Node.js 多线程完全指南总结

2020-06-17 06:09:52易采站长站整理
是传递给工作池
.run()
方法的函数(如下所述),一旦项目开始处理就会被调用。
getData
函数返回的数据将传给工作线程。

.init()
方法中,我们创建了 worker 并将它们保存在以下状态中:


private init() {
if (this.numberOfThreads < 1) {
return null;
}

for (let i = 0; i < this.numberOfThreads; i += 1) {
const worker = new Worker(this.workerPath);

this.workersById[i] = worker;
this.activeWorkersById[i] = false;
}
}

为避免无限循环,我们首先要确保线程数 > 1。然后创建有效的 worker 数,并将它们的索引保存在

workersById
状态。我们在
activeWorkersById
状态中保存了它们当前是否正在运行的信息,默认情况下该状态始终为false。

现在我们必须实现前面提到的

.run()
方法来设置一个 worker 可用的任务。


public run(getData: () => T) {
return new Promise<N>((resolve, reject) => {
const availableWorkerId = this.getInactiveWorkerId();

const queueItem: QueueItem<T, N> = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
},
};

if (availableWorkerId === -1) {
this.queue.push(queueItem);

return null;
}

this.runWorker(availableWorkerId, queueItem);
});
}

在 promise 函数里,我们首先通过调用

.getInactiveWorkerId()
来检查是否存在空闲的 worker 可以来处理数据:


private getInactiveWorkerId(): number {
for (let i = 0; i < this.numberOfThreads; i += 1) {
if (!this.activeWorkersById[i]) {
return i;
}
}

return -1;
}

接下来,我们创建一个

queueItem
,在其中保存传递给
.run()
方法的
getData
函数以及回调。在回调中,我们要么
resolve
或者
reject
promise,这取决于 worker 是否将错误传递给回调。

如果

availableWorkerId
的值是 -1,意味着当前没有可用的 worker,我们将
queueItem
添加到
queue