source: imaps-frontend/node_modules/webpack/lib/util/AsyncQueue.js@ 79a0317

main
Last change on this file since 79a0317 was 79a0317, checked in by stefan toskovski <stefantoska84@…>, 3 days ago

F4 Finalna Verzija

  • Property mode set to 100644
File size: 10.3 KB
Line 
1/*
2 MIT License http://www.opensource.org/licenses/mit-license.php
3 Author Tobias Koppers @sokra
4*/
5
6"use strict";
7
8const { SyncHook, AsyncSeriesHook } = require("tapable");
9const { makeWebpackError } = require("../HookWebpackError");
10const WebpackError = require("../WebpackError");
11const ArrayQueue = require("./ArrayQueue");
12
13const QUEUED_STATE = 0;
14const PROCESSING_STATE = 1;
15const DONE_STATE = 2;
16
17let inHandleResult = 0;
18
19/**
20 * @template T
21 * @callback Callback
22 * @param {(WebpackError | null)=} err
23 * @param {(T | null)=} result
24 */
25
26/**
27 * @template T
28 * @template K
29 * @template R
30 */
31class AsyncQueueEntry {
32 /**
33 * @param {T} item the item
34 * @param {Callback<R>} callback the callback
35 */
36 constructor(item, callback) {
37 this.item = item;
38 /** @type {typeof QUEUED_STATE | typeof PROCESSING_STATE | typeof DONE_STATE} */
39 this.state = QUEUED_STATE;
40 /** @type {Callback<R> | undefined} */
41 this.callback = callback;
42 /** @type {Callback<R>[] | undefined} */
43 this.callbacks = undefined;
44 /** @type {R | null | undefined} */
45 this.result = undefined;
46 /** @type {WebpackError | null | undefined} */
47 this.error = undefined;
48 }
49}
50
51/**
52 * @template T, K
53 * @typedef {function(T): K} getKey
54 */
55
56/**
57 * @template T, R
58 * @typedef {function(T, Callback<R>): void} Processor
59 */
60
61/**
62 * @template T
63 * @template K
64 * @template R
65 */
66class AsyncQueue {
67 /**
68 * @param {object} options options object
69 * @param {string=} options.name name of the queue
70 * @param {number=} options.parallelism how many items should be processed at once
71 * @param {string=} options.context context of execution
72 * @param {AsyncQueue<any, any, any>=} options.parent parent queue, which will have priority over this queue and with shared parallelism
73 * @param {getKey<T, K>=} options.getKey extract key from item
74 * @param {Processor<T, R>} options.processor async function to process items
75 */
76 constructor({ name, context, parallelism, parent, processor, getKey }) {
77 this._name = name;
78 this._context = context || "normal";
79 this._parallelism = parallelism || 1;
80 this._processor = processor;
81 this._getKey =
82 getKey ||
83 /** @type {getKey<T, K>} */ (item => /** @type {T & K} */ (item));
84 /** @type {Map<K, AsyncQueueEntry<T, K, R>>} */
85 this._entries = new Map();
86 /** @type {ArrayQueue<AsyncQueueEntry<T, K, R>>} */
87 this._queued = new ArrayQueue();
88 /** @type {AsyncQueue<any, any, any>[] | undefined} */
89 this._children = undefined;
90 this._activeTasks = 0;
91 this._willEnsureProcessing = false;
92 this._needProcessing = false;
93 this._stopped = false;
94 /** @type {AsyncQueue<any, any, any>} */
95 this._root = parent ? parent._root : this;
96 if (parent) {
97 if (this._root._children === undefined) {
98 this._root._children = [this];
99 } else {
100 this._root._children.push(this);
101 }
102 }
103
104 this.hooks = {
105 /** @type {AsyncSeriesHook<[T]>} */
106 beforeAdd: new AsyncSeriesHook(["item"]),
107 /** @type {SyncHook<[T]>} */
108 added: new SyncHook(["item"]),
109 /** @type {AsyncSeriesHook<[T]>} */
110 beforeStart: new AsyncSeriesHook(["item"]),
111 /** @type {SyncHook<[T]>} */
112 started: new SyncHook(["item"]),
113 /** @type {SyncHook<[T, WebpackError | null | undefined, R | null | undefined]>} */
114 result: new SyncHook(["item", "error", "result"])
115 };
116
117 this._ensureProcessing = this._ensureProcessing.bind(this);
118 }
119
120 /**
121 * @returns {string} context of execution
122 */
123 getContext() {
124 return this._context;
125 }
126
127 /**
128 * @param {string} value context of execution
129 */
130 setContext(value) {
131 this._context = value;
132 }
133
134 /**
135 * @param {T} item an item
136 * @param {Callback<R>} callback callback function
137 * @returns {void}
138 */
139 add(item, callback) {
140 if (this._stopped) return callback(new WebpackError("Queue was stopped"));
141 this.hooks.beforeAdd.callAsync(item, err => {
142 if (err) {
143 callback(
144 makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeAdd`)
145 );
146 return;
147 }
148 const key = this._getKey(item);
149 const entry = this._entries.get(key);
150 if (entry !== undefined) {
151 if (entry.state === DONE_STATE) {
152 if (inHandleResult++ > 3) {
153 process.nextTick(() => callback(entry.error, entry.result));
154 } else {
155 callback(entry.error, entry.result);
156 }
157 inHandleResult--;
158 } else if (entry.callbacks === undefined) {
159 entry.callbacks = [callback];
160 } else {
161 entry.callbacks.push(callback);
162 }
163 return;
164 }
165 const newEntry = new AsyncQueueEntry(item, callback);
166 if (this._stopped) {
167 this.hooks.added.call(item);
168 this._root._activeTasks++;
169 process.nextTick(() =>
170 this._handleResult(newEntry, new WebpackError("Queue was stopped"))
171 );
172 } else {
173 this._entries.set(key, newEntry);
174 this._queued.enqueue(newEntry);
175 const root = this._root;
176 root._needProcessing = true;
177 if (root._willEnsureProcessing === false) {
178 root._willEnsureProcessing = true;
179 setImmediate(root._ensureProcessing);
180 }
181 this.hooks.added.call(item);
182 }
183 });
184 }
185
186 /**
187 * @param {T} item an item
188 * @returns {void}
189 */
190 invalidate(item) {
191 const key = this._getKey(item);
192 const entry =
193 /** @type {AsyncQueueEntry<T, K, R>} */
194 (this._entries.get(key));
195 this._entries.delete(key);
196 if (entry.state === QUEUED_STATE) {
197 this._queued.delete(entry);
198 }
199 }
200
201 /**
202 * Waits for an already started item
203 * @param {T} item an item
204 * @param {Callback<R>} callback callback function
205 * @returns {void}
206 */
207 waitFor(item, callback) {
208 const key = this._getKey(item);
209 const entry = this._entries.get(key);
210 if (entry === undefined) {
211 return callback(
212 new WebpackError(
213 "waitFor can only be called for an already started item"
214 )
215 );
216 }
217 if (entry.state === DONE_STATE) {
218 process.nextTick(() => callback(entry.error, entry.result));
219 } else if (entry.callbacks === undefined) {
220 entry.callbacks = [callback];
221 } else {
222 entry.callbacks.push(callback);
223 }
224 }
225
226 /**
227 * @returns {void}
228 */
229 stop() {
230 this._stopped = true;
231 const queue = this._queued;
232 this._queued = new ArrayQueue();
233 const root = this._root;
234 for (const entry of queue) {
235 this._entries.delete(
236 this._getKey(/** @type {AsyncQueueEntry<T, K, R>} */ (entry).item)
237 );
238 root._activeTasks++;
239 this._handleResult(
240 /** @type {AsyncQueueEntry<T, K, R>} */ (entry),
241 new WebpackError("Queue was stopped")
242 );
243 }
244 }
245
246 /**
247 * @returns {void}
248 */
249 increaseParallelism() {
250 const root = this._root;
251 root._parallelism++;
252 /* istanbul ignore next */
253 if (root._willEnsureProcessing === false && root._needProcessing) {
254 root._willEnsureProcessing = true;
255 setImmediate(root._ensureProcessing);
256 }
257 }
258
259 /**
260 * @returns {void}
261 */
262 decreaseParallelism() {
263 const root = this._root;
264 root._parallelism--;
265 }
266
267 /**
268 * @param {T} item an item
269 * @returns {boolean} true, if the item is currently being processed
270 */
271 isProcessing(item) {
272 const key = this._getKey(item);
273 const entry = this._entries.get(key);
274 return entry !== undefined && entry.state === PROCESSING_STATE;
275 }
276
277 /**
278 * @param {T} item an item
279 * @returns {boolean} true, if the item is currently queued
280 */
281 isQueued(item) {
282 const key = this._getKey(item);
283 const entry = this._entries.get(key);
284 return entry !== undefined && entry.state === QUEUED_STATE;
285 }
286
287 /**
288 * @param {T} item an item
289 * @returns {boolean} true, if the item is currently queued
290 */
291 isDone(item) {
292 const key = this._getKey(item);
293 const entry = this._entries.get(key);
294 return entry !== undefined && entry.state === DONE_STATE;
295 }
296
297 /**
298 * @returns {void}
299 */
300 _ensureProcessing() {
301 while (this._activeTasks < this._parallelism) {
302 const entry = this._queued.dequeue();
303 if (entry === undefined) break;
304 this._activeTasks++;
305 entry.state = PROCESSING_STATE;
306 this._startProcessing(entry);
307 }
308 this._willEnsureProcessing = false;
309 if (this._queued.length > 0) return;
310 if (this._children !== undefined) {
311 for (const child of this._children) {
312 while (this._activeTasks < this._parallelism) {
313 const entry = child._queued.dequeue();
314 if (entry === undefined) break;
315 this._activeTasks++;
316 entry.state = PROCESSING_STATE;
317 child._startProcessing(entry);
318 }
319 if (child._queued.length > 0) return;
320 }
321 }
322 if (!this._willEnsureProcessing) this._needProcessing = false;
323 }
324
325 /**
326 * @param {AsyncQueueEntry<T, K, R>} entry the entry
327 * @returns {void}
328 */
329 _startProcessing(entry) {
330 this.hooks.beforeStart.callAsync(entry.item, err => {
331 if (err) {
332 this._handleResult(
333 entry,
334 makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeStart`)
335 );
336 return;
337 }
338 let inCallback = false;
339 try {
340 this._processor(entry.item, (e, r) => {
341 inCallback = true;
342 this._handleResult(entry, e, r);
343 });
344 } catch (err) {
345 if (inCallback) throw err;
346 this._handleResult(entry, /** @type {WebpackError} */ (err), null);
347 }
348 this.hooks.started.call(entry.item);
349 });
350 }
351
352 /**
353 * @param {AsyncQueueEntry<T, K, R>} entry the entry
354 * @param {(WebpackError | null)=} err error, if any
355 * @param {(R | null)=} result result, if any
356 * @returns {void}
357 */
358 _handleResult(entry, err, result) {
359 this.hooks.result.callAsync(entry.item, err, result, hookError => {
360 const error = hookError
361 ? makeWebpackError(hookError, `AsyncQueue(${this._name}).hooks.result`)
362 : err;
363
364 const callback = /** @type {Callback<R>} */ (entry.callback);
365 const callbacks = entry.callbacks;
366 entry.state = DONE_STATE;
367 entry.callback = undefined;
368 entry.callbacks = undefined;
369 entry.result = result;
370 entry.error = error;
371
372 const root = this._root;
373 root._activeTasks--;
374 if (root._willEnsureProcessing === false && root._needProcessing) {
375 root._willEnsureProcessing = true;
376 setImmediate(root._ensureProcessing);
377 }
378
379 if (inHandleResult++ > 3) {
380 process.nextTick(() => {
381 callback(error, result);
382 if (callbacks !== undefined) {
383 for (const callback of callbacks) {
384 callback(error, result);
385 }
386 }
387 });
388 } else {
389 callback(error, result);
390 if (callbacks !== undefined) {
391 for (const callback of callbacks) {
392 callback(error, result);
393 }
394 }
395 }
396 inHandleResult--;
397 });
398 }
399
400 clear() {
401 this._entries.clear();
402 this._queued.clear();
403 this._activeTasks = 0;
404 this._willEnsureProcessing = false;
405 this._needProcessing = false;
406 this._stopped = false;
407 }
408}
409
410module.exports = AsyncQueue;
Note: See TracBrowser for help on using the repository browser.