Node.js pipe实现源码解析

2020-06-17 06:47:29易采站长站整理

src._readableState.awaitDrain++;
increasedAwaitDrain = true;
}
// 进入 pause 模式
src.pause();
}
}

在 ondata(chunk) 函数内,通过 dest.write(chunk) 将数据写入 Writable

此时,在 _write() 内部可能会调用 src.push(chunk) 或使其 unpipe,这会导致 awaitDrain 多次增加,不能清零,Readable 卡住

当不能再向 Writable 写入数据时,Readable 会进入 pause 模式,直到所有的 drain 事件触发

触发 drain 事件,执行 ondrain()


// lib/_stream_readable.js

var ondrain = pipeOnDrain(src);

function pipeOnDrain(src) {
return function() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
// awaitDrain === 0,且有 data 监听器
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
};
}

每个 drain 事件触发时,都会减少 awaitDrain,直到 awaitDrain 为 0。此时,调用 flow(src),使 Readable 进入 flow 模式

到这里,整个数据传递循环已经建立,数据会顺着循环源源不断的流入 Writable,直到所有数据写入完成

unpipe

不管写入过程中是否出现错误,最后都会执行 unpipe()


// lib/_stream_readable.js

// ...

function unpipe() {
debug('unpipe');
src.unpipe(dest);
}

// ...

Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
var unpipeInfo = { hasUnpiped: false };

// 啥也没有
if (state.pipesCount === 0)
return this;

// 只有一个
if (state.pipesCount === 1) {
if (dest && dest !== state.pipes)
return this;
// 没有指定就 unpipe 所有
if (!dest)
dest = state.pipes;

state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
if (dest)
dest.emit('unpipe', this, unpipeInfo);
return this;
}

// 没有指定就 unpipe 所有
if (!dest) {
var dests = state.pipes;
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;

for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this, unpipeInfo);
return this;
}

// 找到指定 Writable,并 unpipe
var index = state.pipes.indexOf(dest);
if (index === -1)
return this;

state.pipes.splice(index, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1)
state.pipes = state.pipes[0];

dest.emit('unpipe', this, unpipeInfo);

return this;
};

Readable.prototype.unpipe() 函数会根据 state.pipes 属性和 dest 参数,选择执行策略。最后会触发 dest 的 unpipe 事件