介紹
背壓是在 Observable
處理管道中,一些非同步階段無法足夠快地處理這些值,並且需要一種方法來告訴上游生產者減速。
需要背壓的經典案例是當生產者是熱源時:
PublishSubject<Integer> source = PublishSubject.create();
source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Thread.sleep(10_000);
在此示例中,主執行緒將向最終消費者生成 100 萬個專案,該消費者正在後臺執行緒上處理它。compute(int)
方法可能需要一些時間,但 Observable
操作鏈的開銷也可能增加處理專案所需的時間。然而,帶有 for 迴圈的生成執行緒無法知道這一點並保持了時間。
在內部,非同步操作符具有緩衝區來儲存這些元素,直到它們可以被處理。在經典的 Rx.NET 和早期的 RxJava 中,這些緩衝區是無界的,這意味著它們可能會佔用示例中幾乎所有 100 萬個元素。例如,當一個程式中有 10 億個元素或相同的 100 萬個序列出現 1000 次時,問題就會開始,導致 OutOfMemoryError
並且由於過多的 GC 開銷而導致通常速度減慢。
類似於錯誤處理成為一等公民並接受運算子處理它(通過 onErrorXXX
運算子),背壓是資料流的另一個屬性,程式設計師必須考慮和處理(通過 onBackpressureXXX
運算子)。
除了 PublishSubject
之外,還有其他運算子不支援背壓,主要是由於功能原因。例如,運算子 interval
週期性地發出值,反壓它會導致相對於掛鐘的週期變化。
在現代的 RxJava 中,大多數非同步操作符現在都有一個有界的內部緩衝區,如上面的 observeOn
,任何溢位此緩衝區的嘗試都將使用 MissingBackpressureException
終止整個序列。每個運算子的文件都有關於其背壓行為的描述。
然而,背壓在常規冷序列中更微妙地存在(其不會也不應該產生 MissingBackpressureException
)。如果第一個示例被重寫:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
Thread.sleep(10_000);
沒有錯誤,一切都執行順暢,記憶體使用量很少。這樣做的原因是許多源操作符可以按需生成值,因此運算子 observeOn
可以告訴 range
最多生成如此多的值,observeOn
緩衝區可以立即保持而不會溢位。
這個談判是基於電腦科學的共同慣例概念(我叫你,你叫我)。運算子 range
通過呼叫其(Subscriber
的)setProducer
,以 Producer
介面的實現形式向 observeOn
傳送回撥。作為回報,observeOn
用一個值來呼叫 Producer.request(n)
來告訴 range
它允許產生許多其他元素 (即,onNext
)。然後,observeOn
有責任在正確的時間內呼叫 request
方法並使用正確的值來保持資料流動但不會溢位。
在最終消費者中表達背壓很少是必要的(因為它們的直接上游和背壓是同步的,自然是由於呼叫堆疊阻塞),但是可能更容易理解它的工作原理:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
public void onNext(Integer v) {
compute(v);
request(1);
}
@Override
public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Done!");
}
});
這裡的 onStart
實現表明 range
產生它的第一個值,然後在 onNext
中接收。一旦 compute(int)
完成,然後從 range
請求另一個值。在 range
的一個天真的實現中,這樣的呼叫將遞迴地呼叫 onNext
,導致 StackOverflowError
,這當然是不可取的。
為了防止這種情況,運算子使用所謂的蹦床邏輯來阻止這種可重入的呼叫。在 range
的術語中,它會記住有一個 request(1)
呼叫,而它叫 onNext()
,一旦 onNext()
返回,它將進行另一輪並用下一個整數值呼叫 onNext()
。因此,如果交換兩個,示例仍然是相同的:
@Override
public void onNext(Integer v) {
request(1);
compute(v);
}
然而,對於 onStart
來說並非如此。儘管 Observable
基礎設施保證每個 Subscriber
最多隻呼叫一次,但是對 request(1)
的呼叫可能會立即觸發元素的發射。如果在 onNext
呼叫 request(1)
之後有一個初始化邏輯,你可能會遇到異常:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
String name;
@Override
public void onStart() {
request(1);
name = "RangeExample";
}
@Override
public void onNext(Integer v) {
compute(name.length + v);
request(1);
}
// ... rest is the same
});
在這種同步情況下,NullPointerException
將在執行 onStart
時立即丟擲。如果對 request(1)
的呼叫在某個其他執行緒上觸發對 onNext
的非同步呼叫,並在 onNext
的匹配中讀取 name
,則會發生更微妙的錯誤。
因此,應該在 onStart
中或甚至在此之前進行所有欄位初始化,並最後呼叫 request()
。運算子中 request()
的實現確保在必要時發生在之前的關係(或其他術語,記憶體釋放或完全圍欄)。