RxJs Book
  • Introduction
  • Summary
    • Why RxJS?
    • RxJS Design Guidelines
      • Introduction
      • When To Use RxJS
      • The RxJS Contract
      • Using RxJS
      • Operator Implementations
    • Getting Started With RxJS
      • What Are The Reactive Extensions?
      • Exploring Major Concepts In RxJS
      • Creating And Querying Observable Sequences
        • Creating And Subscribing To Simple Observable Sequences
        • Bridging To Events
        • Bridging To Callbacks
        • Bridging To Promises
        • Generators And Observable Sequences
        • Querying Observable Sequences
        • Error Handling With Observable Sequences
        • Transducers With Observable Sequences
        • Backpressure With Observable Sequences
        • Operators By Category
      • Subjects
      • Scheduling And Concurrency
      • Testing And Debugging
      • Implementing Your Own Operators
    • How Do I...?
      • How Do I Wrap An Existing API?
      • How Do I Integrate jQuery With RxJS?
      • How Do I Integrate Angular.js With RxJS?
      • How Do I Create A Simple Event Emitter?
    • Mapping RxJS From Different Libraries
      • For Bacon.js Users
      • For Async.js Users
    • Config
      • Promise
      • useNativeEvents
    • Helpers
      • defaultComparer
      • defaultSubComparer
      • defaultError
      • identity
      • just
      • isPromise
      • noop
      • pluck
    • Observable
      • Observable Methods
        • amb
        • case
        • catch
        • combineLatest
        • concat
        • create
        • defer
        • empty
        • for | forIn
        • forkJoin
        • from
        • fromCallback
        • fromEvent
        • fromEventPattern
        • fromNodeCallback
        • fromPromise
        • generate
        • generateWithAbsoluteTime
        • generateWithRelativeTime
        • if
        • interval
        • isObservable
        • just | return
        • merge
        • mergeDelayError
        • never
        • of
        • ofWithScheduler
        • onErrorResumeNext
        • pairs
        • range
        • repeat
        • spawn
        • start
        • startAsync
        • throw
        • timer
        • toAsync
        • using
        • when
        • while | whileDo
        • wrap
        • zip
      • Observable Instance Methods
        • amb
        • and
        • asObservable
        • average
        • buffer
        • bufferWithCount
        • bufferWithTime
        • bufferWithTimeOrCount
        • catch
        • combineLatest
        • concat
        • concatAll
        • concatMapObserver | selectConcatObserver
        • controlled
        • count
        • debounce
        • defaultIfEmpty
        • delay
        • delaySubscription
        • dematerialize
        • distinct
        • distinctUntilChanged
        • do | doAction | tap
        • doOnCompleted | tapOnCompleted
        • doOnError | tapOnError
        • doOnNext | tapOnNext
        • doWhile
        • elementAt
        • every
        • expand
        • extend | manySelect
        • filter | where
        • finally
        • find
        • findIndex
        • first
        • flatMap | selectMany
        • flatMapConcat | concatMap
        • flatMapFirst | selectSwitchFirst
        • flatMapLatest
        • flatMapObserver | selectManyObserver
        • flatMapWithMaxConcurrent
        • forkJoin
        • groupBy
        • groupByUntil
        • groupJoin
        • ignoreElements
        • includes
        • indexOf
        • isEmpty
        • join
        • jortSort
        • jortSortUntil
        • last
        • lastIndexOf
        • let | letBind
        • map | select
        • materialize
        • max
        • maxBy
        • merge
        • mergeAll
        • min
        • minBy
        • multicast
        • observeOn
        • onErrorResumeNext
        • pairwise
        • partition
        • pausable
        • pausableBuffered
        • pipe
        • pluck
        • publish
        • publishLast
        • publishValue
        • reduce
        • repeat
        • replay
        • retry
        • retryWhen
        • scan
        • sequenceEqual
        • share
        • shareReplay
        • shareValue
        • single
        • singleInstance
        • skip
        • skipLast
        • skipLastWithTime
        • skipUntil
        • skipUntilWithTime
        • skipWhile
        • skipWithTime
        • slice
        • some
        • startWith
        • subscribe | forEach
        • subscribeOn
        • subscribeOnCompleted
        • subscribeOnError
        • subscribeOnNext
        • sum
        • switch
        • switchFirst
        • take
        • takeLast
        • takeLastBuffer
        • takeLastBufferWithTime
        • takeLastWithTime
        • takeUntil
        • takeUntilWithTime
        • takeWhile
        • takeWithTime
        • thenDo
        • throttle
        • throttleLatest | sample
        • timeInterval
        • timeout
        • timestamp
        • toArray
        • toMap
        • toPromise
        • toSet
        • transduce
        • window
        • windowWithCount
        • windowWithTime
        • windowWithTimeOrCount
        • withLatestFrom
        • zip
        • zipIterable
    • Observer
      • Observer Methods
        • create
        • fromNotifier
      • Observer Instance Methods
        • asObserver
        • checked
        • notifyOn
        • onCompleted
        • onError
        • onNext
        • toNotifier
    • Notification
      • Notification Methods
        • createOnNext
        • createOnError
        • createOnCompleted
      • Notification Instance Methods
        • accept
        • toObservable
      • Notification Properties
        • exception
        • hasValue
        • kind
        • value
    • Subjects
      • Rx.AsyncSubject
      • Rx.BehaviorSubject
      • Rx.ReplaySubject
      • Rx.Subject
    • Schedulers
      • Rx.HistoricalScheduler
      • Rx.Scheduler
      • Rx.VirtualTimeScheduler
    • Disposables
      • Rx.CompositeDisposable
      • Rx.Disposable
      • Rx.RefCountDisposable
      • Rx.SerialDisposable
      • Rx.SingleAssignmentDisposable
    • Testing
      • Rx.ReactiveTest
      • Rx.Recorded
      • Rx.Subscription
      • Rx.TestScheduler
    • Bindings
      • DOM
        • Ajax
          • ajax
          • ajaxCold
          • get
          • get_Json
          • post
        • JSONP
          • jsonpRequest
          • jsonpRequestCold
        • Web Sockets
          • fromWebSocket
        • Web Workers
          • fromWebWorker
        • Mutation Observers
          • fromMutationObserver
        • Geolocation
          • getCurrentPosition
          • watchPosition
        • Schedulers
          • requestAnimationFrame
          • mutationObserver
      • jQuery
      • AngularJS
        • Factories
          • rx
          • observeOnScope
        • Observable Methods
          • safeApply
        • $rootScope Methods
          • $toObservable
          • $eventToObservable
          • $createObservableFunction
      • Facebook React
      • Ractive.js
      • Node.js
        • Callback Handlers
          • fromCallback
          • fromNodeCallback
        • Event Handlers
          • fromEvent
          • toEventEmitter
        • Stream Handlers
          • fromStream
          • fromReadableStream
          • fromWritableStream
          • fromTransformStream
          • writeToStream
  • Recipes
  • Which Operator do I use?
    • Creation Operators
    • Instance Operators
