什么是 stream
Stream 借鉴自 Unix 编程哲学中的 pipe。
Unix shell 命令中觉的管道流式操作 | 将上一个命令的输出作为下一个命令的输入。node stream 中则是通过 .pip() 方法来进行的。
一个 stream 的运用场景。从服务器读取文件并返回给页面。
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);
好处:
stream 的种类
分五种:
- readable
- writable
- duplex
- transform
- classic
readable
readable 类型的流产生数据,可通过 .pip() 输送到能够消费流数据的地方,比如 writable,transform,duplex
一个 readable stream 示例:
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
运行结果:
$ node read0.js
beep boop
_read 方法与按需输出
上面 rs.push(null) 表示没有更多数据了。
上面从代码直接将数据塞入到 readable 流中,然后被缓冲起来,直到被消费。因为消费者有可能并不能立即消费这些内容,直接 push 数据后消耗不必要的资源。
更好的做法是,让 readable 流只在消费者需要数据的时候再 push 。这是通过定义能 raedable 对象定义 ._read 方法来完成的。
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);
运行结果:
$ node read1.js
abcdefghijklmnopqrstuvwxyz
这种方式下,定义了 readable 流产生数据的方法 ._read ,但并没有马上执行并输出数据,而是在 process.stdout 读取时,才调用输出的。
_read 方法可动态接收一个可选的 size 参数,由消费方指定一次读取想要多少字节的数据,当然,_read 方法的实现中是可以忽略这个入参的。
下面的示例可证明 _read 方法是消费方调用的时候才执行的,而不是主动执行。
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);
输出任意数据
上面展示的是输出简单字符串,如果需要输出其他复杂数据,初始化时设置上正确的 objectMode 参数,Readable({ objectMode: true })
消费 readable 流产生的数据
这一段没看太懂
writable 流
writable 流可作为 .pip() 的对象。
src.pipe(writableStream)
创建 writable 流
需要实现 ._write(chunk, enc, next) 方法,其中:
chunk 为接收到的数据
enc 当 opts.decodeString 为 false 且收到的数据这字符串时,它表示字符串的编码
next(err) 数据处理后的回调,可传递一个错误信息以表示数据处理失败
默认情况下,获取到的字符串数据会转为 Buffer ,可设置 Writable({ decodeStrings: false }) 来获取字符串数据。
一个 writable 示例:
var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
向 writable 流写入数据
通过调用 writable 流的 write 方法来写入。
process.stdout.write('beep boop\n');
通过调用 end() 来结束数据的写入。
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
duplex
双工类型的流,同时具有 writable 和 readable 流的功能。node 内建的 zlib ,TCP sockets 以及 crypto 都是双工类型的。
所以可对双工类型的流进行如下操作:
a.pip(b).pip(a)
transform
一种特殊类型的双工流,区别在于 transform 类型其输出是输入的转换。跟它的名字一样,这里面对数据进行一些转换后输出。比如,通过 zlib.createGzip 来对数据进行 gzip 的压缩。有时候也将这种类型的流称为 through steam 。
classic stream
这里指使用旧版 api 的流。当一个流身上绑定了 data 事件的监听时,便会回退为经典旧版的流。
classic readable stream
当有数据时它会派发 data 事件,数据输出结束时派发 end 事件给消费者。
.pipe() 通过检查 stream.readable 以判断该流是否是 readable 类型。
classic readable 流的创建
一个 classic readable 流的创建示例:
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
从 classic readable 流读取数据
数据读取是通过监听流上的 data 与 end 事件。
一个从 classic readable 流读取数据的示例:
process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
一般不建议通过这种方式来操作,一旦给流绑定 data 事件处理器,即回退到旧的 api 来使用流。如果真的有兼容操作旧版流的需求,应该通过 through 或 concat-stream 来进行。
classic writable stream
只需要实现 .write(buf) , .end(buf) 及 .destroy() 方法即可,比较简单。
内建的流对象
总结
本质上,所有流都是 EventEmitter ,通过事件可写入和读取数据。但通过新的 stream api,可方便地通过 .pipe() 方法来使用流而不是事件的方式。
参考
|