.run() 方法的函数(如下所述),一旦项目开始处理就会被调用。
getData 函数返回的数据将传给工作线程。在
.init() 方法中,我们创建了 worker 并将它们保存在以下状态中:
private init() {
if (this.numberOfThreads < 1) {
return null;
} for (let i = 0; i < this.numberOfThreads; i += 1) {
const worker = new Worker(this.workerPath);
this.workersById[i] = worker;
this.activeWorkersById[i] = false;
}
}
为避免无限循环,我们首先要确保线程数 > 1。然后创建有效的 worker 数,并将它们的索引保存在
workersById 状态。我们在
activeWorkersById 状态中保存了它们当前是否正在运行的信息,默认情况下该状态始终为false。现在我们必须实现前面提到的
.run() 方法来设置一个 worker 可用的任务。
public run(getData: () => T) {
return new Promise<N>((resolve, reject) => {
const availableWorkerId = this.getInactiveWorkerId(); const queueItem: QueueItem<T, N> = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
},
};
if (availableWorkerId === -1) {
this.queue.push(queueItem);
return null;
}
this.runWorker(availableWorkerId, queueItem);
});
}
在 promise 函数里,我们首先通过调用
.getInactiveWorkerId() 来检查是否存在空闲的 worker 可以来处理数据:
private getInactiveWorkerId(): number {
for (let i = 0; i < this.numberOfThreads; i += 1) {
if (!this.activeWorkersById[i]) {
return i;
}
} return -1;
}
接下来,我们创建一个
queueItem,在其中保存传递给
.run() 方法的
getData 函数以及回调。在回调中,我们要么
resolve 或者
reject promise,这取决于 worker 是否将错误传递给回调。如果
availableWorkerId 的值是 -1,意味着当前没有可用的 worker,我们将
queueItem 添加到
queue









