缓存 HTTP 响应

RxJS 的典型用例是创建 HTTP 请求并在一段时间内缓存其结果。此外,我们总是希望一次只运行一个请求并共享其响应。

例如,以下代码缓存 1 项最大值。 1000ms

var updateRequest = Observable.defer(() => makeMockHttpRequest())
    .publishReplay(1, 1000)
    .refCount()
    .take(1);

var counter = 1;    
function makeMockHttpRequest() {
    return Observable.of(counter++)
        .delay(100);
}

function requestCachedHttpResult() {
    return updateRequest;
}

函数 makeMockHttpRequest() 模拟以 100ms 延迟到达的 HTTP 请求。

函数 requestCachedHttpResult() 是我们订阅获取实际或缓存响应的地方。

使用 .publishReplay(1, 1000),我们使用 RxJS 组播在内部使用 ReplaySubject 并保留 1 项目以获得最大的 1000ms。然后 refCount() 用于始终只保留 source 的一个订阅,即 Observable.defer()。此 Observable 用于创建新请求并递增 counter 以证明缓存的值和新订阅共享相同的数据。

当我们想要获取当前数据时,我们称之为 requestCachedHttpResult()。为了确保在发布数据后 Observer 将正确完成,我们使用了 take(1) 运算符。

requestCachedHttpResult()
    .subscribe(val => console.log("Response 0:", val));

这会使用 mockDataFetch() 创建一个请求并打印到控制台:

1

一个更复杂的示例将在我们想要测试模拟的 HTTP 连接和响应被共享的不同时间调用多个请求。

requestCachedHttpResult()
    .subscribe(val => console.log("Response 0:", val));

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 50:", val))
, 50);

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 200:", val))
, 200);

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 1200:", val))
, 1200);

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 1500:", val))
, 1500);

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 3500:", val))
, 3500);

观看现场演示: https//jsbin.com/todude/5/edit?js,console

每个请求都会延迟发送,并按以下顺序发送:

0 - 首次请求使 refCount() 订阅其 source,这使得 mockDataFetch() 呼叫。它的回应将被推迟 22。此时 publishReplay() 内的 publishReplay() 运算符有一个 Observer。

50 - 第二个请求也订阅了 ConnectableObservable。此时 ConnectableObservable 里面的 publishReplay() 运算符有两个 Observer。它不会使用 makeMockHttpRequest() 创建另一个请求,因为 refCount() 已经订阅了。

100 - 第一个响应准备就绪。它首先由 ReplaySubject 缓存,然后重新发送给订阅 ConnectableObservable两位观察者。由于 take(1) 和取消订阅,两位观察员都完成了。

200 - 订阅 ReplaySubject,它立即发出缓存值,使 take(1) 完成 Observer 并立即取消订阅。没有发出 HTTP 请求,也没有订阅。

1200 - 与 0 的第一个匹配相同。此时缓存的值已被丢弃,因为它比 1000ms 旧。

1500 - 与 200 的第四次活动​​相同。

3500 - 与 1200 的第一个匹配相同。

控制台中的输出如下:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 2
Response 1500: 2
Response 3500: 3

在 RxJS 5 中,cache() 运算符涵盖了类似的功能。然而,由于其功能有限,它在 5.0.0-rc.1被删除。

处理错误

如果我们想要处理远程服务(makeMockHttpRequest 函数)产生的错误,我们需要在它们合并到主 Observable 链之前捕获它们,因为 publishReplay() 内部的 ReplaySubject 收到的任何错误都会将其内部状态标记为 stopped(阅读更多这里主题和它的内部状态 )这绝对不是我们想要的。

在下面的例子中,我们在 counter === 2 模拟错误并用 catch() 操作符捕获它。我们使用 catch() 只将 error 通知转换为常规 next,以便我们可以在观察者中处理错误:

function makeMockHttpRequest() {
    return Observable.of(counter++)
        .delay(100)
        .map(i => {
            if (i === 2) {
                throw new Error('Invalid URL');
            }
            return i;
        })
        .catch(err => Observable.of(err));
}

观看现场演示: https//jsbin.com/kavihu/10/edit?js,console

这将打印以控制以下输出。请注意 next 处理程序中收到的错误:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: [object Error] { ... }
Response 1500: [object Error] { ... }
Response 3500: 3

如果我们想要在每个观察者中处理常规 error 通知中的错误,我们必须在 publishReplay() 运算符之后重新抛出它们,原因如上所述。

var updateRequest = Observable.defer(() => makeMockHttpRequest())
    .publishReplay(1, 1000)
    .refCount()
    .take(1)
    .map(val => {
        if (val instanceof Error) {
            throw val;
        }
        return val;
    });

请参阅现场演示: https//jsbin.com/fabosam/5/edit? js, console (请注意,我们必须为每个观察者添加错误回调)。

Response 0: 1
Response 50: 1
Response 200: 1
error callback: Error: Invalid URL
error callback: Error: Invalid URL
Response 3500: 3