source: trip-planner-front/node_modules/piscina/src/index.ts@ 6a3a178

Last change on this file since 6a3a178 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 34.0 KB
Line 
1import { Worker, MessageChannel, MessagePort, receiveMessageOnPort } from 'worker_threads';
2import { once } from 'events';
3import EventEmitterAsyncResource from 'eventemitter-asyncresource';
4import { AsyncResource } from 'async_hooks';
5import { cpus } from 'os';
6import { fileURLToPath, URL } from 'url';
7import { resolve } from 'path';
8import { inspect, types } from 'util';
9import assert from 'assert';
10import { Histogram, build } from 'hdr-histogram-js';
11import { performance } from 'perf_hooks';
12import hdrobj from 'hdr-histogram-percentiles-obj';
13import {
14 ReadyMessage,
15 RequestMessage,
16 ResponseMessage,
17 StartupMessage,
18 commonState,
19 kResponseCountField,
20 kRequestCountField,
21 kFieldCount,
22 Transferable,
23 Task,
24 TaskQueue,
25 kQueueOptions,
26 isTaskQueue,
27 isTransferable,
28 markMovable,
29 isMovable,
30 kTransferable,
31 kValue
32} from './common';
33import { version } from '../package.json';
34
35const cpuCount : number = (() => {
36 try {
37 return cpus().length;
38 } catch {
39 /* istanbul ignore next */
40 return 1;
41 }
42})();
43
44interface AbortSignalEventTargetAddOptions {
45 once : boolean;
46};
47
48interface AbortSignalEventTarget {
49 addEventListener : (
50 name : 'abort',
51 listener : () => void,
52 options? : AbortSignalEventTargetAddOptions) => void;
53 removeEventListener : (
54 name : 'abort',
55 listener : () => void) => void;
56 aborted? : boolean;
57}
58interface AbortSignalEventEmitter {
59 off : (name : 'abort', listener : () => void) => void;
60 once : (name : 'abort', listener : () => void) => void;
61}
62type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;
63function onabort (abortSignal : AbortSignalAny, listener : () => void) {
64 if ('addEventListener' in abortSignal) {
65 abortSignal.addEventListener('abort', listener, { once: true });
66 } else {
67 abortSignal.once('abort', listener);
68 }
69}
70class AbortError extends Error {
71 constructor () {
72 super('The task has been aborted');
73 }
74
75 get name () { return 'AbortError'; }
76}
77
78type ResourceLimits = Worker extends {
79 resourceLimits? : infer T;
80} ? T : {};
81type EnvSpecifier = typeof Worker extends {
82 new (filename : never, options?: { env: infer T }) : Worker;
83} ? T : never;
84
85class ArrayTaskQueue implements TaskQueue {
86 tasks : Task[] = [];
87
88 get size () { return this.tasks.length; }
89
90 shift () : Task | null {
91 return this.tasks.shift() as Task;
92 }
93
94 push (task : Task) : void {
95 this.tasks.push(task);
96 }
97
98 remove (task : Task) : void {
99 const index = this.tasks.indexOf(task);
100 assert.notStrictEqual(index, -1);
101 this.tasks.splice(index, 1);
102 }
103}
104
105interface Options {
106 filename? : string | null,
107 name?: string,
108 minThreads? : number,
109 maxThreads? : number,
110 idleTimeout? : number,
111 maxQueue? : number | 'auto',
112 concurrentTasksPerWorker? : number,
113 useAtomics? : boolean,
114 resourceLimits? : ResourceLimits,
115 argv? : string[],
116 execArgv? : string[],
117 env? : EnvSpecifier,
118 workerData? : any,
119 taskQueue? : TaskQueue,
120 niceIncrement? : number,
121 trackUnmanagedFds? : boolean,
122}
123
124interface FilledOptions extends Options {
125 filename : string | null,
126 name: string,
127 minThreads : number,
128 maxThreads : number,
129 idleTimeout : number,
130 maxQueue : number,
131 concurrentTasksPerWorker : number,
132 useAtomics: boolean,
133 taskQueue : TaskQueue,
134 niceIncrement : number
135}
136
137const kDefaultOptions : FilledOptions = {
138 filename: null,
139 name: 'default',
140 minThreads: Math.max(cpuCount / 2, 1),
141 maxThreads: cpuCount * 1.5,
142 idleTimeout: 0,
143 maxQueue: Infinity,
144 concurrentTasksPerWorker: 1,
145 useAtomics: true,
146 taskQueue: new ArrayTaskQueue(),
147 niceIncrement: 0,
148 trackUnmanagedFds: true
149};
150
151interface RunOptions {
152 transferList? : TransferList,
153 filename? : string | null,
154 signal? : AbortSignalAny | null,
155 name? : string | null
156}
157
158interface FilledRunOptions extends RunOptions {
159 transferList : TransferList | never,
160 filename : string | null,
161 signal : AbortSignalAny | null,
162 name : string | null
163}
164
165const kDefaultRunOptions : FilledRunOptions = {
166 transferList: undefined,
167 filename: null,
168 signal: null,
169 name: null
170};
171
172class DirectlyTransferable implements Transferable {
173 #value : object;
174 constructor (value : object) {
175 this.#value = value;
176 }
177
178 get [kTransferable] () : object { return this.#value; }
179
180 get [kValue] () : object { return this.#value; }
181}
182
183class ArrayBufferViewTransferable implements Transferable {
184 #view : ArrayBufferView;
185 constructor (view : ArrayBufferView) {
186 this.#view = view;
187 }
188
189 get [kTransferable] () : object { return this.#view.buffer; }
190
191 get [kValue] () : object { return this.#view; }
192}
193
194let taskIdCounter = 0;
195
196type TaskCallback = (err : Error, result: any) => void;
197// Grab the type of `transferList` off `MessagePort`. At the time of writing,
198// only ArrayBuffer and MessagePort are valid, but let's avoid having to update
199// our types here every time Node.js adds support for more objects.
200type TransferList = MessagePort extends { postMessage(value : any, transferList : infer T) : any; } ? T : never;
201type TransferListItem = TransferList extends (infer T)[] ? T : never;
202
203function maybeFileURLToPath (filename : string) : string {
204 return filename.startsWith('file:')
205 ? fileURLToPath(new URL(filename))
206 : filename;
207}
208
209// Extend AsyncResource so that async relations between posting a task and
210// receiving its result are visible to diagnostic tools.
211class TaskInfo extends AsyncResource implements Task {
212 callback : TaskCallback;
213 task : any;
214 transferList : TransferList;
215 filename : string;
216 name : string;
217 taskId : number;
218 abortSignal : AbortSignalAny | null;
219 abortListener : (() => void) | null = null;
220 workerInfo : WorkerInfo | null = null;
221 created : number;
222 started : number;
223
224 constructor (
225 task : any,
226 transferList : TransferList,
227 filename : string,
228 name : string,
229 callback : TaskCallback,
230 abortSignal : AbortSignalAny | null,
231 triggerAsyncId : number) {
232 super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId });
233 this.callback = callback;
234 this.task = task;
235 this.transferList = transferList;
236
237 // If the task is a Transferable returned by
238 // Piscina.move(), then add it to the transferList
239 // automatically
240 if (isMovable(task)) {
241 // This condition should never be hit but typescript
242 // complains if we dont do the check.
243 /* istanbul ignore if */
244 if (this.transferList == null) {
245 this.transferList = [];
246 }
247 this.transferList =
248 this.transferList.concat(task[kTransferable]);
249 this.task = task[kValue];
250 }
251
252 this.filename = filename;
253 this.name = name;
254 this.taskId = taskIdCounter++;
255 this.abortSignal = abortSignal;
256 this.created = performance.now();
257 this.started = 0;
258 }
259
260 releaseTask () : any {
261 const ret = this.task;
262 this.task = null;
263 return ret;
264 }
265
266 done (err : Error | null, result? : any) : void {
267 this.runInAsyncScope(this.callback, null, err, result);
268 this.emitDestroy(); // `TaskInfo`s are used only once.
269 // If an abort signal was used, remove the listener from it when
270 // done to make sure we do not accidentally leak.
271 if (this.abortSignal && this.abortListener) {
272 if ('removeEventListener' in this.abortSignal && this.abortListener) {
273 this.abortSignal.removeEventListener('abort', this.abortListener);
274 } else {
275 (this.abortSignal as AbortSignalEventEmitter).off(
276 'abort', this.abortListener);
277 }
278 }
279 }
280
281 get [kQueueOptions] () : object | null {
282 return kQueueOptions in this.task ? this.task[kQueueOptions] : null;
283 }
284}
285
286abstract class AsynchronouslyCreatedResource {
287 onreadyListeners : (() => void)[] | null = [];
288
289 markAsReady () : void {
290 const listeners = this.onreadyListeners;
291 assert(listeners !== null);
292 this.onreadyListeners = null;
293 for (const listener of listeners) {
294 listener();
295 }
296 }
297
298 isReady () : boolean {
299 return this.onreadyListeners === null;
300 }
301
302 onReady (fn : () => void) {
303 if (this.onreadyListeners === null) {
304 fn(); // Zalgo is okay here.
305 return;
306 }
307 this.onreadyListeners.push(fn);
308 }
309
310 abstract currentUsage() : number;
311}
312
313class AsynchronouslyCreatedResourcePool<
314 T extends AsynchronouslyCreatedResource> {
315 pendingItems = new Set<T>();
316 readyItems = new Set<T>();
317 maximumUsage : number;
318 onAvailableListeners : ((item : T) => void)[];
319
320 constructor (maximumUsage : number) {
321 this.maximumUsage = maximumUsage;
322 this.onAvailableListeners = [];
323 }
324
325 add (item : T) {
326 this.pendingItems.add(item);
327 item.onReady(() => {
328 /* istanbul ignore else */
329 if (this.pendingItems.has(item)) {
330 this.pendingItems.delete(item);
331 this.readyItems.add(item);
332 this.maybeAvailable(item);
333 }
334 });
335 }
336
337 delete (item : T) {
338 this.pendingItems.delete(item);
339 this.readyItems.delete(item);
340 }
341
342 findAvailable () : T | null {
343 let minUsage = this.maximumUsage;
344 let candidate = null;
345 for (const item of this.readyItems) {
346 const usage = item.currentUsage();
347 if (usage === 0) return item;
348 if (usage < minUsage) {
349 candidate = item;
350 minUsage = usage;
351 }
352 }
353 return candidate;
354 }
355
356 * [Symbol.iterator] () {
357 yield * this.pendingItems;
358 yield * this.readyItems;
359 }
360
361 get size () {
362 return this.pendingItems.size + this.readyItems.size;
363 }
364
365 maybeAvailable (item : T) {
366 /* istanbul ignore else */
367 if (item.currentUsage() < this.maximumUsage) {
368 for (const listener of this.onAvailableListeners) {
369 listener(item);
370 }
371 }
372 }
373
374 onAvailable (fn : (item : T) => void) {
375 this.onAvailableListeners.push(fn);
376 }
377}
378
379type ResponseCallback = (response : ResponseMessage) => void;
380
381const Errors = {
382 ThreadTermination:
383 () => new Error('Terminating worker thread'),
384 FilenameNotProvided:
385 () => new Error('filename must be provided to run() or in options object'),
386 TaskQueueAtLimit:
387 () => new Error('Task queue is at limit'),
388 NoTaskQueueAvailable:
389 () => new Error('No task queue available and all Workers are busy')
390};
391
392class WorkerInfo extends AsynchronouslyCreatedResource {
393 worker : Worker;
394 taskInfos : Map<number, TaskInfo>;
395 idleTimeout : NodeJS.Timeout | null = null; // eslint-disable-line no-undef
396 port : MessagePort;
397 sharedBuffer : Int32Array;
398 lastSeenResponseCount : number = 0;
399 onMessage : ResponseCallback;
400
401 constructor (
402 worker : Worker,
403 port : MessagePort,
404 onMessage : ResponseCallback) {
405 super();
406 this.worker = worker;
407 this.port = port;
408 this.port.on('message',
409 (message : ResponseMessage) => this._handleResponse(message));
410 this.onMessage = onMessage;
411 this.taskInfos = new Map();
412 this.sharedBuffer = new Int32Array(
413 new SharedArrayBuffer(kFieldCount * Int32Array.BYTES_PER_ELEMENT));
414 }
415
416 destroy () : void {
417 this.worker.terminate();
418 this.port.close();
419 this.clearIdleTimeout();
420 for (const taskInfo of this.taskInfos.values()) {
421 taskInfo.done(Errors.ThreadTermination());
422 }
423 this.taskInfos.clear();
424 }
425
426 clearIdleTimeout () : void {
427 if (this.idleTimeout !== null) {
428 clearTimeout(this.idleTimeout);
429 this.idleTimeout = null;
430 }
431 }
432
433 ref () : WorkerInfo {
434 this.port.ref();
435 return this;
436 }
437
438 unref () : WorkerInfo {
439 // Note: Do not call ref()/unref() on the Worker itself since that may cause
440 // a hard crash, see https://github.com/nodejs/node/pull/33394.
441 this.port.unref();
442 return this;
443 }
444
445 _handleResponse (message : ResponseMessage) : void {
446 this.onMessage(message);
447
448 if (this.taskInfos.size === 0) {
449 // No more tasks running on this Worker means it should not keep the
450 // process running.
451 this.unref();
452 }
453 }
454
455 postTask (taskInfo : TaskInfo) {
456 assert(!this.taskInfos.has(taskInfo.taskId));
457 const message : RequestMessage = {
458 task: taskInfo.releaseTask(),
459 taskId: taskInfo.taskId,
460 filename: taskInfo.filename,
461 name: taskInfo.name
462 };
463
464 try {
465 this.port.postMessage(message, taskInfo.transferList);
466 } catch (err) {
467 // This would mostly happen if e.g. message contains unserializable data
468 // or transferList is invalid.
469 taskInfo.done(err);
470 return;
471 }
472
473 taskInfo.workerInfo = this;
474 this.taskInfos.set(taskInfo.taskId, taskInfo);
475 this.ref();
476 this.clearIdleTimeout();
477
478 // Inform the worker that there are new messages posted, and wake it up
479 // if it is waiting for one.
480 Atomics.add(this.sharedBuffer, kRequestCountField, 1);
481 Atomics.notify(this.sharedBuffer, kRequestCountField, 1);
482 }
483
484 processPendingMessages () {
485 // If we *know* that there are more messages than we have received using
486 // 'message' events yet, then try to load and handle them synchronously,
487 // without the need to wait for more expensive events on the event loop.
488 // This would usually break async tracking, but in our case, we already have
489 // the extra TaskInfo/AsyncResource layer that rectifies that situation.
490 const actualResponseCount =
491 Atomics.load(this.sharedBuffer, kResponseCountField);
492 if (actualResponseCount !== this.lastSeenResponseCount) {
493 this.lastSeenResponseCount = actualResponseCount;
494
495 let entry;
496 while ((entry = receiveMessageOnPort(this.port)) !== undefined) {
497 this._handleResponse(entry.message);
498 }
499 }
500 }
501
502 isRunningAbortableTask () : boolean {
503 // If there are abortable tasks, we are running one at most per Worker.
504 if (this.taskInfos.size !== 1) return false;
505 const [[, task]] = this.taskInfos;
506 return task.abortSignal !== null;
507 }
508
509 currentUsage () : number {
510 if (this.isRunningAbortableTask()) return Infinity;
511 return this.taskInfos.size;
512 }
513}
514
515class ThreadPool {
516 publicInterface : Piscina;
517 workers : AsynchronouslyCreatedResourcePool<WorkerInfo>;
518 options : FilledOptions;
519 taskQueue : TaskQueue;
520 skipQueue : TaskInfo[] = [];
521 completed : number = 0;
522 runTime : Histogram;
523 waitTime : Histogram;
524 start : number = performance.now();
525 inProcessPendingMessages : boolean = false;
526 startingUp : boolean = false;
527 workerFailsDuringBootstrap : boolean = false;
528
529 constructor (publicInterface : Piscina, options : Options) {
530 this.publicInterface = publicInterface;
531 this.taskQueue = options.taskQueue || new ArrayTaskQueue();
532 this.runTime = build({ lowestDiscernibleValue: 1 });
533 this.waitTime = build({ lowestDiscernibleValue: 1 });
534
535 const filename =
536 options.filename ? maybeFileURLToPath(options.filename) : null;
537 this.options = { ...kDefaultOptions, ...options, filename, maxQueue: 0 };
538 // The >= and <= could be > and < but this way we get 100 % coverage 🙃
539 if (options.maxThreads !== undefined &&
540 this.options.minThreads >= options.maxThreads) {
541 this.options.minThreads = options.maxThreads;
542 }
543 if (options.minThreads !== undefined &&
544 this.options.maxThreads <= options.minThreads) {
545 this.options.maxThreads = options.minThreads;
546 }
547 if (options.maxQueue === 'auto') {
548 this.options.maxQueue = this.options.maxThreads ** 2;
549 } else {
550 this.options.maxQueue = options.maxQueue ?? kDefaultOptions.maxQueue;
551 }
552
553 this.workers = new AsynchronouslyCreatedResourcePool<WorkerInfo>(
554 this.options.concurrentTasksPerWorker);
555 this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w));
556
557 this.startingUp = true;
558 this._ensureMinimumWorkers();
559 this.startingUp = false;
560 }
561
562 _ensureMinimumWorkers () : void {
563 while (this.workers.size < this.options.minThreads) {
564 this._addNewWorker();
565 }
566 }
567
568 _addNewWorker () : void {
569 const pool = this;
570 const worker = new Worker(resolve(__dirname, 'worker.js'), {
571 env: this.options.env,
572 argv: this.options.argv,
573 execArgv: this.options.execArgv,
574 resourceLimits: this.options.resourceLimits,
575 workerData: this.options.workerData,
576 trackUnmanagedFds: this.options.trackUnmanagedFds
577 });
578
579 const { port1, port2 } = new MessageChannel();
580 const workerInfo = new WorkerInfo(worker, port1, onMessage);
581 if (this.startingUp) {
582 // There is no point in waiting for the initial set of Workers to indicate
583 // that they are ready, we just mark them as such from the start.
584 workerInfo.markAsReady();
585 }
586
587 const message : StartupMessage = {
588 filename: this.options.filename,
589 name: this.options.name,
590 port: port2,
591 sharedBuffer: workerInfo.sharedBuffer,
592 useAtomics: this.options.useAtomics,
593 niceIncrement: this.options.niceIncrement
594 };
595 worker.postMessage(message, [port2]);
596
597 function onMessage (message : ResponseMessage) {
598 const { taskId, result } = message;
599 // In case of success: Call the callback that was passed to `runTask`,
600 // remove the `TaskInfo` associated with the Worker, which marks it as
601 // free again.
602 const taskInfo = workerInfo.taskInfos.get(taskId);
603 workerInfo.taskInfos.delete(taskId);
604
605 pool.workers.maybeAvailable(workerInfo);
606
607 /* istanbul ignore if */
608 if (taskInfo === undefined) {
609 const err = new Error(
610 `Unexpected message from Worker: ${inspect(message)}`);
611 pool.publicInterface.emit('error', err);
612 } else {
613 taskInfo.done(message.error, result);
614 }
615
616 pool._processPendingMessages();
617 }
618
619 worker.on('message', (message : ReadyMessage) => {
620 if (message.ready === true) {
621 if (workerInfo.currentUsage() === 0) {
622 workerInfo.unref();
623 }
624
625 if (!workerInfo.isReady()) {
626 workerInfo.markAsReady();
627 }
628 return;
629 }
630
631 worker.emit('error', new Error(
632 `Unexpected message on Worker: ${inspect(message)}`));
633 });
634
635 worker.on('error', (err : Error) => {
636 // Work around the bug in https://github.com/nodejs/node/pull/33394
637 worker.ref = () => {};
638
639 // In case of an uncaught exception: Call the callback that was passed to
640 // `postTask` with the error, or emit an 'error' event if there is none.
641 const taskInfos = [...workerInfo.taskInfos.values()];
642 workerInfo.taskInfos.clear();
643
644 // Remove the worker from the list and potentially start a new Worker to
645 // replace the current one.
646 this._removeWorker(workerInfo);
647
648 if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
649 this._ensureMinimumWorkers();
650 } else {
651 // Do not start new workers over and over if they already fail during
652 // bootstrap, there's no point.
653 this.workerFailsDuringBootstrap = true;
654 }
655
656 if (taskInfos.length > 0) {
657 for (const taskInfo of taskInfos) {
658 taskInfo.done(err, null);
659 }
660 } else {
661 this.publicInterface.emit('error', err);
662 }
663 });
664
665 worker.unref();
666 port1.on('close', () => {
667 // The port is only closed if the Worker stops for some reason, but we
668 // always .unref() the Worker itself. We want to receive e.g. 'error'
669 // events on it, so we ref it once we know it's going to exit anyway.
670 worker.ref();
671 });
672
673 this.workers.add(workerInfo);
674 }
675
676 _processPendingMessages () {
677 if (this.inProcessPendingMessages || !this.options.useAtomics) {
678 return;
679 }
680
681 this.inProcessPendingMessages = true;
682 try {
683 for (const workerInfo of this.workers) {
684 workerInfo.processPendingMessages();
685 }
686 } finally {
687 this.inProcessPendingMessages = false;
688 }
689 }
690
691 _removeWorker (workerInfo : WorkerInfo) : void {
692 workerInfo.destroy();
693
694 this.workers.delete(workerInfo);
695 }
696
697 _onWorkerAvailable (workerInfo : WorkerInfo) : void {
698 while ((this.taskQueue.size > 0 || this.skipQueue.length > 0) &&
699 workerInfo.currentUsage() < this.options.concurrentTasksPerWorker) {
700 // The skipQueue will have tasks that we previously shifted off
701 // the task queue but had to skip over... we have to make sure
702 // we drain that before we drain the taskQueue.
703 const taskInfo = this.skipQueue.shift() ||
704 this.taskQueue.shift() as TaskInfo;
705 // If the task has an abortSignal and the worker has any other
706 // tasks, we cannot distribute the task to it. Skip for now.
707 if (taskInfo.abortSignal && workerInfo.taskInfos.size > 0) {
708 this.skipQueue.push(taskInfo);
709 break;
710 }
711 const now = performance.now();
712 this.waitTime.recordValue(now - taskInfo.created);
713 taskInfo.started = now;
714 workerInfo.postTask(taskInfo);
715 this._maybeDrain();
716 return;
717 }
718
719 if (workerInfo.taskInfos.size === 0 &&
720 this.workers.size > this.options.minThreads) {
721 workerInfo.idleTimeout = setTimeout(() => {
722 assert.strictEqual(workerInfo.taskInfos.size, 0);
723 if (this.workers.size > this.options.minThreads) {
724 this._removeWorker(workerInfo);
725 }
726 }, this.options.idleTimeout).unref();
727 }
728 }
729
730 runTask (
731 task : any,
732 options : RunOptions) : Promise<any> {
733 let {
734 filename,
735 name
736 } = options;
737 const {
738 transferList = [],
739 signal = null
740 } = options;
741 if (filename == null) {
742 filename = this.options.filename;
743 }
744 if (name == null) {
745 name = this.options.name;
746 }
747 if (typeof filename !== 'string') {
748 return Promise.reject(Errors.FilenameNotProvided());
749 }
750 filename = maybeFileURLToPath(filename);
751
752 let resolve : (result : any) => void;
753 let reject : (err : Error) => void;
754 // eslint-disable-next-line
755 const ret = new Promise((res, rej) => { resolve = res; reject = rej; });
756 const taskInfo = new TaskInfo(
757 task,
758 transferList,
759 filename,
760 name,
761 (err : Error | null, result : any) => {
762 this.completed++;
763 if (taskInfo.started) {
764 this.runTime.recordValue(performance.now() - taskInfo.started);
765 }
766 if (err !== null) {
767 reject(err);
768 } else {
769 resolve(result);
770 }
771 },
772 signal,
773 this.publicInterface.asyncResource.asyncId());
774
775 if (signal !== null) {
776 // If the AbortSignal has an aborted property and it's truthy,
777 // reject immediately.
778 if ((signal as AbortSignalEventTarget).aborted) {
779 return Promise.reject(new AbortError());
780 }
781 taskInfo.abortListener = () => {
782 // Call reject() first to make sure we always reject with the AbortError
783 // if the task is aborted, not with an Error from the possible
784 // thread termination below.
785 reject(new AbortError());
786
787 if (taskInfo.workerInfo !== null) {
788 // Already running: We cancel the Worker this is running on.
789 this._removeWorker(taskInfo.workerInfo);
790 this._ensureMinimumWorkers();
791 } else {
792 // Not yet running: Remove it from the queue.
793 this.taskQueue.remove(taskInfo);
794 }
795 };
796 onabort(signal, taskInfo.abortListener);
797 }
798
799 // If there is a task queue, there's no point in looking for an available
800 // Worker thread. Add this task to the queue, if possible.
801 if (this.taskQueue.size > 0) {
802 const totalCapacity = this.options.maxQueue + this.pendingCapacity();
803 if (this.taskQueue.size >= totalCapacity) {
804 if (this.options.maxQueue === 0) {
805 return Promise.reject(Errors.NoTaskQueueAvailable());
806 } else {
807 return Promise.reject(Errors.TaskQueueAtLimit());
808 }
809 } else {
810 if (this.workers.size < this.options.maxThreads) {
811 this._addNewWorker();
812 }
813 this.taskQueue.push(taskInfo);
814 }
815
816 return ret;
817 }
818
819 // Look for a Worker with a minimum number of tasks it is currently running.
820 let workerInfo : WorkerInfo | null = this.workers.findAvailable();
821
822 // If we want the ability to abort this task, use only workers that have
823 // no running tasks.
824 if (workerInfo !== null && workerInfo.currentUsage() > 0 && signal) {
825 workerInfo = null;
826 }
827
828 // If no Worker was found, or that Worker was handling another task in some
829 // way, and we still have the ability to spawn new threads, do so.
830 let waitingForNewWorker = false;
831 if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
832 this.workers.size < this.options.maxThreads) {
833 this._addNewWorker();
834 waitingForNewWorker = true;
835 }
836
837 // If no Worker is found, try to put the task into the queue.
838 if (workerInfo === null) {
839 if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
840 return Promise.reject(Errors.NoTaskQueueAvailable());
841 } else {
842 this.taskQueue.push(taskInfo);
843 }
844
845 return ret;
846 }
847
848 // TODO(addaleax): Clean up the waitTime/runTime recording.
849 const now = performance.now();
850 this.waitTime.recordValue(now - taskInfo.created);
851 taskInfo.started = now;
852 workerInfo.postTask(taskInfo);
853 this._maybeDrain();
854 return ret;
855 }
856
857 pendingCapacity () : number {
858 return this.workers.pendingItems.size *
859 this.options.concurrentTasksPerWorker;
860 }
861
862 _maybeDrain () {
863 if (this.taskQueue.size === 0 && this.skipQueue.length === 0) {
864 this.publicInterface.emit('drain');
865 }
866 }
867
868 async destroy () {
869 while (this.skipQueue.length > 0) {
870 const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
871 taskInfo.done(new Error('Terminating worker thread'));
872 }
873 while (this.taskQueue.size > 0) {
874 const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
875 taskInfo.done(new Error('Terminating worker thread'));
876 }
877
878 const exitEvents : Promise<any[]>[] = [];
879 while (this.workers.size > 0) {
880 const [workerInfo] = this.workers;
881 exitEvents.push(once(workerInfo.worker, 'exit'));
882 this._removeWorker(workerInfo);
883 }
884
885 await Promise.all(exitEvents);
886 }
887}
888
889class Piscina extends EventEmitterAsyncResource {
890 #pool : ThreadPool;
891
892 constructor (options : Options = {}) {
893 super({ ...options, name: 'Piscina' });
894
895 if (typeof options.filename !== 'string' && options.filename != null) {
896 throw new TypeError('options.filename must be a string or null');
897 }
898 if (typeof options.name !== 'string' && options.name != null) {
899 throw new TypeError('options.name must be a string or null');
900 }
901 if (options.minThreads !== undefined &&
902 (typeof options.minThreads !== 'number' || options.minThreads < 0)) {
903 throw new TypeError('options.minThreads must be a non-negative integer');
904 }
905 if (options.maxThreads !== undefined &&
906 (typeof options.maxThreads !== 'number' || options.maxThreads < 1)) {
907 throw new TypeError('options.maxThreads must be a positive integer');
908 }
909 if (options.minThreads !== undefined && options.maxThreads !== undefined &&
910 options.minThreads > options.maxThreads) {
911 throw new RangeError('options.minThreads and options.maxThreads must not conflict');
912 }
913 if (options.idleTimeout !== undefined &&
914 (typeof options.idleTimeout !== 'number' || options.idleTimeout < 0)) {
915 throw new TypeError('options.idleTimeout must be a non-negative integer');
916 }
917 if (options.maxQueue !== undefined &&
918 options.maxQueue !== 'auto' &&
919 (typeof options.maxQueue !== 'number' || options.maxQueue < 0)) {
920 throw new TypeError('options.maxQueue must be a non-negative integer');
921 }
922 if (options.concurrentTasksPerWorker !== undefined &&
923 (typeof options.concurrentTasksPerWorker !== 'number' ||
924 options.concurrentTasksPerWorker < 1)) {
925 throw new TypeError(
926 'options.concurrentTasksPerWorker must be a positive integer');
927 }
928 if (options.useAtomics !== undefined &&
929 typeof options.useAtomics !== 'boolean') {
930 throw new TypeError('options.useAtomics must be a boolean value');
931 }
932 if (options.resourceLimits !== undefined &&
933 (typeof options.resourceLimits !== 'object' ||
934 options.resourceLimits === null)) {
935 throw new TypeError('options.resourceLimits must be an object');
936 }
937 if (options.taskQueue !== undefined && !isTaskQueue(options.taskQueue)) {
938 throw new TypeError('options.taskQueue must be a TaskQueue object');
939 }
940 if (options.niceIncrement !== undefined &&
941 (typeof options.niceIncrement !== 'number' || options.niceIncrement < 0)) {
942 throw new TypeError('options.niceIncrement must be a non-negative integer');
943 }
944 if (options.trackUnmanagedFds !== undefined &&
945 typeof options.trackUnmanagedFds !== 'boolean') {
946 throw new TypeError('options.trackUnmanagedFds must be a boolean value');
947 }
948
949 this.#pool = new ThreadPool(this, options);
950 }
951
952 /** @deprecated Use run(task, options) instead **/
953 runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise<any>;
954
955 /** @deprecated Use run(task, options) instead **/
956 runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
957
958 /** @deprecated Use run(task, options) instead **/
959 runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
960
961 /** @deprecated Use run(task, options) instead **/
962 runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise<any>;
963
964 /** @deprecated Use run(task, options) instead **/
965 runTask (task : any, transferList? : any, filename? : any, signal? : any) {
966 // If transferList is a string or AbortSignal, shift it.
967 if ((typeof transferList === 'object' && !Array.isArray(transferList)) ||
968 typeof transferList === 'string') {
969 signal = filename as (AbortSignalAny | undefined);
970 filename = transferList;
971 transferList = undefined;
972 }
973 // If filename is an AbortSignal, shift it.
974 if (typeof filename === 'object' && !Array.isArray(filename)) {
975 signal = filename;
976 filename = undefined;
977 }
978
979 if (transferList !== undefined && !Array.isArray(transferList)) {
980 return Promise.reject(
981 new TypeError('transferList argument must be an Array'));
982 }
983 if (filename !== undefined && typeof filename !== 'string') {
984 return Promise.reject(
985 new TypeError('filename argument must be a string'));
986 }
987 if (signal !== undefined && typeof signal !== 'object') {
988 return Promise.reject(
989 new TypeError('signal argument must be an object'));
990 }
991 return this.#pool.runTask(
992 task, {
993 transferList,
994 filename: filename || null,
995 name: 'default',
996 signal: signal || null
997 });
998 }
999
1000 run (task : any, options : RunOptions = kDefaultRunOptions) {
1001 if (options === null || typeof options !== 'object') {
1002 return Promise.reject(
1003 new TypeError('options must be an object'));
1004 }
1005 const {
1006 transferList,
1007 filename,
1008 name,
1009 signal
1010 } = options;
1011 if (transferList !== undefined && !Array.isArray(transferList)) {
1012 return Promise.reject(
1013 new TypeError('transferList argument must be an Array'));
1014 }
1015 if (filename != null && typeof filename !== 'string') {
1016 return Promise.reject(
1017 new TypeError('filename argument must be a string'));
1018 }
1019 if (name != null && typeof name !== 'string') {
1020 return Promise.reject(new TypeError('name argument must be a string'));
1021 }
1022 if (signal != null && typeof signal !== 'object') {
1023 return Promise.reject(
1024 new TypeError('signal argument must be an object'));
1025 }
1026 return this.#pool.runTask(task, { transferList, filename, name, signal });
1027 }
1028
1029 destroy () {
1030 return this.#pool.destroy();
1031 }
1032
1033 get options () : FilledOptions {
1034 return this.#pool.options;
1035 }
1036
1037 get threads () : Worker[] {
1038 const ret : Worker[] = [];
1039 for (const workerInfo of this.#pool.workers) { ret.push(workerInfo.worker); }
1040 return ret;
1041 }
1042
1043 get queueSize () : number {
1044 const pool = this.#pool;
1045 return Math.max(pool.taskQueue.size - pool.pendingCapacity(), 0);
1046 }
1047
1048 get completed () : number {
1049 return this.#pool.completed;
1050 }
1051
1052 get waitTime () : any {
1053 const result = hdrobj.histAsObj(this.#pool.waitTime);
1054 return hdrobj.addPercentiles(this.#pool.waitTime, result);
1055 }
1056
1057 get runTime () : any {
1058 const result = hdrobj.histAsObj(this.#pool.runTime);
1059 return hdrobj.addPercentiles(this.#pool.runTime, result);
1060 }
1061
1062 get utilization () : number {
1063 // The capacity is the max compute time capacity of the
1064 // pool to this point in time as determined by the length
1065 // of time the pool has been running multiplied by the
1066 // maximum number of threads.
1067 const capacity = this.duration * this.#pool.options.maxThreads;
1068 const totalMeanRuntime = this.#pool.runTime.mean *
1069 this.#pool.runTime.totalCount;
1070
1071 // We calculate the appoximate pool utilization by multiplying
1072 // the mean run time of all tasks by the number of runtime
1073 // samples taken and dividing that by the capacity. The
1074 // theory here is that capacity represents the absolute upper
1075 // limit of compute time this pool could ever attain (but
1076 // never will for a variety of reasons. Multiplying the
1077 // mean run time by the number of tasks sampled yields an
1078 // approximation of the realized compute time. The utilization
1079 // then becomes a point-in-time measure of how active the
1080 // pool is.
1081 return totalMeanRuntime / capacity;
1082 }
1083
1084 get duration () : number {
1085 return performance.now() - this.#pool.start;
1086 }
1087
1088 static get isWorkerThread () : boolean {
1089 return commonState.isWorkerThread;
1090 }
1091
1092 static get workerData () : any {
1093 return commonState.workerData;
1094 }
1095
1096 static get version () : string {
1097 return version;
1098 }
1099
1100 static get Piscina () {
1101 return Piscina;
1102 }
1103
1104 static move (val : Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort) {
1105 if (val != null && typeof val === 'object' && typeof val !== 'function') {
1106 if (!isTransferable(val)) {
1107 if ((types as any).isArrayBufferView(val)) {
1108 val = new ArrayBufferViewTransferable(val as ArrayBufferView);
1109 } else {
1110 val = new DirectlyTransferable(val);
1111 }
1112 }
1113 markMovable(val);
1114 }
1115 return val;
1116 }
1117
1118 static get transferableSymbol () { return kTransferable; }
1119
1120 static get valueSymbol () { return kValue; }
1121
1122 static get queueOptionsSymbol () { return kQueueOptions; }
1123}
1124
1125export = Piscina;
Note: See TracBrowser for help on using the repository browser.