PublishReplay 如何工作
它在内部创建了一个 ReplaySubject
并使其与 multicast
兼容。ReplaySubject
的最小重放值是 1 发射。这导致以下结果:
- 首次订阅将触发
publishReplay(1)
内部订阅源流并通过ReplaySubject
管理所有排放,有效缓存最后 n (= 1)个排放 - 如果在源仍处于活动状态时启动第二个订阅,则
multicast()
将我们连接到相同的replaySubject
,我们将接收所有下一个排放,直到源流完成。 - 如果在源已经完成之后启动订阅,则 replaySubject 已缓存最后 n 个排放,并且它将仅在完成之前接收那些。
const source = Rx.Observable.from([1,2])
.mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
.do(null,null,() => console.log('source stream completed'))
.publishReplay(1)
.refCount();
// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));
// new subscription after the stream has completed already
setTimeout(() => {
source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>