はじめてのDuplexStream...でなくてTransformStream(stream2)

Readable, Writableときて、DuplexStreamを書いてたんですが意味がわからなかったのでTransformStreamを書いてみました。
で、TransformStreamで一番の目的のものが書けそうだったのでDuplexStreamは放置することに。

var stream = require('stream'),
    util = require('util');

function TStream(options) {
  options || (options = {});
  stream.Transform.call(this, options);
  this.buffer_ = [];
}

util.inherits(TStream, stream.Transform);

TStream.prototype._transform = function(chunk, encoding, callback) {
  var str = chunk.toString(encoding),
      qqq = (this.buffer_.pop() || '') + str,
      arr = qqq.split('\n'),
      i, len;

  this.buffer_.push(arr.pop());

  for (i = 0, len = arr.length; i < len; ++i) {
    this.push(arr[i] + '\n');
  }

  callback(null);
};

TStream.prototype._flush = function(callback) {
  var i, len;

  if (this.buffer_.length > 0) {
    for (i = 0, len = this.buffer_.length; i < len; ++i) {
      this.push(this.buffer_[i]);
    }
  }
  callback(null);
};



var aaa = new TStream;

aaa.on('readable', function() {
  var buf;

  while ((buf = aaa.read(1)) !== null) {
    process.stdout.write(buf);
  }

  console.log('---');
});
aaa.on('end', function() {
  console.log('end');
});

require('fs').readFile(__filename, 'utf8', function(err, data) {
  if (err) throw err;
  var i, len,
      arr = data.split('\n');

  //aaa.write(data);
  for (i = 0, len = arr.length; i < len; ++i) {
    process.nextTick(function() {
      aaa.write(this.str + '\n');
    }.bind({ str: arr[i] }));
  }
});

//fs.createReadStream(__filename).pipe(aaa).pipe(process.stdout);

出力されたものを\nで区切って渡すストリームです。


いい感じに内部で良きに計らってくれるみたいで、書く方としては_transformで渡されたデータを加工してpushしてあげるだけなので大変ラクチンです。
_flushにはパイプなどが閉じられるとき、最後にやって欲しい事を書いてあげるだけでOKなのです。
stream1では似たようなことを自分で書いてあげないといけなかったので大変に良いですね。その上速度も上々(だと思う)とは言う事なしです。