限速
远程服务的常见问题是速率限制。远程服务允许我们每个时间段仅发送有限数量的请求或数据量。
在 RxJS 5 中, bufferTime
运算符提供了一个非常类似的功能,特别是如果我们保留第二个参数未指定(它定义了我们想要创建新缓冲区的频率。如果我们将它保留为 undefined / null,它将创建一个新的缓冲区权限发射当前的一个)。
bufferTime
的典型用法如下所示:
bufferTime(1000, null, 5)
这将缓冲项目,直到满足两个条件之一。然后它将发出缓冲区并启动另一个缓冲区:
- 经营者一直在为
1000ms
收集物品 - 运算符已经收集了
5
项目
出于演示目的,我们可以创建一个非常快速发出的源 Observable,因此 bufferTime
将达到大小限制(5
)并且每次发射的次数通常超过一次 7:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
然后我们将用 bufferTime
和 concatMap
链接它。concatMap
运算符是我们迫使 1000ms
延迟的地方:
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.of(buffer).delay(1000))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
观看现场演示: https : //jsbin.com/kotibow/3/edit?js,console
我们还添加了 timestamp()
来查看排放时间,以确保延迟真的至少是 1000ms
。
请注意,我们根本不必使用 Observable.of(buffer)
。我们在这里使用它只是为了手动检查缓冲项的数量是否正确。
从控制台输出我们可以看到两次排放之间的延迟大致相当于 15:
Timestamp { value: [ 1, 2, 3, 4, 5 ], timestamp: 1475 }
Timestamp { value: [ 6, 7, 8, 9, 10 ], timestamp: 2564 }
Timestamp { value: [ 11, 12, 13, 14, 15 ], timestamp: 3567 }
Timestamp { value: [ 16, 17, 18, 19, 20 ], timestamp: 4572 }
Timestamp { value: [ 21, 22, 23, 24, 25 ], timestamp: 5573 }
Timestamp { value: [], timestamp: 6578 }
现在我们还可以测试源缓慢发出的情况,以便 bufferTime
运算符将达到最大间隔条件:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(300));
观看现场演示: https : //jsbin.com/tuwowan/2/edit?js,console
然后输出应该在关于 2s
之后开始,因为 1s
需要 1s
才能发出,然后我们添加了 1s
延迟;
Timestamp { value: [ 1, 2, 3 ], timestamp: 2017 }
Timestamp { value: [ 4, 5, 6 ], timestamp: 3079 }
Timestamp { value: [ 7, 8, 9, 10 ], timestamp: 4088 }
Timestamp { value: [ 11, 12, 13 ], timestamp: 5093 }
Timestamp { value: [ 14, 15, 16 ], timestamp: 6094 }
Timestamp { value: [ 17, 18, 19, 20 ], timestamp: 7098 }
Timestamp { value: [ 21, 22, 23 ], timestamp: 8103 }
Timestamp { value: [ 24, 25 ], timestamp: 9104 }
如果我们想在现实世界的应用程序中使用这种方法,我们将远程调用放入 concatMap
运算符。通过这种方式,我们可以控制是否要在远程服务的请求或响应之间强制切换。
例如,我们可以通过在 concatMap
回调中使用 forkJoin
来强制请求之间的最小 1s
延迟:
function mockHTTPRequest(buffer) {
return Observable.of(true).delay(Math.random() * 1500)
}
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.forkJoin(
mockHTTPRequest(buffer),
Observable.of(buffer).delay(1000)
))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
观看现场演示: https : //jsbin.com/xijaver/edit?js,console
感谢 forkJoin
,concatMap
需要等待两个 Observable 完成。
另一方面,如果我们想在响应之间强制 1s
延迟,我们只需在 mockHTTPRequest()
之后附加 delay()
运算符:
.concatMap(buffer => mockHTTPRequest(buffer).delay(1000))
观看现场演示: https : //jsbin.com/munopot/2/edit?js,console