介紹

背壓是在 Observable 處理管道中,一些非同步階段無法足夠快地處理這些值,並且需要一種方法來告訴上游生產者減速。

需要背壓的經典案例是當生產者是熱源時:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

在此示例中,主執行緒將向最終消費者生成 100 萬個專案,該消費者正在後臺執行緒上處理它。compute(int) 方法可能需要一些時間,但 Observable 操作鏈的開銷也可能增加處理專案所需的時間。然而,帶有 for 迴圈的生成執行緒無法知道這一點並保持了時間。

在內部,非同步操作符具有緩衝區來儲存這些元素,直到它們可以被處理。在經典的 Rx.NET 和早期的 RxJava 中,這些緩衝區是無界的,這意味著它們可能會佔用示例中幾乎所有 100 萬個元素。例如,當一個程式中有 10 億個元素或相同的 100 萬個序列出現 1000 次時,問題就會開始,導致 OutOfMemoryError 並且由於過多的 GC 開銷而導致通常速度減慢。

類似於錯誤處理成為一等公民並接受運算子處理它(通過 onErrorXXX 運算子),背壓是資料流的另一個屬性,程式設計師必須考慮和處理(通過 onBackpressureXXX 運算子)。

除了 PublishSubject 之外,還有其他運算子不支援背壓,主要是由於功能原因。例如,運算子 interval 週期性地發出值,反壓它會導致相對於掛鐘的週期變化。

在現代的 RxJava 中,大多數非同步操作符現在都有一個有界的內部緩衝區,如上面的 observeOn,任何溢位此緩衝區的嘗試都將使用 MissingBackpressureException 終止整個序列。每個運算子的文件都有關於其背壓行為的描述。

然而,背壓在常規冷序列中更微妙地存在(其不會也不應該產生 MissingBackpressureException)。如果第一個示例被重寫:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

沒有錯誤,一切都執行順暢,記憶體使用量很少。這樣做的原因是許多源操作符可以按需生成值,因此運算子 observeOn 可以告訴 range 最多生成如此多的值,observeOn 緩衝區可以立即保持而不會溢位。

這個談判是基於電腦科學的共同慣例概念(我叫你,你叫我)。運算子 range 通過呼叫其(Subscriber 的)setProducer,以 Producer 介面的實現形式向 observeOn 傳送回撥。作為回報,observeOn 用一個值來呼叫 Producer.request(n) 來告訴 range 它允許產生許多其他元素 (即,onNext)。然後,observeOn 有責任在正確的時間內呼叫 request 方法並使用正確的值來保持資料流動但不會溢位。

在最終消費者中表達背壓很少是必要的(因為它們的直接上游和背壓是同步的,自然是由於呼叫堆疊阻塞),但是可能更容易理解它的工作原理:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

這裡的 onStart 實現表明 range 產生它的第一個值,然後在 onNext 中接收。一旦 compute(int) 完成,然後從 range 請求另一個值。在 range 的一個天真的實現中,這樣的呼叫將遞迴地呼叫 onNext,導致 StackOverflowError,這當然是不可取的。

為了防止這種情況,運算子使用所謂的蹦床邏輯來阻止這種可重入的呼叫。在 range 的術語中,它會記住有一個 request(1) 呼叫,而它叫 onNext(),一旦 onNext() 返回,它將進行另一輪並用下一個整數值呼叫 onNext()。因此,如果交換兩個,示例仍然是相同的:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

然而,對於 onStart 來說並非如此。儘管 Observable 基礎設施保證每個 Subscriber 最多隻呼叫一次,但是對 request(1) 的呼叫可能會立即觸發元素的發射。如果在 onNext 呼叫 request(1) 之後有一個初始化邏輯,你可能會遇到異常:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

在這種同步情況下,NullPointerException 將在執行 onStart 時立即丟擲。如果對 request(1) 的呼叫在某個其他執行緒上觸發對 onNext 的非同步呼叫,並在 onNext 的匹配中讀取 name,則會發生更微妙的錯誤。

因此,應該在 onStart 中或甚至在此之前進行所有欄位初始化,並最後呼叫 request()。運算子中 request() 的實現確保在必要時發生在之前的關係(或其他術語,記憶體釋放或完全圍欄)。