Source: observable.js

'use strict';

/**
 * RxJS Observable.
 *
 * @class Observable
 * @external
 * @see {@link https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md Rx.Observable}
 */
/**
 * Subscribes to the observable.
 *
 * @example
 * observable.subscribe(
 *     function (x) { console.log('New item', x); },
 *     function (err) { console.error(err); },
 *     function () { console.log('Completed'); }
 * );
 *
 * @name external:Observable#subscribe
 * @function
 * @param {Function} onNext - the callback called for each element in the sequence.
 * @param {Function} onError - the callback called upon exceptional termination of the sequence.
 * @param {Function} onNext - the callback called upon graceful termination of the sequence.
 * @see {@link https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/subscribe.md Rx.Observable.subscribe}
 */

var skeleton = require('./skeleton');

function updaterFromObservable(observable) {
  function updater(item, done, fail) {
    observable.subscribe(item, fail, done);
  }
  return updater;
}

function newSummarizer(observable, serialize, digest, selector) {
  return skeleton.newSummarizer(updaterFromObservable(observable), serialize, digest, selector);
}

function newResolver(observable, remote, serialize, deserialize) {
  return skeleton.newResolver(updaterFromObservable(observable), remote, serialize, deserialize);
}

/**
 * Observable handling.
 *
 * @module mathsync/observable
 */
module.exports = {

  /**
   * Creates a new summarizer.
   *
   * <p>Each summary creates a new subscription to the observable and drains it until it reports being completed.</p>
   *
   * @example <caption>Taking items from local storage</caption>
   * var local = Rx.Observable.create(function (observer) {
   *   for (var i = 0; i < localStorage.length; i++) {
   *     observer.onNext(JSON.parse(localStorage.getItem(localStorage.key(i))));
   *   }
   *   observer.onCompleted();
   * }
   * function serialize(item) {
   *   // ...
   * }
   * var summarizer = require('mathsync/observable').newSummarizer(local, serialize);
   *
   * @name module:mathsync/observable.newSummarizer
   * @function
   * @param {external:Observable} observable - the observable to read local items from.
   * @param {Serial~Serialize} serialize - the item serializer.
   * @param {Digest~Digester} [digester] - the digester to use, defaults to SHA-1.
   * @param {BucketSelector~Selector} [selector] - how to place items in IBF buckets, uses 3 buckets by default.
   */
  newSummarizer : newSummarizer,

  /**
   * Creates a new resolver.
   *
   * <p>Each summary creates a new subscription to the observable and drains it until it reports being completed.</p>
   *
   * @example <caption>Taking items from local storage</caption>
   * var local = Rx.Observable.create(function (observer) {
   *   for (var i = 0; i < localStorage.length; i++) {
   *     observer.onNext(JSON.parse(localStorage.getItem(localStorage.key(i))));
   *   }
   *   observer.onCompleted();
   * }
   * function serialize(item) {
   *   // ...
   * }
   * function deserialize(buffer) {
   *   // ...
   * }
   * var resolver = require('mathsync/observable').newResolver(local, remote, serialize, deserialize);
   *
   * @name module:mathsync/observable.newResolver
   * @function
   * @param {external:Observable} observable - the observable to read local items from.
   * @param {Summarizer} remote - summarizer producing summaires of the remote side.
   * @param {Serial~Serialize} serialize - the item serializer.
   * @param {Serial~Deserialize} deserialize - the item deserializer.
   */
  newResolver : newResolver
};