main
Last change
on this file since 0c6b92a was d565449, checked in by stefan toskovski <stefantoska84@…>, 3 months ago |
Update repo after prototype presentation
|
-
Property mode
set to
100644
|
File size:
1.5 KB
|
Rev | Line | |
---|
[d565449] | 1 | 'use strict';
|
---|
| 2 | const Queue = require('yocto-queue');
|
---|
| 3 |
|
---|
| 4 | const pLimit = concurrency => {
|
---|
| 5 | if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
|
---|
| 6 | throw new TypeError('Expected `concurrency` to be a number from 1 and up');
|
---|
| 7 | }
|
---|
| 8 |
|
---|
| 9 | const queue = new Queue();
|
---|
| 10 | let activeCount = 0;
|
---|
| 11 |
|
---|
| 12 | const next = () => {
|
---|
| 13 | activeCount--;
|
---|
| 14 |
|
---|
| 15 | if (queue.size > 0) {
|
---|
| 16 | queue.dequeue()();
|
---|
| 17 | }
|
---|
| 18 | };
|
---|
| 19 |
|
---|
| 20 | const run = async (fn, resolve, ...args) => {
|
---|
| 21 | activeCount++;
|
---|
| 22 |
|
---|
| 23 | const result = (async () => fn(...args))();
|
---|
| 24 |
|
---|
| 25 | resolve(result);
|
---|
| 26 |
|
---|
| 27 | try {
|
---|
| 28 | await result;
|
---|
| 29 | } catch {}
|
---|
| 30 |
|
---|
| 31 | next();
|
---|
| 32 | };
|
---|
| 33 |
|
---|
| 34 | const enqueue = (fn, resolve, ...args) => {
|
---|
| 35 | queue.enqueue(run.bind(null, fn, resolve, ...args));
|
---|
| 36 |
|
---|
| 37 | (async () => {
|
---|
| 38 | // This function needs to wait until the next microtask before comparing
|
---|
| 39 | // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
|
---|
| 40 | // when the run function is dequeued and called. The comparison in the if-statement
|
---|
| 41 | // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
|
---|
| 42 | await Promise.resolve();
|
---|
| 43 |
|
---|
| 44 | if (activeCount < concurrency && queue.size > 0) {
|
---|
| 45 | queue.dequeue()();
|
---|
| 46 | }
|
---|
| 47 | })();
|
---|
| 48 | };
|
---|
| 49 |
|
---|
| 50 | const generator = (fn, ...args) => new Promise(resolve => {
|
---|
| 51 | enqueue(fn, resolve, ...args);
|
---|
| 52 | });
|
---|
| 53 |
|
---|
| 54 | Object.defineProperties(generator, {
|
---|
| 55 | activeCount: {
|
---|
| 56 | get: () => activeCount
|
---|
| 57 | },
|
---|
| 58 | pendingCount: {
|
---|
| 59 | get: () => queue.size
|
---|
| 60 | },
|
---|
| 61 | clearQueue: {
|
---|
| 62 | value: () => {
|
---|
| 63 | queue.clear();
|
---|
| 64 | }
|
---|
| 65 | }
|
---|
| 66 | });
|
---|
| 67 |
|
---|
| 68 | return generator;
|
---|
| 69 | };
|
---|
| 70 |
|
---|
| 71 | module.exports = pLimit;
|
---|
Note:
See
TracBrowser
for help on using the repository browser.