Powered by GitBook
On this page
  • Scheduler Types ##
  • Using Schedulers ##
  • When to Use Which Scheduler ##
  • See Also

Was this helpful?

  1. Summary
  2. Getting Started With RxJS

Scheduling And Concurrency

PreviousSubjectsNextTesting And Debugging

Last updated 5 years ago

Was this helpful?

A scheduler controls when a subscription starts and when notifications are published. It consists of three components. It is first a data structure. When you schedule for tasks to be completed, they are put into the scheduler for queueing based on priority or other criteria. It also offers an execution context which denotes where the task is executed (e.g., in the immediately, current thread, or in another callback mechanism such as setTimeout or process.nextTick). Lastly, it has a clock which provides a notion of time for itself (by accessing the now method of a scheduler). Tasks being scheduled on a particular scheduler will adhere to the time denoted by that clock only.

Schedulers also introduce the notion of virtual time (denoted by the type), which does not correlate with real time that is used in our daily life. For example, a sequence that is specified to take 100 years to complete can be scheduled to complete in virtual time in a mere 5 minutes. This will be covered in the topic.

Scheduler Types ##

The various Scheduler types provided by Rx all implement the Scheduler methods. Each of these can be created and returned by using static properties of the Scheduler object. The ImmediateScheduler (by accessing the static property) will start the specified action immediately. The CurrentThreadScheduler (by accessing the static property) will schedule actions to be performed on the thread that makes the original call. The action is not executed immediately, but is placed in a queue and only executed after the current action is complete. The DefaultScheduler (by accessing the static property) will schedule actions to be performed on a asynchronous callback, which is optimized for the particular runtime, such as setImmediate or process.nextTick on Node.js or in the browser with a fallback to setTimeout.

