RxJs Book
  • Introduction
  • Summary
    • Why RxJS?
    • RxJS Design Guidelines
      • Introduction
      • When To Use RxJS
      • The RxJS Contract
      • Using RxJS
      • Operator Implementations
    • Getting Started With RxJS
      • What Are The Reactive Extensions?
      • Exploring Major Concepts In RxJS
      • Creating And Querying Observable Sequences
        • Creating And Subscribing To Simple Observable Sequences
        • Bridging To Events
        • Bridging To Callbacks
        • Bridging To Promises
        • Generators And Observable Sequences
        • Querying Observable Sequences
        • Error Handling With Observable Sequences
        • Transducers With Observable Sequences
        • Backpressure With Observable Sequences
        • Operators By Category
      • Subjects
      • Scheduling And Concurrency
      • Testing And Debugging
      • Implementing Your Own Operators
    • How Do I...?
      • How Do I Wrap An Existing API?
      • How Do I Integrate jQuery With RxJS?
      • How Do I Integrate Angular.js With RxJS?
      • How Do I Create A Simple Event Emitter?
    • Mapping RxJS From Different Libraries
      • For Bacon.js Users
      • For Async.js Users
    • Config
      • Promise
      • useNativeEvents
    • Helpers
      • defaultComparer
      • defaultSubComparer
      • defaultError
      • identity
      • just
      • isPromise
      • noop
      • pluck
    • Observable
      • Observable Methods
        • amb
        • case
        • catch
        • combineLatest
        • concat
        • create
        • defer
        • empty
        • for | forIn
        • forkJoin
        • from
        • fromCallback
        • fromEvent
        • fromEventPattern
        • fromNodeCallback
        • fromPromise
        • generate
        • generateWithAbsoluteTime
        • generateWithRelativeTime
        • if
        • interval
        • isObservable
        • just | return
        • merge
        • mergeDelayError
        • never
        • of
        • ofWithScheduler
        • onErrorResumeNext
        • pairs
        • range
        • repeat
        • spawn
        • start
        • startAsync
        • throw
        • timer
        • toAsync
        • using
        • when
        • while | whileDo
        • wrap
        • zip
      • Observable Instance Methods
        • amb
        • and
        • asObservable
        • average
        • buffer
        • bufferWithCount
        • bufferWithTime
        • bufferWithTimeOrCount
        • catch
        • combineLatest
        • concat
        • concatAll
        • concatMapObserver | selectConcatObserver
        • controlled
        • count
        • debounce
        • defaultIfEmpty
        • delay
        • delaySubscription
        • dematerialize
        • distinct
        • distinctUntilChanged
        • do | doAction | tap
        • doOnCompleted | tapOnCompleted
        • doOnError | tapOnError
        • doOnNext | tapOnNext
        • doWhile
        • elementAt
        • every
        • expand
        • extend | manySelect
        • filter | where
        • finally
        • find
        • findIndex
        • first
        • flatMap | selectMany
        • flatMapConcat | concatMap
        • flatMapFirst | selectSwitchFirst
        • flatMapLatest
        • flatMapObserver | selectManyObserver
        • flatMapWithMaxConcurrent
        • forkJoin
        • groupBy
        • groupByUntil
        • groupJoin
        • ignoreElements
        • includes
        • indexOf
        • isEmpty
        • join
        • jortSort
        • jortSortUntil
        • last
        • lastIndexOf
        • let | letBind
        • map | select
        • materialize
        • max
        • maxBy
        • merge
        • mergeAll
        • min
        • minBy
        • multicast
        • observeOn
        • onErrorResumeNext
        • pairwise
        • partition
        • pausable
        • pausableBuffered
        • pipe
        • pluck
        • publish
        • publishLast
        • publishValue
        • reduce
        • repeat
        • replay
        • retry
        • retryWhen
        • scan
        • sequenceEqual
        • share
        • shareReplay
        • shareValue
        • single
        • singleInstance
        • skip
        • skipLast
        • skipLastWithTime
        • skipUntil
        • skipUntilWithTime
        • skipWhile
        • skipWithTime
        • slice
        • some
        • startWith
        • subscribe | forEach
        • subscribeOn
        • subscribeOnCompleted
        • subscribeOnError
        • subscribeOnNext
        • sum
        • switch
        • switchFirst
        • take
        • takeLast
        • takeLastBuffer
        • takeLastBufferWithTime
        • takeLastWithTime
        • takeUntil
        • takeUntilWithTime
        • takeWhile
        • takeWithTime
        • thenDo
        • throttle
        • throttleLatest | sample
        • timeInterval
        • timeout
        • timestamp
        • toArray
        • toMap
        • toPromise
        • toSet
        • transduce
        • window
        • windowWithCount
        • windowWithTime
        • windowWithTimeOrCount
        • withLatestFrom
        • zip
        • zipIterable
    • Observer
      • Observer Methods
        • create
        • fromNotifier
      • Observer Instance Methods
        • asObserver
        • checked
        • notifyOn
        • onCompleted
        • onError
        • onNext
        • toNotifier
    • Notification
      • Notification Methods
        • createOnNext
        • createOnError
        • createOnCompleted
      • Notification Instance Methods
        • accept
        • toObservable
      • Notification Properties
        • exception
        • hasValue
        • kind
        • value
    • Subjects
      • Rx.AsyncSubject
      • Rx.BehaviorSubject
      • Rx.ReplaySubject
      • Rx.Subject
    • Schedulers
      • Rx.HistoricalScheduler
      • Rx.Scheduler
      • Rx.VirtualTimeScheduler
    • Disposables
      • Rx.CompositeDisposable
      • Rx.Disposable
      • Rx.RefCountDisposable
      • Rx.SerialDisposable
      • Rx.SingleAssignmentDisposable
    • Testing
      • Rx.ReactiveTest
      • Rx.Recorded
      • Rx.Subscription
      • Rx.TestScheduler
    • Bindings
      • DOM
        • Ajax
          • ajax
          • ajaxCold
          • get
          • get_Json
          • post
        • JSONP
          • jsonpRequest
          • jsonpRequestCold
        • Web Sockets
          • fromWebSocket
        • Web Workers
          • fromWebWorker
        • Mutation Observers
          • fromMutationObserver
        • Geolocation
          • getCurrentPosition
          • watchPosition
        • Schedulers
          • requestAnimationFrame
          • mutationObserver
      • jQuery
      • AngularJS
        • Factories
          • rx
          • observeOnScope
        • Observable Methods
          • safeApply
        • $rootScope Methods
          • $toObservable
          • $eventToObservable
          • $createObservableFunction
      • Facebook React
      • Ractive.js
      • Node.js
        • Callback Handlers
          • fromCallback
          • fromNodeCallback
        • Event Handlers
          • fromEvent
          • toEventEmitter
        • Stream Handlers
          • fromStream
          • fromReadableStream
          • fromWritableStream
          • fromTransformStream
          • writeToStream
  • Recipes
  • Which Operator do I use?
    • Creation Operators
    • Instance Operators
