index.js 976 字节
var Stream = require('stream');

function prop(propName) {
  return function (data) {
    return data[propName];
  };
}

module.exports = unique;
function unique(propName) {
  var keyfn = JSON.stringify;
  if (typeof propName === 'string') {
    keyfn = prop(propName);
  } else if (typeof propName === 'function') {
    keyfn = propName;
  }
  var seen = {};
  var s = new Stream();
  s.readable = true;
  s.writable = true;
  var pipes = 0;

  s.write = function (data) {
    var key = keyfn(data);
    if (seen[key] === undefined) {
      seen[key] = true;
      s.emit('data', data);
    }
  };

  var ended = 0;
  s.end = function (data) {
    if (arguments.length) s.write(data);
    ended++;
    if (ended === pipes || pipes === 0) {
      s.writable = false;
      s.emit('end');
    }
  };

  s.destroy = function (data) {
    s.writable = false;
  };

  s.on('pipe', function () {
    pipes++;
  });

  s.on('unpipe', function () {
    pipes--;
  });

  return s;
}