基本的例子
调度程序是关于处理单元的 RxJava 抽象。调度程序可以由 Executor 服务支持,但你可以实现自己的调度程序实现。
一个 Scheduler
应该满足这个要求:
- 应按顺序处理未延迟的任务(FIFO 顺序)
- 任务可以延迟
Scheduler
可以在某些运算符中用作参数(例如:delay
),或者与 subscribeOn
/ observeOn
方法一起使用。
对于某些运算符,Scheduler
将用于处理特定运算符的任务。例如,delay
将安排将发出下一个值的延迟任务。这是一个将保留并稍后执行的 Scheduler
。
subscribeOn
每个 tihuan 可以使用一次 10。它将定义订阅的代码将在哪个 Scheduler
执行。
observeOn
可以每次使用多次 13。它将定义 Scheduler
将用于执行 observeOn
方法之后定义的所有任务。observeOn
将帮助你执行跳线。
subscribeOn 特定的调度程序
// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
用特定的 Scheduler 观察
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.io())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the computation scheduler
.observeOn(Schedulers.computation())
.map(str -> str + " -> " + Thread.currentThread().getName())
// next tasks will be executed in the io scheduler
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
使用运算符指定特定的调度程序
一些运算符可以将 Scheduler
作为参数。
Observable.just(1)
// the onNext method of the delay operator will be executed in a new thread
.delay(1, TimeUnit.SECONDS, Schedulers.newThread())
.subscribe(System.out::println);
发布到订阅者:
TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
线程池:
this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);
Web Socket Observable:
final Subscription subscribe = socket.webSocketObservable()
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<RxEvent>() {
@Override
public void call(RxEvent rxEvent) {
System.out.println("Event: " + rxEvent);
}
})
.subscribe();