source: trip-planner-front/node_modules/piscina/src/worker.ts@ 76712b2

Last change on this file since 76712b2 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 6.6 KB
Line 
1import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'worker_threads';
2import { pathToFileURL } from 'url';
3import {
4 commonState,
5 ReadyMessage,
6 RequestMessage,
7 ResponseMessage,
8 StartupMessage,
9 kResponseCountField,
10 kRequestCountField,
11 isMovable,
12 kTransferable,
13 kValue
14} from './common';
15
16commonState.isWorkerThread = true;
17commonState.workerData = workerData;
18
19const handlerCache : Map<string, Function> = new Map();
20let useAtomics : boolean = true;
21
22// Get `import(x)` as a function that isn't transpiled to `require(x)` by
23// TypeScript for dual ESM/CJS support.
24// Load this lazily, so that there is no warning about the ESM loader being
25// experimental (on Node v12.x) until we actually try to use it.
26let importESMCached : (specifier : string) => Promise<any> | undefined;
27function getImportESM () {
28 if (importESMCached === undefined) {
29 // eslint-disable-next-line no-eval
30 importESMCached = eval('(specifier) => import(specifier)');
31 }
32 return importESMCached;
33}
34
35// Look up the handler function that we call when a task is posted.
36// This is either going to be "the" export from a file, or the default export.
37async function getHandler (filename : string, name : string) : Promise<Function | null> {
38 let handler = handlerCache.get(`${filename}/${name}`);
39 if (handler !== undefined) {
40 return handler;
41 }
42
43 try {
44 // With our current set of TypeScript options, this is transpiled to
45 // `require(filename)`.
46 handler = await import(filename);
47 if (typeof handler !== 'function') {
48 handler = await ((handler as any)[name]);
49 }
50 } catch {}
51 if (typeof handler !== 'function') {
52 handler = await getImportESM()(pathToFileURL(filename).href);
53 if (typeof handler !== 'function') {
54 handler = await ((handler as any)[name]);
55 }
56 }
57 if (typeof handler !== 'function') {
58 return null;
59 }
60
61 // Limit the handler cache size. This should not usually be an issue and is
62 // only provided for pathological cases.
63 if (handlerCache.size > 1000) {
64 const [[key]] = handlerCache;
65 handlerCache.delete(key);
66 }
67
68 handlerCache.set(`${filename}/${name}`, handler);
69 return handler;
70}
71
72// We should only receive this message once, when the Worker starts. It gives
73// us the MessagePort used for receiving tasks, a SharedArrayBuffer for fast
74// communication using Atomics, and the name of the default filename for tasks
75// (so we can pre-load and cache the handler).
76parentPort!.on('message', (message : StartupMessage) => {
77 useAtomics = message.useAtomics;
78 const { port, sharedBuffer, filename, name, niceIncrement } = message;
79 (async function () {
80 try {
81 if (niceIncrement !== 0 && process.platform === 'linux') {
82 // ts-ignore because the dependency is not installed on Windows.
83 // @ts-ignore
84 (await import('nice-napi')).default(niceIncrement);
85 }
86 } catch {}
87
88 if (filename !== null) {
89 await getHandler(filename, name);
90 }
91
92 const readyMessage : ReadyMessage = { ready: true };
93 parentPort!.postMessage(readyMessage);
94
95 port.on('message', onMessage.bind(null, port, sharedBuffer));
96 atomicsWaitLoop(port, sharedBuffer);
97 })().catch(throwInNextTick);
98});
99
100let currentTasks : number = 0;
101let lastSeenRequestCount : number = 0;
102function atomicsWaitLoop (port : MessagePort, sharedBuffer : Int32Array) {
103 if (!useAtomics) return;
104
105 // This function is entered either after receiving the startup message, or
106 // when we are done with a task. In those situations, the *only* thing we
107 // expect to happen next is a 'message' on `port`.
108 // That call would come with the overhead of a C++ → JS boundary crossing,
109 // including async tracking. So, instead, if there is no task currently
110 // running, we wait for a signal from the parent thread using Atomics.wait(),
111 // and read the message from the port instead of generating an event,
112 // in order to avoid that overhead.
113 // The one catch is that this stops asynchronous operations that are still
114 // running from proceeding. Generally, tasks should not spawn asynchronous
115 // operations without waiting for them to finish, though.
116 while (currentTasks === 0) {
117 // Check whether there are new messages by testing whether the current
118 // number of requests posted by the parent thread matches the number of
119 // requests received.
120 Atomics.wait(sharedBuffer, kRequestCountField, lastSeenRequestCount);
121 lastSeenRequestCount = Atomics.load(sharedBuffer, kRequestCountField);
122
123 // We have to read messages *after* updating lastSeenRequestCount in order
124 // to avoid race conditions.
125 let entry;
126 while ((entry = receiveMessageOnPort(port)) !== undefined) {
127 onMessage(port, sharedBuffer, entry.message);
128 }
129 }
130}
131
132function onMessage (
133 port : MessagePort,
134 sharedBuffer : Int32Array,
135 message : RequestMessage) {
136 currentTasks++;
137 const { taskId, task, filename, name } = message;
138
139 (async function () {
140 let response : ResponseMessage;
141 const transferList : any[] = [];
142 try {
143 const handler = await getHandler(filename, name);
144 if (handler === null) {
145 throw new Error(`No handler function exported from ${filename}`);
146 }
147 let result = await handler(task);
148 if (isMovable(result)) {
149 transferList.concat(result[kTransferable]);
150 result = result[kValue];
151 }
152 response = {
153 taskId,
154 result: result,
155 error: null
156 };
157
158 // If the task used e.g. console.log(), wait for the stream to drain
159 // before potentially entering the `Atomics.wait()` loop, and before
160 // returning the result so that messages will always be printed even
161 // if the process would otherwise be ready to exit.
162 if (process.stdout.writableLength > 0) {
163 await new Promise((resolve) => process.stdout.write('', resolve));
164 }
165 if (process.stderr.writableLength > 0) {
166 await new Promise((resolve) => process.stderr.write('', resolve));
167 }
168 } catch (error) {
169 response = {
170 taskId,
171 result: null,
172 // It may be worth taking a look at the error cloning algorithm we
173 // use in Node.js core here, it's quite a bit more flexible
174 error
175 };
176 }
177 currentTasks--;
178
179 // Post the response to the parent thread, and let it know that we have
180 // an additional message available. If possible, use Atomics.wait()
181 // to wait for the next message.
182 port.postMessage(response, transferList);
183 Atomics.add(sharedBuffer, kResponseCountField, 1);
184 atomicsWaitLoop(port, sharedBuffer);
185 })().catch(throwInNextTick);
186}
187
188function throwInNextTick (error : Error) {
189 process.nextTick(() => { throw error; });
190}
Note: See TracBrowser for help on using the repository browser.