PublishSubject
PublishSubject
僅在訂閱時間之後向源 Observable
發出的那些專案傳送到 Observer
。
一個簡單的 PublishSubject
示例:
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);
輸出:
sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3
在上面的示例中,PublishSubject
訂閱了 Observable
,其作用類似於時鐘,並且每 500 毫秒發出一個專案(Long)。如輸出中所示,PublishSubject
傳遞從源(clock
)到其訂戶(sub1
和 sub2
)的值。
PublishSubject
可以在建立物品後立即開始發射物品,而無需任何觀察者,這會冒一個或多個物品丟失的風險,直到觀察者能夠進行消防。
createClock(); // 3 lines moved for brevity. same as above example
Thread.sleep(5000); // introduces a delay before first subscribe
sub1andsub2(); // 6 lines moved for brevity. same as above example
輸出:
sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13
請注意,sub1
從 10
開始發出值。引入的 5 秒延遲導致物品丟失。這些不能再現。這基本上使得 PublishSubject
成為了一個節目。
另外,請注意,如果觀察者在發出 n 個專案之後訂閱了 PublishSubject
,則無法為該觀察者再現這些 n 個專案。 **
下面是 PublishSubject
的大理石圖
http://i.stack.imgur.com/UKFxw.jpg
在呼叫源 Observable
的 onCompleted
之前的任何時間點,PublishSubject
向所有已訂閱的專案傳送專案。
如果源 Observable
以錯誤終止,則 PublishSubject
將不會向後續觀察者發出任何專案,但只會傳遞來自源 Observable 的錯誤通知。
http://i.stack.imgur.com/BlLyD.jpg
用例
假設你要建立一個應用程式,該應用程式將監視某個公司的股票價格並將其轉發給所有請求它的客戶。
/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);
/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);
/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();
在上面的示例用例中,PublishSubject
充當橋接器,將值從伺服器傳遞到訂閱 watcher
的所有客戶端。
進一步閱讀: