RxJs Book
  • Introduction
  • Summary
    • Why RxJS?
    • RxJS Design Guidelines
      • Introduction
      • When To Use RxJS
      • The RxJS Contract
      • Using RxJS
      • Operator Implementations
    • Getting Started With RxJS
      • What Are The Reactive Extensions?
      • Exploring Major Concepts In RxJS
      • Creating And Querying Observable Sequences
        • Creating And Subscribing To Simple Observable Sequences
        • Bridging To Events
        • Bridging To Callbacks
        • Bridging To Promises
        • Generators And Observable Sequences
        • Querying Observable Sequences
        • Error Handling With Observable Sequences
        • Transducers With Observable Sequences
        • Backpressure With Observable Sequences
        • Operators By Category
      • Subjects
      • Scheduling And Concurrency
      • Testing And Debugging
      • Implementing Your Own Operators
    • How Do I...?
      • How Do I Wrap An Existing API?
      • How Do I Integrate jQuery With RxJS?
      • How Do I Integrate Angular.js With RxJS?
      • How Do I Create A Simple Event Emitter?
    • Mapping RxJS From Different Libraries
      • For Bacon.js Users
      • For Async.js Users
    • Config
      • Promise
      • useNativeEvents
    • Helpers
      • defaultComparer
      • defaultSubComparer
      • defaultError
      • identity
      • just
      • isPromise
      • noop
      • pluck
    • Observable
      • Observable Methods
        • amb
        • case
        • catch
        • combineLatest
        • concat
        • create
        • defer
        • empty
        • for | forIn
        • forkJoin
        • from
        • fromCallback
        • fromEvent
        • fromEventPattern
        • fromNodeCallback
        • fromPromise
        • generate
        • generateWithAbsoluteTime
        • generateWithRelativeTime
        • if
        • interval
        • isObservable
        • just | return
        • merge
        • mergeDelayError
        • never
        • of
        • ofWithScheduler
        • onErrorResumeNext
        • pairs
        • range
        • repeat
        • spawn
        • start
        • startAsync
        • throw
        • timer
        • toAsync
        • using
        • when
        • while | whileDo
        • wrap
        • zip
      • Observable Instance Methods
        • amb
        • and
        • asObservable
        • average
        • buffer
        • bufferWithCount
        • bufferWithTime
        • bufferWithTimeOrCount
        • catch
        • combineLatest
        • concat
        • concatAll
        • concatMapObserver | selectConcatObserver
        • controlled
        • count
        • debounce
        • defaultIfEmpty
        • delay
        • delaySubscription
        • dematerialize
        • distinct
        • distinctUntilChanged
        • do | doAction | tap
        • doOnCompleted | tapOnCompleted
        • doOnError | tapOnError
        • doOnNext | tapOnNext
        • doWhile
        • elementAt
        • every
        • expand
        • extend | manySelect
        • filter | where
        • finally
        • find
        • findIndex
        • first
        • flatMap | selectMany
        • flatMapConcat | concatMap
        • flatMapFirst | selectSwitchFirst
        • flatMapLatest
        • flatMapObserver | selectManyObserver
        • flatMapWithMaxConcurrent
        • forkJoin
        • groupBy
        • groupByUntil
        • groupJoin
        • ignoreElements
        • includes
        • indexOf
        • isEmpty
        • join
        • jortSort
        • jortSortUntil
        • last
        • lastIndexOf
        • let | letBind
        • map | select
        • materialize
        • max
        • maxBy
        • merge
        • mergeAll
        • min
        • minBy
        • multicast
        • observeOn
        • onErrorResumeNext
        • pairwise
        • partition
        • pausable
        • pausableBuffered
        • pipe
        • pluck
        • publish
        • publishLast
        • publishValue
        • reduce
        • repeat
        • replay
        • retry
        • retryWhen
        • scan
        • sequenceEqual
        • share
        • shareReplay
        • shareValue
        • single
        • singleInstance
        • skip
        • skipLast
        • skipLastWithTime
        • skipUntil
        • skipUntilWithTime
        • skipWhile
        • skipWithTime
        • slice
        • some
        • startWith
        • subscribe | forEach
        • subscribeOn
        • subscribeOnCompleted
        • subscribeOnError
        • subscribeOnNext
        • sum
        • switch
        • switchFirst
        • take
        • takeLast
        • takeLastBuffer
        • takeLastBufferWithTime
        • takeLastWithTime
        • takeUntil
        • takeUntilWithTime
        • takeWhile
        • takeWithTime
        • thenDo
        • throttle
        • throttleLatest | sample
        • timeInterval
        • timeout
        • timestamp
        • toArray
        • toMap
        • toPromise
        • toSet
        • transduce
        • window
        • windowWithCount
        • windowWithTime
        • windowWithTimeOrCount
        • withLatestFrom
        • zip
        • zipIterable
    • Observer
      • Observer Methods
        • create
        • fromNotifier
      • Observer Instance Methods
        • asObserver
        • checked
        • notifyOn
        • onCompleted
        • onError
        • onNext
        • toNotifier
    • Notification
      • Notification Methods
        • createOnNext
        • createOnError
        • createOnCompleted
      • Notification Instance Methods
        • accept
        • toObservable
      • Notification Properties
        • exception
        • hasValue
        • kind
        • value
    • Subjects
      • Rx.AsyncSubject
      • Rx.BehaviorSubject
      • Rx.ReplaySubject
      • Rx.Subject
    • Schedulers
      • Rx.HistoricalScheduler
      • Rx.Scheduler
      • Rx.VirtualTimeScheduler
    • Disposables
      • Rx.CompositeDisposable
      • Rx.Disposable
      • Rx.RefCountDisposable
      • Rx.SerialDisposable
      • Rx.SingleAssignmentDisposable
    • Testing
      • Rx.ReactiveTest
      • Rx.Recorded
      • Rx.Subscription
      • Rx.TestScheduler
    • Bindings
      • DOM
        • Ajax
          • ajax
          • ajaxCold
          • get
          • get_Json
          • post
        • JSONP
          • jsonpRequest
          • jsonpRequestCold
        • Web Sockets
          • fromWebSocket
        • Web Workers
          • fromWebWorker
        • Mutation Observers
          • fromMutationObserver
        • Geolocation
          • getCurrentPosition
          • watchPosition
        • Schedulers
          • requestAnimationFrame
          • mutationObserver
      • jQuery
      • AngularJS
        • Factories
          • rx
          • observeOnScope
        • Observable Methods
          • safeApply
        • $rootScope Methods
          • $toObservable
          • $eventToObservable
          • $createObservableFunction
      • Facebook React
      • Ractive.js
      • Node.js
        • Callback Handlers
          • fromCallback
          • fromNodeCallback
        • Event Handlers
          • fromEvent
          • toEventEmitter
        • Stream Handlers
          • fromStream
          • fromReadableStream
          • fromWritableStream
          • fromTransformStream
          • writeToStream
  • Recipes
  • Which Operator do I use?
    • Creation Operators
    • Instance Operators
