缓存 HTTP 响应
RxJS 的典型用例是创建 HTTP 请求并在一段时间内缓存其结果。此外,我们总是希望一次只运行一个请求并共享其响应。
例如,以下代码缓存 1 项最大值。 1000ms :
var updateRequest = Observable.defer(() => makeMockHttpRequest())
.publishReplay(1, 1000)
.refCount()
.take(1);
var counter = 1;
function makeMockHttpRequest() {
return Observable.of(counter++)
.delay(100);
}
function requestCachedHttpResult() {
return updateRequest;
}
函数 makeMockHttpRequest()
模拟以 100ms
延迟到达的 HTTP 请求。
函数 requestCachedHttpResult()
是我们订阅获取实际或缓存响应的地方。
使用 .publishReplay(1, 1000)
,我们使用 RxJS 组播在内部使用 ReplaySubject
并保留 1
项目以获得最大的 1000ms
。然后 refCount()
用于始终只保留 source
的一个订阅,即 Observable.defer()
。此 Observable 用于创建新请求并递增 counter
以证明缓存的值和新订阅共享相同的数据。
当我们想要获取当前数据时,我们称之为 requestCachedHttpResult()
。为了确保在发布数据后 Observer 将正确完成,我们使用了 take(1)
运算符。
requestCachedHttpResult()
.subscribe(val => console.log("Response 0:", val));
这会使用 mockDataFetch()
创建一个请求并打印到控制台:
1
一个更复杂的示例将在我们想要测试模拟的 HTTP 连接和响应被共享的不同时间调用多个请求。
requestCachedHttpResult()
.subscribe(val => console.log("Response 0:", val));
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 50:", val))
, 50);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 200:", val))
, 200);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 1200:", val))
, 1200);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 1500:", val))
, 1500);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 3500:", val))
, 3500);
观看现场演示: https : //jsbin.com/todude/5/edit?js,console
每个请求都会延迟发送,并按以下顺序发送:
0
- 首次请求使 refCount()
订阅其 source
,这使得 mockDataFetch()
呼叫。它的回应将被推迟 22。此时 publishReplay()
内的 publishReplay()
运算符有一个 Observer。
50
- 第二个请求也订阅了 ConnectableObservable
。此时 ConnectableObservable
里面的 publishReplay()
运算符有两个 Observer。它不会使用 makeMockHttpRequest()
创建另一个请求,因为 refCount()
已经订阅了。
100
- 第一个响应准备就绪。它首先由 ReplaySubject
缓存,然后重新发送给订阅 ConnectableObservable
的两位观察者。由于 take(1)
和取消订阅,两位观察员都完成了。
200
- 订阅 ReplaySubject
,它立即发出缓存值,使 take(1)
完成 Observer 并立即取消订阅。没有发出 HTTP 请求,也没有订阅。
1200
- 与 0
的第一个匹配相同。此时缓存的值已被丢弃,因为它比 1000ms
旧。
1500
- 与 200
的第四次活动相同。
3500
- 与 1200
的第一个匹配相同。
控制台中的输出如下:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 2
Response 1500: 2
Response 3500: 3
在 RxJS 5 中,cache()
运算符涵盖了类似的功能。然而,由于其功能有限,它在 5.0.0-rc.1
中被删除。
处理错误
如果我们想要处理远程服务(makeMockHttpRequest
函数)产生的错误,我们需要在它们合并到主 Observable 链之前捕获它们,因为 publishReplay()
内部的 ReplaySubject
收到的任何错误都会将其内部状态标记为 stopped
(阅读更多这里主题和它的内部状态 )这绝对不是我们想要的。
在下面的例子中,我们在 counter === 2
模拟错误并用 catch()
操作符捕获它。我们使用 catch()
只将 error
通知转换为常规 next
,以便我们可以在观察者中处理错误:
function makeMockHttpRequest() {
return Observable.of(counter++)
.delay(100)
.map(i => {
if (i === 2) {
throw new Error('Invalid URL');
}
return i;
})
.catch(err => Observable.of(err));
}
观看现场演示: https : //jsbin.com/kavihu/10/edit?js,console
这将打印以控制以下输出。请注意 next
处理程序中收到的错误:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: [object Error] { ... }
Response 1500: [object Error] { ... }
Response 3500: 3
如果我们想要在每个观察者中处理常规 error
通知中的错误,我们必须在 publishReplay()
运算符之后重新抛出它们,原因如上所述。
var updateRequest = Observable.defer(() => makeMockHttpRequest())
.publishReplay(1, 1000)
.refCount()
.take(1)
.map(val => {
if (val instanceof Error) {
throw val;
}
return val;
});
请参阅现场演示: https : //jsbin.com/fabosam/5/edit? js, console (请注意,我们必须为每个观察者添加错误回调)。
Response 0: 1
Response 50: 1
Response 200: 1
error callback: Error: Invalid URL
error callback: Error: Invalid URL
Response 3500: 3