创建背压数据源

在处理背压时,创建背压数据源是相对容易的任务,因为库已经在 Observable 上提供静态方法来处理开发人员的背压。我们可以区分两种工厂方法:基于下游需求返回和生成元素的冷发电机和通常桥接非反应和/或非反压数据源的热推动器,并在上面层叠一些背压处理。他们。

只是

最基本的背压感知源是通过 just 创建的:

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

由于我们明确地没有在 onStart 中请求,因此不会打印任何内容。just 很棒,当有一个常数值时,我们想跳一个序列。

不幸的是,just 经常被误认为是一种动态计算内容以供 Subscribers 使用的方法:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

令某些人感到惊讶的是,这打印 1 次,而不是分别打印 1 和 2。如果重写了调用,很明显为什么它会这样:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

computeValue 被称为主程序的一部分,而不是响应订阅者的订阅。

fromCallable

人们真正需要的是方法 fromCallable

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

这里 computeValue 仅在用户订阅并且为每个订阅者打印预期的 1 和 2 时执行。当然,fromCallable 也适当地支持背压,并且除非请求,否则不会发出计算值。但请注意,无论如何计算确实会发生。如果计算本身应该延迟到下游实际请求,我们可以使用 justmap

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just 在被映射到 computeValue 的结果时才会发出其常量值,仍然会单独为每个用户调用。

如果数据已经作为一个对象数组,一个对象列表或任何 Iterable 源提供,相应的 from 重载将处理这些源的背压和发射:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

为了方便(并避免关于通用数组创建的警告),just 有 2 到 10 个参数重载,内部委托给 from

from(Iterable) 也提供了一个有趣的机会。许多价值生成可以以状态机的形式表达。每个请求的元素触发状态转换和返回值的计算。

编写像 Iterables 这样的状态机有点复杂(但是比编写 Observable 更容易使用它)并且与 C#不同,Java 没有任何编译器支持通过简单编写经典的代码来构建这样的状态机(使用 yield returnyield break)。一些库提供了一些帮助,例如 Google Guava 的 AbstractIterable 和 IxJava 的 Ix.generate()Ix.forloop()。这些本身就值得一个完整的系列,所以让我们看一些非常基本的 Iterable 源,它无限地重复一些常量值:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

如果我们通过经典 for 循环消耗 iterator,那将导致无限循环。由于我们从中构建了一个 Observable,我们可以表达我们的意愿,只消耗它的前 5 个,然后停止请求任何东西。这是在 Observables 内懒惰评估和计算的真正力量。

创建(SyncOnSubscribe)

有时,要转换为被动世界本身的数据源是同步(阻塞)和拉式,也就是说,我们必须调用一些 getread 方法来获取下一段数据。当然,人们可以把它变成一个东西,但是当这些资源与资源相关时,如果下游在它结束之前取消订阅序列,我们可能会泄漏这些资源。

为了处理这种情况,RxJava 有 SyncOnSubscribe 类。可以扩展它并实现其方法或使用其基于 lambda 的工厂方法之一来构建实例。

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

通常,SyncOnSubscribe 使用 3 个回调。

第一个回调允许创建一个每用户状态,例如示例中的 FileInputStream; 该文件将独立打开给每个用户。

第二个回调接受此状态对象并提供输出 Observer,其 onXXX 方法可以被调用以发出值。此回调执行的次数与下游请求的次数相同。在每次调用时,它必须最多调用 onNext 一次,然后选择 onErroronCompleted。在示例中,如果读取字节为负,指示文件结束,则调用 onCompleted(),如果读取了 IOException,则调用 onError

当下游取消​​订阅(关闭输入流)或前一个回调调用终端方法时,将调用最终的回调; 它允许释放资源。由于并非所有源都需要所有这些功能,因此 SyncOnSubscribe 的静态方法可以创建没有它们的实例。

不幸的是,JVM 和其他库中的许多方法调用抛出了已检查的异常,需要将其包装到 try-catches 中,因为此类使用的功能接口不允许抛出已检查的异常。

当然,我们可以模仿其他典型的来源,例如无限范围:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

在这个设置中,current0 开始,下次调用 lambda 时,参数 current 现在保持 1

有一个名为 AsyncOnSubscribeSyncOnSubscribe 的变体看起来非常相似,除了中间回调也采用表示来自下游的请求数量的长值,并且回调应该生成具有完全相同长度的 Observable。然后该源将所有这些 Observable 连接成单个序列。

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

关于这个类的有用性正在进行(热烈)讨论,并且通常不推荐,因为它通常会打破关于它将如何实际发出这些生成的值以及它将如何响应的期望,甚至是它将在何时收到的请求值。更复杂的消费者情景。

创建(发射器)

有时,被包装到 Observable 中的源已经很热(例如鼠标移动)或冷却但在其 API 中不能反压(例如异步网络回调)。

为了处理这种情况,最新版本的 RxJava 引入了 create(emitter) 工厂方法。它需要两个参数:

  • 将使用 Emitter<T> 接口的实例为每个传入订阅者调用的回调,
  • 一个 Emitter.BackpressureMode 枚举,要求开发人员指定要应用的背压行为。它有通常的模式,类似于 onBackpressureXXX,除了发信号通知 MissingBackpressureException 或完全忽略它内部的这种溢出。

请注意,它目前不支持那些背压模式的附加参数。如果需要那些定制,使用 NONE 作为背压模式并在生成的 Observable 上应用相关的 onBackpressureXXX 是可行的方法。

当人们想要与基于推送的源(例如 GUI 事件)交互时使用的第一种典型情况。这些 API 具有某种形式的 addListener / removeListener 调用,可以使用:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

Emitter 使用相对简单; 可以在其上调用 onNextonErroronCompleted,运算符可以自行处理背压和取消订阅管理。此外,如果包装的 API 支持取消(例如示例中的侦听器删除),则可以使用 setCancellation(或 setSubscription 用于类似 Subscription 的资源)来注册在下游取消订阅或 onError /时调用的取消回调。在提供的 Emitter 实例上调用 onCompleted

这些方法一次只允许一个资源与发射器关联,而设置一个新资源会自动取消订阅旧资源。如果必须处理多个资源,请创建一个 CompositeSubscription,将其与发射器关联,然后将更多资源添加到 CompositeSubscription 本身:

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

第二种情况通常涉及一些异步的,基于回调的 API,必须转换为 Observable

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

在这种情况下,委托以相同的方式工作。不幸的是,通常情况下,这些经典的回调式 API 不支持取消,但如果他们这样做,可以像在 previoius 示例中那样设置取消(尽管可能采用更复杂的方式)。注意使用 LATEST 背压模式; 如果我们知道只有一个值,我们不需要 BUFFER 策略,因为它分配一个永远不会被充分利用的默认 128 元素长缓冲区(必要时增长)。