傳送多個並行 HTTP 請求

Web 應用程式中一個非常常見的用例是執行多個非同步(例如 HTTP)請求,並在它們到達時收集它們的結果或者同時收集它們的所有結果(例如,在 Angular2 中使用 HTTP 服務 )。

1.當他們到達時逐個收集非同步響應

這通常使用 mergeMap() 運算子來完成,運算子采用必須返回 Observable 的投影函式。即使先前的 Observable 尚未完成,運算子 mergeMap() 也會立即在內部訂閱每個 Observable。

function mockHTTPRequest(url) {
    return Observable.of(`Response from ${url}`)
      .delay(Math.random() * 1000);
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];

Observable.from(urls)
  .mergeMap(url => mockHTTPRequest(url))
  .subscribe(val => console.log(val));

由於隨機延遲,這會以不同的順序列印對控制檯的響應:

Response from url-3
Response from url-4
Response from url-2
Response from url-1

檢視現場演示: https//jsbin.com/xaqudan/2/edit?js,console

每個響應(通過 next 呼叫發出的專案)立即由 mergeMap() 重新傳送。

為了我們傳送多個 HTTP 請求的目的,有必要提一下 mergeMap() 總共可以帶三個引數:

  1. 需要返回 Observable 的投影函式。
  2. 結果選擇器功能允許我們在進一步發出結果之前修改結果。
  3. 併發訂閱的 Observable 的數量。

控制並行請求的數量

使用第三個引數,我們可以控制我們將處理多少個並行請求(假設執行 HTTP 請求的每個 Observable 都是)。

在以下示例中,我們將同時僅執行 2 個請求。

function mockHTTPRequest(url) {
    return Observable.of(`Response from ${url}`)
      .delay(1000);
}

let urls = ['url-1', 'url-2', 'url-3', 'url-4'];
let start = (new Date()).getTime();

Observable.from(urls)
  .mergeMap(url => mockHTTPRequest(url), undefined, 2)
  .timestamp()
  .map(stamp => [stamp.timestamp - start, stamp.value])
  .subscribe(val => console.log(val));

觀看現場演示: https//jsbin.com/sojejal/4/edit?js,console

請注意,前兩個請求在 1 秒後完成,而其他兩個請求在 2 秒後完成。

[1004, "Response from url-1"]
[1010, "Response from url-2"]
[2007, "Response from url-3"]
[2012, "Response from url-4"]

處理錯誤

如果任何源 Observable 失敗(傳送 error 通知),mergeMap() 會再次傳送錯誤作為 error。如果我們希望每個 Observable 優雅地失敗,我們需要使用例如 catch() 操作符。

function mockHTTPRequest(url) {
  return Observable.of(`Response from ${url}`)
    .delay(Math.random() * 1000)
    .map(value => {
      if (url === 'url-3') {
        throw new Error(`Error response from ${url}`)
      }
      return value;
    });
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];

Observable.from(urls)
  .mergeMap(url => mockHTTPRequest(url).catch(() => Observable.empty()))
  .subscribe(val => console.log(val));

url-3 的響應會丟擲一個錯誤,該錯誤將作為 error 通知傳送。這是後來被 catch() 運算子捕獲並替換為 Observable.empty(),這只是一個 complete 通知。因此,忽略此響應。

此示例的輸出如下:

Response from url-4
Response from url-1
Response from url-2

觀看現場演示: https//jsbin.com/kuqumud/4/edit?js,console

2.立即收集所有非同步響應

按照前面的例子,我們可以用 toArray() 運算子收集所有響應。

function mockHTTPRequest(url) {
    return Observable.of(`Response from ${url}`)
      .delay(Math.random() * 1000);
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];

Observable.from(urls)
  .mergeMap(url => mockHTTPRequest(url))
  .toArray()
  .subscribe(val => console.log(val));

但是,使用 toArray() 運算元會產生重要影響。訂閱者是否收到結果不僅通過完成所有 HTTP 請求來控制,還通過完成源 Observable(在我們的例子中為 Observable.from)來控制。這意味著我們不能使用永遠不會完成的源 Observable(例如,Observable.fromEvent)。

另一種實現相同結果的方法是使用 Observable.forkJoin() ,它將我們想要訂閱的 Observable 陣列作為引數,並等待所有它們發出至少一個值並完成

function mockHTTPRequest(url) {
    return Observable.of(`Response from ${url}`)
      .delay(Math.random() * 1000);
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
var observables = urls.map(url => mockHTTPRequest(url));

Observable.forkJoin(observables)
  .subscribe(val => console.log(val));

這會將所有響應列印為單個陣列:

["Response from url-1", "Response from url-2", "Response from url-3", "Response from url-4"]

觀看現場演示: https//jsbin.com/fomoye/2/edit?js,console

Observable.forkJoin() 還將一個結果選擇器函式作為可選引數,它允許我們在進一步發出之前修改最終結果:

Observable.forkJoin(observables, (...results) => {
    return results.length;
  })
  .subscribe(val => console.log(val));

這列印到控制檯:

4

檢視現場演示: https//jsbin.com/muwiqic/1/edit?js,console

請注意,結果選擇器函式的引數已解壓縮。

處理錯誤

對於錯誤處理,我們可以使用與前面示例中相同的方法與 catch() 運算子。

但是,有一件重要的事情需要注意。forkJoin() 要求每個源 Observable 至少發出一個值。如果我們使用 catch(() => Observable.empty()) 就像我們之前所做的那樣,forkJoin() 永遠不會發出任何東西,因為 Observable.empty() 只是一個通知。
這就是為什麼我們需要使用例如 Observable.of(null),這是 null 值,然後是 complete 通知。

function mockHTTPRequest(url) {
  return Observable.of(`Response from ${url}`)
    .delay(Math.random() * 1000)
    .map(value => {
      if (url === 'url-3') {
        throw new Error(`Error response from ${url}`)
      }
      return value;
    });
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];

var observables = urls.map(url => mockHTTPRequest(url).catch(() => Observable.of(null)));

Observable.forkJoin(observables)
  .subscribe(val => console.log(val));

觀看現場演示: https//jsbin.com/yidiked/2/edit?js,console

這列印到控制檯:

["Response from url-1", "Response from url-2", null, "Response from url-4"]

請注意,錯誤由 null 替換。如果我們只使用 Observable.empty(),則 forkJoin() 將永遠不會發出任何東西。