Using Schedulers ##

You may have already used schedulers in your Rx code without explicitly stating the type of schedulers to be used. This is because all Observable operators that deal with concurrency have optional schedulers. If you do not provide the scheduler, RxJS will pick a default scheduler by using the principle of least concurrency. This means that the scheduler which introduces the least amount of concurrency that satisfies the needs of the operator is chosen. For example, for operators returning an observable with a finite and small number of messages, RxJS calls immediate. For operators returning a potentially large or infinite number of messages, currentThread is called. For operators which use timers, default is used.

Because RxJS uses the least concurrency scheduler, you can pick a different scheduler if you want to introduce concurrency for performance purpose. To specify a particular scheduler, you can use those operator methods that take a scheduler, e.g., return(42, Rx.Scheduler.default).

In the following example, the source observable sequence is producing values at a frantic pace. The default scheduler of the generate operator would place onNext messages on the currentThread.

var obs = Rx.Observable.generate(
    0,
    () => true,
    x => x + 1,
    x => x);

This will queue up on the observer quickly. We can improve this code by using the observeOn operator, which allows you to specify the context that you want to use to send pushed notifications (onNext) to observers. By default, the operator ensures that onNext will be called as many times as possible on the current thread. You can use its overloads and redirect the onNext outputs to a different context. In addition, you can use the operator to return a proxy observable that delegates actions to a specific scheduler. For example, for a UI-intensive application, you can delegate all background operations to be performed on a scheduler running in the background by using subscribeOn and passing to it the DefaultScheduler.

The following example will schedule any onNext notifications on the current Dispatcher, so that any value pushed out is sent on the UI thread. This is especially beneficial to Silverlight developers who use RxJS.

Rx.Observable.generate(
    0,
    () => true,
    x => x + 1,
    x => x
    )
    .observeOn(Rx.Scheduler.default)
    .subscribe(...);

Instead of using the observeOn operator to change the execution context on which the observable sequence produces messages, we can create concurrency in the right place to begin with. As operators parameterize introduction of concurrency by providing a scheduler argument overload, passing the right scheduler will lead to fewer places where the ObserveOn operator has to be used. For example, we can unblock the observer and subscribe to the UI thread directly by changing the scheduler used by the source, as in the following example. In this code, by using the generate method passing a scheduler, and providing the Rx.Scheduler.default instance, all values pushed out from this observable sequence will originate via an asynchronous callback.

Rx.Observable.generate(
    0,
    () => true,
    x => x + 1,
    x => x,
    Rx.Scheduler.default)
    .subscribe(...);

You should also note that by using the observeOn operator, an action is scheduled for each message that comes through the original observable sequence. This potentially changes timing information as well as puts additional stress on the system. If you have a query that composes various observable sequences running on many different execution contexts, and you are doing filtering in the query, it is best to place observeOn later in the query. This is because a query will potentially filter out a lot of messages, and placing the observeOn operator earlier in the query would do extra work on messages that would be filtered out anyway. Calling the observeOn operator at the end of the query will create the least performance impact.

Another advantage of specifying a scheduler type explicitly is that you can introduce concurrency for performance purpose, as illustrated by the following code.

seq.groupBy(...)
  .map(x => x.observeOn(Rx.Scheduler.default))
  .map(x => expensive(x))  // perform operations that are expensive on resources

When to Use Which Scheduler ##

To make things a little easier when you are creating your own operators, or using the standard built-in ones, which scheduler you should use. The following table lays out each scenario with the suggested scheduler.

Scenario

Scheduler

Constant Time Operations

Tail Recursive Operations

Iteration Operations

Time-based Operations

Asynchronous Conversions

Historical Data Operations

Unit Testing

See Also

Reference

VirtualTimeScheduler
Testing and Debugging Observable Sequences
immediate
currentThread
default
observeOn
subscribeOn
Testing and Debugging Observable Sequences
Rx.Scheduler.immediate
Rx.Scheduler.immediate
Rx.Scheduler.currentThread
Rx.Scheduler.default
Rx.Scheduler.default
Rx.HistoricalScheduler
Rx.TestScheduler