source: trip-planner-front/node_modules/piscina/dist/src/worker.js@ 8d391a1

Last change on this file since 8d391a1 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 7.8 KB
Line 
1"use strict";
2var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
3 if (k2 === undefined) k2 = k;
4 Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } });
5}) : (function(o, m, k, k2) {
6 if (k2 === undefined) k2 = k;
7 o[k2] = m[k];
8}));
9var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
10 Object.defineProperty(o, "default", { enumerable: true, value: v });
11}) : function(o, v) {
12 o["default"] = v;
13});
14var __importStar = (this && this.__importStar) || function (mod) {
15 if (mod && mod.__esModule) return mod;
16 var result = {};
17 if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
18 __setModuleDefault(result, mod);
19 return result;
20};
21Object.defineProperty(exports, "__esModule", { value: true });
22const worker_threads_1 = require("worker_threads");
23const url_1 = require("url");
24const common_1 = require("./common");
25common_1.commonState.isWorkerThread = true;
26common_1.commonState.workerData = worker_threads_1.workerData;
27const handlerCache = new Map();
28let useAtomics = true;
29// Get `import(x)` as a function that isn't transpiled to `require(x)` by
30// TypeScript for dual ESM/CJS support.
31// Load this lazily, so that there is no warning about the ESM loader being
32// experimental (on Node v12.x) until we actually try to use it.
33let importESMCached;
34function getImportESM() {
35 if (importESMCached === undefined) {
36 // eslint-disable-next-line no-eval
37 importESMCached = eval('(specifier) => import(specifier)');
38 }
39 return importESMCached;
40}
41// Look up the handler function that we call when a task is posted.
42// This is either going to be "the" export from a file, or the default export.
43async function getHandler(filename, name) {
44 let handler = handlerCache.get(`${filename}/${name}`);
45 if (handler !== undefined) {
46 return handler;
47 }
48 try {
49 // With our current set of TypeScript options, this is transpiled to
50 // `require(filename)`.
51 handler = await Promise.resolve().then(() => __importStar(require(filename)));
52 if (typeof handler !== 'function') {
53 handler = await (handler[name]);
54 }
55 }
56 catch { }
57 if (typeof handler !== 'function') {
58 handler = await getImportESM()(url_1.pathToFileURL(filename).href);
59 if (typeof handler !== 'function') {
60 handler = await (handler[name]);
61 }
62 }
63 if (typeof handler !== 'function') {
64 return null;
65 }
66 // Limit the handler cache size. This should not usually be an issue and is
67 // only provided for pathological cases.
68 if (handlerCache.size > 1000) {
69 const [[key]] = handlerCache;
70 handlerCache.delete(key);
71 }
72 handlerCache.set(`${filename}/${name}`, handler);
73 return handler;
74}
75// We should only receive this message once, when the Worker starts. It gives
76// us the MessagePort used for receiving tasks, a SharedArrayBuffer for fast
77// communication using Atomics, and the name of the default filename for tasks
78// (so we can pre-load and cache the handler).
79worker_threads_1.parentPort.on('message', (message) => {
80 useAtomics = message.useAtomics;
81 const { port, sharedBuffer, filename, name, niceIncrement } = message;
82 (async function () {
83 try {
84 if (niceIncrement !== 0 && process.platform === 'linux') {
85 // ts-ignore because the dependency is not installed on Windows.
86 // @ts-ignore
87 (await Promise.resolve().then(() => __importStar(require('nice-napi')))).default(niceIncrement);
88 }
89 }
90 catch { }
91 if (filename !== null) {
92 await getHandler(filename, name);
93 }
94 const readyMessage = { ready: true };
95 worker_threads_1.parentPort.postMessage(readyMessage);
96 port.on('message', onMessage.bind(null, port, sharedBuffer));
97 atomicsWaitLoop(port, sharedBuffer);
98 })().catch(throwInNextTick);
99});
100let currentTasks = 0;
101let lastSeenRequestCount = 0;
102function atomicsWaitLoop(port, sharedBuffer) {
103 if (!useAtomics)
104 return;
105 // This function is entered either after receiving the startup message, or
106 // when we are done with a task. In those situations, the *only* thing we
107 // expect to happen next is a 'message' on `port`.
108 // That call would come with the overhead of a C++ → JS boundary crossing,
109 // including async tracking. So, instead, if there is no task currently
110 // running, we wait for a signal from the parent thread using Atomics.wait(),
111 // and read the message from the port instead of generating an event,
112 // in order to avoid that overhead.
113 // The one catch is that this stops asynchronous operations that are still
114 // running from proceeding. Generally, tasks should not spawn asynchronous
115 // operations without waiting for them to finish, though.
116 while (currentTasks === 0) {
117 // Check whether there are new messages by testing whether the current
118 // number of requests posted by the parent thread matches the number of
119 // requests received.
120 Atomics.wait(sharedBuffer, common_1.kRequestCountField, lastSeenRequestCount);
121 lastSeenRequestCount = Atomics.load(sharedBuffer, common_1.kRequestCountField);
122 // We have to read messages *after* updating lastSeenRequestCount in order
123 // to avoid race conditions.
124 let entry;
125 while ((entry = worker_threads_1.receiveMessageOnPort(port)) !== undefined) {
126 onMessage(port, sharedBuffer, entry.message);
127 }
128 }
129}
130function onMessage(port, sharedBuffer, message) {
131 currentTasks++;
132 const { taskId, task, filename, name } = message;
133 (async function () {
134 let response;
135 const transferList = [];
136 try {
137 const handler = await getHandler(filename, name);
138 if (handler === null) {
139 throw new Error(`No handler function exported from ${filename}`);
140 }
141 let result = await handler(task);
142 if (common_1.isMovable(result)) {
143 transferList.concat(result[common_1.kTransferable]);
144 result = result[common_1.kValue];
145 }
146 response = {
147 taskId,
148 result: result,
149 error: null
150 };
151 // If the task used e.g. console.log(), wait for the stream to drain
152 // before potentially entering the `Atomics.wait()` loop, and before
153 // returning the result so that messages will always be printed even
154 // if the process would otherwise be ready to exit.
155 if (process.stdout.writableLength > 0) {
156 await new Promise((resolve) => process.stdout.write('', resolve));
157 }
158 if (process.stderr.writableLength > 0) {
159 await new Promise((resolve) => process.stderr.write('', resolve));
160 }
161 }
162 catch (error) {
163 response = {
164 taskId,
165 result: null,
166 // It may be worth taking a look at the error cloning algorithm we
167 // use in Node.js core here, it's quite a bit more flexible
168 error
169 };
170 }
171 currentTasks--;
172 // Post the response to the parent thread, and let it know that we have
173 // an additional message available. If possible, use Atomics.wait()
174 // to wait for the next message.
175 port.postMessage(response, transferList);
176 Atomics.add(sharedBuffer, common_1.kResponseCountField, 1);
177 atomicsWaitLoop(port, sharedBuffer);
178 })().catch(throwInNextTick);
179}
180function throwInNextTick(error) {
181 process.nextTick(() => { throw error; });
182}
183//# sourceMappingURL=worker.js.map
Note: See TracBrowser for help on using the repository browser.