Source: stream.js

'use strict';

/**
 * Node's readable stream.
 *
 * <p>Only to be used in object mode.</p>
 *
 * @class Readable
 * @external
 * @see {@link http://nodejs.org/api/stream.html#stream_class_stream_readable stream.Readable on Node.js}
 */
/**
 * Reads the next available item.
 *
 * <p>To be repeatedly called until it returns <code>null</code> after the stream emits <code>readable</code>.<p>
 *
 * @example
 * stream.on('readable', function() {
 *   var item;
 *   while (null !== (item = stream.read())) {
 *     // handle item
 *   }
 * });
 *
 * @name external:Readable#read
 * @function
 * @return {Object} next available item or <code>null</code> if no item is currently available.
 */
/**
 * Readable event.
 *
 * <p>Notifies the stream can be read again.</p>
 *
 * @example
 * stream.on('readable', function() {
 *   // use stream.read() until it returns null
 * });
 *
 * @event external:Readable#readable
 * @type {Object}
 */
/**
 * Error event.
 *
 * <p>Called if there is an error while reading data.</p>
 *
 * @example
 * stream.on('error', function(err) {
 *   console.error(err);
 * });
 *
 * @event external:Readable#error
 * @type {Error}
 */
/**
 * End event.
 *
 * <p>Called after all data has been read.</p>
 *
 * @example
 * stream.on('end', function(err) {
 *   console.log('done with my stream');
 * });
 *
 * @event external:Readable#end
 */

var skeleton = require('./skeleton');

function updaterFromStreamBuilder(streamProducer) {
  function updater(item, done, fail) {
    var stream = streamProducer();
    stream.on('readable', function() {
      var i;
      while (null !== (i = stream.read())) {
        item(i);
      }
    });
    stream.on('error', fail);
    stream.on('end', done);
  }

  return updater;
}

function newSummarizer(streamProducer, serialize, digest, selector) {
  return skeleton.newSummarizer(updaterFromStreamBuilder(streamProducer), serialize, digest, selector);
}

function newResolver(streamProducer, remote, serialize, deserialize) {
  return skeleton.newResolver(updaterFromStreamBuilder(streamProducer), remote, serialize, deserialize);
}

/**
 * Stream handling.
 *
 * @module mathsync/stream
 */
module.exports = {

  /**
   * Creates a new summarizer.
   *
   * @example <caption>Taking items from a {@link https://github.com/maxogden/level.js level.js} database</caption>
   * var levelup = require('levelup');
   * var leveljs = require('level-js');
   * var db = levelup(name, { db : leveljs });
   * function serialize(item) {
   *   // ...
   * }
   * var summarizer = require('mathsync/stream').newSummarizer(function () {
   *   return db.createReadStream();
   * }, serialize);
   *
   * @name module:mathsync/stream.newSummarizer
   * @function
   * @param {function} streamProducer - the no-arg function returning a fresh {@link external:Readable stream} emitting local items each time it is called.
   * @param {Serial~Serialize} serialize - the item serializer.
   * @param {Digest~Digester} [digester] - the digester to use, defaults to SHA-1.
   * @param {BucketSelector~Selector} [selector] - how to place items in IBF buckets, uses 3 buckets by default.
   */
  newSummarizer : newSummarizer,

  /**
   * Creates a new resolver.
   *
   *
   * @example <caption>Taking items from a {@link https://github.com/maxogden/level.js level.js} database</caption>
   * var remote = ...
   * var levelup = require('levelup');
   * var leveljs = require('level-js');
   * var db = levelup(name, { db : leveljs });
   * function serialize(item) {
   *   // ...
   * }
   * function deserialize(buffer) {
   *   // ...
   * }
   * var resolver = require('mathsync/stream').newResolver(function () {
   *   return db.createReadStream();
   * }, remote, serialize, deserialize);
   *
   * @name module:mathsync/stream.newResolver
   * @function
   * @param {function} streamProducer - the no-arg function returning a fresh {@link external:Readable stream} emitting local items each time it is called.
   * @param {Summarizer} remote - summarizer producing summaires of the remote side.
   * @param {Serial~Serialize} serialize - the item serializer.
   * @param {Serial~Deserialize} deserialize - the item deserializer.
   */
  newResolver : newResolver
};