Powered by GitBook
On this page
  • Hot and Cold Observables and Multicast ##
  • Lossy Backpressure ##
  • Debounce ###
  • Throttling ###
  • Sampling Observables ###
  • Pausable Observables ###
  • Loss-less Backpressure ##
  • Buffers and Windows ###
  • Pausable Buffers ###
  • Controlled Observables ###
  • Future Work ###

Was this helpful?

  1. Summary
  2. Getting Started With RxJS
  3. Creating And Querying Observable Sequences

Backpressure With Observable Sequences

PreviousTransducers With Observable SequencesNextOperators By Category

Last updated 5 years ago

Was this helpful?

When it comes to streaming data, streams can be overly chatty in which the consumer cannot keep up with the producer. To that end, we need mechanisms to control the source so that the consumer does not get overwhelmed. These mechanisms can come in either the form of lossy or loss-less operations, each of which depends on the requirements. For example, if you miss a few mouse movements, it may not be a problem, however, if you miss a few bank transactions, that could definitely be a problem. This section covers which techniques you can use to handle backpressure in either lossy or loss-less ways.

For example, imagine using the zip operator to zip together two infinite Observables, one of which emits items twice as frequently as the other. A naive implementation of the zip operator would have to maintain an ever-expanding buffer of items emitted by the faster Observable to eventually combine with items emitted by the slower one. This could cause RxJS to seize an unwieldy amount of system resources.

