source: trip-planner-front/node_modules/piscina/test/task-queue.ts

Last change on this file was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 7.2 KB
RevLine 
[6a3a178]1import Piscina from '..';
2import { test } from 'tap';
3import { resolve } from 'path';
4import { Task, TaskQueue } from '../dist/src/common';
5
6test('will put items into a task queue until they can run', async ({ equal }) => {
7 const pool = new Piscina({
8 filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
9 minThreads: 2,
10 maxThreads: 3
11 });
12
13 equal(pool.threads.length, 2);
14 equal(pool.queueSize, 0);
15
16 const buffers = [
17 new Int32Array(new SharedArrayBuffer(4)),
18 new Int32Array(new SharedArrayBuffer(4)),
19 new Int32Array(new SharedArrayBuffer(4)),
20 new Int32Array(new SharedArrayBuffer(4))
21 ];
22
23 const results = [];
24
25 results.push(pool.runTask(buffers[0]));
26 equal(pool.threads.length, 2);
27 equal(pool.queueSize, 0);
28
29 results.push(pool.runTask(buffers[1]));
30 equal(pool.threads.length, 2);
31 equal(pool.queueSize, 0);
32
33 results.push(pool.runTask(buffers[2]));
34 equal(pool.threads.length, 3);
35 equal(pool.queueSize, 0);
36
37 results.push(pool.runTask(buffers[3]));
38 equal(pool.threads.length, 3);
39 equal(pool.queueSize, 1);
40
41 for (const buffer of buffers) {
42 Atomics.store(buffer, 0, 1);
43 Atomics.notify(buffer, 0, 1);
44 }
45
46 await results[0];
47 equal(pool.queueSize, 0);
48
49 await Promise.all(results);
50});
51
52test('will reject items over task queue limit', async ({ equal, rejects }) => {
53 const pool = new Piscina({
54 filename: resolve(__dirname, 'fixtures/eval.ts'),
55 minThreads: 0,
56 maxThreads: 1,
57 maxQueue: 2
58 });
59
60 equal(pool.threads.length, 0);
61 equal(pool.queueSize, 0);
62
63 rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
64 equal(pool.threads.length, 1);
65 equal(pool.queueSize, 0);
66
67 rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
68 equal(pool.threads.length, 1);
69 equal(pool.queueSize, 1);
70
71 rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
72 equal(pool.threads.length, 1);
73 equal(pool.queueSize, 2);
74
75 rejects(pool.runTask('while (true) {}'), /Task queue is at limit/);
76 await pool.destroy();
77});
78
79test('will reject items when task queue is unavailable', async ({ equal, rejects }) => {
80 const pool = new Piscina({
81 filename: resolve(__dirname, 'fixtures/eval.ts'),
82 minThreads: 0,
83 maxThreads: 1,
84 maxQueue: 0
85 });
86
87 equal(pool.threads.length, 0);
88 equal(pool.queueSize, 0);
89
90 rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
91 equal(pool.threads.length, 1);
92 equal(pool.queueSize, 0);
93
94 rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
95 await pool.destroy();
96});
97
98test('will reject items when task queue is unavailable (fixed thread count)', async ({ equal, rejects }) => {
99 const pool = new Piscina({
100 filename: resolve(__dirname, 'fixtures/eval.ts'),
101 minThreads: 1,
102 maxThreads: 1,
103 maxQueue: 0
104 });
105
106 equal(pool.threads.length, 1);
107 equal(pool.queueSize, 0);
108
109 rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
110 equal(pool.threads.length, 1);
111 equal(pool.queueSize, 0);
112
113 rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
114 await pool.destroy();
115});
116
117test('tasks can share a Worker if requested (both tests blocking)', async ({ equal, rejects }) => {
118 const pool = new Piscina({
119 filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
120 minThreads: 0,
121 maxThreads: 1,
122 maxQueue: 0,
123 concurrentTasksPerWorker: 2
124 });
125
126 equal(pool.threads.length, 0);
127 equal(pool.queueSize, 0);
128
129 rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
130 equal(pool.threads.length, 1);
131 equal(pool.queueSize, 0);
132
133 rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
134 equal(pool.threads.length, 1);
135 equal(pool.queueSize, 0);
136
137 await pool.destroy();
138});
139
140test('tasks can share a Worker if requested (one test finishes)', async ({ equal, rejects }) => {
141 const pool = new Piscina({
142 filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
143 minThreads: 0,
144 maxThreads: 1,
145 maxQueue: 0,
146 concurrentTasksPerWorker: 2
147 });
148
149 const buffers = [
150 new Int32Array(new SharedArrayBuffer(4)),
151 new Int32Array(new SharedArrayBuffer(4))
152 ];
153
154 equal(pool.threads.length, 0);
155 equal(pool.queueSize, 0);
156
157 const firstTask = pool.runTask(buffers[0]);
158 equal(pool.threads.length, 1);
159 equal(pool.queueSize, 0);
160
161 rejects(pool.runTask(
162 'new Promise((resolve) => setTimeout(resolve, 1000000))',
163 resolve(__dirname, 'fixtures/eval.js')), /Terminating worker thread/);
164 equal(pool.threads.length, 1);
165 equal(pool.queueSize, 0);
166
167 Atomics.store(buffers[0], 0, 1);
168 Atomics.notify(buffers[0], 0, 1);
169
170 await firstTask;
171 equal(pool.threads.length, 1);
172 equal(pool.queueSize, 0);
173
174 await pool.destroy();
175});
176
177test('tasks can share a Worker if requested (both tests finish)', async ({ equal }) => {
178 const pool = new Piscina({
179 filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
180 minThreads: 1,
181 maxThreads: 1,
182 maxQueue: 0,
183 concurrentTasksPerWorker: 2
184 });
185
186 const buffers = [
187 new Int32Array(new SharedArrayBuffer(4)),
188 new Int32Array(new SharedArrayBuffer(4))
189 ];
190
191 equal(pool.threads.length, 1);
192 equal(pool.queueSize, 0);
193
194 const firstTask = pool.runTask(buffers[0]);
195 equal(pool.threads.length, 1);
196 equal(pool.queueSize, 0);
197
198 const secondTask = pool.runTask(buffers[1]);
199 equal(pool.threads.length, 1);
200 equal(pool.queueSize, 0);
201
202 Atomics.store(buffers[0], 0, 1);
203 Atomics.store(buffers[1], 0, 1);
204 Atomics.notify(buffers[0], 0, 1);
205 Atomics.notify(buffers[1], 0, 1);
206 Atomics.wait(buffers[0], 0, 1);
207 Atomics.wait(buffers[1], 0, 1);
208
209 await firstTask;
210 equal(buffers[0][0], -1);
211 await secondTask;
212 equal(buffers[1][0], -1);
213
214 equal(pool.threads.length, 1);
215 equal(pool.queueSize, 0);
216});
217
218test('custom task queue works', async ({ equal, ok }) => {
219 let sizeCalled : boolean = false;
220 let shiftCalled : boolean = false;
221 let pushCalled : boolean = false;
222
223 class CustomTaskPool implements TaskQueue {
224 tasks: Task[] = [];
225
226 get size () : number {
227 sizeCalled = true;
228 return this.tasks.length;
229 }
230
231 shift () : Task | null {
232 shiftCalled = true;
233 return this.tasks.length > 0 ? this.tasks.shift() as Task : null;
234 }
235
236 push (task : Task) : void {
237 pushCalled = true;
238 this.tasks.push(task);
239
240 ok(Piscina.queueOptionsSymbol in task);
241 if ((task as any).task.a === 3) {
242 equal(task[Piscina.queueOptionsSymbol], null);
243 } else {
244 equal(task[Piscina.queueOptionsSymbol].option,
245 (task as any).task.a);
246 }
247 }
248
249 remove (task : Task) : void {
250 const index = this.tasks.indexOf(task);
251 this.tasks.splice(index, 1);
252 }
253 };
254
255 const pool = new Piscina({
256 filename: resolve(__dirname, 'fixtures/eval.js'),
257 taskQueue: new CustomTaskPool(),
258 // Setting maxThreads low enough to ensure we queue
259 maxThreads: 1,
260 minThreads: 1
261 });
262
263 function makeTask (task, option) {
264 return { ...task, [Piscina.queueOptionsSymbol]: { option } };
265 }
266
267 const ret = await Promise.all([
268 pool.runTask(makeTask({ a: 1 }, 1)),
269 pool.runTask(makeTask({ a: 2 }, 2)),
270 pool.runTask({ a: 3 }) // No queueOptionsSymbol attached
271 ]);
272
273 equal(ret[0].a, 1);
274 equal(ret[1].a, 2);
275 equal(ret[2].a, 3);
276
277 ok(sizeCalled);
278 ok(pushCalled);
279 ok(shiftCalled);
280});
Note: See TracBrowser for help on using the repository browser.