CountStream

読み込んだStreamを行ごとに分けて、先頭に行番号を出力するStreamを書いてみた。

count_stream.js
var stream = require('stream'),
    util = require('util');

function CountStream() {
  stream.Stream.call(this);

  this.readable = true;
  this.writable = true;

  this.ended = false;
  this.paused = false;
  this.encoding = '';

  this.count = 0;
  this.buffer = '';
}

util.inherits(CountStream, stream.Stream);

CountStream.prototype.destroy = function() {
  this.readable = false;
  this.writable = false;
};

CountStream.prototype.setEncoding = function(encoding) {
  this.encoding = encoding;
};
CountStream.prototype.pause = function() {
  this.paused = true;
};
CountStream.prototype.resume = function() {
  this.paused = false;
};
CountStream.prototype.write = function(data, encoding) {
  var str, pos;

  if (this.ended) {
    return false;
  }

  str = (Buffer.isBuffer(data)) ?
    data.toString(encoding || this.encoding) : data;

  this.buffer += str;

  if (this.paused) {
    return true;
  }

  pos = this.buffer.indexOf('\n');
  if (pos !== -1) {
    var lines = this.buffer.split('\n');
    for (var i = 0, len = lines.length - 1; i < len; ++i) {
      this.emit('data', ++this.count + ': ' + lines[i] + '\n');
    }
    this.buffer = lines[i];
  }

  return true;
};
CountStream.prototype.end = function(data, encoding) {
  if (data) {
    this.write(data, encoding);
  }
  this.emit('end');
  this.destroy();
  this.ended = true;
};
CountStream.prototype.destroySoon = function() {
  this.destroy();
};


module.exports = {
  create: function() {
    return new CountStream;
  }
};
index.js
var countstream = require('./count_stream'),
    c = countstream.create();

require('fs').createReadStream('./count_stream.js').pipe(c).pipe(process.stdout);

行数を付加して出力は出来たんだけど、for使ってるからなんかこう……writeでブロックしちゃう。
process.nextTick使うと良いのかな?