RxJs Book
  • Introduction
  • Summary
    • Why RxJS?
    • RxJS Design Guidelines
      • Introduction
      • When To Use RxJS
      • The RxJS Contract
      • Using RxJS
      • Operator Implementations
    • Getting Started With RxJS
      • What Are The Reactive Extensions?
      • Exploring Major Concepts In RxJS
      • Creating And Querying Observable Sequences
        • Creating And Subscribing To Simple Observable Sequences
        • Bridging To Events
        • Bridging To Callbacks
        • Bridging To Promises
        • Generators And Observable Sequences
        • Querying Observable Sequences
        • Error Handling With Observable Sequences
        • Transducers With Observable Sequences
        • Backpressure With Observable Sequences
        • Operators By Category
      • Subjects
      • Scheduling And Concurrency
      • Testing And Debugging
      • Implementing Your Own Operators
    • How Do I...?
      • How Do I Wrap An Existing API?
      • How Do I Integrate jQuery With RxJS?
      • How Do I Integrate Angular.js With RxJS?
      • How Do I Create A Simple Event Emitter?
    • Mapping RxJS From Different Libraries
      • For Bacon.js Users
      • For Async.js Users
    • Config
      • Promise
      • useNativeEvents
    • Helpers
      • defaultComparer
      • defaultSubComparer
      • defaultError
      • identity
      • just
      • isPromise
      • noop
      • pluck
    • Observable
      • Observable Methods
        • amb
        • case
        • catch
        • combineLatest
        • concat
        • create
        • defer
        • empty
        • for | forIn
        • forkJoin
        • from
        • fromCallback
        • fromEvent
        • fromEventPattern
        • fromNodeCallback
        • fromPromise
        • generate
        • generateWithAbsoluteTime
        • generateWithRelativeTime
        • if
        • interval
        • isObservable
        • just | return
        • merge
        • mergeDelayError
        • never
        • of
        • ofWithScheduler
        • onErrorResumeNext
        • pairs
        • range
        • repeat
        • spawn
        • start
        • startAsync
        • throw
        • timer
        • toAsync
        • using
        • when
        • while | whileDo
        • wrap
        • zip
      • Observable Instance Methods
        • amb
        • and
        • asObservable
        • average
        • buffer
        • bufferWithCount
        • bufferWithTime
        • bufferWithTimeOrCount
        • catch
        • combineLatest
        • concat
        • concatAll
        • concatMapObserver | selectConcatObserver
        • controlled
        • count
        • debounce
        • defaultIfEmpty
        • delay
        • delaySubscription
        • dematerialize
        • distinct
        • distinctUntilChanged
        • do | doAction | tap
        • doOnCompleted | tapOnCompleted
        • doOnError | tapOnError
        • doOnNext | tapOnNext
        • doWhile
        • elementAt
        • every
        • expand
        • extend | manySelect
        • filter | where
        • finally
        • find
        • findIndex
        • first
        • flatMap | selectMany
        • flatMapConcat | concatMap
        • flatMapFirst | selectSwitchFirst
        • flatMapLatest
        • flatMapObserver | selectManyObserver
        • flatMapWithMaxConcurrent
        • forkJoin
        • groupBy
        • groupByUntil
        • groupJoin
        • ignoreElements
        • includes
        • indexOf
        • isEmpty
        • join
        • jortSort
        • jortSortUntil
        • last
        • lastIndexOf
        • let | letBind
        • map | select
        • materialize
        • max
        • maxBy
        • merge
        • mergeAll
        • min
        • minBy
        • multicast
        • observeOn
        • onErrorResumeNext
        • pairwise
        • partition
        • pausable
        • pausableBuffered
        • pipe
        • pluck
        • publish
        • publishLast
        • publishValue
        • reduce
        • repeat
        • replay
        • retry
        • retryWhen
        • scan
        • sequenceEqual
        • share
        • shareReplay
        • shareValue
        • single
        • singleInstance
        • skip
        • skipLast
        • skipLastWithTime
        • skipUntil
        • skipUntilWithTime
        • skipWhile
        • skipWithTime
        • slice
        • some
        • startWith
        • subscribe | forEach
        • subscribeOn
        • subscribeOnCompleted
        • subscribeOnError
        • subscribeOnNext
        • sum
        • switch
        • switchFirst
        • take
        • takeLast
        • takeLastBuffer
        • takeLastBufferWithTime
        • takeLastWithTime
        • takeUntil
        • takeUntilWithTime
        • takeWhile
        • takeWithTime
        • thenDo
        • throttle
        • throttleLatest | sample
        • timeInterval
        • timeout
        • timestamp
        • toArray
        • toMap
        • toPromise
        • toSet
        • transduce
        • window
        • windowWithCount
        • windowWithTime
        • windowWithTimeOrCount
        • withLatestFrom
        • zip
        • zipIterable
    • Observer
      • Observer Methods
        • create
        • fromNotifier
      • Observer Instance Methods
        • asObserver
        • checked
        • notifyOn
        • onCompleted
        • onError
        • onNext
        • toNotifier
    • Notification
      • Notification Methods
        • createOnNext
        • createOnError
        • createOnCompleted
      • Notification Instance Methods
        • accept
        • toObservable
      • Notification Properties
        • exception
        • hasValue
        • kind
        • value
    • Subjects
      • Rx.AsyncSubject
      • Rx.BehaviorSubject
      • Rx.ReplaySubject
      • Rx.Subject
    • Schedulers
      • Rx.HistoricalScheduler
      • Rx.Scheduler
      • Rx.VirtualTimeScheduler
    • Disposables
      • Rx.CompositeDisposable
      • Rx.Disposable
      • Rx.RefCountDisposable
      • Rx.SerialDisposable
      • Rx.SingleAssignmentDisposable
    • Testing
      • Rx.ReactiveTest
      • Rx.Recorded
      • Rx.Subscription
      • Rx.TestScheduler
    • Bindings
      • DOM
        • Ajax
          • ajax
          • ajaxCold
          • get
          • get_Json
          • post
        • JSONP
          • jsonpRequest
          • jsonpRequestCold
        • Web Sockets
          • fromWebSocket
        • Web Workers
          • fromWebWorker
        • Mutation Observers
          • fromMutationObserver
        • Geolocation
          • getCurrentPosition
          • watchPosition
        • Schedulers
          • requestAnimationFrame
          • mutationObserver
      • jQuery
      • AngularJS
        • Factories
          • rx
          • observeOnScope
        • Observable Methods
          • safeApply
        • $rootScope Methods
          • $toObservable
          • $eventToObservable
          • $createObservableFunction
      • Facebook React
      • Ractive.js
      • Node.js
        • Callback Handlers
          • fromCallback
          • fromNodeCallback
        • Event Handlers
          • fromEvent
          • toEventEmitter
        • Stream Handlers
          • fromStream
          • fromReadableStream
          • fromWritableStream
          • fromTransformStream
          • writeToStream
  • Recipes
  • Which Operator do I use?
    • Creation Operators
    • Instance Operators
