onBackpressureXXX 运算符
当应用程序使用 MissingBackpressureException
失败时,大多数开发人员会遇到背压,而异常通常指向 observeOn
运算符。实际原因通常是通过 create()
创建的 PublishSubject
,timer()
或 interval()
或自定义运算符的非反压使用。
有几种方法可以处理这种情况。
增加缓冲区大小
有时这种溢出是由于突发源而发生的。突然,用户太快地点击了屏幕,并且 Android 上的 observeOn
默认的 16 元素内部缓冲区溢出。
最近版本的 RxJava 中的大多数背压敏感运算符现在允许程序员指定其内部缓冲区的大小。相关参数通常称为 bufferSize
,prefetch
或 capacityHint
。鉴于介绍中的溢出示例,我们可以增加 observeOn
的缓冲区大小,以便为所有值提供足够的空间。
PublishSubject<Integer> source = PublishSubject.create();
source.observeOn(Schedulers.computation(), 1024 * 1024)
.subscribe(e -> { }, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
但请注意,通常情况下,这可能只是一个临时修复,因为如果源过量生成预测的缓冲区大小,仍可能发生溢出。在这种情况下,可以使用以下运算符之一。
使用标准运算符批处理/跳过值
如果可以批量处理源数据,则可以通过使用标准批处理操作符之一(按大小和/或按时间)来降低 MissingBackpressureException
的可能性。
PublishSubject<Integer> source = PublishSubject.create();
source
.buffer(1024)
.observeOn(Schedulers.computation(), 1024)
.subscribe(list -> {
list.parallelStream().map(e -> e * e).first();
}, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
如果可以安全地忽略某些值,可以使用采样(使用时间或其他 Observable)和限制运算符(throttleFirst
,throttleLast
,throttleWithTimeout
)。
PublishSubject<Integer> source = PublishSubject.create();
source
.sample(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation(), 1024)
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
请注意,这些运算符只会降低下游的价值接收率,因此它们仍然可能会导致这种情况发生。
onBackpressureBuffer()
该无操作器形式的运算符在上游源和下游运算符之间重新引入无界缓冲区。无限制意味着只要 JVM 没有内存不足,它就可以处理来自突发源的几乎任何数量。
Observable.range(1, 1_000_000)
.onBackpressureBuffer()
.observeOn(Schedulers.computation(), 8)
.subscribe(e -> { }, Throwable::printStackTrace);
在这个例子中,observeOn
具有非常低的缓冲区大小但是没有 MissingBackpressureException
,因为 onBackpressureBuffer
吸收所有 100 万个值并将小批量交给 observeOn
。
但请注意,onBackpressureBuffer
以无限制的方式消耗其来源,即不对其施加任何背压。这导致即使是背压支持源如 range
也将完全实现。
onBackpressureBuffer
还有 4 个额外的重载
onBackpressureBuffer(int capacity)
这是一个有界的版本,在其缓冲区达到给定容量的情况下发出信号。
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
随着越来越多的运算符现在允许设置其缓冲区大小,此运算符的相关性正在下降。对于其余部分,这提供了一个扩展其内部缓冲区的机会,通过使用 onBackpressureBuffer
比默认值更大的数字。
onBackpressureBuffer(int capacity,Action0 onOverflow)
如果发生溢出,此重载会调用(共享)操作。它的用处相当有限,因为没有提供有关溢出的其他信息而不是当前的调用堆栈。
onBackpressureBuffer(int capacity,Action0 onOverflow,BackpressureOverflow.Strategy strategy)
这种重载实际上更有用,因为它可以定义在达到容量时要做什么。BackpressureOverflow.Strategy
实际上是一个接口,但是类 BackpressureOverflow
提供了 4 个静态字段,其实现代表了典型的操作:
ON_OVERFLOW_ERROR
:这是前两个重载的默认行为,表示一个BufferOverflowException
ON_OVERFLOW_DEFAULT
:目前和ON_OVERFLOW_ERROR
相同ON_OVERFLOW_DROP_LATEST
:如果发生溢出,将简单地忽略当前值,并且只有下游请求才会传递旧值。ON_OVERFLOW_DROP_OLDEST
:删除缓冲区中最旧的元素并将当前值添加到缓冲区中。
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> { },
BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
请注意,最后两个策略会导致流中断,因为它们会丢弃元素。此外,他们不会发出信号 34。
onBackpressureDrop()
每当下游未准备好接收值时,此运算符将从序列中删除该 elemenet。人们可以将其视为 0 容量 onBackpressureBuffer
与策略 ON_OVERFLOW_DROP_LATEST
。
当人们可以安全地忽略来自源的值(例如鼠标移动或当前 GPS 位置信号)时,此运算符非常有用,因为稍后会有更多的最新值。
component.mouseMoves()
.onBackpressureDrop()
.observeOn(Schedulers.computation(), 1)
.subscribe(event -> compute(event.x, event.y));
它可能与源操作符 interval()
结合使用。例如,如果想要执行某些定期后台任务但每次迭代的持续时间可能比期间长,则可以安全地删除多余的间隔通知,因为稍后会有更多:
Observable.interval(1, TimeUnit.MINUTES)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.doOnNext(e -> networkCall.doStuff())
.subscribe(v -> { }, Throwable::printStackTrace);
此运算符存在一个重载:onBackpressureDrop(Action1<? super T> onDrop)
,其中调用(共享)操作并删除该值。该变体允许自身清理值(例如,释放相关资源)。
onBackpressureLatest()
最终运算符仅保留最新值,并实际覆盖较旧的未传递值。人们可以将此视为 onBackpressureBuffer
的变体,其容量为 1,策略为 ON_OVERFLOW_DROP_OLDEST
。
与 onBackpressureDrop
不同,如果下游恰好落后,总有一个可供消费的价值。这在类似遥测的情况下非常有用,在这种情况下,数据可能会以某种突发模式出现,但只有最新的数据才会对处理感兴趣。
例如,如果用户在屏幕上点击了很多,我们仍然希望对其最新输入作出反应。
component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);
在这种情况下使用 onBackpressureDrop
会导致最后一次点击被丢弃并让用户想知道为什么没有执行业务逻辑。