限速

远程服务的常见问题是速率限制。远程服务允许我们每个时间段仅发送有限数量的请求或数据量。

在 RxJS 5 中, bufferTime 运算符提供了一个非常类似的功能,特别是如果我们保留第二个参数未指定(它定义了我们想要创建新缓冲区的频率。如果我们将它保留为 undefined / null,它将创建一个新的缓冲区权限发射当前的一个)。

bufferTime 的典型用法如下所示:

bufferTime(1000, null, 5)

这将缓冲项目,直到满足两个条件之一。然后它将发出缓冲区并启动另一个缓冲区:

  • 经营者一直在为 1000ms 收集物品
  • 运算符已经收集了 5 项目

出于演示目的,我们可以创建一个非常快速发出的源 Observable,因此 bufferTime 将达到大小限制(5)并且每次发射的次数通常超过一次 7:

const source = Observable.range(1, 25)
  .concatMap(val => Observable.of(val).delay(75));

然后我们将用 bufferTimeconcatMap 链接它。concatMap 运算符是我们迫使 1000ms 延迟的地方:

const startTime = (new Date()).getTime();

const source = Observable.range(1, 25)
  .concatMap(val => Observable.of(val).delay(75));

source.bufferTime(1000, null, 5)
  .concatMap(buffer => Observable.of(buffer).delay(1000))
  .timestamp()
  .map(obj => {
    obj.timestamp = obj.timestamp - startTime;
    return obj;
  })
  .subscribe(obj => console.log(obj));

观看现场演示: https//jsbin.com/kotibow/3/edit?js,console

我们还添加了 timestamp() 来查看排放时间,以确保延迟真的至少是 1000ms

请注意,我们根本不必使用 Observable.of(buffer)。我们在这里使用它只是为了手动检查缓冲项的数量是否正确。

从控制台输出我们可以看到两次排放之间的延迟大致相当于 15:

Timestamp { value: [ 1, 2, 3, 4, 5 ], timestamp: 1475 }
Timestamp { value: [ 6, 7, 8, 9, 10 ], timestamp: 2564 }
Timestamp { value: [ 11, 12, 13, 14, 15 ], timestamp: 3567 }
Timestamp { value: [ 16, 17, 18, 19, 20 ], timestamp: 4572 }
Timestamp { value: [ 21, 22, 23, 24, 25 ], timestamp: 5573 }
Timestamp { value: [], timestamp: 6578 }

现在我们还可以测试源缓慢发出的情况,以便 bufferTime 运算符将达到最大间隔条件:

const source = Observable.range(1, 25)
  .concatMap(val => Observable.of(val).delay(300));

观看现场演示: https//jsbin.com/tuwowan/2/edit?js,console

然后输出应该在关于 2s 之后开始,因为 1s 需要 1s 才能发出,然后我们添加了 1s 延迟;

Timestamp { value: [ 1, 2, 3 ], timestamp: 2017 }
Timestamp { value: [ 4, 5, 6 ], timestamp: 3079 }
Timestamp { value: [ 7, 8, 9, 10 ], timestamp: 4088 }
Timestamp { value: [ 11, 12, 13 ], timestamp: 5093 }
Timestamp { value: [ 14, 15, 16 ], timestamp: 6094 }
Timestamp { value: [ 17, 18, 19, 20 ], timestamp: 7098 }
Timestamp { value: [ 21, 22, 23 ], timestamp: 8103 }
Timestamp { value: [ 24, 25 ], timestamp: 9104 }

如果我们想在现实世界的应用程序中使用这种方法,我们将远程调用放入 concatMap 运算符。通过这种方式,我们可以控制是否要在远程服务的请求或响应之间强制切换。

例如,我们可以通过在 concatMap 回调中使用 forkJoin 来强制请求之间的最小 1s 延迟:

function mockHTTPRequest(buffer) {
  return Observable.of(true).delay(Math.random() * 1500)
}

const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
  .concatMap(val => Observable.of(val).delay(75));

source.bufferTime(1000, null, 5)
  .concatMap(buffer => Observable.forkJoin(
    mockHTTPRequest(buffer),
    Observable.of(buffer).delay(1000)
  ))
  .timestamp()
  .map(obj => {
    obj.timestamp = obj.timestamp - startTime;
    return obj;
  })
  .subscribe(obj => console.log(obj));

观看现场演示: https//jsbin.com/xijaver/edit?js,console

感谢 forkJoinconcatMap 需要等待两个 Observable 完成。

另一方面,如果我们想在响应之间强制 1s 延迟,我们只需在 mockHTTPRequest() 之后附加 delay() 运算符:

.concatMap(buffer => mockHTTPRequest(buffer).delay(1000))

观看现场演示: https//jsbin.com/munopot/2/edit?js,console