RxJs Book
  • Introduction
  • Summary
    • Why RxJS?
    • RxJS Design Guidelines
      • Introduction
      • When To Use RxJS
      • The RxJS Contract
      • Using RxJS
      • Operator Implementations
    • Getting Started With RxJS
      • What Are The Reactive Extensions?
      • Exploring Major Concepts In RxJS
      • Creating And Querying Observable Sequences
        • Creating And Subscribing To Simple Observable Sequences
        • Bridging To Events
        • Bridging To Callbacks
        • Bridging To Promises
        • Generators And Observable Sequences
        • Querying Observable Sequences
        • Error Handling With Observable Sequences
        • Transducers With Observable Sequences
        • Backpressure With Observable Sequences
        • Operators By Category
      • Subjects
      • Scheduling And Concurrency
      • Testing And Debugging
      • Implementing Your Own Operators
    • How Do I...?
      • How Do I Wrap An Existing API?
      • How Do I Integrate jQuery With RxJS?
      • How Do I Integrate Angular.js With RxJS?
      • How Do I Create A Simple Event Emitter?
    • Mapping RxJS From Different Libraries
      • For Bacon.js Users
      • For Async.js Users
    • Config
      • Promise
      • useNativeEvents
    • Helpers
      • defaultComparer
      • defaultSubComparer
      • defaultError
      • identity
      • just
      • isPromise
      • noop
      • pluck
    • Observable
      • Observable Methods
        • amb
        • case
        • catch
        • combineLatest
        • concat
        • create
        • defer
        • empty
        • for | forIn
        • forkJoin
        • from
        • fromCallback
        • fromEvent
        • fromEventPattern
        • fromNodeCallback
        • fromPromise
        • generate
        • generateWithAbsoluteTime
        • generateWithRelativeTime
        • if
        • interval
        • isObservable
        • just | return
        • merge
        • mergeDelayError
        • never
        • of
        • ofWithScheduler
        • onErrorResumeNext
        • pairs
        • range
        • repeat
        • spawn
        • start
        • startAsync
        • throw
        • timer
        • toAsync
        • using
        • when
        • while | whileDo
        • wrap
        • zip
      • Observable Instance Methods
        • amb
        • and
        • asObservable
        • average
        • buffer
        • bufferWithCount
        • bufferWithTime
        • bufferWithTimeOrCount
        • catch
        • combineLatest
        • concat
        • concatAll
        • concatMapObserver | selectConcatObserver
        • controlled
        • count
        • debounce
        • defaultIfEmpty
        • delay
        • delaySubscription
        • dematerialize
        • distinct
        • distinctUntilChanged
        • do | doAction | tap
        • doOnCompleted | tapOnCompleted
        • doOnError | tapOnError
        • doOnNext | tapOnNext
        • doWhile
        • elementAt
        • every
        • expand
        • extend | manySelect
        • filter | where
        • finally
        • find
        • findIndex
        • first
        • flatMap | selectMany
        • flatMapConcat | concatMap
        • flatMapFirst | selectSwitchFirst
        • flatMapLatest
        • flatMapObserver | selectManyObserver
        • flatMapWithMaxConcurrent
        • forkJoin
        • groupBy
        • groupByUntil
        • groupJoin
        • ignoreElements
        • includes
        • indexOf
        • isEmpty
        • join
        • jortSort
        • jortSortUntil
        • last
        • lastIndexOf
        • let | letBind
        • map | select
        • materialize
        • max
        • maxBy
        • merge
        • mergeAll
        • min
        • minBy
        • multicast
        • observeOn
        • onErrorResumeNext
        • pairwise
        • partition
        • pausable
        • pausableBuffered
        • pipe
        • pluck
        • publish
        • publishLast
        • publishValue
        • reduce
        • repeat
        • replay
        • retry
        • retryWhen
        • scan
        • sequenceEqual
        • share
        • shareReplay
        • shareValue
        • single
        • singleInstance
        • skip
        • skipLast
        • skipLastWithTime
        • skipUntil
        • skipUntilWithTime
        • skipWhile
        • skipWithTime
        • slice
        • some
        • startWith
        • subscribe | forEach
        • subscribeOn
        • subscribeOnCompleted
        • subscribeOnError
        • subscribeOnNext
        • sum
        • switch
        • switchFirst
        • take
        • takeLast
        • takeLastBuffer
        • takeLastBufferWithTime
        • takeLastWithTime
        • takeUntil
        • takeUntilWithTime
        • takeWhile
        • takeWithTime
        • thenDo
        • throttle
        • throttleLatest | sample
        • timeInterval
        • timeout
        • timestamp
        • toArray
        • toMap
        • toPromise
        • toSet
        • transduce
        • window
        • windowWithCount
        • windowWithTime
        • windowWithTimeOrCount
        • withLatestFrom
        • zip
        • zipIterable
    • Observer
      • Observer Methods
        • create
        • fromNotifier
      • Observer Instance Methods
        • asObserver
        • checked
        • notifyOn
        • onCompleted
        • onError
        • onNext
        • toNotifier
    • Notification
      • Notification Methods
        • createOnNext
        • createOnError
        • createOnCompleted
      • Notification Instance Methods
        • accept
        • toObservable
      • Notification Properties
        • exception
        • hasValue
        • kind
        • value
    • Subjects
      • Rx.AsyncSubject
      • Rx.BehaviorSubject
      • Rx.ReplaySubject
      • Rx.Subject
    • Schedulers
      • Rx.HistoricalScheduler
      • Rx.Scheduler
      • Rx.VirtualTimeScheduler
    • Disposables
      • Rx.CompositeDisposable
      • Rx.Disposable
      • Rx.RefCountDisposable
      • Rx.SerialDisposable
      • Rx.SingleAssignmentDisposable
    • Testing
      • Rx.ReactiveTest
      • Rx.Recorded
      • Rx.Subscription
      • Rx.TestScheduler
    • Bindings
      • DOM
        • Ajax
          • ajax
          • ajaxCold
          • get
          • get_Json
          • post
        • JSONP
          • jsonpRequest
          • jsonpRequestCold
        • Web Sockets
          • fromWebSocket
        • Web Workers
          • fromWebWorker
        • Mutation Observers
          • fromMutationObserver
        • Geolocation
          • getCurrentPosition
          • watchPosition
        • Schedulers
          • requestAnimationFrame
          • mutationObserver
      • jQuery
      • AngularJS
        • Factories
          • rx
          • observeOnScope
        • Observable Methods
          • safeApply
        • $rootScope Methods
          • $toObservable
          • $eventToObservable
          • $createObservableFunction
      • Facebook React
      • Ractive.js
      • Node.js
        • Callback Handlers
          • fromCallback
          • fromNodeCallback
        • Event Handlers
          • fromEvent
          • toEventEmitter
        • Stream Handlers
          • fromStream
          • fromReadableStream
          • fromWritableStream
          • fromTransformStream
          • writeToStream
  • Recipes
  • Which Operator do I use?
    • Creation Operators
    • Instance Operators
