はじめてのWritable Stream + Readable/Writable(Both?) Stream

昨日に引き続き、Streamを書いてました。ReadableもしくはWritable単体だと理解してるつもりなんだけど、両方を扱うStreamを書いてると、途端に???って感じになる……
そしてWritable Streamのソースコードをなくしたw

Readable/Writable Stream

line_stream.js

LineStreamとか言う名前だけどなにもしないw

var stream = require('stream'),
    util = require('util');

function LineStream() {
  stream.Stream.call(this);

  this.readable = true;
  this.writable = true;

  this.ended = false;
  this.paused = false;

  this.encoding = '';
  this.buffer = '';
}

util.inherits(LineStream, stream.Stream);

/** stream */

LineStream.prototype.destroy = function() {
  this.readable = false;
  this.writable = false;
};

/** readable stream */

LineStream.prototype.setEncoding = function(encoding) {
  this.encoding = encoding;
};

LineStream.prototype.pause = function() {
  this.paused = true;
};

LineStream.prototype.resume = function() {
  this.paused = false;
};

/** writable stream */

LineStream.prototype.write = function(data, encoding) {
  if (this.ended || this.paused) {
    return;
  }

  this.emit('data',
      (Buffer.isBuffer(data)) ? data.toString(encoding) : data);

  return true;
};

LineStream.prototype.end = function(data, encoding) {
  if (data) {
    this.write(data, encoding);
  }
  this.emit('end');
  this.destroy();
  this.ended = true;
};

LineStream.prototype.destroySoon = function() {
  this.destroy();
};

module.exports = {
  create: function() {
    return new LineStream;
  }
};


LineStreamを扱うスクリプトを二種類書いてみたけど、とりあえず両方とも想定通りに動くみたい。

index.js - 1
var linestream = require('./line_stream'),
    l = linestream.create();

require('fs')
  .createReadStream('./line_stream.js', {
    bufferSize: 16
  })
  .pipe(l)
  .pipe(process.stdout);
index.js - 2
var linestream = require('./line_stream'),
    l = linestream.create();

l.on('data', function(data) {
  console.log(data);
});
l.on('end', function() {
  console.log('end');
});

require('fs')
  .createReadStream('./line_stream.js', {
    bufferSize: 16
  })
  .pipe(l);


むむむむ。むずかしい……

CountStream

読み込んだStreamを行ごとに分けて、先頭に行番号を出力するStreamを書いてみた。

count_stream.js
var stream = require('stream'),
    util = require('util');

function CountStream() {
  stream.Stream.call(this);

  this.readable = true;
  this.writable = true;

  this.ended = false;
  this.paused = false;
  this.encoding = '';

  this.count = 0;
  this.buffer = '';
}

util.inherits(CountStream, stream.Stream);

CountStream.prototype.destroy = function() {
  this.readable = false;
  this.writable = false;
};

CountStream.prototype.setEncoding = function(encoding) {
  this.encoding = encoding;
};
CountStream.prototype.pause = function() {
  this.paused = true;
};
CountStream.prototype.resume = function() {
  this.paused = false;
};
CountStream.prototype.write = function(data, encoding) {
  var str, pos;

  if (this.ended) {
    return false;
  }

  str = (Buffer.isBuffer(data)) ?
    data.toString(encoding || this.encoding) : data;

  this.buffer += str;

  if (this.paused) {
    return true;
  }

  pos = this.buffer.indexOf('\n');
  if (pos !== -1) {
    var lines = this.buffer.split('\n');
    for (var i = 0, len = lines.length - 1; i < len; ++i) {
      this.emit('data', ++this.count + ': ' + lines[i] + '\n');
    }
    this.buffer = lines[i];
  }

  return true;
};
CountStream.prototype.end = function(data, encoding) {
  if (data) {
    this.write(data, encoding);
  }
  this.emit('end');
  this.destroy();
  this.ended = true;
};
CountStream.prototype.destroySoon = function() {
  this.destroy();
};


module.exports = {
  create: function() {
    return new CountStream;
  }
};
index.js
var countstream = require('./count_stream'),
    c = countstream.create();

require('fs').createReadStream('./count_stream.js').pipe(c).pipe(process.stdout);

行数を付加して出力は出来たんだけど、for使ってるからなんかこう……writeでブロックしちゃう。
process.nextTick使うと良いのかな?

起動中のプロセスのデバッガを有効にしてデバッグする

node.jsで普通に起動したプロセスを後からデバッガを有効にしてデバッグする方法があるのは知ってたのですが、実際にやったことが無かったので試してみました。

普通に起動させる

とりあえずHTTPサーバを書きます。

index.js
require('http').createServer(function(req, res) {
  res.writeHead(200);
  res.end('Hello');
}).listen(3000);

普通に起動させます。

$ node index.js

デバッガを起動する

$ ps ax | grep node
$ kill -SIGUSR1 (pid)

上で起動させたnodeのpidを調べて、SIGUSR1を投げます。

node-inspectorからデバッグしてみる

$ npm install -g node-inspector
$ node-inspector

あとはGoogle Chromeからhttp://0.0.0.0:8080/debug?port=5858を開くと現在動作しているnodeのコードが表示できるので、ブレークポイントとかを置いてアクセスすると止まります。