发送多个并行 HTTP 请求
Web 应用程序中一个非常常见的用例是执行多个异步(例如 HTTP)请求,并在它们到达时收集它们的结果或者同时收集它们的所有结果(例如,在 Angular2 中使用 HTTP 服务 )。
1.当他们到达时逐个收集异步响应
这通常使用 mergeMap()
运算符来完成,该运算符采用必须返回 Observable 的投影函数。即使先前的 Observable 尚未完成,运算符 mergeMap()
也会立即在内部订阅每个 Observable。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url))
.subscribe(val => console.log(val));
由于随机延迟,这会以不同的顺序打印对控制台的响应:
Response from url-3
Response from url-4
Response from url-2
Response from url-1
查看现场演示: https : //jsbin.com/xaqudan/2/edit?js,console
每个响应(通过 next
调用发出的项目)立即由 mergeMap()
重新发送。
为了我们发送多个 HTTP 请求的目的,有必要提一下 mergeMap()
总共可以带三个参数:
- 需要返回 Observable 的投影函数。
- 结果选择器功能允许我们在进一步发出结果之前修改结果。
- 并发订阅的 Observable 的数量。
控制并行请求的数量
使用第三个参数,我们可以控制我们将处理多少个并行请求(假设执行 HTTP 请求的每个 Observable 都是冷)。
在以下示例中,我们将同时仅运行 2 个请求。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(1000);
}
let urls = ['url-1', 'url-2', 'url-3', 'url-4'];
let start = (new Date()).getTime();
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url), undefined, 2)
.timestamp()
.map(stamp => [stamp.timestamp - start, stamp.value])
.subscribe(val => console.log(val));
观看现场演示: https : //jsbin.com/sojejal/4/edit?js,console
请注意,前两个请求在 1 秒后完成,而其他两个请求在 2 秒后完成。
[1004, "Response from url-1"]
[1010, "Response from url-2"]
[2007, "Response from url-3"]
[2012, "Response from url-4"]
处理错误
如果任何源 Observable 失败(发送 error
通知),mergeMap()
会再次发送错误作为 error
。如果我们希望每个 Observable 优雅地失败,我们需要使用例如 catch()
操作符。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000)
.map(value => {
if (url === 'url-3') {
throw new Error(`Error response from ${url}`)
}
return value;
});
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url).catch(() => Observable.empty()))
.subscribe(val => console.log(val));
url-3
的响应会抛出一个错误,该错误将作为 error
通知发送。这是后来被 catch()
运算符捕获并替换为 Observable.empty()
,这只是一个 complete
通知。因此,忽略此响应。
此示例的输出如下:
Response from url-4
Response from url-1
Response from url-2
观看现场演示: https : //jsbin.com/kuqumud/4/edit?js,console
2.立即收集所有异步响应
按照前面的例子,我们可以用 toArray()
运算符收集所有响应。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url))
.toArray()
.subscribe(val => console.log(val));
但是,使用 toArray()
算子会产生重要影响。订阅者是否收到结果不仅通过完成所有 HTTP 请求来控制,还通过完成源 Observable(在我们的例子中为 Observable.from
)来控制。这意味着我们不能使用永远不会完成的源 Observable(例如,Observable.fromEvent
)。
另一种实现相同结果的方法是使用 Observable.forkJoin()
,它将我们想要订阅的 Observable 数组作为参数,并等待所有它们发出至少一个值并完成。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
var observables = urls.map(url => mockHTTPRequest(url));
Observable.forkJoin(observables)
.subscribe(val => console.log(val));
这会将所有响应打印为单个数组:
["Response from url-1", "Response from url-2", "Response from url-3", "Response from url-4"]
观看现场演示: https : //jsbin.com/fomoye/2/edit?js,console
Observable.forkJoin()
还将一个结果选择器函数作为可选参数,它允许我们在进一步发出之前修改最终结果:
Observable.forkJoin(observables, (...results) => {
return results.length;
})
.subscribe(val => console.log(val));
这打印到控制台:
4
查看现场演示: https : //jsbin.com/muwiqic/1/edit?js,console
请注意,结果选择器函数的参数已解压缩。
处理错误
对于错误处理,我们可以使用与前面示例中相同的方法与 catch()
运算符。
但是,有一件重要的事情需要注意。forkJoin()
要求每个源 Observable 至少发出一个值。如果我们使用 catch(() => Observable.empty())
就像我们之前所做的那样,forkJoin()
永远不会发出任何东西,因为 Observable.empty()
只是一个通知。
这就是为什么我们需要使用例如 Observable.of(null)
,这是 null
值,然后是 complete
通知。
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000)
.map(value => {
if (url === 'url-3') {
throw new Error(`Error response from ${url}`)
}
return value;
});
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
var observables = urls.map(url => mockHTTPRequest(url).catch(() => Observable.of(null)));
Observable.forkJoin(observables)
.subscribe(val => console.log(val));
观看现场演示: https : //jsbin.com/yidiked/2/edit?js,console
这打印到控制台:
["Response from url-1", "Response from url-2", null, "Response from url-4"]
请注意,错误由 null
替换。如果我们只使用 Observable.empty()
,则 forkJoin()
将永远不会发出任何东西。