Powered by GitBook
On this page
  • Using Subjects ##
  • Different types of Subjects ##

Was this helpful?

  1. Summary
  2. Getting Started With RxJS

Subjects

PreviousOperators By CategoryNextScheduling And Concurrency

Last updated 5 years ago

Was this helpful?

The class inherits both and , in the sense that it is both an observer and an observable. You can use a subject to subscribe all the observers, and then subscribe the subject to a backend data source. In this way, the subject can act as a proxy for a group of subscribers and a source. You can use subjects to implement a custom observable with caching, buffering and time shifting. In addition, you can use subjects to broadcast data to multiple subscribers.

By default, subjects do not perform any synchronization across threads. They do not take a scheduler but rather assume that all serialization and grammatical correctness are handled by the caller of the subject. A subject simply broadcasts to all subscribed observers in the thread-safe list of subscribers. Doing so has the advantage of reducing overhead and improving performance.

Using Subjects ##

In the following example, we create a subject, subscribe to that subject and then use the same subject to publish values to the observer. By doing so, we combine the publication and subscription into the same source.

In addition to taking an Observer, the method can also take a function for onNext, which means that the action will be executed every time an item is published. In our sample, whenever onNext is invoked, the item will be written to the console.

var subject = new Rx.Subject();

var subscription = subject.subscribe(
    x => console.log('onNext: ' + x),
    e => console.log('onError: ' + e.message),
    () => console.log('onCompleted'));

subject.onNext(1);
// => onNext: 1

subject.onNext(2);
// => onNext: 2

subject.onCompleted();
// => onCompleted

subscription.dispose();

The following example illustrates the proxy and broadcast nature of a Subject. We first create a source sequence which produces an integer every 1 second. We then create a Subject, and pass it as an observer to the source so that it will receive all the values pushed out by this source sequence. After that, we create another two subscriptions, this time with the subject as the source. The subSubject1 and subSubject2 subscriptions will then receive any value passed down (from the source) by the Subject.

// Every second
var source = Rx.Observable.interval(1000);

var subject = new Rx.Subject();

var subSource = source.subscribe(subject);

var subSubject1 = subject.subscribe(
    x => console.log('Value published to observer #1: ' + x),
    e => console.log('onError: ' + e.message),
    () => console.log('onCompleted'));

var subSubject2 = subject.subscribe(
    x => console.log('Value published to observer #2: ' + x),
    e => console.log('onError: ' + e.message),
    () => console.log('onCompleted'));

setTimeout(() => {
    // Clean up
    subject.onCompleted();
    subSubject1.dispose();
    subSubject2.dispose();
}, 5000);

// => Value published to observer #1: 0
// => Value published to observer #2: 0
// => Value published to observer #1: 1
// => Value published to observer #2: 1
// => Value published to observer #1: 2
// => Value published to observer #2: 2
// => Value published to observer #1: 3
// => Value published to observer #2: 3
// => onCompleted
// => onCompleted

Different types of Subjects ##

The Subject object in the RxJS library is a basic implementation, but you can create your own using the method. There are other implementations of Subjects that offer different functionalities. All of these types store some (or all of) values pushed to them via onNext, and broadcast them back to their observers. In this way, they convert a Cold Observable into a Hot one. This means that if you Subscribe to any of these more than once (i.e. subscribe -> unsubscribe -> subscribe again), you will see at least one of the same value again. For more information on hot and cold observables, see the last section of the topic.

stores all the values that it has published. Therefore, when you subscribe to it, you automatically receive an entire history of values that it has published, even though your subscription might have come in after certain values have been pushed out. is similar to ReplaySubject, except that it only stores the last value it published. BehaviourSubject also requires a default value upon initialization. This value is sent to observers when no other value has been received by the subject yet. This means that all subscribers will receive a value instantly on subscribe, unless the Subject has already completed. is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the sequence is completed. You can use the AsyncSubject type for situations when the source observable is hot and might complete before any observer can subscribe to it. In this case, AsyncSubject can still provide the last value and publish it to any future subscribers.

Subject
Observable
Observer
subscribe
Subject.create
Creating and Subscribing to Simple Observable Sequences
ReplaySubject
BehaviourSubject
AsyncSubject