Powered by GitBook
On this page

Was this helpful?

  1. Summary
  2. Getting Started With RxJS
  3. Creating And Querying Observable Sequences

Transducers With Observable Sequences

PreviousError Handling With Observable SequencesNextBackpressure With Observable Sequences

Last updated 5 years ago

Was this helpful?

Much like Language Integrated Query (LINQ), Transducers are composable algorithmic transformations. They, like LINQ, are independent from the context of their input and output sources and specify only the essence of the transformation in terms of an individual element. Because transducers are decoupled from input or output sources, they can be used in many different processes - collections, streams, observables, etc. Transducers compose directly, without awareness of input or creation of intermediate aggregates. There are two major libraries currently out there, Cognitect's and James Long's which are both great for getting high performance over large amounts of data. Because it is collection type neutral, it is a perfect fit for RxJS to do transformations over large collections.

The word transduce is just a combination of transform and reduce. The reduce function is the base transformation; any other transformation can be expressed in terms of it (map, filter, etc).

var arr = [1, 2, 3, 4];

arr.reduce((result, x) => result.concat(x + 1), []);

// => [ 2, 3, 4, 5 ]

Using transducers, we can model the following behavior while breaking apart the map aspect of adding 1 to the concat operation, adding the seed and then the "collection" to transduce.

var arr = [1, 2, 3, 4];

function increment(x) { return x + 1; }
function concatItem(acc, x) { return acc.concat(x); }

transduce(map(increment), concatItem, [], arr);

// => [ 2, 3, 4, 5 ]

Using Cognitect's library, we can easily accomplish what we had above.

var t = transducers;

var arr = [1, 2, 3, 4];

function increment(x) { return x + 1; }

into([], t.comp(t.map(increment)), arr);

// => [ 2, 3, 4, 5 ]

We can go a step further and add filtering as well to get only even values.

var t = transducers;

var arr = [1, 2, 3, 4];

function increment(x) { return x + 1; }
function isEven(x) { return x % 2 === 0; }

into([], t.comp(t.map(increment), t.filter(isEven)), arr);

// => [ 2, 4 ]
var t = transducers;

var source = Rx.Observable.range(1, 4);

function increment(x) { return x + 1; }
function isEven(x) { return x % 2 === 0; }

var transduced = source.transduce(t.comp(t.map(increment), t.filter(isEven)));

transduced.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e),
  () => console.log('onCompleted'));

// => Next: 2
// => Next: 4
// => Completed

Note that this above example also works the same with transducers.js as well with little to no modification. This example will in fact work faster than the traditional LINQ style (as of now) which most use currently.

var source = Rx.Observable.range(1, 4);

function increment(x) { return x + 1; }
function isEven(x) { return x % 2 === 0; }

var transduced = source.map(increment).filter(isEven);

transduced.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e),
  () => console.log('onCompleted'));

// => Next: 2
// => Next: 4
// => Completed

This opens up a wide new set of possibilities making RxJS even faster over large collections with no intermediate Observable sequences.

Since it works so well using Arrays, there's no reason why it cannot work for Observable sequences as well. To that end, we have introduced the method which acts exactly like it does for Arrays, but for Observable sequences. Once again, let's go over the above example, this time using an Observable sequence.

transducer-js
transducers.js
transducers-js
transduce