限速
遠端服務的常見問題是速率限制。遠端服務允許我們每個時間段僅傳送有限數量的請求或資料量。
在 RxJS 5 中, bufferTime 運算子提供了一個非常類似的功能,特別是如果我們保留第二個引數未指定(它定義了我們想要建立新緩衝區的頻率。如果我們將它保留為 undefined / null,它將建立一個新的緩衝區許可權發射當前的一個)。
bufferTime 的典型用法如下所示:
bufferTime(1000, null, 5)
這將緩衝專案,直到滿足兩個條件之一。然後它將發出緩衝區並啟動另一個緩衝區:
- 經營者一直在為
1000ms收集物品 - 運算子已經收集了
5專案
出於演示目的,我們可以建立一個非常快速發出的源 Observable,因此 bufferTime 將達到大小限制(5)並且每次發射的次數通常超過一次 7:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
然後我們將用 bufferTime 和 concatMap 連結它。concatMap 運算子是我們迫使 1000ms 延遲的地方:
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.of(buffer).delay(1000))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
觀看現場演示: https : //jsbin.com/kotibow/3/edit?js,console
我們還新增了 timestamp() 來檢視排放時間,以確保延遲真的至少是 1000ms。
請注意,我們根本不必使用 Observable.of(buffer)。我們在這裡使用它只是為了手動檢查緩衝項的數量是否正確。
從控制檯輸出我們可以看到兩次排放之間的延遲大致相當於 15:
Timestamp { value: [ 1, 2, 3, 4, 5 ], timestamp: 1475 }
Timestamp { value: [ 6, 7, 8, 9, 10 ], timestamp: 2564 }
Timestamp { value: [ 11, 12, 13, 14, 15 ], timestamp: 3567 }
Timestamp { value: [ 16, 17, 18, 19, 20 ], timestamp: 4572 }
Timestamp { value: [ 21, 22, 23, 24, 25 ], timestamp: 5573 }
Timestamp { value: [], timestamp: 6578 }
現在我們還可以測試源緩慢發出的情況,以便 bufferTime 運算子將達到最大間隔條件:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(300));
觀看現場演示: https : //jsbin.com/tuwowan/2/edit?js,console
然後輸出應該在關於 2s 之後開始,因為 1s 需要 1s 才能發出,然後我們新增了 1s 延遲;
Timestamp { value: [ 1, 2, 3 ], timestamp: 2017 }
Timestamp { value: [ 4, 5, 6 ], timestamp: 3079 }
Timestamp { value: [ 7, 8, 9, 10 ], timestamp: 4088 }
Timestamp { value: [ 11, 12, 13 ], timestamp: 5093 }
Timestamp { value: [ 14, 15, 16 ], timestamp: 6094 }
Timestamp { value: [ 17, 18, 19, 20 ], timestamp: 7098 }
Timestamp { value: [ 21, 22, 23 ], timestamp: 8103 }
Timestamp { value: [ 24, 25 ], timestamp: 9104 }
如果我們想在現實世界的應用程式中使用這種方法,我們將遠端呼叫放入 concatMap 運算子。通過這種方式,我們可以控制是否要在遠端服務的請求或響應之間強制切換。
例如,我們可以通過在 concatMap 回撥中使用 forkJoin 來強制請求之間的最小 1s 延遲:
function mockHTTPRequest(buffer) {
return Observable.of(true).delay(Math.random() * 1500)
}
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.forkJoin(
mockHTTPRequest(buffer),
Observable.of(buffer).delay(1000)
))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
觀看現場演示: https : //jsbin.com/xijaver/edit?js,console
感謝 forkJoin,concatMap 需要等待兩個 Observable 完成。
另一方面,如果我們想在響應之間強制 1s 延遲,我們只需在 mockHTTPRequest() 之後附加 delay() 運算子:
.concatMap(buffer => mockHTTPRequest(buffer).delay(1000))
觀看現場演示: https : //jsbin.com/munopot/2/edit?js,console