介绍

背压是在 Observable 处理管道中,一些异步阶段无法足够快地处理这些值,并且需要一种方法来告诉上游生产者减速。

需要背压的经典案例是当生产者是热源时:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

在此示例中,主线程将向最终消费者生成 100 万个项目,该消费者正在后台线程上处理它。compute(int) 方法可能需要一些时间,但 Observable 操作链的开销也可能增加处理项目所需的时间。然而,带有 for 循环的生成线程无法知道这一点并保持了时间。

在内部,异步操作符具有缓冲区来保存这些元素,直到它们可以被处理。在经典的 Rx.NET 和早期的 RxJava 中,这些缓冲区是无界的,这意味着它们可能会占用示例中几乎所有 100 万个元素。例如,当一个程序中有 10 亿个元素或相同的 100 万个序列出现 1000 次时,问题就会开始,导致 OutOfMemoryError 并且由于过多的 GC 开销而导致通常速度减慢。

类似于错误处理成为一等公民并接受运算符处理它(通过 onErrorXXX 运算符),背压是数据流的另一个属性,程序员必须考虑和处理(通过 onBackpressureXXX 运算符)。

除了 PublishSubject 之外,还有其他运算符不支持背压,主要是由于功能原因。例如,运算符 interval 周期性地发出值,反压它会导致相对于挂钟的周期变化。

在现代的 RxJava 中,大多数异步操作符现在都有一个有界的内部缓冲区,如上面的 observeOn,任何溢出此缓冲区的尝试都将使用 MissingBackpressureException 终止整个序列。每个运算符的文档都有关于其背压行为的描述。

然而,背压在常规冷序列中更微妙地存在(其不会也不应该产生 MissingBackpressureException)。如果第一个示例被重写:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

没有错误,一切都运行顺畅,内存使用量很少。这样做的原因是许多源操作符可以按需生成值,因此运算符 observeOn 可以告诉 range 最多生成如此多的值,observeOn 缓冲区可以立即保持而不会溢出。

这个谈判是基于计算机科学的共同惯例概念(我叫你,你叫我)。运算符 range 通过调用其(Subscriber 的)setProducer,以 Producer 接口的实现形式向 observeOn 发送回调。作为回报,observeOn 用一个值来调用 Producer.request(n) 来告诉 range 它允许产生许多其他元素 (即,onNext)。然后,observeOn 有责任在正确的时间内调用 request 方法并使用正确的值来保持数据流动但不会溢出。

在最终消费者中表达背压很少是必要的(因为它们的直接上游和背压是同步的,自然是由于调用堆栈阻塞),但是可能更容易理解它的工作原理:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

这里的 onStart 实现表明 range 产生它的第一个值,然后在 onNext 中接收。一旦 compute(int) 完成,然后从 range 请求另一个值。在 range 的一个天真的实现中,这样的调用将递归地调用 onNext,导致 StackOverflowError,这当然是不可取的。

为了防止这种情况,运算符使用所谓的蹦床逻辑来阻止这种可重入的呼叫。在 range 的术语中,它会记住有一个 request(1) 调用,而它叫 onNext(),一旦 onNext() 返回,它将进行另一轮并用下一个整数值调用 onNext()。因此,如果交换两个,示例仍然是相同的:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

然而,对于 onStart 来说并非如此。尽管 Observable 基础设施保证每个 Subscriber 最多只调用一次,但是对 request(1) 的调用可能会立即触发元素的发射。如果在 onNext 调用 request(1) 之后有一个初始化逻辑,你可能会遇到异常:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

在这种同步情况下,NullPointerException 将在执行 onStart 时立即抛出。如果对 request(1) 的调用在某个其他线程上触发对 onNext 的异步调用,并在 onNext 的匹配中读取 name,则会发生更微妙的错误。

因此,应该在 onStart 中或甚至在此之前进行所有字段初始化,并最后调用 request()。运算符中 request() 的实现确保在必要时发生在之前的关系(或其他术语,内存释放或完全围栏)。