RxJs Book
  • Introduction
  • Summary
    • Why RxJS?
    • RxJS Design Guidelines
      • Introduction
      • When To Use RxJS
      • The RxJS Contract
      • Using RxJS
      • Operator Implementations
    • Getting Started With RxJS
      • What Are The Reactive Extensions?
      • Exploring Major Concepts In RxJS
      • Creating And Querying Observable Sequences
        • Creating And Subscribing To Simple Observable Sequences
        • Bridging To Events
        • Bridging To Callbacks
        • Bridging To Promises
        • Generators And Observable Sequences
        • Querying Observable Sequences
        • Error Handling With Observable Sequences
        • Transducers With Observable Sequences
        • Backpressure With Observable Sequences
        • Operators By Category
      • Subjects
      • Scheduling And Concurrency
      • Testing And Debugging
      • Implementing Your Own Operators
    • How Do I...?
      • How Do I Wrap An Existing API?
      • How Do I Integrate jQuery With RxJS?
      • How Do I Integrate Angular.js With RxJS?
      • How Do I Create A Simple Event Emitter?
    • Mapping RxJS From Different Libraries
      • For Bacon.js Users
      • For Async.js Users
    • Config
      • Promise
      • useNativeEvents
    • Helpers
      • defaultComparer
      • defaultSubComparer
      • defaultError
      • identity
      • just
      • isPromise
      • noop
      • pluck
    • Observable
      • Observable Methods
        • amb
        • case
        • catch
        • combineLatest
        • concat
        • create
        • defer
        • empty
        • for | forIn
        • forkJoin
        • from
        • fromCallback
        • fromEvent
        • fromEventPattern
        • fromNodeCallback
        • fromPromise
        • generate
        • generateWithAbsoluteTime
        • generateWithRelativeTime
        • if
        • interval
        • isObservable
        • just | return
        • merge
        • mergeDelayError
        • never
        • of
        • ofWithScheduler
        • onErrorResumeNext
        • pairs
        • range
        • repeat
        • spawn
        • start
        • startAsync
        • throw
        • timer
        • toAsync
        • using
        • when
        • while | whileDo
        • wrap
        • zip
      • Observable Instance Methods
        • amb
        • and
        • asObservable
        • average
        • buffer
        • bufferWithCount
        • bufferWithTime
        • bufferWithTimeOrCount
        • catch
        • combineLatest
        • concat
        • concatAll
        • concatMapObserver | selectConcatObserver
        • controlled
        • count
        • debounce
        • defaultIfEmpty
        • delay
        • delaySubscription
        • dematerialize
        • distinct
        • distinctUntilChanged
        • do | doAction | tap
        • doOnCompleted | tapOnCompleted
        • doOnError | tapOnError
        • doOnNext | tapOnNext
        • doWhile
        • elementAt
        • every
        • expand
        • extend | manySelect
        • filter | where
        • finally
        • find
        • findIndex
        • first
        • flatMap | selectMany
        • flatMapConcat | concatMap
        • flatMapFirst | selectSwitchFirst
        • flatMapLatest
        • flatMapObserver | selectManyObserver
        • flatMapWithMaxConcurrent
        • forkJoin
        • groupBy
        • groupByUntil
        • groupJoin
        • ignoreElements
        • includes
        • indexOf
        • isEmpty
        • join
        • jortSort
        • jortSortUntil
        • last
        • lastIndexOf
        • let | letBind
        • map | select
        • materialize
        • max
        • maxBy
        • merge
        • mergeAll
        • min
        • minBy
        • multicast
        • observeOn
        • onErrorResumeNext
        • pairwise
        • partition
        • pausable
        • pausableBuffered
        • pipe
        • pluck
        • publish
        • publishLast
        • publishValue
        • reduce
        • repeat
        • replay
        • retry
        • retryWhen
        • scan
        • sequenceEqual
        • share
        • shareReplay
        • shareValue
        • single
        • singleInstance
        • skip
        • skipLast
        • skipLastWithTime
        • skipUntil
        • skipUntilWithTime
        • skipWhile
        • skipWithTime
        • slice
        • some
        • startWith
        • subscribe | forEach
        • subscribeOn
        • subscribeOnCompleted
        • subscribeOnError
        • subscribeOnNext
        • sum
        • switch
        • switchFirst
        • take
        • takeLast
        • takeLastBuffer
        • takeLastBufferWithTime
        • takeLastWithTime
        • takeUntil
        • takeUntilWithTime
        • takeWhile
        • takeWithTime
        • thenDo
        • throttle
        • throttleLatest | sample
        • timeInterval
        • timeout
        • timestamp
        • toArray
        • toMap
        • toPromise
        • toSet
        • transduce
        • window
        • windowWithCount
        • windowWithTime
        • windowWithTimeOrCount
        • withLatestFrom
        • zip
        • zipIterable
    • Observer
      • Observer Methods
        • create
        • fromNotifier
      • Observer Instance Methods
        • asObserver
        • checked
        • notifyOn
        • onCompleted
        • onError
        • onNext
        • toNotifier
    • Notification
      • Notification Methods
        • createOnNext
        • createOnError
        • createOnCompleted
      • Notification Instance Methods
        • accept
        • toObservable
      • Notification Properties
        • exception
        • hasValue
        • kind
        • value
    • Subjects
      • Rx.AsyncSubject
      • Rx.BehaviorSubject
      • Rx.ReplaySubject
      • Rx.Subject
    • Schedulers
      • Rx.HistoricalScheduler
      • Rx.Scheduler
      • Rx.VirtualTimeScheduler
    • Disposables
      • Rx.CompositeDisposable
      • Rx.Disposable
      • Rx.RefCountDisposable
      • Rx.SerialDisposable
      • Rx.SingleAssignmentDisposable
    • Testing
      • Rx.ReactiveTest
      • Rx.Recorded
      • Rx.Subscription
      • Rx.TestScheduler
    • Bindings
      • DOM
        • Ajax
          • ajax
          • ajaxCold
          • get
          • get_Json
          • post
        • JSONP
          • jsonpRequest
          • jsonpRequestCold
        • Web Sockets
          • fromWebSocket
        • Web Workers
          • fromWebWorker
        • Mutation Observers
          • fromMutationObserver
        • Geolocation
          • getCurrentPosition
          • watchPosition
        • Schedulers
          • requestAnimationFrame
          • mutationObserver
      • jQuery
      • AngularJS
        • Factories
          • rx
          • observeOnScope
        • Observable Methods
          • safeApply
        • $rootScope Methods
          • $toObservable
          • $eventToObservable
          • $createObservableFunction
      • Facebook React
      • Ractive.js
      • Node.js
        • Callback Handlers
          • fromCallback
          • fromNodeCallback
        • Event Handlers
          • fromEvent
          • toEventEmitter
        • Stream Handlers
          • fromStream
          • fromReadableStream
          • fromWritableStream
          • fromTransformStream
          • writeToStream
  • Recipes
  • Which Operator do I use?
    • Creation Operators
    • Instance Operators
