source: trip-planner-front/node_modules/webpack/lib/util/AsyncQueue.js@ 8d391a1

Last change on this file since 8d391a1 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 9.4 KB
Line 
1/*
2 MIT License http://www.opensource.org/licenses/mit-license.php
3 Author Tobias Koppers @sokra
4*/
5
6"use strict";
7
8const { SyncHook, AsyncSeriesHook } = require("tapable");
9const { makeWebpackError } = require("../HookWebpackError");
10const WebpackError = require("../WebpackError");
11const ArrayQueue = require("./ArrayQueue");
12
13const QUEUED_STATE = 0;
14const PROCESSING_STATE = 1;
15const DONE_STATE = 2;
16
17let inHandleResult = 0;
18
19/**
20 * @template T
21 * @callback Callback
22 * @param {WebpackError=} err
23 * @param {T=} result
24 */
25
26/**
27 * @template T
28 * @template K
29 * @template R
30 */
31class AsyncQueueEntry {
32 /**
33 * @param {T} item the item
34 * @param {Callback<R>} callback the callback
35 */
36 constructor(item, callback) {
37 this.item = item;
38 /** @type {typeof QUEUED_STATE | typeof PROCESSING_STATE | typeof DONE_STATE} */
39 this.state = QUEUED_STATE;
40 this.callback = callback;
41 /** @type {Callback<R>[] | undefined} */
42 this.callbacks = undefined;
43 this.result = undefined;
44 /** @type {WebpackError | undefined} */
45 this.error = undefined;
46 }
47}
48
49/**
50 * @template T
51 * @template K
52 * @template R
53 */
54class AsyncQueue {
55 /**
56 * @param {Object} options options object
57 * @param {string=} options.name name of the queue
58 * @param {number=} options.parallelism how many items should be processed at once
59 * @param {AsyncQueue<any, any, any>=} options.parent parent queue, which will have priority over this queue and with shared parallelism
60 * @param {function(T): K=} options.getKey extract key from item
61 * @param {function(T, Callback<R>): void} options.processor async function to process items
62 */
63 constructor({ name, parallelism, parent, processor, getKey }) {
64 this._name = name;
65 this._parallelism = parallelism || 1;
66 this._processor = processor;
67 this._getKey =
68 getKey || /** @type {(T) => K} */ (item => /** @type {any} */ (item));
69 /** @type {Map<K, AsyncQueueEntry<T, K, R>>} */
70 this._entries = new Map();
71 /** @type {ArrayQueue<AsyncQueueEntry<T, K, R>>} */
72 this._queued = new ArrayQueue();
73 /** @type {AsyncQueue<any, any, any>[]} */
74 this._children = undefined;
75 this._activeTasks = 0;
76 this._willEnsureProcessing = false;
77 this._needProcessing = false;
78 this._stopped = false;
79 this._root = parent ? parent._root : this;
80 if (parent) {
81 if (this._root._children === undefined) {
82 this._root._children = [this];
83 } else {
84 this._root._children.push(this);
85 }
86 }
87
88 this.hooks = {
89 /** @type {AsyncSeriesHook<[T]>} */
90 beforeAdd: new AsyncSeriesHook(["item"]),
91 /** @type {SyncHook<[T]>} */
92 added: new SyncHook(["item"]),
93 /** @type {AsyncSeriesHook<[T]>} */
94 beforeStart: new AsyncSeriesHook(["item"]),
95 /** @type {SyncHook<[T]>} */
96 started: new SyncHook(["item"]),
97 /** @type {SyncHook<[T, Error, R]>} */
98 result: new SyncHook(["item", "error", "result"])
99 };
100
101 this._ensureProcessing = this._ensureProcessing.bind(this);
102 }
103
104 /**
105 * @param {T} item an item
106 * @param {Callback<R>} callback callback function
107 * @returns {void}
108 */
109 add(item, callback) {
110 if (this._stopped) return callback(new WebpackError("Queue was stopped"));
111 this.hooks.beforeAdd.callAsync(item, err => {
112 if (err) {
113 callback(
114 makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeAdd`)
115 );
116 return;
117 }
118 const key = this._getKey(item);
119 const entry = this._entries.get(key);
120 if (entry !== undefined) {
121 if (entry.state === DONE_STATE) {
122 if (inHandleResult++ > 3) {
123 process.nextTick(() => callback(entry.error, entry.result));
124 } else {
125 callback(entry.error, entry.result);
126 }
127 inHandleResult--;
128 } else if (entry.callbacks === undefined) {
129 entry.callbacks = [callback];
130 } else {
131 entry.callbacks.push(callback);
132 }
133 return;
134 }
135 const newEntry = new AsyncQueueEntry(item, callback);
136 if (this._stopped) {
137 this.hooks.added.call(item);
138 this._root._activeTasks++;
139 process.nextTick(() =>
140 this._handleResult(newEntry, new WebpackError("Queue was stopped"))
141 );
142 } else {
143 this._entries.set(key, newEntry);
144 this._queued.enqueue(newEntry);
145 const root = this._root;
146 root._needProcessing = true;
147 if (root._willEnsureProcessing === false) {
148 root._willEnsureProcessing = true;
149 setImmediate(root._ensureProcessing);
150 }
151 this.hooks.added.call(item);
152 }
153 });
154 }
155
156 /**
157 * @param {T} item an item
158 * @returns {void}
159 */
160 invalidate(item) {
161 const key = this._getKey(item);
162 const entry = this._entries.get(key);
163 this._entries.delete(key);
164 if (entry.state === QUEUED_STATE) {
165 this._queued.delete(entry);
166 }
167 }
168
169 /**
170 * Waits for an already started item
171 * @param {T} item an item
172 * @param {Callback<R>} callback callback function
173 * @returns {void}
174 */
175 waitFor(item, callback) {
176 const key = this._getKey(item);
177 const entry = this._entries.get(key);
178 if (entry === undefined) {
179 return callback(
180 new WebpackError(
181 "waitFor can only be called for an already started item"
182 )
183 );
184 }
185 if (entry.state === DONE_STATE) {
186 process.nextTick(() => callback(entry.error, entry.result));
187 } else if (entry.callbacks === undefined) {
188 entry.callbacks = [callback];
189 } else {
190 entry.callbacks.push(callback);
191 }
192 }
193
194 /**
195 * @returns {void}
196 */
197 stop() {
198 this._stopped = true;
199 const queue = this._queued;
200 this._queued = new ArrayQueue();
201 const root = this._root;
202 for (const entry of queue) {
203 this._entries.delete(this._getKey(entry.item));
204 root._activeTasks++;
205 this._handleResult(entry, new WebpackError("Queue was stopped"));
206 }
207 }
208
209 /**
210 * @returns {void}
211 */
212 increaseParallelism() {
213 const root = this._root;
214 root._parallelism++;
215 /* istanbul ignore next */
216 if (root._willEnsureProcessing === false && root._needProcessing) {
217 root._willEnsureProcessing = true;
218 setImmediate(root._ensureProcessing);
219 }
220 }
221
222 /**
223 * @returns {void}
224 */
225 decreaseParallelism() {
226 const root = this._root;
227 root._parallelism--;
228 }
229
230 /**
231 * @param {T} item an item
232 * @returns {boolean} true, if the item is currently being processed
233 */
234 isProcessing(item) {
235 const key = this._getKey(item);
236 const entry = this._entries.get(key);
237 return entry !== undefined && entry.state === PROCESSING_STATE;
238 }
239
240 /**
241 * @param {T} item an item
242 * @returns {boolean} true, if the item is currently queued
243 */
244 isQueued(item) {
245 const key = this._getKey(item);
246 const entry = this._entries.get(key);
247 return entry !== undefined && entry.state === QUEUED_STATE;
248 }
249
250 /**
251 * @param {T} item an item
252 * @returns {boolean} true, if the item is currently queued
253 */
254 isDone(item) {
255 const key = this._getKey(item);
256 const entry = this._entries.get(key);
257 return entry !== undefined && entry.state === DONE_STATE;
258 }
259
260 /**
261 * @returns {void}
262 */
263 _ensureProcessing() {
264 while (this._activeTasks < this._parallelism) {
265 const entry = this._queued.dequeue();
266 if (entry === undefined) break;
267 this._activeTasks++;
268 entry.state = PROCESSING_STATE;
269 this._startProcessing(entry);
270 }
271 this._willEnsureProcessing = false;
272 if (this._queued.length > 0) return;
273 if (this._children !== undefined) {
274 for (const child of this._children) {
275 while (this._activeTasks < this._parallelism) {
276 const entry = child._queued.dequeue();
277 if (entry === undefined) break;
278 this._activeTasks++;
279 entry.state = PROCESSING_STATE;
280 child._startProcessing(entry);
281 }
282 if (child._queued.length > 0) return;
283 }
284 }
285 if (!this._willEnsureProcessing) this._needProcessing = false;
286 }
287
288 /**
289 * @param {AsyncQueueEntry<T, K, R>} entry the entry
290 * @returns {void}
291 */
292 _startProcessing(entry) {
293 this.hooks.beforeStart.callAsync(entry.item, err => {
294 if (err) {
295 this._handleResult(
296 entry,
297 makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeStart`)
298 );
299 return;
300 }
301 let inCallback = false;
302 try {
303 this._processor(entry.item, (e, r) => {
304 inCallback = true;
305 this._handleResult(entry, e, r);
306 });
307 } catch (err) {
308 if (inCallback) throw err;
309 this._handleResult(entry, err, null);
310 }
311 this.hooks.started.call(entry.item);
312 });
313 }
314
315 /**
316 * @param {AsyncQueueEntry<T, K, R>} entry the entry
317 * @param {WebpackError=} err error, if any
318 * @param {R=} result result, if any
319 * @returns {void}
320 */
321 _handleResult(entry, err, result) {
322 this.hooks.result.callAsync(entry.item, err, result, hookError => {
323 const error = hookError
324 ? makeWebpackError(hookError, `AsyncQueue(${this._name}).hooks.result`)
325 : err;
326
327 const callback = entry.callback;
328 const callbacks = entry.callbacks;
329 entry.state = DONE_STATE;
330 entry.callback = undefined;
331 entry.callbacks = undefined;
332 entry.result = result;
333 entry.error = error;
334
335 const root = this._root;
336 root._activeTasks--;
337 if (root._willEnsureProcessing === false && root._needProcessing) {
338 root._willEnsureProcessing = true;
339 setImmediate(root._ensureProcessing);
340 }
341
342 if (inHandleResult++ > 3) {
343 process.nextTick(() => {
344 callback(error, result);
345 if (callbacks !== undefined) {
346 for (const callback of callbacks) {
347 callback(error, result);
348 }
349 }
350 });
351 } else {
352 callback(error, result);
353 if (callbacks !== undefined) {
354 for (const callback of callbacks) {
355 callback(error, result);
356 }
357 }
358 }
359 inHandleResult--;
360 });
361 }
362
363 clear() {
364 this._entries.clear();
365 this._queued.clear();
366 this._activeTasks = 0;
367 this._willEnsureProcessing = false;
368 this._needProcessing = false;
369 this._stopped = false;
370 }
371}
372
373module.exports = AsyncQueue;
Note: See TracBrowser for help on using the repository browser.