RxJs Book
  • Introduction
  • Summary
    • Why RxJS?
    • RxJS Design Guidelines
      • Introduction
      • When To Use RxJS
      • The RxJS Contract
      • Using RxJS
      • Operator Implementations
    • Getting Started With RxJS
      • What Are The Reactive Extensions?
      • Exploring Major Concepts In RxJS
      • Creating And Querying Observable Sequences
        • Creating And Subscribing To Simple Observable Sequences
        • Bridging To Events
        • Bridging To Callbacks
        • Bridging To Promises
        • Generators And Observable Sequences
        • Querying Observable Sequences
        • Error Handling With Observable Sequences
        • Transducers With Observable Sequences
        • Backpressure With Observable Sequences
        • Operators By Category
      • Subjects
      • Scheduling And Concurrency
      • Testing And Debugging
      • Implementing Your Own Operators
    • How Do I...?
      • How Do I Wrap An Existing API?
      • How Do I Integrate jQuery With RxJS?
      • How Do I Integrate Angular.js With RxJS?
      • How Do I Create A Simple Event Emitter?
    • Mapping RxJS From Different Libraries
      • For Bacon.js Users
      • For Async.js Users
    • Config
      • Promise
      • useNativeEvents
    • Helpers
      • defaultComparer
      • defaultSubComparer
      • defaultError
      • identity
      • just
      • isPromise
      • noop
      • pluck
    • Observable
      • Observable Methods
        • amb
        • case
        • catch
        • combineLatest
        • concat
        • create
        • defer
        • empty
        • for | forIn
        • forkJoin
        • from
        • fromCallback
        • fromEvent
        • fromEventPattern
        • fromNodeCallback
        • fromPromise
        • generate
        • generateWithAbsoluteTime
        • generateWithRelativeTime
        • if
        • interval
        • isObservable
        • just | return
        • merge
        • mergeDelayError
        • never
        • of
        • ofWithScheduler
        • onErrorResumeNext
        • pairs
        • range
        • repeat
        • spawn
        • start
        • startAsync
        • throw
        • timer
        • toAsync
        • using
        • when
        • while | whileDo
        • wrap
        • zip
      • Observable Instance Methods
        • amb
        • and
        • asObservable
        • average
        • buffer
        • bufferWithCount
        • bufferWithTime
        • bufferWithTimeOrCount
        • catch
        • combineLatest
        • concat
        • concatAll
        • concatMapObserver | selectConcatObserver
        • controlled
        • count
        • debounce
        • defaultIfEmpty
        • delay
        • delaySubscription
        • dematerialize
        • distinct
        • distinctUntilChanged
        • do | doAction | tap
        • doOnCompleted | tapOnCompleted
        • doOnError | tapOnError
        • doOnNext | tapOnNext
        • doWhile
        • elementAt
        • every
        • expand
        • extend | manySelect
        • filter | where
        • finally
        • find
        • findIndex
        • first
        • flatMap | selectMany
        • flatMapConcat | concatMap
        • flatMapFirst | selectSwitchFirst
        • flatMapLatest
        • flatMapObserver | selectManyObserver
        • flatMapWithMaxConcurrent
        • forkJoin
        • groupBy
        • groupByUntil
        • groupJoin
        • ignoreElements
        • includes
        • indexOf
        • isEmpty
        • join
        • jortSort
        • jortSortUntil
        • last
        • lastIndexOf
        • let | letBind
        • map | select
        • materialize
        • max
        • maxBy
        • merge
        • mergeAll
        • min
        • minBy
        • multicast
        • observeOn
        • onErrorResumeNext
        • pairwise
        • partition
        • pausable
        • pausableBuffered
        • pipe
        • pluck
        • publish
        • publishLast
        • publishValue
        • reduce
        • repeat
        • replay
        • retry
        • retryWhen
        • scan
        • sequenceEqual
        • share
        • shareReplay
        • shareValue
        • single
        • singleInstance
        • skip
        • skipLast
        • skipLastWithTime
        • skipUntil
        • skipUntilWithTime
        • skipWhile
        • skipWithTime
        • slice
        • some
        • startWith
        • subscribe | forEach
        • subscribeOn
        • subscribeOnCompleted
        • subscribeOnError
        • subscribeOnNext
        • sum
        • switch
        • switchFirst
        • take
        • takeLast
        • takeLastBuffer
        • takeLastBufferWithTime
        • takeLastWithTime
        • takeUntil
        • takeUntilWithTime
        • takeWhile
        • takeWithTime
        • thenDo
        • throttle
        • throttleLatest | sample
        • timeInterval
        • timeout
        • timestamp
        • toArray
        • toMap
        • toPromise
        • toSet
        • transduce
        • window
        • windowWithCount
        • windowWithTime
        • windowWithTimeOrCount
        • withLatestFrom
        • zip
        • zipIterable
    • Observer
      • Observer Methods
        • create
        • fromNotifier
      • Observer Instance Methods
        • asObserver
        • checked
        • notifyOn
        • onCompleted
        • onError
        • onNext
        • toNotifier
    • Notification
      • Notification Methods
        • createOnNext
        • createOnError
        • createOnCompleted
      • Notification Instance Methods
        • accept
        • toObservable
      • Notification Properties
        • exception
        • hasValue
        • kind
        • value
    • Subjects
      • Rx.AsyncSubject
      • Rx.BehaviorSubject
      • Rx.ReplaySubject
      • Rx.Subject
    • Schedulers
      • Rx.HistoricalScheduler
      • Rx.Scheduler
      • Rx.VirtualTimeScheduler
    • Disposables
      • Rx.CompositeDisposable
      • Rx.Disposable
      • Rx.RefCountDisposable
      • Rx.SerialDisposable
      • Rx.SingleAssignmentDisposable
    • Testing
      • Rx.ReactiveTest
      • Rx.Recorded
      • Rx.Subscription
      • Rx.TestScheduler
    • Bindings
      • DOM
        • Ajax
          • ajax
          • ajaxCold
          • get
          • get_Json
          • post
        • JSONP
          • jsonpRequest
          • jsonpRequestCold
        • Web Sockets
          • fromWebSocket
        • Web Workers
          • fromWebWorker
        • Mutation Observers
          • fromMutationObserver
        • Geolocation
          • getCurrentPosition
          • watchPosition
        • Schedulers
          • requestAnimationFrame
          • mutationObserver
      • jQuery
      • AngularJS
        • Factories
          • rx
          • observeOnScope
        • Observable Methods
          • safeApply
        • $rootScope Methods
          • $toObservable
          • $eventToObservable
          • $createObservableFunction
      • Facebook React
      • Ractive.js
      • Node.js
        • Callback Handlers
          • fromCallback
          • fromNodeCallback
        • Event Handlers
          • fromEvent
          • toEventEmitter
        • Stream Handlers
          • fromStream
          • fromReadableStream
          • fromWritableStream
          • fromTransformStream
          • writeToStream
  • Recipes
  • Which Operator do I use?
    • Creation Operators
    • Instance Operators
Powered by GitBook
On this page
  • Using Different Operators ##
  • Combining different sequences ##
  • Projection ##
  • Filtering ##
  • Time-based Operation ##
  • Operators by Categories ##
  • See Also ##

Was this helpful?

  1. Summary
  2. Getting Started With RxJS
  3. Creating And Querying Observable Sequences

Querying Observable Sequences

PreviousGenerators And Observable SequencesNextError Handling With Observable Sequences

Last updated 5 years ago

Was this helpful?

In , we have converted existing DOM and Node.js events into observable sequences to subscribe to them. In this topic, we will look at the first-class nature of observable sequences as IObservable objects, in which generic LINQ operators are supplied by the Rx assemblies to manipulate these objects. Most operators take an observable sequence and perform some logic on it and output another observable sequence. In addition, as you can see from our code samples, you can even chain multiple operators on a source sequence to tweak the resulting sequence to your exact requirement.

Using Different Operators ##

We have already used the and operators in previous topics to create and return simple sequences. We have also used the and operators to convert existing events into observable sequences. In this topic, we will use other operators of the Observable type so that you can filter, group and transform data. Such operators take observable sequence(s) as input, and produce observable sequence(s) as output.

Combining different sequences ##

In this section, we will examine some of the operators that combine various observable sequences into a single observable sequence. Notice that data is not transformed when we combine sequences. In the following sample, we use the Concat operator to combine two sequences into a single sequence and subscribe to it. For illustration purpose, we will use the very simple range(x, y) operator to create a sequence of integers that starts with x and produces y sequential numbers afterwards.

var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(1, 3);

source1.concat(source2)
   .subscribe(console.log.bind(console));

// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(1, 3);

source1.merge(source2)
   .subscribe(console.log.bind(console));

// => 1
// => 1
// => 2
// => 2
// => 3
// => 3
var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(4, 3);

source1.catch(source2)
   .subscribe(console.log.bind(console));

// => 1
// => 2
// => 3
var source1 = Rx.Observable.throw(new Error('An error has occurred.'));
var source2 = Rx.Observable.range(1, 3);

source1.onErrorResumeNext(source2)
   .subscribe(console.log.bind(console));

// => 1
// => 2
// => 3

Projection ##

In the following example, we project a sequence of strings into a series of integers representing the length.

var array = ['Reactive', 'Extensions', 'RxJS'];

var seqString = Rx.Observable.from(array);

var seqNum = seqString.map(x => x.length);

seqNum
   .subscribe(console.log.bind(console));

// => 8
// => 10
// => 4
var move = Rx.Observable.fromEvent(document, 'mousemove');

var points = move.map(e => ({x: e.clientX, y: e.clientY }));

points.subscribe(
    pos => console.log('Mouse at point ' + pos.x + ', ' + pos.y));

The observable returned from selectMany or flatMap publishes onCompleted after the source sequence and all mini observable sequences produced by the selector have completed. It fires onError when an error has occurred in the source stream, when an exception was thrown by the selector function, or when an error occurred in any of the mini observable sequences.

In the following example, we first create a source sequence which produces an integer every 5 seconds, and decide to just take the first 2 values produced (by using the take operator). We then use selectMany or flatMap to project each of these integers using another sequence of {100, 101, 102}. By doing so, two mini observable sequences are produced, {100, 101, 102} and {100, 101, 102}. These are finally flattened into a single stream of integers of {100, 101, 102, 100, 101, 102} and pushed to the observer.

var source1 = Rx.Observable.interval(5000).take(2);
var proj = Rx.Observable.range(100, 3);
var resultSeq = source1.flatMap(proj);

var subscription = resultSeq.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e.message),
  () => console.log('onCompleted'));

// => onNext: 100
// => onNext: 101
// => onNext: 102
// => onNext: 100
// => onNext: 101
// => onNext: 102
// => onCompleted

Filtering ##

var seq = Rx.Observable.generate(
    0,
    i => i < 10,
    i => i + 1,
    i => i * i);

var source = seq.filter(n => n < 5);

var subscription = source.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e.message),
  () => console.log('onCompleted'));

// => onNext: 0
// => onNext: 1
// => onNext: 4
// => onCompleted

The following example is an extension of the projection example you have seen earlier in this topic. In that sample, we have used the select or map operator to project the event arguments into a point with x and y. In the following example, we use the filter or where and select or map operators to pick only those mouse movements that we are interested in. In this case, we filter the mouse moves to those over the first bisector (where the x and y coordinates are equal).

var move = Rx.Observable.fromEvent(document, 'mousemove');

var points = move.map(e => ({ x: e.clientX, y: e.clientY }));

var overfirstbisector = points.filter(pos => pos.x === pos.y);

var movesub = overfirstbisector.subscribe(pos => console.log('mouse at ' + pos.x + ', ' pos.y));

Time-based Operation ##

You can use the Buffer operators to perform time-based operations.

