RxJs Book
  • Introduction
  • Summary
    • Why RxJS?
    • RxJS Design Guidelines
      • Introduction
      • When To Use RxJS
      • The RxJS Contract
      • Using RxJS
      • Operator Implementations
    • Getting Started With RxJS
      • What Are The Reactive Extensions?
      • Exploring Major Concepts In RxJS
      • Creating And Querying Observable Sequences
        • Creating And Subscribing To Simple Observable Sequences
        • Bridging To Events
        • Bridging To Callbacks
        • Bridging To Promises
        • Generators And Observable Sequences
        • Querying Observable Sequences
        • Error Handling With Observable Sequences
        • Transducers With Observable Sequences
        • Backpressure With Observable Sequences
        • Operators By Category
      • Subjects
      • Scheduling And Concurrency
      • Testing And Debugging
      • Implementing Your Own Operators
    • How Do I...?
      • How Do I Wrap An Existing API?
      • How Do I Integrate jQuery With RxJS?
      • How Do I Integrate Angular.js With RxJS?
      • How Do I Create A Simple Event Emitter?
    • Mapping RxJS From Different Libraries
      • For Bacon.js Users
      • For Async.js Users
    • Config
      • Promise
      • useNativeEvents
    • Helpers
      • defaultComparer
      • defaultSubComparer
      • defaultError
      • identity
      • just
      • isPromise
      • noop
      • pluck
    • Observable
      • Observable Methods
        • amb
        • case
        • catch
        • combineLatest
        • concat
        • create
        • defer
        • empty
        • for | forIn
        • forkJoin
        • from
        • fromCallback
        • fromEvent
        • fromEventPattern
        • fromNodeCallback
        • fromPromise
        • generate
        • generateWithAbsoluteTime
        • generateWithRelativeTime
        • if
        • interval
        • isObservable
        • just | return
        • merge
        • mergeDelayError
        • never
        • of
        • ofWithScheduler
        • onErrorResumeNext
        • pairs
        • range
        • repeat
        • spawn
        • start
        • startAsync
        • throw
        • timer
        • toAsync
        • using
        • when
        • while | whileDo
        • wrap
        • zip
      • Observable Instance Methods
        • amb
        • and
        • asObservable
        • average
        • buffer
        • bufferWithCount
        • bufferWithTime
        • bufferWithTimeOrCount
        • catch
        • combineLatest
        • concat
        • concatAll
        • concatMapObserver | selectConcatObserver
        • controlled
        • count
        • debounce
        • defaultIfEmpty
        • delay
        • delaySubscription
        • dematerialize
        • distinct
        • distinctUntilChanged
        • do | doAction | tap
        • doOnCompleted | tapOnCompleted
        • doOnError | tapOnError
        • doOnNext | tapOnNext
        • doWhile
        • elementAt
        • every
        • expand
        • extend | manySelect
        • filter | where
        • finally
        • find
        • findIndex
        • first
        • flatMap | selectMany
        • flatMapConcat | concatMap
        • flatMapFirst | selectSwitchFirst
        • flatMapLatest
        • flatMapObserver | selectManyObserver
        • flatMapWithMaxConcurrent
        • forkJoin
        • groupBy
        • groupByUntil
        • groupJoin
        • ignoreElements
        • includes
        • indexOf
        • isEmpty
        • join
        • jortSort
        • jortSortUntil
        • last
        • lastIndexOf
        • let | letBind
        • map | select
        • materialize
        • max
        • maxBy
        • merge
        • mergeAll
        • min
        • minBy
        • multicast
        • observeOn
        • onErrorResumeNext
        • pairwise
        • partition
        • pausable
        • pausableBuffered
        • pipe
        • pluck
        • publish
        • publishLast
        • publishValue
        • reduce
        • repeat
        • replay
        • retry
        • retryWhen
        • scan
        • sequenceEqual
        • share
        • shareReplay
        • shareValue
        • single
        • singleInstance
        • skip
        • skipLast
        • skipLastWithTime
        • skipUntil
        • skipUntilWithTime
        • skipWhile
        • skipWithTime
        • slice
        • some
        • startWith
        • subscribe | forEach
        • subscribeOn
        • subscribeOnCompleted
        • subscribeOnError
        • subscribeOnNext
        • sum
        • switch
        • switchFirst
        • take
        • takeLast
        • takeLastBuffer
        • takeLastBufferWithTime
        • takeLastWithTime
        • takeUntil
        • takeUntilWithTime
        • takeWhile
        • takeWithTime
        • thenDo
        • throttle
        • throttleLatest | sample
        • timeInterval
        • timeout
        • timestamp
        • toArray
        • toMap
        • toPromise
        • toSet
        • transduce
        • window
        • windowWithCount
        • windowWithTime
        • windowWithTimeOrCount
        • withLatestFrom
        • zip
        • zipIterable
    • Observer
      • Observer Methods
        • create
        • fromNotifier
      • Observer Instance Methods
        • asObserver
        • checked
        • notifyOn
        • onCompleted
        • onError
        • onNext
        • toNotifier
    • Notification
      • Notification Methods
        • createOnNext
        • createOnError
        • createOnCompleted
      • Notification Instance Methods
        • accept
        • toObservable
      • Notification Properties
        • exception
        • hasValue
        • kind
        • value
    • Subjects
      • Rx.AsyncSubject
      • Rx.BehaviorSubject
      • Rx.ReplaySubject
      • Rx.Subject
    • Schedulers
      • Rx.HistoricalScheduler
      • Rx.Scheduler
      • Rx.VirtualTimeScheduler
    • Disposables
      • Rx.CompositeDisposable
      • Rx.Disposable
      • Rx.RefCountDisposable
      • Rx.SerialDisposable
      • Rx.SingleAssignmentDisposable
    • Testing
      • Rx.ReactiveTest
      • Rx.Recorded
      • Rx.Subscription
      • Rx.TestScheduler
    • Bindings
      • DOM
        • Ajax
          • ajax
          • ajaxCold
          • get
          • get_Json
          • post
        • JSONP
          • jsonpRequest
          • jsonpRequestCold
        • Web Sockets
          • fromWebSocket
        • Web Workers
          • fromWebWorker
        • Mutation Observers
          • fromMutationObserver
        • Geolocation
          • getCurrentPosition
          • watchPosition
        • Schedulers
          • requestAnimationFrame
          • mutationObserver
      • jQuery
      • AngularJS
        • Factories
          • rx
          • observeOnScope
        • Observable Methods
          • safeApply
        • $rootScope Methods
          • $toObservable
          • $eventToObservable
          • $createObservableFunction
      • Facebook React
      • Ractive.js
      • Node.js
        • Callback Handlers
          • fromCallback
          • fromNodeCallback
        • Event Handlers
          • fromEvent
          • toEventEmitter
        • Stream Handlers
          • fromStream
          • fromReadableStream
          • fromWritableStream
          • fromTransformStream
          • writeToStream
  • Recipes
  • Which Operator do I use?
    • Creation Operators
    • Instance Operators
