主體及其內部狀態

在 Rx 中,受試者具有可以控制其行為的內部狀態。

一個常見的用例形式主題是訂閱多個 Observable。以下示例建立兩個不同的 Observable,併為兩者訂閱 Subject。然後它嘗試列印經過的所有值:

let subject = new Subject();
subject.subscribe(val => console.log(val));

Observable.range(1, 5).subscribe(subject);
Observable.from(['a', 'b', 'c']).subscribe(subject);

檢視現場演示: https//jsbin.com/pesumup/2/edit?js,console

這個例子只列印數字 1 - 5 並沒有列印任何字元 abc

1
2
3
4
5

問題是發生了什麼?這裡的問題是收到 complete 通知時 Subject 例項的內部狀態。當受試者收到 errorcomplete 通知時,它會將自己標記為停止並且永遠不會發出任何其他訊號

它需要是這樣的,因為 Subjects 基本上是 Observables,Observables 只能在流的末尾發出一個 completeerror 通知,但從不兩者。

上面示例的問題在於,第一個 Observable Observable.range() 也發出了 complete 通知,然後由 Subject 接收,因此在訂閱第二個 Observable 不會重新傳送任何值

通過設定完整的回撥,我們可以看到主題確實收到了 complete 通知。

subject.subscribe(val => console.log(val), null, () => console.log('complete'));

最後的輸出是相同的,它也列印 complete

1
2
3
4
5
complete

因此,如果我們不希望主題收到 complete 通知,我們可以手動傳送 next 訊號。這意味著我們不會直接訂閱主題,而是訂閱一個在主題上呼叫 next() 方法的回撥:

Observable.range(1, 5).subscribe(val => subject.next(val));
Observable.from(['a', 'b', 'c']).subscribe(val => subject.next(val));

檢視現場演示: https//jsbin.com/funeka/1/edit?js,console

1
2
3
4
5
a
b
c

請注意,這個完全相同的原則適用於我們使用 Subjects 的任何地方。

例如,publish()share() 等操作符以及在引擎蓋下使用相同 Subject 主題的所有變體都受此影響。