响应式系统中的背压


这篇文章是关于背压的简单介绍,并讲述了RxJava (v3)、Project Reactor和Kotlin的协程中是如何处理它的。

Back pressure (or backpressure) is a resistance or force opposing the desired flow of fluid through pipes, leading to friction loss and pressure drop.

The term back pressure is a misnomer, as pressure is a scalar quantity, so it has a magnitude but no direction.

后向压力(背压)是一种阻力或力,它与通过管道的期望流体流量相反,从而导致摩擦损失和压降。

这个术语是一个误称,因为压力是一个标量,所以它只有大小没有方向。

​ ——Wikipedia

在软件中,背压的含义略有关联,但仍有不同:假设有一个快的数据生产者和一个慢的数据消费者,背压就是一种 “倒逼 “生产者而使自己不被数据淹没的机制。

无论是基于reactstreams.org还是Java的java.util.concurrent.Flow,Reactive Streams都提供了四个构件:

  1. 一个产生元素的Publisher
  2. 一个接收到元素时做出反应的Subscriber
  3. 一个绑定PublisherSubscriberSubscription
  4. 以及一个Processor

下面是一个类图:

Reactive Streams class diagram

Subscription通过其中的request()方法成为背压的根源。

规范非常直白:

A Subscriber MUST signal demand via Subscription.request(long n) to receive onNext signals.

The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient “stop-and-wait” protocol.

Subscriber 必须通过Subscription.request(long n)发出信号请求,以接收onNext信号。

这条规则的目的是确立订阅者有责任决定何时以及它能够与愿意接收多少元素。为了避免重入订阅方法引起的信号重排序,强烈建议同步订阅者在任何信号处理的最后阶段调用Subscription方法。建议订阅者按照能够处理的上限请求数据,因为一次只请求一个元素会导致低效的 “停止-等待 “协议。

​ — Reactive Streams specifications for the JVM(JVM的Reactive Streams规范)

Reactive Streams(反应流)的规范非常可靠。它们还附带了基于java的TCK。

但如何管理由生产者发出的、不能被下游处理的项目,这不属于规范的范畴。虽然问题相当简单,但也可以有不同的解决方案。每个Reactive框架都提供了一些选项,让我们依次来看看。

RxJava 3中的背压

RxJava v3中提供了一些基本类:

说明
Flowable 包含0到N个项的流,支持Reactive-Streams和背压
Observable 包含0到N个项的流,不支持背压
Single 只包含一个项、或者一个错误的流
Maybe 不包含项、只包含一个项,或者包含一个错误的流
Completable 不包含项,但是要么包含一个结束符、或者一个错误信号的流

在这些类中,Flowable是唯一实现Reactive Streams和背压的类。 但是,提供背压并不是唯一的问题。 正如RxJava的Wiki所述:

Backpressure doesn’t make the problem of an overproducing Observable or an underconsuming Subscriber go away. It just moves the problem up the chain of operators to a point where it can be handled better.

背压并不能解决Observable过度生产或Subscriber消费不足的问题。它只是将问题在操作者的链条中向上移动,使其能够得到更好的处理。

Reactive pull backpressure isn’t magic

为了解决这个问题,RxJava提供了两种主要策略来处理“过度生产”的项:

  1. 将项存储在缓冲器中

    img

    请注意,如果你没有为缓冲区设置上限,则可能会导致OutOfMemoryError

  2. 丢弃项

    img

下图总结了实现这些策略的不同方法:

RxJava’s Flowable class diagram excerpt

请注意,onBackPressureLatest操作类似于使用onBackpressureBuffer(1)

img

与其它框架相比,RxJava提供了在发送所有项之后发送溢出异常信号的方法。这些方法可以让消费者在接收数据项的同时,仍然得到生产者丢弃数据项的通知。

Project Reactor中的背压

Project Reactor提供的策略与RxJava的类似。

不过API有一些细微的差别。例如,Project Reactor提供了一个方便的方法,可以在生产者溢出时抛出一个异常:

var stream = Stream.generate(Math::random);

// RxJava
Flowable.fromStream(stream)        
        .onBackpressureBuffer(0);   

// Project Reactor
Flux.fromStream(stream)            // 1
    .onBackpressureError();        // 2
  1. 创建Reactive Stream
  2. 如果生产者溢出则抛出错误

下面是Flux类图,重点介绍背压能力:

Project Reactor’s Flux class diagram excerpt

与其他框架相比,Project Reactor提供了方法为缓冲区的数据项设置TTL,以防止缓冲区溢出。

协程中的背压

协程确实也提供了相同的缓冲和丢弃功能。 协程的基类是Flow

Coroutines' Flow class diagram excerpt

你可以这样使用该类:

flow {                                  // 1
  while (true) emit(Math.random())      // 2
}.buffer(10)                            // 3
  1. 创建一个Flow,其内容由后面的块定义
  2. 定义Flow的内容
  3. 设置缓冲区容量为10

总结

总而言之,RxJava,Project Reactor和Kotlin协程都提供背压功能。 所有用户都通过提供两种策略来应对生产速度比其订阅者消费速度更快的生产者:要么缓存数据项,要么删除数据项。

参考:


文章作者: Guo Yaxiang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Guo Yaxiang !
评论
 上一篇
MapStruct使用指南 MapStruct使用指南
在本文中,我们探讨了MapStruct——一个用于创建映射器类的库。从基本映射到自定义方法和自定义映射器,此外, 我们还介绍了MapStruct提供的一些高级操作选项,包括依赖注入,数据类型映射、枚举映射和表达式使用。
下一篇 
实现Raft协议:Part 3 - 持久性和优化 实现Raft协议:Part 3 - 持久性和优化
本系列文章的写作目的,在于描述Raft协议的一个功能完备且经过严格测试的实现方式,并提供一些Raft工作方式的直观理解。