快取 HTTP 響應
RxJS 的典型用例是建立 HTTP 請求並在一段時間內快取其結果。此外,我們總是希望一次只執行一個請求並共享其響應。
例如,以下程式碼快取 1 項最大值。 1000ms :
var updateRequest = Observable.defer(() => makeMockHttpRequest())
.publishReplay(1, 1000)
.refCount()
.take(1);
var counter = 1;
function makeMockHttpRequest() {
return Observable.of(counter++)
.delay(100);
}
function requestCachedHttpResult() {
return updateRequest;
}
函式 makeMockHttpRequest() 模擬以 100ms 延遲到達的 HTTP 請求。
函式 requestCachedHttpResult() 是我們訂閱獲取實際或快取響應的地方。
使用 .publishReplay(1, 1000),我們使用 RxJS 組播在內部使用 ReplaySubject 並保留 1 專案以獲得最大的 1000ms。然後 refCount() 用於始終只保留 source 的一個訂閱,即 Observable.defer()。此 Observable 用於建立新請求並遞增 counter 以證明快取的值和新訂閱共享相同的資料。
當我們想要獲取當前資料時,我們稱之為 requestCachedHttpResult()。為了確保在釋出資料後 Observer 將正確完成,我們使用了 take(1) 運算子。
requestCachedHttpResult()
.subscribe(val => console.log("Response 0:", val));
這會使用 mockDataFetch() 建立一個請求並列印到控制檯:
1
一個更復雜的示例將在我們想要測試模擬的 HTTP 連線和響應被共享的不同時間呼叫多個請求。
requestCachedHttpResult()
.subscribe(val => console.log("Response 0:", val));
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 50:", val))
, 50);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 200:", val))
, 200);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 1200:", val))
, 1200);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 1500:", val))
, 1500);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 3500:", val))
, 3500);
觀看現場演示: https : //jsbin.com/todude/5/edit?js,console
每個請求都會延遲傳送,並按以下順序傳送:
0 - 首次請求使 refCount() 訂閱其 source,這使得 mockDataFetch() 呼叫。它的迴應將被推遲 22。此時 publishReplay() 內的 publishReplay() 運算子有一個 Observer。
50 - 第二個請求也訂閱了 ConnectableObservable。此時 ConnectableObservable 裡面的 publishReplay() 運算子有兩個 Observer。它不會使用 makeMockHttpRequest() 建立另一個請求,因為 refCount() 已經訂閱了。
100 - 第一個響應準備就緒。它首先由 ReplaySubject 快取,然後重新傳送給訂閱 ConnectableObservable 的兩位觀察者。由於 take(1) 和取消訂閱,兩位觀察員都完成了。
200 - 訂閱 ReplaySubject,它立即發出快取值,使 take(1) 完成 Observer 並立即取消訂閱。沒有發出 HTTP 請求,也沒有訂閱。
1200 - 與 0 的第一個匹配相同。此時快取的值已被丟棄,因為它比 1000ms 舊。
1500 - 與 200 的第四次活動相同。
3500 - 與 1200 的第一個匹配相同。
控制檯中的輸出如下:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 2
Response 1500: 2
Response 3500: 3
在 RxJS 5 中,cache() 運算子涵蓋了類似的功能。然而,由於其功能有限,它在 5.0.0-rc.1 中被刪除。
處理錯誤
如果我們想要處理遠端服務(makeMockHttpRequest 函式)產生的錯誤,我們需要在它們合併到主 Observable 鏈之前捕獲它們,因為 publishReplay() 內部的 ReplaySubject 收到的任何錯誤都會將其內部狀態標記為 stopped(閱讀更多這裡主題和它的內部狀態 )這絕對不是我們想要的。
在下面的例子中,我們在 counter === 2 模擬錯誤並用 catch() 操作符捕獲它。我們使用 catch() 只將 error 通知轉換為常規 next,以便我們可以在觀察者中處理錯誤:
function makeMockHttpRequest() {
return Observable.of(counter++)
.delay(100)
.map(i => {
if (i === 2) {
throw new Error('Invalid URL');
}
return i;
})
.catch(err => Observable.of(err));
}
觀看現場演示: https : //jsbin.com/kavihu/10/edit?js,console
這將列印以控制以下輸出。請注意 next 處理程式中收到的錯誤:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: [object Error] { ... }
Response 1500: [object Error] { ... }
Response 3500: 3
如果我們想要在每個觀察者中處理常規 error 通知中的錯誤,我們必須在 publishReplay() 運算子之後重新丟擲它們,原因如上所述。
var updateRequest = Observable.defer(() => makeMockHttpRequest())
.publishReplay(1, 1000)
.refCount()
.take(1)
.map(val => {
if (val instanceof Error) {
throw val;
}
return val;
});
請參閱現場演示: https : //jsbin.com/fabosam/5/edit? js, console (請注意,我們必須為每個觀察者新增錯誤回撥)。
Response 0: 1
Response 50: 1
Response 200: 1
error callback: Error: Invalid URL
error callback: Error: Invalid URL
Response 3500: 3