source: trip-planner-front/node_modules/piscina/test/abort-task.ts@ 8d391a1

Last change on this file since 8d391a1 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 5.5 KB
Line 
1import { AbortController } from 'abort-controller';
2import { EventEmitter } from 'events';
3import Piscina from '..';
4import { test } from 'tap';
5import { resolve } from 'path';
6
7test('tasks can be aborted through AbortController while running', async ({ equal, rejects }) => {
8 const pool = new Piscina({
9 filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
10 });
11
12 const buf = new Int32Array(new SharedArrayBuffer(4));
13 const abortController = new AbortController();
14 rejects(pool.runTask(buf, abortController.signal),
15 /The task has been aborted/);
16
17 Atomics.wait(buf, 0, 0);
18 equal(Atomics.load(buf, 0), 1);
19
20 abortController.abort();
21});
22
23test('tasks can be aborted through EventEmitter while running', async ({ equal, rejects }) => {
24 const pool = new Piscina({
25 filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
26 });
27
28 const buf = new Int32Array(new SharedArrayBuffer(4));
29 const ee = new EventEmitter();
30 rejects(pool.runTask(buf, ee), /The task has been aborted/);
31 rejects(pool.run(buf, { signal: ee }), /The task has been aborted/);
32
33 Atomics.wait(buf, 0, 0);
34 equal(Atomics.load(buf, 0), 1);
35
36 ee.emit('abort');
37});
38
39test('tasks can be aborted through EventEmitter before running', async ({ equal, rejects }) => {
40 const pool = new Piscina({
41 filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
42 maxThreads: 1
43 });
44
45 const bufs = [
46 new Int32Array(new SharedArrayBuffer(4)),
47 new Int32Array(new SharedArrayBuffer(4))
48 ];
49 const task1 = pool.runTask(bufs[0]);
50 const ee = new EventEmitter();
51 rejects(pool.runTask(bufs[1], ee), /The task has been aborted/);
52 rejects(pool.run(bufs[1], { signal: ee }), /The task has been aborted/);
53 equal(pool.queueSize, 2);
54
55 ee.emit('abort');
56
57 // Wake up the thread handling the first task.
58 Atomics.store(bufs[0], 0, 1);
59 Atomics.notify(bufs[0], 0, 1);
60 await task1;
61});
62
63test('abortable tasks will not share workers (abortable posted second)', async ({ equal, rejects }) => {
64 const pool = new Piscina({
65 filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
66 maxThreads: 1,
67 concurrentTasksPerWorker: 2
68 });
69
70 const bufs = [
71 new Int32Array(new SharedArrayBuffer(4)),
72 new Int32Array(new SharedArrayBuffer(4))
73 ];
74 const task1 = pool.runTask(bufs[0]);
75 const ee = new EventEmitter();
76 rejects(pool.runTask(bufs[1], ee), /The task has been aborted/);
77 equal(pool.queueSize, 1);
78
79 ee.emit('abort');
80
81 // Wake up the thread handling the first task.
82 Atomics.store(bufs[0], 0, 1);
83 Atomics.notify(bufs[0], 0, 1);
84 await task1;
85});
86
87test('abortable tasks will not share workers (abortable posted first)', async ({ equal, rejects }) => {
88 const pool = new Piscina({
89 filename: resolve(__dirname, 'fixtures/eval.js'),
90 maxThreads: 1,
91 concurrentTasksPerWorker: 2
92 });
93
94 const ee = new EventEmitter();
95 rejects(pool.runTask('while(true);', ee), /The task has been aborted/);
96 const task2 = pool.runTask('42');
97 equal(pool.queueSize, 1);
98
99 ee.emit('abort');
100
101 // Wake up the thread handling the second task.
102 equal(await task2, 42);
103});
104
105test('abortable tasks will not share workers (on worker available)', async ({ equal }) => {
106 const pool = new Piscina({
107 filename: resolve(__dirname, 'fixtures/sleep.js'),
108 maxThreads: 1,
109 concurrentTasksPerWorker: 2
110 });
111
112 // Task 1 will sleep 100 ms then complete,
113 // Task 2 will sleep 300 ms then complete.
114 // Abortable task 3 should still be in the queue
115 // when Task 1 completes, but should not be selected
116 // until after Task 2 completes because it is abortable.
117
118 const ret = await Promise.all([
119 pool.runTask({ time: 100, a: 1 }),
120 pool.runTask({ time: 300, a: 2 }),
121 pool.runTask({ time: 100, a: 3 }, new EventEmitter())
122 ]);
123
124 equal(ret[0], 0);
125 equal(ret[1], 1);
126 equal(ret[2], 2);
127});
128
129test('abortable tasks will not share workers (destroy workers)', async ({ rejects }) => {
130 const pool = new Piscina({
131 filename: resolve(__dirname, 'fixtures/sleep.js'),
132 maxThreads: 1,
133 concurrentTasksPerWorker: 2
134 });
135
136 // Task 1 will sleep 100 ms then complete,
137 // Task 2 will sleep 300 ms then complete.
138 // Abortable task 3 should still be in the queue
139 // when Task 1 completes, but should not be selected
140 // until after Task 2 completes because it is abortable.
141
142 pool.runTask({ time: 100, a: 1 }).then(() => {
143 pool.destroy();
144 });
145
146 rejects(pool.runTask({ time: 300, a: 2 }), /Terminating worker thread/);
147 rejects(pool.runTask({ time: 100, a: 3 }, new EventEmitter()),
148 /Terminating worker thread/);
149});
150
151test('aborted AbortSignal rejects task immediately', async ({ rejects, equal }) => {
152 const pool = new Piscina({
153 filename: resolve(__dirname, 'fixtures/move.ts')
154 });
155
156 const controller = new AbortController();
157 // Abort the controller early
158 controller.abort();
159 equal(controller.signal.aborted, true);
160
161 // The data won't be moved because the task will abort immediately.
162 const data = new Uint8Array(new SharedArrayBuffer(4));
163 rejects(pool.runTask(data, [data.buffer], controller.signal),
164 /The task has been aborted/);
165
166 equal(data.length, 4);
167});
168
169test('task with AbortSignal cleans up properly', async ({ equal }) => {
170 const pool = new Piscina({
171 filename: resolve(__dirname, 'fixtures/eval.js')
172 });
173
174 const ee = new EventEmitter();
175
176 await pool.runTask('1+1', ee);
177
178 const { getEventListeners } = EventEmitter as any;
179 if (typeof getEventListeners === 'function') {
180 equal(getEventListeners(ee, 'abort').length, 0);
181 }
182
183 const controller = new AbortController();
184
185 await pool.runTask('1+1', controller.signal);
186});
Note: See TracBrowser for help on using the repository browser.