PublishSubject

PublishSubject 仅在订阅时间之后向源 Observable 发出的那些项目发送到 Observer

一个简单的 PublishSubject 示例:

Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();

clock.subscribe(subjectLong);

System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);

输出:

sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3

在上面的示例中,PublishSubject 订阅了 Observable,其作用类似于时钟,并且每 500 毫秒发出一个项目(Long)。如输出中所示,PublishSubject 传递从源(clock)到其订户(sub1sub2)的值。

PublishSubject 可以在创建物品后立即开始发射物品,而无需任何观察者,这会冒一个或多个物品丢失的风险,直到观察者能够进行消防。

createClock(); // 3 lines moved for brevity. same as above example

Thread.sleep(5000); // introduces a delay before first subscribe

sub1andsub2(); // 6 lines moved for brevity. same as above example

输出:

sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13

请注意,sub110 开始发出值。引入的 5 秒延迟导致物品丢失。这些不能再现。这基本上使得 PublishSubject 成为了一个节目。

另外,请注意,如果观察者在发出 n 个项目之后订阅了 PublishSubject,则无法为该观察者再现这些 n 个项目。 **

下面是 PublishSubject 的大理石图

http://i.stack.imgur.com/UKFxw.jpg

在调用源 ObservableonCompleted 之前的任何时间点,PublishSubject 向所有已订阅的项目发送项目。

如果源 Observable 以错误终止,则 PublishSubject 将不会向后续观察者发出任何项目,但只会传递来自源 Observable 的错误通知。

http://i.stack.imgur.com/BlLyD.jpg

用例
假设你要创建一个应用程序,该应用程序将监视某个公司的股票价格并将其转发给所有请求它的客户。

/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);

/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);

/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();

在上面的示例用例中,PublishSubject 充当桥接器,将值从服务器传递到订阅 watcher 的所有客户端。

进一步阅读: