Node.js Stream ondata触发时机与顺序的探索

2020-06-17 05:47:54易采站长站整理

function Readable(options) {
this._read = options.read; // 将参数传递的read函数赋值到this._read
}
// 使用者通过调用read方法获取数据
Readable.prototype.read = function (size) {
var state = this._readableState;
// 模拟锁,一次_read如果没有返回(this.push),后续read不会继续调用_read读取数据
if (!state.reading) {
state.reading = true;
state.sync = true; // sync用于在push方法中指示_read内部是否同步调用了push
this._read(size);
state.sync = false;
}
// _read内部如果是同步调用push,数据会放入缓冲区
// _read内部如果是异步调用push且缓冲区没有内容,数据可能emit data返回
// 尝试从缓冲区(state.buffer)中获取大小为size的数据,如果获取成功则触发data事件
if (ret)
this.emit('data', ret);
return ret;
};
// 在this._read执行过程中通过this.push输出数据
Readable.prototype.push = function (chunk, encoding) {
var state = this._readableState;
// 本次_read获取到数据,打开锁
state.reading = false;
// 流动模式 & 缓冲区没有数据 & 非同步返回,则直接触发data事件
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0); // 触发下一次读取,_read异步push的话还是会到这里,类似flow中的保持流出于流动
}
else {
// 将数据放入缓冲区
state.length += chunk.length;
state.buffer.push(chunk);
}
};
// 暂停流动
Readable.prototype.pause = function() {
if (this._readableState.flowing !== false) {
this._readableState.flowing = false;
this.emit('pause');
}
return this;
};
function flow(stream) {
const state = stream._readableState;
while (state.flowing && stream.read() !== null);
}

data事件的触发时机与顺序

时机

data的触发只有两处:

流如果处于流动模式 & 缓冲区没有数据 & 异步调用push,此时数据不经过缓冲区,直接触发data事件
不满足上述情况时,push的数据会被放入缓冲区,然后再尝试从缓冲区读取指定size的数据并触发data事件

顺序

关于data的触发顺序,实际是由emit顺序决定,为讨论原始问题:“

increasedAwaitDrain
相关逻辑为什么可以被删除?”,将代码简化:


let count = 0;
src.on('data', chunk => {
let ret = dest.write(chunk);
if (!ret) {
count++;
src.pause();
}
});

当监听流的data事件时,流最终会通过resume并调用flow函数进入流动模式模式,即不断的调用read方法读取数据。接下来分析以下几种场景,当