Backpressure With Observable Sequences
Last updated
Was this helpful?
Last updated
Was this helpful?
When it comes to streaming data, streams can be overly chatty in which the consumer cannot keep up with the producer. To that end, we need mechanisms to control the source so that the consumer does not get overwhelmed. These mechanisms can come in either the form of lossy or loss-less operations, each of which depends on the requirements. For example, if you miss a few mouse movements, it may not be a problem, however, if you miss a few bank transactions, that could definitely be a problem. This section covers which techniques you can use to handle backpressure in either lossy or loss-less ways.
For example, imagine using the zip
operator to zip together two infinite Observables, one of which emits items twice as frequently as the other. A naive implementation of the zip operator would have to maintain an ever-expanding buffer of items emitted by the faster Observable to eventually combine with items emitted by the slower one. This could cause RxJS to seize an unwieldy amount of system resources.
A cold Observable emits a particular sequence of items, but can begin emitting this sequence when its Observer finds it to be convenient, and at whatever rate the Observer desires, without disrupting the integrity of the sequence. For example if you convert an iterable such as array, Map, Set, or generator into an Observable, that Observable will emit the same sequence of items no matter when it is later subscribed to or how frequently those items are observed. Examples of items emitted by a cold Observable might include the results of a database query, file retrieval, or web request.
A hot Observable begins generating items to emit immediately when it is created. Subscribers typically begin observing the sequence of items emitted by a hot Observable from somewhere in the middle of the sequence, beginning with the first item emitted by the Observable subsequent to the establishment of the subscription. Such an Observable emits items at its own pace, and it is up to its observers to keep up. Examples of items emitted by a hot Observable might include mouse & keyboard events, system events, or stock prices.
When a cold Observable is multi-cast (when it is converted into a ConnectableObservable
and its connect
method is called), it effectively becomes hot and for the purposes of backpressure and flow-control it should be treated as a hot Observable.
Cold Observables are ideal subjects for the reactive pull model of backpressure described below. Hot observables are typically not designed to cope well with a reactive pull model, and are better candidates for some of the other flow control strategies discussed on this page, such as the use of the pausableBuffered
or pausable
operators, throttling, buffers, or windows.
There are a number of ways that an observable sequence can be controlled so that the consumer does not get overwhelmed through lossy operations, meaning that packets will be dropped in between pause and resume actions.
The first technique for lossy backpressure is called which only emits an item from the source Observable after a particular timespan has passed without the Observable emitting any other items. This is useful in scenarios such as if the user is typing too fast and you do not want to yield every keystroke, and instead wait half a second after the person stopped typing before yielding the value.
In addition to supporting lossy backpressure mechanisms, RxJS also supports ways of getting the data in such a way that it is able to be fully consumed by the consumer at its own pace. There are a number of strategies at work including using buffers that work with timespans, count or both, pausable buffers, reactive pull, etc.
The first strategy of dealing with an overly chatty producer is through the use of buffers. This allows the consumer to set either the number of items they wish to wait for at a time, or a particular timespan, or both, whichever comes first. This is useful in a number of cases, for example if you want some data within a window for comparison purposes in addition to chunking up data as you need it.
This is of course only the beginning of the work with backpressure as there are many other strategies that can be considered. In future versions of RxJS, the idea of the controlled observable will be baked into the subscription itself which then allows the backpressure to be an essential part of the contract or requesting n number of items.
Another technique to deal with an observable sequence which is producing too much for the consumer is through throttling with the use of the method which emits the first items emitted by an Observable within periodic time intervals. Throttling can be especially useful for rate limiting execution of handlers on events like resize and scroll.
You can also at certain intervals extract values from the observable sequence using the method. This is useful if you want values from say a stock ticker every five seconds or so without having to consume the entire observable sequence.
The ability to pause and resume is also a powerful concept which is offered in RxJS in both lossy and loss-less versions. In the case of lossy backpressure, the operator can be used to stop listening and then resume listening at a later time by calling pause
and resume
respectively on the observable sequence. For example we can take some observable sequence and call pausable
, then call pause
to pause the sequence and resume
within 5 seconds. Note that any data that comes in between the pause and resume are lost. Note that this only works for hot observables and is unsuitable for cold observables as they will restart upon resume.
The method allows us to specify the number of items that you wish to capture in a buffer array before yielding it to the consumer. An impractical yet fun use of this is to calculate whether the user has input the Konami Code for example.
On the other hand, you can also get the data within a buffer for a given amount of time with the method. This is useful for example if you are tracking volume of data that is coming across the network, which can then be handled uniformly.
In order to keep buffers from filling too quickly, there is a method to cap the buffer by specifying ceilings for count and timespan, whichever occurs first. For example, the network could be particularly quick with the data for the specified time, and other times not, so to keep the data levels even, you can specify this threshold via the method
The method is great at dealing with hot observables where you would want to pause and resume while dropping data, however, you may want to preserve that data between the pause
and resume
calls. To that end, we have introduced the method which keeps a running buffer between pause
is called and is drained when resume
is called. This then leaves the discretion up to the developer to decide when to pause and resume and in the mean time, no data is lost.
In more advanced scenarios, you may want to control the absolute number of items that you receive at a given time, and the rest is buffered via the controlled
method. For example, you can pull 10 items, followed by 20 items, and is up to the discretion of the developer. This is more in-line with the efforts from the effort to effectively turn the push stream into a push/pull stream.