主体及其内部状态
在 Rx 中,受试者具有可以控制其行为的内部状态。
一个常见的用例形式主题是订阅多个 Observable。以下示例创建两个不同的 Observable,并为两者订阅 Subject。然后它尝试打印经过的所有值:
let subject = new Subject();
subject.subscribe(val => console.log(val));
Observable.range(1, 5).subscribe(subject);
Observable.from(['a', 'b', 'c']).subscribe(subject);
查看现场演示: https : //jsbin.com/pesumup/2/edit?js,console
这个例子只打印数字 1 - 5
并没有打印任何字符 a
,b
,c
。
1
2
3
4
5
问题是发生了什么?这里的问题是收到 complete
通知时 Subject 实例的内部状态。当受试者收到 error
或 complete
通知时,它会将自己标记为停止并且永远不会发出任何其他信号 。
它需要是这样的,因为 Subjects 基本上是 Observables,Observables 只能在流的末尾发出一个 complete
或 error
通知,但从不两者。
上面示例的问题在于,第一个 Observable Observable.range()
也发出了 complete
通知,然后由 Subject 接收,因此在订阅第二个 Observable 时它不会重新发送任何值 。
通过设置完整的回调,我们可以看到主题确实收到了 complete
通知。
subject.subscribe(val => console.log(val), null, () => console.log('complete'));
最后的输出是相同的,它也打印 complete
。
1
2
3
4
5
complete
因此,如果我们不希望主题收到 complete
通知,我们可以手动发送 next
信号。这意味着我们不会直接订阅主题,而是订阅一个在主题上调用 next()
方法的回调:
Observable.range(1, 5).subscribe(val => subject.next(val));
Observable.from(['a', 'b', 'c']).subscribe(val => subject.next(val));
查看现场演示: https : //jsbin.com/funeka/1/edit?js,console
1
2
3
4
5
a
b
c
请注意,这个完全相同的原则适用于我们使用 Subjects 的任何地方。
例如,publish()
,share()
等操作符以及在引擎盖下使用相同 Subject 主题的所有变体都受此影响。