PublishSubject
PublishSubject
仅在订阅时间之后向源 Observable
发出的那些项目发送到 Observer
。
一个简单的 PublishSubject
示例:
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);
输出:
sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3
在上面的示例中,PublishSubject
订阅了 Observable
,其作用类似于时钟,并且每 500 毫秒发出一个项目(Long)。如输出中所示,PublishSubject
传递从源(clock
)到其订户(sub1
和 sub2
)的值。
PublishSubject
可以在创建物品后立即开始发射物品,而无需任何观察者,这会冒一个或多个物品丢失的风险,直到观察者能够进行消防。
createClock(); // 3 lines moved for brevity. same as above example
Thread.sleep(5000); // introduces a delay before first subscribe
sub1andsub2(); // 6 lines moved for brevity. same as above example
输出:
sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13
请注意,sub1
从 10
开始发出值。引入的 5 秒延迟导致物品丢失。这些不能再现。这基本上使得 PublishSubject
成为了一个节目。
另外,请注意,如果观察者在发出 n 个项目之后订阅了 PublishSubject
,则无法为该观察者再现这些 n 个项目。 **
下面是 PublishSubject
的大理石图
http://i.stack.imgur.com/UKFxw.jpg
在调用源 Observable
的 onCompleted
之前的任何时间点,PublishSubject
向所有已订阅的项目发送项目。
如果源 Observable
以错误终止,则 PublishSubject
将不会向后续观察者发出任何项目,但只会传递来自源 Observable 的错误通知。
http://i.stack.imgur.com/BlLyD.jpg
用例
假设你要创建一个应用程序,该应用程序将监视某个公司的股票价格并将其转发给所有请求它的客户。
/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);
/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);
/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();
在上面的示例用例中,PublishSubject
充当桥接器,将值从服务器传递到订阅 watcher
的所有客户端。
进一步阅读: