从前面两篇文章,我们了解到。想要把 Readable 的数据写到 Writable,就必须先手动的将数据读入内存,然后写入 Writable。换句话说,每次传递数据时,都需要写如下的模板代码
readable.on('readable', (err) => {
if(err) throw err writable.write(readable.read())
})
为了方便使用,Node.js 提供了 pipe() 方法,让我们可以优雅的传递数据
readable.pipe(writable)现在,就让我们来看看它是如何实现的吧
pipe
首先需要先调用 Readable 的 pipe() 方法
// lib/_stream_readable.jsReadable.prototype.pipe = function(dest, pipeOpts) {
var src = this;
var state = this._readableState;
// 记录 Writable
switch (state.pipesCount) {
case 0:
state.pipes = dest;
break;
case 1:
state.pipes = [state.pipes, dest];
break;
default:
state.pipes.push(dest);
break;
}
state.pipesCount += 1;
// ...
src.once('end', endFn);
dest.on('unpipe', onunpipe);
// ...
dest.on('drain', ondrain);
// ...
src.on('data', ondata);
// ...
// 保证 error 事件触发时,onerror 首先被执行
prependListener(dest, 'error', onerror);
// ...
dest.once('close', onclose);
// ...
dest.once('finish', onfinish);
// ...
// 触发 Writable 的 pipe 事件
dest.emit('pipe', src);
// 将 Readable 改为 flow 模式
if (!state.flowing) {
debug('pipe resume');
src.resume();
}
return dest;
};
执行 pipe() 函数时,首先将 Writable 记录到 state.pipes 中,然后绑定相关事件,最后如果 Readable 不是 flow 模式,就调用 resume() 将 Readable 改为 flow 模式
传递数据
Readable 从数据源获取到数据后,触发 data 事件,执行 ondata()
ondata() 相关代码:
// lib/_stream_readable.js // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
// 详情见 https://github.com/nodejs/node/issues/7278
var increasedAwaitDrain = false;
function ondata(chunk) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {
// 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
) &&
!cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);









