这篇文章是关于背压的简单介绍,并讲述了RxJava (v3)、Project Reactor和Kotlin的协程中是如何处理它的。
Back pressure (or backpressure) is a resistance or force opposing the desired flow of fluid through pipes, leading to friction loss and pressure drop.
The term back pressure is a misnomer, as pressure is a scalar quantity, so it has a magnitude but no direction.
后向压力(背压)是一种阻力或力,它与通过管道的期望流体流量相反,从而导致摩擦损失和压降。
这个术语是一个误称,因为压力是一个标量,所以它只有大小没有方向。
——Wikipedia
在软件中,背压的含义略有关联,但仍有不同:假设有一个快的数据生产者和一个慢的数据消费者,背压就是一种 “倒逼 “生产者而使自己不被数据淹没的机制。
无论是基于reactstreams.org还是Java的java.util.concurrent.Flow
,Reactive Streams都提供了四个构件:
- 一个产生元素的
Publisher
- 一个接收到元素时做出反应的
Subscriber
- 一个绑定
Publisher
和Subscriber
的Subscription
- 以及一个
Processor
下面是一个类图:
Subscription
通过其中的request()
方法成为背压的根源。
规范非常直白:
A
Subscriber
MUST signal demand viaSubscription.request(long n)
to receiveonNext
signals.The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient “stop-and-wait” protocol.
Subscriber
必须通过Subscription.request(long n)
发出信号请求,以接收onNext
信号。这条规则的目的是确立订阅者有责任决定何时以及它能够与愿意接收多少元素。为了避免重入订阅方法引起的信号重排序,强烈建议同步订阅者在任何信号处理的最后阶段调用Subscription方法。建议订阅者按照能够处理的上限请求数据,因为一次只请求一个元素会导致低效的 “停止-等待 “协议。
— Reactive Streams specifications for the JVM(JVM的Reactive Streams规范)
Reactive Streams(反应流)的规范非常可靠。它们还附带了基于java的TCK。
但如何管理由生产者发出的、不能被下游处理的项目,这不属于规范的范畴。虽然问题相当简单,但也可以有不同的解决方案。每个Reactive框架都提供了一些选项,让我们依次来看看。
RxJava 3中的背压
RxJava v3中提供了一些基本类:
类 | 说明 |
---|---|
Flowable | 包含0到N个项的流,支持Reactive-Streams和背压 |
Observable | 包含0到N个项的流,不支持背压 |
Single | 只包含一个项、或者一个错误的流 |
Maybe | 不包含项、只包含一个项,或者包含一个错误的流 |
Completable | 不包含项,但是要么包含一个结束符、或者一个错误信号的流 |
在这些类中,Flowable
是唯一实现Reactive Streams和背压的类。 但是,提供背压并不是唯一的问题。 正如RxJava的Wiki所述:
Backpressure doesn’t make the problem of an overproducing Observable or an underconsuming Subscriber go away. It just moves the problem up the chain of operators to a point where it can be handled better.
背压并不能解决Observable过度生产或Subscriber消费不足的问题。它只是将问题在操作者的链条中向上移动,使其能够得到更好的处理。
为了解决这个问题,RxJava提供了两种主要策略来处理“过度生产”的项:
将项存储在缓冲器中
请注意,如果你没有为缓冲区设置上限,则可能会导致
OutOfMemoryError
。丢弃项
下图总结了实现这些策略的不同方法:
请注意,onBackPressureLatest
操作类似于使用onBackpressureBuffer(1)
:
与其它框架相比,RxJava提供了在发送所有项之后发送溢出异常信号的方法。这些方法可以让消费者在接收数据项的同时,仍然得到生产者丢弃数据项的通知。
Project Reactor中的背压
Project Reactor提供的策略与RxJava的类似。
不过API有一些细微的差别。例如,Project Reactor提供了一个方便的方法,可以在生产者溢出时抛出一个异常:
var stream = Stream.generate(Math::random);
// RxJava
Flowable.fromStream(stream)
.onBackpressureBuffer(0);
// Project Reactor
Flux.fromStream(stream) // 1
.onBackpressureError(); // 2
- 创建Reactive Stream
- 如果生产者溢出则抛出错误
下面是Flux
类图,重点介绍背压能力:
与其他框架相比,Project Reactor提供了方法为缓冲区的数据项设置TTL,以防止缓冲区溢出。
协程中的背压
协程确实也提供了相同的缓冲和丢弃功能。 协程的基类是Flow
。
你可以这样使用该类:
flow { // 1
while (true) emit(Math.random()) // 2
}.buffer(10) // 3
- 创建一个
Flow
,其内容由后面的块定义 - 定义
Flow
的内容 - 设置缓冲区容量为10
总结
总而言之,RxJava,Project Reactor和Kotlin协程都提供背压功能。 所有用户都通过提供两种策略来应对生产速度比其订阅者消费速度更快的生产者:要么缓存数据项,要么删除数据项。