傳送多個並行 HTTP 請求
Web 應用程式中一個非常常見的用例是執行多個非同步(例如 HTTP)請求,並在它們到達時收集它們的結果或者同時收集它們的所有結果(例如,在 Angular2 中使用 HTTP 服務 )。
1.當他們到達時逐個收集非同步響應
這通常使用 mergeMap()
運算子來完成,該運算子采用必須返回 Observable 的投影函式。即使先前的 Observable 尚未完成,運算子 mergeMap()
也會立即在內部訂閱每個 Observable。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url))
.subscribe(val => console.log(val));
由於隨機延遲,這會以不同的順序列印對控制檯的響應:
Response from url-3
Response from url-4
Response from url-2
Response from url-1
檢視現場演示: https : //jsbin.com/xaqudan/2/edit?js,console
每個響應(通過 next
呼叫發出的專案)立即由 mergeMap()
重新傳送。
為了我們傳送多個 HTTP 請求的目的,有必要提一下 mergeMap()
總共可以帶三個引數:
- 需要返回 Observable 的投影函式。
- 結果選擇器功能允許我們在進一步發出結果之前修改結果。
- 併發訂閱的 Observable 的數量。
控制並行請求的數量
使用第三個引數,我們可以控制我們將處理多少個並行請求(假設執行 HTTP 請求的每個 Observable 都是冷)。
在以下示例中,我們將同時僅執行 2 個請求。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(1000);
}
let urls = ['url-1', 'url-2', 'url-3', 'url-4'];
let start = (new Date()).getTime();
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url), undefined, 2)
.timestamp()
.map(stamp => [stamp.timestamp - start, stamp.value])
.subscribe(val => console.log(val));
觀看現場演示: https : //jsbin.com/sojejal/4/edit?js,console
請注意,前兩個請求在 1 秒後完成,而其他兩個請求在 2 秒後完成。
[1004, "Response from url-1"]
[1010, "Response from url-2"]
[2007, "Response from url-3"]
[2012, "Response from url-4"]
處理錯誤
如果任何源 Observable 失敗(傳送 error
通知),mergeMap()
會再次傳送錯誤作為 error
。如果我們希望每個 Observable 優雅地失敗,我們需要使用例如 catch()
操作符。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000)
.map(value => {
if (url === 'url-3') {
throw new Error(`Error response from ${url}`)
}
return value;
});
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url).catch(() => Observable.empty()))
.subscribe(val => console.log(val));
url-3
的響應會丟擲一個錯誤,該錯誤將作為 error
通知傳送。這是後來被 catch()
運算子捕獲並替換為 Observable.empty()
,這只是一個 complete
通知。因此,忽略此響應。
此示例的輸出如下:
Response from url-4
Response from url-1
Response from url-2
觀看現場演示: https : //jsbin.com/kuqumud/4/edit?js,console
2.立即收集所有非同步響應
按照前面的例子,我們可以用 toArray()
運算子收集所有響應。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url))
.toArray()
.subscribe(val => console.log(val));
但是,使用 toArray()
運算元會產生重要影響。訂閱者是否收到結果不僅通過完成所有 HTTP 請求來控制,還通過完成源 Observable(在我們的例子中為 Observable.from
)來控制。這意味著我們不能使用永遠不會完成的源 Observable(例如,Observable.fromEvent
)。
另一種實現相同結果的方法是使用 Observable.forkJoin()
,它將我們想要訂閱的 Observable 陣列作為引數,並等待所有它們發出至少一個值並完成。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
var observables = urls.map(url => mockHTTPRequest(url));
Observable.forkJoin(observables)
.subscribe(val => console.log(val));
這會將所有響應列印為單個陣列:
["Response from url-1", "Response from url-2", "Response from url-3", "Response from url-4"]
觀看現場演示: https : //jsbin.com/fomoye/2/edit?js,console
Observable.forkJoin()
還將一個結果選擇器函式作為可選引數,它允許我們在進一步發出之前修改最終結果:
Observable.forkJoin(observables, (...results) => {
return results.length;
})
.subscribe(val => console.log(val));
這列印到控制檯:
4
檢視現場演示: https : //jsbin.com/muwiqic/1/edit?js,console
請注意,結果選擇器函式的引數已解壓縮。
處理錯誤
對於錯誤處理,我們可以使用與前面示例中相同的方法與 catch()
運算子。
但是,有一件重要的事情需要注意。forkJoin()
要求每個源 Observable 至少發出一個值。如果我們使用 catch(() => Observable.empty())
就像我們之前所做的那樣,forkJoin()
永遠不會發出任何東西,因為 Observable.empty()
只是一個通知。
這就是為什麼我們需要使用例如 Observable.of(null)
,這是 null
值,然後是 complete
通知。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000)
.map(value => {
if (url === 'url-3') {
throw new Error(`Error response from ${url}`)
}
return value;
});
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
var observables = urls.map(url => mockHTTPRequest(url).catch(() => Observable.of(null)));
Observable.forkJoin(observables)
.subscribe(val => console.log(val));
觀看現場演示: https : //jsbin.com/yidiked/2/edit?js,console
這列印到控制檯:
["Response from url-1", "Response from url-2", null, "Response from url-4"]
請注意,錯誤由 null
替換。如果我們只使用 Observable.empty()
,則 forkJoin()
將永遠不會發出任何東西。