CountStream
読み込んだStreamを行ごとに分けて、先頭に行番号を出力するStreamを書いてみた。
count_stream.js
var stream = require('stream'), util = require('util'); function CountStream() { stream.Stream.call(this); this.readable = true; this.writable = true; this.ended = false; this.paused = false; this.encoding = ''; this.count = 0; this.buffer = ''; } util.inherits(CountStream, stream.Stream); CountStream.prototype.destroy = function() { this.readable = false; this.writable = false; }; CountStream.prototype.setEncoding = function(encoding) { this.encoding = encoding; }; CountStream.prototype.pause = function() { this.paused = true; }; CountStream.prototype.resume = function() { this.paused = false; }; CountStream.prototype.write = function(data, encoding) { var str, pos; if (this.ended) { return false; } str = (Buffer.isBuffer(data)) ? data.toString(encoding || this.encoding) : data; this.buffer += str; if (this.paused) { return true; } pos = this.buffer.indexOf('\n'); if (pos !== -1) { var lines = this.buffer.split('\n'); for (var i = 0, len = lines.length - 1; i < len; ++i) { this.emit('data', ++this.count + ': ' + lines[i] + '\n'); } this.buffer = lines[i]; } return true; }; CountStream.prototype.end = function(data, encoding) { if (data) { this.write(data, encoding); } this.emit('end'); this.destroy(); this.ended = true; }; CountStream.prototype.destroySoon = function() { this.destroy(); }; module.exports = { create: function() { return new CountStream; } };
index.js
var countstream = require('./count_stream'), c = countstream.create(); require('fs').createReadStream('./count_stream.js').pipe(c).pipe(process.stdout);
行数を付加して出力は出来たんだけど、for使ってるからなんかこう……writeでブロックしちゃう。
process.nextTick使うと良いのかな?