快取 HTTP 響應
RxJS 的典型用例是建立 HTTP 請求並在一段時間內快取其結果。此外,我們總是希望一次只執行一個請求並共享其響應。
例如,以下程式碼快取 1 項最大值。 1000ms :
var updateRequest = Observable.defer(() => makeMockHttpRequest())
.publishReplay(1, 1000)
.refCount()
.take(1);
var counter = 1;
function makeMockHttpRequest() {
return Observable.of(counter++)
.delay(100);
}
function requestCachedHttpResult() {
return updateRequest;
}
函式 makeMockHttpRequest()
模擬以 100ms
延遲到達的 HTTP 請求。
函式 requestCachedHttpResult()
是我們訂閱獲取實際或快取響應的地方。
使用 .publishReplay(1, 1000)
,我們使用 RxJS 組播在內部使用 ReplaySubject
並保留 1
專案以獲得最大的 1000ms
。然後 refCount()
用於始終只保留 source
的一個訂閱,即 Observable.defer()
。此 Observable 用於建立新請求並遞增 counter
以證明快取的值和新訂閱共享相同的資料。
當我們想要獲取當前資料時,我們稱之為 requestCachedHttpResult()
。為了確保在釋出資料後 Observer 將正確完成,我們使用了 take(1)
運算子。
requestCachedHttpResult()
.subscribe(val => console.log("Response 0:", val));
這會使用 mockDataFetch()
建立一個請求並列印到控制檯:
1
一個更復雜的示例將在我們想要測試模擬的 HTTP 連線和響應被共享的不同時間呼叫多個請求。
requestCachedHttpResult()
.subscribe(val => console.log("Response 0:", val));
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 50:", val))
, 50);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 200:", val))
, 200);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 1200:", val))
, 1200);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 1500:", val))
, 1500);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 3500:", val))
, 3500);
觀看現場演示: https : //jsbin.com/todude/5/edit?js,console
每個請求都會延遲傳送,並按以下順序傳送:
0
- 首次請求使 refCount()
訂閱其 source
,這使得 mockDataFetch()
呼叫。它的迴應將被推遲 22。此時 publishReplay()
內的 publishReplay()
運算子有一個 Observer。
50
- 第二個請求也訂閱了 ConnectableObservable
。此時 ConnectableObservable
裡面的 publishReplay()
運算子有兩個 Observer。它不會使用 makeMockHttpRequest()
建立另一個請求,因為 refCount()
已經訂閱了。
100
- 第一個響應準備就緒。它首先由 ReplaySubject
快取,然後重新傳送給訂閱 ConnectableObservable
的兩位觀察者。由於 take(1)
和取消訂閱,兩位觀察員都完成了。
200
- 訂閱 ReplaySubject
,它立即發出快取值,使 take(1)
完成 Observer 並立即取消訂閱。沒有發出 HTTP 請求,也沒有訂閱。
1200
- 與 0
的第一個匹配相同。此時快取的值已被丟棄,因為它比 1000ms
舊。
1500
- 與 200
的第四次活動相同。
3500
- 與 1200
的第一個匹配相同。
控制檯中的輸出如下:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 2
Response 1500: 2
Response 3500: 3
在 RxJS 5 中,cache()
運算子涵蓋了類似的功能。然而,由於其功能有限,它在 5.0.0-rc.1
中被刪除。
處理錯誤
如果我們想要處理遠端服務(makeMockHttpRequest
函式)產生的錯誤,我們需要在它們合併到主 Observable 鏈之前捕獲它們,因為 publishReplay()
內部的 ReplaySubject
收到的任何錯誤都會將其內部狀態標記為 stopped
(閱讀更多這裡主題和它的內部狀態 )這絕對不是我們想要的。
在下面的例子中,我們在 counter === 2
模擬錯誤並用 catch()
操作符捕獲它。我們使用 catch()
只將 error
通知轉換為常規 next
,以便我們可以在觀察者中處理錯誤:
function makeMockHttpRequest() {
return Observable.of(counter++)
.delay(100)
.map(i => {
if (i === 2) {
throw new Error('Invalid URL');
}
return i;
})
.catch(err => Observable.of(err));
}
觀看現場演示: https : //jsbin.com/kavihu/10/edit?js,console
這將列印以控制以下輸出。請注意 next
處理程式中收到的錯誤:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: [object Error] { ... }
Response 1500: [object Error] { ... }
Response 3500: 3
如果我們想要在每個觀察者中處理常規 error
通知中的錯誤,我們必須在 publishReplay()
運算子之後重新丟擲它們,原因如上所述。
var updateRequest = Observable.defer(() => makeMockHttpRequest())
.publishReplay(1, 1000)
.refCount()
.take(1)
.map(val => {
if (val instanceof Error) {
throw val;
}
return val;
});
請參閱現場演示: https : //jsbin.com/fabosam/5/edit? js, console (請注意,我們必須為每個觀察者新增錯誤回撥)。
Response 0: 1
Response 50: 1
Response 200: 1
error callback: Error: Invalid URL
error callback: Error: Invalid URL
Response 3500: 3