Powered by GitBook
On this page
  • Observable / Observer ##
  • Caution:
  • See Also

Was this helpful?

  1. Summary
  2. Getting Started With RxJS

Exploring Major Concepts In RxJS

PreviousWhat Are The Reactive Extensions?NextCreating And Querying Observable Sequences

Last updated 5 years ago

Was this helpful?

This topic describes the major Reactive Extensions for JavaScript (Rx) objects used to represent observable sequences and subscribe to them.

The Observable / Observer objects are available in the core distribution of RxJS.

Observable / Observer ##

Rx exposes asynchronous and event-based data sources as push-based, observable sequences abstracted by the object in the core distribution of RxJS. It represents a data source that can be observed, meaning that it can send data to anyone who is interested.

As described in , the other half of the push model is represented by the object, which represents an observer who registers an interest through a subscription. Items are subsequently handed to the observer from the observable sequence to which it subscribes.

In order to receive notifications from an observable collection, you use the method of Observable to hand it an Observer object. In return for this observer, the method returns a Disposable object that acts as a handle for the subscription. This allows you to clean up the subscription after you are done. Calling dispose on this object detaches the observer from the source so that notifications are no longer delivered. As you can infer, in RxJS you do not need to explicitly unsubscribe from an event as in the common JavaScript event model.

Observers support three publication events, reflected by the object's methods. The can be called zero or more times, when the observable data source has data available. For example, an observable data source used for mouse move events can send out an event object every time the mouse has moved. The other two methods are used to indicate completion or errors.

The following lists the Observable / Observer objects in addition to the Disposable object.

/**
 * Defines a method to release allocated resources.
 */
function Disposable() { }

/**
 * Performs application-defined tasks associated with freeing, releasing, or resetting resources.
 */
Disposable.prototype.dispose =  () => { ... }

/**
 * Defines a provider for push-based notification.
 */
function Observable() { }

/**
 * Notifies the provider that an observer is to receive notifications.
 *
 * @param {Observer} observer The object that is to receive notifications.
 * @returns {Disposable} A reference to disposable that allows observers to stop receiving notifications before the provider has finished sending them.
 */
Observable.prototype.subscribe =  observer => { ... }

/**
 * Provides a mechanism for receiving push-based notifications.
 */
function Observer() { }

/**
 * Provides the observer with new data.
 *
 * @param {Any} value The current notification information.
 */
Observer.prototype.onNext = value => { ... };

/**
 * Notifies the observer that the provider has experienced an error condition.
 *
 * @param {Error} error An object that provides additional information about the error.
 */
Observer.prototype.onError = error => { ... };

/**
 * Notifies the observer that the provider has finished sending push-based notifications.
 */
Observer.prototype.onCompleted = () => { ... };

RxJS also provides subscribe capabilities so that you can avoid implementing the Observer object yourself. For each publication event (onNext, onError, onCompleted) of an observable sequence, you can specify a function that will be invoked, as shown in the following example. If you do not specify an action for an event, the default behavior will occur.

// Creates an observable sequence of 5 integers, starting from 1
var source = Rx.Observable.range(1, 5);

// Prints out each item
var subscription = source.subscribe(
    x => console.log('onNext: ' + x),
    e => console.log('onError: ' + e.message),
    () => console.log('onCompleted'));

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted

Caution:

See Also

Concepts

Other Resources

You can treat the observable sequence (such as a sequence of mouse-over events) as if it were a normal collection. Thus you can write queries over the collection to do things like filtering, grouping, composing, etc. To make observable sequences more useful, the RxJS libraries provide many factory operators so that you do not need to implement any of these on your own. This will be covered in the topic.

You do not need to implement the Observable/Observer objects yourself. Rx provides internal implementations of these interfaces for you and exposes them through various extension methods provided by the Observable and Observer types. See the topic for more information.

Observable
What is RxJS
Observer
subscribe
subscribe
onNext
Querying Observable Sequences
Creating and Querying Observable Sequences
Querying Observable Sequences
Creating and Querying Observable Sequences