冷热观测资料
可观察量大致分为 Hot
或 Cold
,具体取决于它们的排放行为。
Cold Observable
是根据请求(订阅)开始发出的,而 Hot Observable
是不管订阅而发出的 Hot Observable
。
冷可观察
/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2
上面代码的输出看起来像(可能会有所不同):
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0 -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2
请注意,即使 sub2
启动较晚,它也会从一开始就接收值。总而言之,Cold Observable
仅在需要时发出物品。多个请求启动多个管道。
热观察
注意:Hot observables 会发出独立于各个订阅的值。他们有自己的时间表,无论有人在听,都会发生事件。
Cold Observale
可以通过简单的 publish
转换为 Hot Observable
。
Observable.interval(500, TimeUnit.MILLISECONDS)
.publish(); // publish converts cold to hot
publish
返回一个 ConnectableObservable
,它增加了与 observable 连接和断开**连接的功能。
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
以上输出:
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
请注意,即使 sub2
开始观察较晚,它也与 sub1
同步。
断开连接有点复杂! 断开发生在 Subscription
而不是 Observable
。
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription
System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();
以上产生:
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting -> reconnect after unsubscribe
sub1, 0
...
断开连接后,Observable
基本上终止并在添加新订阅时重新启动。
Hot Observable
可用于创建 EventBus
。这种 EventBuses 通常很轻且超快。RxBus 的唯一缺点是必须手动实现所有事件并将其传递给总线。