主体及其内部状态

在 Rx 中,受试者具有可以控制其行为的内部状态。

一个常见的用例形式主题是订阅多个 Observable。以下示例创建两个不同的 Observable,并为两者订阅 Subject。然后它尝试打印经过的所有值:

let subject = new Subject();
subject.subscribe(val => console.log(val));

Observable.range(1, 5).subscribe(subject);
Observable.from(['a', 'b', 'c']).subscribe(subject);

查看现场演示: https//jsbin.com/pesumup/2/edit?js,console

这个例子只打印数字 1 - 5 并没有打印任何字符 abc

1
2
3
4
5

问题是发生了什么?这里的问题是收到 complete 通知时 Subject 实例的内部状态。当受试者收到 errorcomplete 通知时,它会将自己标记为停止并且永远不会发出任何其他信号

它需要是这样的,因为 Subjects 基本上是 Observables,Observables 只能在流的末尾发出一个 completeerror 通知,但从不两者。

上面示例的问题在于,第一个 Observable Observable.range() 也发出了 complete 通知,然后由 Subject 接收,因此在订阅第二个 Observable 不会重新发送任何值

通过设置完整的回调,我们可以看到主题确实收到了 complete 通知。

subject.subscribe(val => console.log(val), null, () => console.log('complete'));

最后的输出是相同的,它也打印 complete

1
2
3
4
5
complete

因此,如果我们不希望主题收到 complete 通知,我们可以手动发送 next 信号。这意味着我们不会直接订阅主题,而是订阅一个在主题上调用 next() 方法的回调:

Observable.range(1, 5).subscribe(val => subject.next(val));
Observable.from(['a', 'b', 'c']).subscribe(val => subject.next(val));

查看现场演示: https//jsbin.com/funeka/1/edit?js,console

1
2
3
4
5
a
b
c

请注意,这个完全相同的原则适用于我们使用 Subjects 的任何地方。

例如,publish()share() 等操作符以及在引擎盖下使用相同 Subject 主题的所有变体都受此影响。