1 | import Piscina from '..';
2 | import { test } from 'tap';
3 | import { resolve } from 'path';
4 | import { Task, TaskQueue } from '../dist/src/common';
5 |
6 | test('will put items into a task queue until they can run', async ({ equal }) => {
7 | const pool = new Piscina({
8 | filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
9 | minThreads: 2,
10 | maxThreads: 3
11 | });
12 |
13 | equal(pool.threads.length, 2);
14 | equal(pool.queueSize, 0);
15 |
16 | const buffers = [
17 | new Int32Array(new SharedArrayBuffer(4)),
18 | new Int32Array(new SharedArrayBuffer(4)),
19 | new Int32Array(new SharedArrayBuffer(4)),
20 | new Int32Array(new SharedArrayBuffer(4))
21 | ];
22 |
23 | const results = [];
24 |
25 | results.push(pool.runTask(buffers[0]));
26 | equal(pool.threads.length, 2);
27 | equal(pool.queueSize, 0);
28 |
29 | results.push(pool.runTask(buffers[1]));
30 | equal(pool.threads.length, 2);
31 | equal(pool.queueSize, 0);
32 |
33 | results.push(pool.runTask(buffers[2]));
34 | equal(pool.threads.length, 3);
35 | equal(pool.queueSize, 0);
36 |
37 | results.push(pool.runTask(buffers[3]));
38 | equal(pool.threads.length, 3);
39 | equal(pool.queueSize, 1);
40 |
41 | for (const buffer of buffers) {
42 | Atomics.store(buffer, 0, 1);
43 | Atomics.notify(buffer, 0, 1);
44 | }
45 |
46 | await results[0];
47 | equal(pool.queueSize, 0);
48 |
49 | await Promise.all(results);
50 | });
51 |
52 | test('will reject items over task queue limit', async ({ equal, rejects }) => {
53 | const pool = new Piscina({
54 | filename: resolve(__dirname, 'fixtures/eval.ts'),
55 | minThreads: 0,
56 | maxThreads: 1,
57 | maxQueue: 2
58 | });
59 |
60 | equal(pool.threads.length, 0);
61 | equal(pool.queueSize, 0);
62 |
63 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
64 | equal(pool.threads.length, 1);
65 | equal(pool.queueSize, 0);
66 |
67 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
68 | equal(pool.threads.length, 1);
69 | equal(pool.queueSize, 1);
70 |
71 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
72 | equal(pool.threads.length, 1);
73 | equal(pool.queueSize, 2);
74 |
75 | rejects(pool.runTask('while (true) {}'), /Task queue is at limit/);
76 | await pool.destroy();
77 | });
78 |
79 | test('will reject items when task queue is unavailable', async ({ equal, rejects }) => {
80 | const pool = new Piscina({
81 | filename: resolve(__dirname, 'fixtures/eval.ts'),
82 | minThreads: 0,
83 | maxThreads: 1,
84 | maxQueue: 0
85 | });
86 |
87 | equal(pool.threads.length, 0);
88 | equal(pool.queueSize, 0);
89 |
90 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
91 | equal(pool.threads.length, 1);
92 | equal(pool.queueSize, 0);
93 |
94 | rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
95 | await pool.destroy();
96 | });
97 |
98 | test('will reject items when task queue is unavailable (fixed thread count)', async ({ equal, rejects }) => {
99 | const pool = new Piscina({
100 | filename: resolve(__dirname, 'fixtures/eval.ts'),
101 | minThreads: 1,
102 | maxThreads: 1,
103 | maxQueue: 0
104 | });
105 |
106 | equal(pool.threads.length, 1);
107 | equal(pool.queueSize, 0);
108 |
109 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
110 | equal(pool.threads.length, 1);
111 | equal(pool.queueSize, 0);
112 |
113 | rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
114 | await pool.destroy();
115 | });
116 |
117 | test('tasks can share a Worker if requested (both tests blocking)', async ({ equal, rejects }) => {
118 | const pool = new Piscina({
119 | filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
120 | minThreads: 0,
121 | maxThreads: 1,
122 | maxQueue: 0,
123 | concurrentTasksPerWorker: 2
124 | });
125 |
126 | equal(pool.threads.length, 0);
127 | equal(pool.queueSize, 0);
128 |
129 | rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
130 | equal(pool.threads.length, 1);
131 | equal(pool.queueSize, 0);
132 |
133 | rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
134 | equal(pool.threads.length, 1);
135 | equal(pool.queueSize, 0);
136 |
137 | await pool.destroy();
138 | });
139 |
140 | test('tasks can share a Worker if requested (one test finishes)', async ({ equal, rejects }) => {
141 | const pool = new Piscina({
142 | filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
143 | minThreads: 0,
144 | maxThreads: 1,
145 | maxQueue: 0,
146 | concurrentTasksPerWorker: 2
147 | });
148 |
149 | const buffers = [
150 | new Int32Array(new SharedArrayBuffer(4)),
151 | new Int32Array(new SharedArrayBuffer(4))
152 | ];
153 |
154 | equal(pool.threads.length, 0);
155 | equal(pool.queueSize, 0);
156 |
157 | const firstTask = pool.runTask(buffers[0]);
158 | equal(pool.threads.length, 1);
159 | equal(pool.queueSize, 0);
160 |
161 | rejects(pool.runTask(
162 | 'new Promise((resolve) => setTimeout(resolve, 1000000))',
163 | resolve(__dirname, 'fixtures/eval.js')), /Terminating worker thread/);
164 | equal(pool.threads.length, 1);
165 | equal(pool.queueSize, 0);
166 |
167 | Atomics.store(buffers[0], 0, 1);
168 | Atomics.notify(buffers[0], 0, 1);
169 |
170 | await firstTask;
171 | equal(pool.threads.length, 1);
172 | equal(pool.queueSize, 0);
173 |
174 | await pool.destroy();
175 | });
176 |
177 | test('tasks can share a Worker if requested (both tests finish)', async ({ equal }) => {
178 | const pool = new Piscina({
179 | filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
180 | minThreads: 1,
181 | maxThreads: 1,
182 | maxQueue: 0,
183 | concurrentTasksPerWorker: 2
184 | });
185 |
186 | const buffers = [
187 | new Int32Array(new SharedArrayBuffer(4)),
188 | new Int32Array(new SharedArrayBuffer(4))
189 | ];
190 |
191 | equal(pool.threads.length, 1);
192 | equal(pool.queueSize, 0);
193 |
194 | const firstTask = pool.runTask(buffers[0]);
195 | equal(pool.threads.length, 1);
196 | equal(pool.queueSize, 0);
197 |
198 | const secondTask = pool.runTask(buffers[1]);
199 | equal(pool.threads.length, 1);
200 | equal(pool.queueSize, 0);
201 |
202 | Atomics.store(buffers[0], 0, 1);
203 | Atomics.store(buffers[1], 0, 1);
204 | Atomics.notify(buffers[0], 0, 1);
205 | Atomics.notify(buffers[1], 0, 1);
206 | Atomics.wait(buffers[0], 0, 1);
207 | Atomics.wait(buffers[1], 0, 1);
208 |
209 | await firstTask;
210 | equal(buffers[0][0], -1);
211 | await secondTask;
212 | equal(buffers[1][0], -1);
213 |
214 | equal(pool.threads.length, 1);
215 | equal(pool.queueSize, 0);
216 | });
217 |
218 | test('custom task queue works', async ({ equal, ok }) => {
219 | let sizeCalled : boolean = false;
220 | let shiftCalled : boolean = false;
221 | let pushCalled : boolean = false;
222 |
223 | class CustomTaskPool implements TaskQueue {
224 | tasks: Task[] = [];
225 |
226 | get size () : number {
227 | sizeCalled = true;
228 | return this.tasks.length;
229 | }
230 |
231 | shift () : Task | null {
232 | shiftCalled = true;
233 | return this.tasks.length > 0 ? this.tasks.shift() as Task : null;
234 | }
235 |
236 | push (task : Task) : void {
237 | pushCalled = true;
238 | this.tasks.push(task);
239 |
240 | ok(Piscina.queueOptionsSymbol in task);
241 | if ((task as any).task.a === 3) {
242 | equal(task[Piscina.queueOptionsSymbol], null);
243 | } else {
244 | equal(task[Piscina.queueOptionsSymbol].option,
245 | (task as any).task.a);
246 | }
247 | }
248 |
249 | remove (task : Task) : void {
250 | const index = this.tasks.indexOf(task);
251 | this.tasks.splice(index, 1);
252 | }
253 | };
254 |
255 | const pool = new Piscina({
256 | filename: resolve(__dirname, 'fixtures/eval.js'),
257 | taskQueue: new CustomTaskPool(),
258 | // Setting maxThreads low enough to ensure we queue
259 | maxThreads: 1,
260 | minThreads: 1
261 | });
262 |
263 | function makeTask (task, option) {
264 | return { ...task, [Piscina.queueOptionsSymbol]: { option } };
265 | }
266 |
267 | const ret = await Promise.all([
268 | pool.runTask(makeTask({ a: 1 }, 1)),
269 | pool.runTask(makeTask({ a: 2 }, 2)),
270 | pool.runTask({ a: 3 }) // No queueOptionsSymbol attached
271 | ]);
272 |
273 | equal(ret[0].a, 1);
274 | equal(ret[1].a, 2);
275 | equal(ret[2].a, 3);
276 |
277 | ok(sizeCalled);
278 | ok(pushCalled);
279 | ok(shiftCalled);
280 | });