Buffering an observable sequence means that an observable sequence’s values are put into a buffer based on either a specified timespan or by a count threshold. This is especially helpful in situations when you expect a tremendous amount of data to be pushed out by the sequence, and the subscriber does not have the resource to process these values. By buffering the results based on time or count, and only returning a sequence of values when the criteria is exceeded (or when the source sequence has completed), the subscriber can process OnNext calls at its own pace.

In the following example, we first create a simple sequence of integers for every second. We then use the bufferWithCount operator and specify that each buffer will hold 5 items from the sequence. The onNext is called when the buffer is full. We then tally the sum of the buffer using Array.reduce. The buffer is automatically flushed and another cycle begins. The printout will be 10, 35, 60… in which 10=0+1+2+3+4, 35=5+6+7+8+9, and so on.

var seq = Rx.Observable.interval(1000);

var bufSeq = seq.bufferWithCount(5);

bufSeq
    .map(arr => arr.reduce((acc, x) => acc + x, 0))
    .subscribe(console.log.bind(console));

// => 10
// => 35
// => 60
...

We can also create a buffer with a specified time span in milliseconds. In the following example, the buffer will hold items that have accumulated for 3 seconds. The printout will be 3, 12, 21… in which 3=0+1+2, 12=3+4+5, and so on.

var seq = Rx.Observable.interval(1000);

var bufSeq = seq.bufferWithTime(3000);

bufSeq
    .map(arr => arr.reduce((acc, x) => acc + x, 0))
    .subscribe(console.log.bind(console));

Note that if you are using any of the buffer* or window* operators, you have to make sure that the sequence is not empty before filtering on it.

Operators by Categories ##

See Also ##

Reference

Concepts

Notice that the resultant sequence is 1,2,3,1,2,3. This is because when you use the operator, the 2nd sequence (source2) will not be active until after the 1st sequence (source1) has finished pushing all its values. It is only after source1 has completed, then source2 will start to push values to the resultant sequence. The subscriber will then get all the values from the resultant sequence.

Compare this with the operator. If you run the following sample code, you will get 1,1,2,2,3,3. This is because the two sequences are active at the same time and values are pushed out as they occur in the sources. The resultant sequence only completes when the last source sequence has finished pushing values.

Another comparison can be done with the operator. In this case, if source1 completes without any error, then source2 will not start. Therefore, if you run the following sample code, you will get 1,2,3 only since source2 (which produces 4,5,6) is ignored.

Finally, let’s look at . This operator will move on to source2 even if source1 cannot be completed due to an error. In the following example, even though source1 represents a sequence that terminates with an exception by using the operator, the subscriber will receive values (1,2,3) published by source2. Therefore, if you expect either source sequence to produce any error, it is a safer bet to use to guarantee that the subscriber will still receive some values.

The or operator can translate each element of an observable sequence into another form.

In the following sample, which is an extension of the event conversion example we saw in the topic, we use the select or map operator to project the event arguments to a point of x and y. This way, we are transforming a mouse move event sequence into a data type that can be parsed and manipulated further, as can be seen in the next "Filtering" section.

Finally, let’s look at the or operator. The selectMany or flatMap operator has many overloads, one of which takes a selector function argument. This selector function is invoked on every value pushed out by the source observable. For each of these values, the selector projects it into a mini observable sequence. At the end, the selectMany or flatMap operator flattens all of these mini sequences into a single resultant sequence, which is then pushed to the subscriber.

In the following example, we use the operator to create a simple observable sequence of numbers. The generate operator has several versions including with relative and absolute time scheduling. In our example, it takes an initial state (0 in our example), a conditional function to terminate (fewer than 10 times), an iterator (+1), a result selector (a square function of the current value), and prints out only those smaller than 5 using the or operators.

The topic lists all of the major operators implemented by the Observable type by their categories; specifically: creation, conversion, combine, functional, mathematical, time, exceptions, miscellaneous, selection and primitives.

Bridging to Events
create
range
fromEvent
fromEventPattern
concat
merge
catch
onErrorResumeNext
throw
onErrorResumeNext
select
map
Bridging with Existing Events
selectMany
flatMap
generate
filter
where
Operators by Categories
Observable
Operators by Categories