限速
遠端服務的常見問題是速率限制。遠端服務允許我們每個時間段僅傳送有限數量的請求或資料量。
在 RxJS 5 中, bufferTime
運算子提供了一個非常類似的功能,特別是如果我們保留第二個引數未指定(它定義了我們想要建立新緩衝區的頻率。如果我們將它保留為 undefined / null,它將建立一個新的緩衝區許可權發射當前的一個)。
bufferTime
的典型用法如下所示:
bufferTime(1000, null, 5)
這將緩衝專案,直到滿足兩個條件之一。然後它將發出緩衝區並啟動另一個緩衝區:
- 經營者一直在為
1000ms
收集物品 - 運算子已經收集了
5
專案
出於演示目的,我們可以建立一個非常快速發出的源 Observable,因此 bufferTime
將達到大小限制(5
)並且每次發射的次數通常超過一次 7:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
然後我們將用 bufferTime
和 concatMap
連結它。concatMap
運算子是我們迫使 1000ms
延遲的地方:
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.of(buffer).delay(1000))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
觀看現場演示: https : //jsbin.com/kotibow/3/edit?js,console
我們還新增了 timestamp()
來檢視排放時間,以確保延遲真的至少是 1000ms
。
請注意,我們根本不必使用 Observable.of(buffer)
。我們在這裡使用它只是為了手動檢查緩衝項的數量是否正確。
從控制檯輸出我們可以看到兩次排放之間的延遲大致相當於 15:
Timestamp { value: [ 1, 2, 3, 4, 5 ], timestamp: 1475 }
Timestamp { value: [ 6, 7, 8, 9, 10 ], timestamp: 2564 }
Timestamp { value: [ 11, 12, 13, 14, 15 ], timestamp: 3567 }
Timestamp { value: [ 16, 17, 18, 19, 20 ], timestamp: 4572 }
Timestamp { value: [ 21, 22, 23, 24, 25 ], timestamp: 5573 }
Timestamp { value: [], timestamp: 6578 }
現在我們還可以測試源緩慢發出的情況,以便 bufferTime
運算子將達到最大間隔條件:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(300));
觀看現場演示: https : //jsbin.com/tuwowan/2/edit?js,console
然後輸出應該在關於 2s
之後開始,因為 1s
需要 1s
才能發出,然後我們新增了 1s
延遲;
Timestamp { value: [ 1, 2, 3 ], timestamp: 2017 }
Timestamp { value: [ 4, 5, 6 ], timestamp: 3079 }
Timestamp { value: [ 7, 8, 9, 10 ], timestamp: 4088 }
Timestamp { value: [ 11, 12, 13 ], timestamp: 5093 }
Timestamp { value: [ 14, 15, 16 ], timestamp: 6094 }
Timestamp { value: [ 17, 18, 19, 20 ], timestamp: 7098 }
Timestamp { value: [ 21, 22, 23 ], timestamp: 8103 }
Timestamp { value: [ 24, 25 ], timestamp: 9104 }
如果我們想在現實世界的應用程式中使用這種方法,我們將遠端呼叫放入 concatMap
運算子。通過這種方式,我們可以控制是否要在遠端服務的請求或響應之間強制切換。
例如,我們可以通過在 concatMap
回撥中使用 forkJoin
來強制請求之間的最小 1s
延遲:
function mockHTTPRequest(buffer) {
return Observable.of(true).delay(Math.random() * 1500)
}
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.forkJoin(
mockHTTPRequest(buffer),
Observable.of(buffer).delay(1000)
))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
觀看現場演示: https : //jsbin.com/xijaver/edit?js,console
感謝 forkJoin
,concatMap
需要等待兩個 Observable 完成。
另一方面,如果我們想在響應之間強制 1s
延遲,我們只需在 mockHTTPRequest()
之後附加 delay()
運算子:
.concatMap(buffer => mockHTTPRequest(buffer).delay(1000))
觀看現場演示: https : //jsbin.com/munopot/2/edit?js,console