Powered by GitBook
On this page

Was this helpful?

  1. Summary
  2. How Do I...?

How Do I Wrap An Existing API?

PreviousHow Do I...?NextHow Do I Integrate jQuery With RxJS?

Last updated 5 years ago

Was this helpful?

One question that often comes up is how can I wrap an existing API into an Observable sequence? The answer is fairly simple and not a lot of lines of code to make that happen.

To make this a bit more concrete, let's take a familiar HTML5 API like , in particular, the navigator.geolocation.watchPosition method.

The typical use of this method might be the following where we would hook up an event handler to listen for success and errors on watching the geolocation by using the navigator.geolocation.watchPosition method. When one wishes to terminate listening for geolocation updates, you simply call the navigator.geolocation.clearWatch method passing in the watch ID returned from the watchPosition method.

function watchPositionChanged(e) {
    // Do something with the coordinates
}

function watchPositionError(e) {
    // Handle position error
}

var watchId = navigator.geolocation.watchPosition(
    watchPositionChanged,
    watchPositionError);

var stopWatching = document.querySelector('#stopWatching');
stopWatching.addEventListener('click', stopWatchingClicked, false);

// Clear watching upon click
function stopWatchingClicked(e) {
    navigator.geolocation.clearWatch(watchId)
}
function watchPosition(geolocationOptions) {
    return Rx.Observable.create(observer => {
        var watchId = window.navigator.geolocation.watchPosition(
            function successHandler (loc) {
                observer.onNext(loc);
            }, 
            function errorHandler (err) {
                observer.onError(err);
            }, 
            geolocationOptions);

        return () => {
            window.navigator.geolocation.clearWatch(watchId);
        };
    });
 }

Our final result should look like the following:

function watchPosition(geolocationOptions) {
    return Rx.Observable.create(observer => {
        var watchId = window.navigator.geolocation.watchPosition(
            function successHandler (loc) {
                observer.onNext(loc);
            }, 
            function errorHandler (err) {
                observer.onError(err);
            }, 
            geolocationOptions);

        return () => {
            window.navigator.geolocation.clearWatch(watchId);
        };
    }).publish().refCount();
 }

And now we can consume the geolocation such as:

var source = watchPosition();

var subscription = source.subscribe(
    position => console.log(`Next: ${position.coords.latitude}, ${position.coords.longitude}`),
    err => {
        var message = '';
        switch (err.code) {
            case err.PERMISSION_DENIED:
                message = 'Permission denied';
                break;
            case err.POSITION_UNAVAILABLE:
                message = 'Position unavailable';
                break;
            case err.PERMISSION_DENIED_TIMEOUT:
                message = 'Position timeout';
                break;
        }
        console.log('Error: ' + message);
    },
    () => console.log('Completed')
);

In order to wrap this, we'll need to use the method. From this, we can yield values to the observer or handle the errors. Let's see how the code might look, creating a watchPosition method which takes geolocation options.

We need to also be aware of ensuring we're not adding too many watchPosition calls as we compose it together with other observable sequences. To do that, we'll need to utilize the and methods from rx.binding.js.

Geolocation API
Rx.Observable.create
publish
refCount