1 | import Piscina from '..';
|
---|
2 | import { test } from 'tap';
|
---|
3 | import { resolve } from 'path';
|
---|
4 | import { Task, TaskQueue } from '../dist/src/common';
|
---|
5 |
|
---|
6 | test('will put items into a task queue until they can run', async ({ equal }) => {
|
---|
7 | const pool = new Piscina({
|
---|
8 | filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
|
---|
9 | minThreads: 2,
|
---|
10 | maxThreads: 3
|
---|
11 | });
|
---|
12 |
|
---|
13 | equal(pool.threads.length, 2);
|
---|
14 | equal(pool.queueSize, 0);
|
---|
15 |
|
---|
16 | const buffers = [
|
---|
17 | new Int32Array(new SharedArrayBuffer(4)),
|
---|
18 | new Int32Array(new SharedArrayBuffer(4)),
|
---|
19 | new Int32Array(new SharedArrayBuffer(4)),
|
---|
20 | new Int32Array(new SharedArrayBuffer(4))
|
---|
21 | ];
|
---|
22 |
|
---|
23 | const results = [];
|
---|
24 |
|
---|
25 | results.push(pool.runTask(buffers[0]));
|
---|
26 | equal(pool.threads.length, 2);
|
---|
27 | equal(pool.queueSize, 0);
|
---|
28 |
|
---|
29 | results.push(pool.runTask(buffers[1]));
|
---|
30 | equal(pool.threads.length, 2);
|
---|
31 | equal(pool.queueSize, 0);
|
---|
32 |
|
---|
33 | results.push(pool.runTask(buffers[2]));
|
---|
34 | equal(pool.threads.length, 3);
|
---|
35 | equal(pool.queueSize, 0);
|
---|
36 |
|
---|
37 | results.push(pool.runTask(buffers[3]));
|
---|
38 | equal(pool.threads.length, 3);
|
---|
39 | equal(pool.queueSize, 1);
|
---|
40 |
|
---|
41 | for (const buffer of buffers) {
|
---|
42 | Atomics.store(buffer, 0, 1);
|
---|
43 | Atomics.notify(buffer, 0, 1);
|
---|
44 | }
|
---|
45 |
|
---|
46 | await results[0];
|
---|
47 | equal(pool.queueSize, 0);
|
---|
48 |
|
---|
49 | await Promise.all(results);
|
---|
50 | });
|
---|
51 |
|
---|
52 | test('will reject items over task queue limit', async ({ equal, rejects }) => {
|
---|
53 | const pool = new Piscina({
|
---|
54 | filename: resolve(__dirname, 'fixtures/eval.ts'),
|
---|
55 | minThreads: 0,
|
---|
56 | maxThreads: 1,
|
---|
57 | maxQueue: 2
|
---|
58 | });
|
---|
59 |
|
---|
60 | equal(pool.threads.length, 0);
|
---|
61 | equal(pool.queueSize, 0);
|
---|
62 |
|
---|
63 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
|
---|
64 | equal(pool.threads.length, 1);
|
---|
65 | equal(pool.queueSize, 0);
|
---|
66 |
|
---|
67 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
|
---|
68 | equal(pool.threads.length, 1);
|
---|
69 | equal(pool.queueSize, 1);
|
---|
70 |
|
---|
71 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
|
---|
72 | equal(pool.threads.length, 1);
|
---|
73 | equal(pool.queueSize, 2);
|
---|
74 |
|
---|
75 | rejects(pool.runTask('while (true) {}'), /Task queue is at limit/);
|
---|
76 | await pool.destroy();
|
---|
77 | });
|
---|
78 |
|
---|
79 | test('will reject items when task queue is unavailable', async ({ equal, rejects }) => {
|
---|
80 | const pool = new Piscina({
|
---|
81 | filename: resolve(__dirname, 'fixtures/eval.ts'),
|
---|
82 | minThreads: 0,
|
---|
83 | maxThreads: 1,
|
---|
84 | maxQueue: 0
|
---|
85 | });
|
---|
86 |
|
---|
87 | equal(pool.threads.length, 0);
|
---|
88 | equal(pool.queueSize, 0);
|
---|
89 |
|
---|
90 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
|
---|
91 | equal(pool.threads.length, 1);
|
---|
92 | equal(pool.queueSize, 0);
|
---|
93 |
|
---|
94 | rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
|
---|
95 | await pool.destroy();
|
---|
96 | });
|
---|
97 |
|
---|
98 | test('will reject items when task queue is unavailable (fixed thread count)', async ({ equal, rejects }) => {
|
---|
99 | const pool = new Piscina({
|
---|
100 | filename: resolve(__dirname, 'fixtures/eval.ts'),
|
---|
101 | minThreads: 1,
|
---|
102 | maxThreads: 1,
|
---|
103 | maxQueue: 0
|
---|
104 | });
|
---|
105 |
|
---|
106 | equal(pool.threads.length, 1);
|
---|
107 | equal(pool.queueSize, 0);
|
---|
108 |
|
---|
109 | rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
|
---|
110 | equal(pool.threads.length, 1);
|
---|
111 | equal(pool.queueSize, 0);
|
---|
112 |
|
---|
113 | rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
|
---|
114 | await pool.destroy();
|
---|
115 | });
|
---|
116 |
|
---|
117 | test('tasks can share a Worker if requested (both tests blocking)', async ({ equal, rejects }) => {
|
---|
118 | const pool = new Piscina({
|
---|
119 | filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
|
---|
120 | minThreads: 0,
|
---|
121 | maxThreads: 1,
|
---|
122 | maxQueue: 0,
|
---|
123 | concurrentTasksPerWorker: 2
|
---|
124 | });
|
---|
125 |
|
---|
126 | equal(pool.threads.length, 0);
|
---|
127 | equal(pool.queueSize, 0);
|
---|
128 |
|
---|
129 | rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
|
---|
130 | equal(pool.threads.length, 1);
|
---|
131 | equal(pool.queueSize, 0);
|
---|
132 |
|
---|
133 | rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
|
---|
134 | equal(pool.threads.length, 1);
|
---|
135 | equal(pool.queueSize, 0);
|
---|
136 |
|
---|
137 | await pool.destroy();
|
---|
138 | });
|
---|
139 |
|
---|
140 | test('tasks can share a Worker if requested (one test finishes)', async ({ equal, rejects }) => {
|
---|
141 | const pool = new Piscina({
|
---|
142 | filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
|
---|
143 | minThreads: 0,
|
---|
144 | maxThreads: 1,
|
---|
145 | maxQueue: 0,
|
---|
146 | concurrentTasksPerWorker: 2
|
---|
147 | });
|
---|
148 |
|
---|
149 | const buffers = [
|
---|
150 | new Int32Array(new SharedArrayBuffer(4)),
|
---|
151 | new Int32Array(new SharedArrayBuffer(4))
|
---|
152 | ];
|
---|
153 |
|
---|
154 | equal(pool.threads.length, 0);
|
---|
155 | equal(pool.queueSize, 0);
|
---|
156 |
|
---|
157 | const firstTask = pool.runTask(buffers[0]);
|
---|
158 | equal(pool.threads.length, 1);
|
---|
159 | equal(pool.queueSize, 0);
|
---|
160 |
|
---|
161 | rejects(pool.runTask(
|
---|
162 | 'new Promise((resolve) => setTimeout(resolve, 1000000))',
|
---|
163 | resolve(__dirname, 'fixtures/eval.js')), /Terminating worker thread/);
|
---|
164 | equal(pool.threads.length, 1);
|
---|
165 | equal(pool.queueSize, 0);
|
---|
166 |
|
---|
167 | Atomics.store(buffers[0], 0, 1);
|
---|
168 | Atomics.notify(buffers[0], 0, 1);
|
---|
169 |
|
---|
170 | await firstTask;
|
---|
171 | equal(pool.threads.length, 1);
|
---|
172 | equal(pool.queueSize, 0);
|
---|
173 |
|
---|
174 | await pool.destroy();
|
---|
175 | });
|
---|
176 |
|
---|
177 | test('tasks can share a Worker if requested (both tests finish)', async ({ equal }) => {
|
---|
178 | const pool = new Piscina({
|
---|
179 | filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
|
---|
180 | minThreads: 1,
|
---|
181 | maxThreads: 1,
|
---|
182 | maxQueue: 0,
|
---|
183 | concurrentTasksPerWorker: 2
|
---|
184 | });
|
---|
185 |
|
---|
186 | const buffers = [
|
---|
187 | new Int32Array(new SharedArrayBuffer(4)),
|
---|
188 | new Int32Array(new SharedArrayBuffer(4))
|
---|
189 | ];
|
---|
190 |
|
---|
191 | equal(pool.threads.length, 1);
|
---|
192 | equal(pool.queueSize, 0);
|
---|
193 |
|
---|
194 | const firstTask = pool.runTask(buffers[0]);
|
---|
195 | equal(pool.threads.length, 1);
|
---|
196 | equal(pool.queueSize, 0);
|
---|
197 |
|
---|
198 | const secondTask = pool.runTask(buffers[1]);
|
---|
199 | equal(pool.threads.length, 1);
|
---|
200 | equal(pool.queueSize, 0);
|
---|
201 |
|
---|
202 | Atomics.store(buffers[0], 0, 1);
|
---|
203 | Atomics.store(buffers[1], 0, 1);
|
---|
204 | Atomics.notify(buffers[0], 0, 1);
|
---|
205 | Atomics.notify(buffers[1], 0, 1);
|
---|
206 | Atomics.wait(buffers[0], 0, 1);
|
---|
207 | Atomics.wait(buffers[1], 0, 1);
|
---|
208 |
|
---|
209 | await firstTask;
|
---|
210 | equal(buffers[0][0], -1);
|
---|
211 | await secondTask;
|
---|
212 | equal(buffers[1][0], -1);
|
---|
213 |
|
---|
214 | equal(pool.threads.length, 1);
|
---|
215 | equal(pool.queueSize, 0);
|
---|
216 | });
|
---|
217 |
|
---|
218 | test('custom task queue works', async ({ equal, ok }) => {
|
---|
219 | let sizeCalled : boolean = false;
|
---|
220 | let shiftCalled : boolean = false;
|
---|
221 | let pushCalled : boolean = false;
|
---|
222 |
|
---|
223 | class CustomTaskPool implements TaskQueue {
|
---|
224 | tasks: Task[] = [];
|
---|
225 |
|
---|
226 | get size () : number {
|
---|
227 | sizeCalled = true;
|
---|
228 | return this.tasks.length;
|
---|
229 | }
|
---|
230 |
|
---|
231 | shift () : Task | null {
|
---|
232 | shiftCalled = true;
|
---|
233 | return this.tasks.length > 0 ? this.tasks.shift() as Task : null;
|
---|
234 | }
|
---|
235 |
|
---|
236 | push (task : Task) : void {
|
---|
237 | pushCalled = true;
|
---|
238 | this.tasks.push(task);
|
---|
239 |
|
---|
240 | ok(Piscina.queueOptionsSymbol in task);
|
---|
241 | if ((task as any).task.a === 3) {
|
---|
242 | equal(task[Piscina.queueOptionsSymbol], null);
|
---|
243 | } else {
|
---|
244 | equal(task[Piscina.queueOptionsSymbol].option,
|
---|
245 | (task as any).task.a);
|
---|
246 | }
|
---|
247 | }
|
---|
248 |
|
---|
249 | remove (task : Task) : void {
|
---|
250 | const index = this.tasks.indexOf(task);
|
---|
251 | this.tasks.splice(index, 1);
|
---|
252 | }
|
---|
253 | };
|
---|
254 |
|
---|
255 | const pool = new Piscina({
|
---|
256 | filename: resolve(__dirname, 'fixtures/eval.js'),
|
---|
257 | taskQueue: new CustomTaskPool(),
|
---|
258 | // Setting maxThreads low enough to ensure we queue
|
---|
259 | maxThreads: 1,
|
---|
260 | minThreads: 1
|
---|
261 | });
|
---|
262 |
|
---|
263 | function makeTask (task, option) {
|
---|
264 | return { ...task, [Piscina.queueOptionsSymbol]: { option } };
|
---|
265 | }
|
---|
266 |
|
---|
267 | const ret = await Promise.all([
|
---|
268 | pool.runTask(makeTask({ a: 1 }, 1)),
|
---|
269 | pool.runTask(makeTask({ a: 2 }, 2)),
|
---|
270 | pool.runTask({ a: 3 }) // No queueOptionsSymbol attached
|
---|
271 | ]);
|
---|
272 |
|
---|
273 | equal(ret[0].a, 1);
|
---|
274 | equal(ret[1].a, 2);
|
---|
275 | equal(ret[2].a, 3);
|
---|
276 |
|
---|
277 | ok(sizeCalled);
|
---|
278 | ok(pushCalled);
|
---|
279 | ok(shiftCalled);
|
---|
280 | });
|
---|