Powered by GitBook
On this page
  • Creating New Operators ##
  • Testing Your New Operator ##
  • See Also ##

Was this helpful?

  1. Summary
  2. Getting Started With RxJS

Implementing Your Own Operators

PreviousTesting And DebuggingNextHow Do I...?

Last updated 5 years ago

Was this helpful?

You can extend RxJS by adding new operators for operations that are not provided by the base library, or by creating your own implementation of standard query operators to improve readability and performance. Writing a customized version of a standard operator is useful when you want to operate with in-memory objects and when the intended customization does not require a comprehensive view of the query.

Creating New Operators ##

RxJS offers a full set of operators that cover most of the possible operations on a set of entities. However, you might need an operator to add a particular semantic meaning to your query—especially if you can reuse that same operator several times in your code. Adding new operators to RxJS is a way to extend its capabilities. However, you can also improve code readability by wrapping existing operators into more specialized and meaningful ones.

For example, let's see how we might implement the method from or , which takes a set of attributes and does a deep comparison for equality. We might try implementing this from scratch using the Rx.Observable.create method such as the following code.

Rx.Observable.prototype.filterByProperties = properties => {
    var source = this,
        comparer = Rx.internals.isEqual;

    return Rx.Observable.create(observer => {
        // Our disposable is the subscription from the parent
        return source.subscribe(
            data => {

                try {
                    var shouldRun = true;

                    // Iterate the properties for deep equality
                    for (var prop in properties) {
                        if (!comparer(properties[prop], data[prop])) {
                            shouldRun = false;
                            break;
                        }
                    }
                } catch (e) {
                    observer.onError(e);
                }

                if (shouldRun) {
                    observer.onNext(data);
                }
            },
            observer.onError.bind(observer),
            observer.onCompleted.bind(observer)
        );
    });
};

Many existing operators, such as this, instead could be built using other basic operators for example in this case, filter or where. In fact, many existing operators are built using other basic operators. For example, the flatMap or selectMany operator is built by composing the map or select and mergeObservable operators, as the following code shows.

Rx.Observable.prototype.flatMap = selector => this.map(selector).mergeObservable();

We could rewrite it as the following to take advantage of already built in operators.

Rx.Observable.prototype.filterByProperties = properties => {
    var comparer = Rx.internals.isEqual;

    return this.filter(data => {

        // Iterate the properties for deep equality
        for (var prop in properties) {
            if (!comparer(properties[prop], data[prop])) {
                return false;
            }
        }

        return true;
    });
};

By reusing existing operators when you build a new one, you can take advantage of the existing performance or exception handling capabilities implemented in the RxJS libraries. When writing a custom operator, it is good practice not to leave any disposables unused; otherwise, you may find that resources could actually be leaked and cancellation may not work correctly.

Testing Your New Operator ##

var onNext = Rx.ReactiveTest.onNext,
    onCompleted = Rx.ReactiveTest.onCompleted,
    subscribe = Rx.ReactiveTest.subscribe;

test('filterProperties should yield with match', () => {

    var scheduler = new Rx.TestScheduler();

    var input = scheduler.createHotObservable(
        onNext(210, { 'name': 'curly', 'age': 30, 'quotes': ['Oh, a wise guy, eh?', 'Poifect!'] }),
        onNext(220, { 'name': 'moe', 'age': 40, 'quotes': ['Spread out!', 'You knucklehead!'] }),
        onCompleted(230)
    );

    var results = scheduler.startWithCreate(
        () => input.filterByProperties({ 'age': 40 })
    );

    collectionAssert.assertEqual(results.messages, [
        onNext(220, { 'name': 'moe', 'age': 40, 'quotes': ['Spread out!', 'You knucklehead!'] }),
        onCompleted(230)
    ]);

    collectionAssert.assertEqual(input.subscriptions, [
        subscribe(200, 230)
    ]);
});

In order for this to be successfully tested, we should check for when there is no data, empty, single matches, multiple matches and so forth.

See Also ##

Resources

Just because you have implemented a new operator doesn't mean you are finished. Now, let's write a test to verify its behavior from what we learned in the topic. We'll reuse the collectionAssert.assertEqual from the previous topic so it is not repeated here.

_.where
Lo-Dash
Underscore
Testing and Debugging
Testing and Debugging