Hot and Cold Observables and Multicast ##

A cold Observable emits a particular sequence of items, but can begin emitting this sequence when its Observer finds it to be convenient, and at whatever rate the Observer desires, without disrupting the integrity of the sequence. For example if you convert an iterable such as array, Map, Set, or generator into an Observable, that Observable will emit the same sequence of items no matter when it is later subscribed to or how frequently those items are observed. Examples of items emitted by a cold Observable might include the results of a database query, file retrieval, or web request.

A hot Observable begins generating items to emit immediately when it is created. Subscribers typically begin observing the sequence of items emitted by a hot Observable from somewhere in the middle of the sequence, beginning with the first item emitted by the Observable subsequent to the establishment of the subscription. Such an Observable emits items at its own pace, and it is up to its observers to keep up. Examples of items emitted by a hot Observable might include mouse & keyboard events, system events, or stock prices.

When a cold Observable is multi-cast (when it is converted into a ConnectableObservable and its connect method is called), it effectively becomes hot and for the purposes of backpressure and flow-control it should be treated as a hot Observable.

Cold Observables are ideal subjects for the reactive pull model of backpressure described below. Hot observables are typically not designed to cope well with a reactive pull model, and are better candidates for some of the other flow control strategies discussed on this page, such as the use of the pausableBuffered or pausable operators, throttling, buffers, or windows.

Lossy Backpressure ##

There are a number of ways that an observable sequence can be controlled so that the consumer does not get overwhelmed through lossy operations, meaning that packets will be dropped in between pause and resume actions.

Debounce ###

The first technique for lossy backpressure is called which only emits an item from the source Observable after a particular timespan has passed without the Observable emitting any other items. This is useful in scenarios such as if the user is typing too fast and you do not want to yield every keystroke, and instead wait half a second after the person stopped typing before yielding the value.

var debounced = Rx.Observable.fromEvent(input, 'keyup')
  .map(e => e.target.value)
  .debounce(500 /* ms */);

debounced.subscribeOnNext(value => console.log('Input value: %s', value));

Throttling ###

var throttled = Rx.Observable.fromEvent(window, 'resize')
  .throttleFirst(250 /* ms */);

throttled.subscribeOnNext(e => {
  console.log('Window inner height: %d', window.innerHeight);
  console.log('Window inner width: %d', window.innerWidth);
});

Sampling Observables ###

var sampled = getStockData()
  .sample(5000 /* ms */);

sampled.subscribeOnNext(data => console.log('Stock data: %o', data));

Pausable Observables ###

var pausable = getSomeObservableSource()
  .pausable();

pausable.subscribeOnNext(data => console.log('Data: %o', data));

pausable.pause();

// Resume in five seconds
setTimeout(() => pausable.resume(), 5000);

Loss-less Backpressure ##

In addition to supporting lossy backpressure mechanisms, RxJS also supports ways of getting the data in such a way that it is able to be fully consumed by the consumer at its own pace. There are a number of strategies at work including using buffers that work with timespans, count or both, pausable buffers, reactive pull, etc.

Buffers and Windows ###

The first strategy of dealing with an overly chatty producer is through the use of buffers. This allows the consumer to set either the number of items they wish to wait for at a time, or a particular timespan, or both, whichever comes first. This is useful in a number of cases, for example if you want some data within a window for comparison purposes in addition to chunking up data as you need it.

