冷熱觀測資料

可觀察量大致分為 HotCold,具體取決於它們的排放行為。
Cold Observable 是根據請求(訂閱)開始發出的,而 Hot Observable 是不管訂閱而發出的 Hot Observable


冷可觀察

/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2

上面程式碼的輸出看起來像(可能會有所不同):

sub1, 0    -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0    -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2

請注意,即使 sub2 啟動較晚,它也會從一開始就接收值。總而言之,Cold Observable 僅在需要時發出物品。多個請求啟動多個管道。


熱觀察

注意:Hot observables 會發出獨立於各個訂閱的值。他們有自己的時間表,無論有人在聽,都會發生事件。

Cold Observale 可以通過簡單的 publish 轉換為 Hot Observable

Observable.interval(500, TimeUnit.MILLISECONDS)
    .publish(); // publish converts cold to hot

publish 返回一個 ConnectableObservable,它增加了與 observable 連線斷開**連線的功能。

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));

以上輸出:

sub1, 0  -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2  -> subscriber2 starts
sub1, 3
sub2, 3

請注意,即使 sub2 開始觀察較晚,它也與 sub1 同步。
斷開連線有點複雜! 斷開發生在 Subscription 而不是 Observable

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription

System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();

以上產生:

sub1, 0   -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2   -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting  -> reconnect after unsubscribe
sub1, 0
...

斷開連線後,Observable 基本上終止並在新增新訂閱時重新啟動。

Hot Observable 可用於建立 EventBus。這種 EventBuses 通常很輕且超快。RxBus 的唯一缺點是必須手動實現所有事件並將其傳遞給匯流排。