介绍
背压是在 Observable
处理管道中,一些异步阶段无法足够快地处理这些值,并且需要一种方法来告诉上游生产者减速。
需要背压的经典案例是当生产者是热源时:
PublishSubject<Integer> source = PublishSubject.create();
source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Thread.sleep(10_000);
在此示例中,主线程将向最终消费者生成 100 万个项目,该消费者正在后台线程上处理它。compute(int)
方法可能需要一些时间,但 Observable
操作链的开销也可能增加处理项目所需的时间。然而,带有 for 循环的生成线程无法知道这一点并保持了时间。
在内部,异步操作符具有缓冲区来保存这些元素,直到它们可以被处理。在经典的 Rx.NET 和早期的 RxJava 中,这些缓冲区是无界的,这意味着它们可能会占用示例中几乎所有 100 万个元素。例如,当一个程序中有 10 亿个元素或相同的 100 万个序列出现 1000 次时,问题就会开始,导致 OutOfMemoryError
并且由于过多的 GC 开销而导致通常速度减慢。
类似于错误处理成为一等公民并接受运算符处理它(通过 onErrorXXX
运算符),背压是数据流的另一个属性,程序员必须考虑和处理(通过 onBackpressureXXX
运算符)。
除了 PublishSubject
之外,还有其他运算符不支持背压,主要是由于功能原因。例如,运算符 interval
周期性地发出值,反压它会导致相对于挂钟的周期变化。
在现代的 RxJava 中,大多数异步操作符现在都有一个有界的内部缓冲区,如上面的 observeOn
,任何溢出此缓冲区的尝试都将使用 MissingBackpressureException
终止整个序列。每个运算符的文档都有关于其背压行为的描述。
然而,背压在常规冷序列中更微妙地存在(其不会也不应该产生 MissingBackpressureException
)。如果第一个示例被重写:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
Thread.sleep(10_000);
没有错误,一切都运行顺畅,内存使用量很少。这样做的原因是许多源操作符可以按需生成值,因此运算符 observeOn
可以告诉 range
最多生成如此多的值,observeOn
缓冲区可以立即保持而不会溢出。
这个谈判是基于计算机科学的共同惯例概念(我叫你,你叫我)。运算符 range
通过调用其(Subscriber
的)setProducer
,以 Producer
接口的实现形式向 observeOn
发送回调。作为回报,observeOn
用一个值来调用 Producer.request(n)
来告诉 range
它允许产生许多其他元素 (即,onNext
)。然后,observeOn
有责任在正确的时间内调用 request
方法并使用正确的值来保持数据流动但不会溢出。
在最终消费者中表达背压很少是必要的(因为它们的直接上游和背压是同步的,自然是由于调用堆栈阻塞),但是可能更容易理解它的工作原理:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
public void onNext(Integer v) {
compute(v);
request(1);
}
@Override
public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Done!");
}
});
这里的 onStart
实现表明 range
产生它的第一个值,然后在 onNext
中接收。一旦 compute(int)
完成,然后从 range
请求另一个值。在 range
的一个天真的实现中,这样的调用将递归地调用 onNext
,导致 StackOverflowError
,这当然是不可取的。
为了防止这种情况,运算符使用所谓的蹦床逻辑来阻止这种可重入的呼叫。在 range
的术语中,它会记住有一个 request(1)
调用,而它叫 onNext()
,一旦 onNext()
返回,它将进行另一轮并用下一个整数值调用 onNext()
。因此,如果交换两个,示例仍然是相同的:
@Override
public void onNext(Integer v) {
request(1);
compute(v);
}
然而,对于 onStart
来说并非如此。尽管 Observable
基础设施保证每个 Subscriber
最多只调用一次,但是对 request(1)
的调用可能会立即触发元素的发射。如果在 onNext
调用 request(1)
之后有一个初始化逻辑,你可能会遇到异常:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
String name;
@Override
public void onStart() {
request(1);
name = "RangeExample";
}
@Override
public void onNext(Integer v) {
compute(name.length + v);
request(1);
}
// ... rest is the same
});
在这种同步情况下,NullPointerException
将在执行 onStart
时立即抛出。如果对 request(1)
的调用在某个其他线程上触发对 onNext
的异步调用,并在 onNext
的匹配中读取 name
,则会发生更微妙的错误。
因此,应该在 onStart
中或甚至在此之前进行所有字段初始化,并最后调用 request()
。运算符中 request()
的实现确保在必要时发生在之前的关系(或其他术语,内存释放或完全围栏)。