var codes = [
  38, // up
  38, // up
  40, // down
  40, // down
  37, // left
  39, // right
  37, // left
  39, // right
  66, // b
  65  // a
];

function isKonamiCode(buffer) {
  return codes.toString() === buffer.toString();
}

var keys = Rx.Observable.fromEvent(document, 'keyup')
  .map(e => e.keyCode)
  .bufferWithCount(10, 1)
  .filter(isKonamiCode)
  .subscribeOnNext(() => console.log('KONAMI!'));
var source = getStockData()
  .bufferWithTime(5000, 1000) // time in milliseconds
  .subscribeOnNext(data => data.forEach(d => console.log('Stock: %o', d)));
var source = getStockData()
  .bufferWithTimeOrCount(5000 /* ms */, 100 /* items */)
  .subscribeOnNext(data => data.forEach(d => console.log('Stock: %o', d)));

Pausable Buffers ###

var source = getStockData()
  .pausableBuffered();

source.subscribeOnNext(stock => console.log('Stock data: %o', stock));

source.pause();

// Resume after five seconds
setTimeout(() => {
  // Drains the buffer and subscribeOnNext is called with the data
  source.resume();
}, 5000);

Controlled Observables ###

var source = getStockData()
  .controlled();

source.subscribeOnNext(stock => console.log('Stock data: %o', stock));

source.request(2);

// Keep getting more after 5 seconds
setInterval(=> source.request(2), 5000);

Future Work ###

This is of course only the beginning of the work with backpressure as there are many other strategies that can be considered. In future versions of RxJS, the idea of the controlled observable will be baked into the subscription itself which then allows the backpressure to be an essential part of the contract or requesting n number of items.

Another technique to deal with an observable sequence which is producing too much for the consumer is through throttling with the use of the method which emits the first items emitted by an Observable within periodic time intervals. Throttling can be especially useful for rate limiting execution of handlers on events like resize and scroll.

You can also at certain intervals extract values from the observable sequence using the method. This is useful if you want values from say a stock ticker every five seconds or so without having to consume the entire observable sequence.

The ability to pause and resume is also a powerful concept which is offered in RxJS in both lossy and loss-less versions. In the case of lossy backpressure, the operator can be used to stop listening and then resume listening at a later time by calling pause and resume respectively on the observable sequence. For example we can take some observable sequence and call pausable, then call pause to pause the sequence and resume within 5 seconds. Note that any data that comes in between the pause and resume are lost. Note that this only works for hot observables and is unsuitable for cold observables as they will restart upon resume.

The method allows us to specify the number of items that you wish to capture in a buffer array before yielding it to the consumer. An impractical yet fun use of this is to calculate whether the user has input the Konami Code for example.

On the other hand, you can also get the data within a buffer for a given amount of time with the method. This is useful for example if you are tracking volume of data that is coming across the network, which can then be handled uniformly.

In order to keep buffers from filling too quickly, there is a method to cap the buffer by specifying ceilings for count and timespan, whichever occurs first. For example, the network could be particularly quick with the data for the specified time, and other times not, so to keep the data levels even, you can specify this threshold via the method

The method is great at dealing with hot observables where you would want to pause and resume while dropping data, however, you may want to preserve that data between the pause and resume calls. To that end, we have introduced the method which keeps a running buffer between pause is called and is drained when resume is called. This then leaves the discretion up to the developer to decide when to pause and resume and in the mean time, no data is lost.

In more advanced scenarios, you may want to control the absolute number of items that you receive at a given time, and the rest is buffered via the controlled method. For example, you can pull 10 items, followed by 20 items, and is up to the discretion of the developer. This is more in-line with the efforts from the effort to effectively turn the push stream into a push/pull stream.

debounce
throttleFirst
sample
pausable
bufferWithCount
bufferWithTime
bufferWithTimeOrCount
pausable
pausableBuffered
Reactive Streams