source: imaps-frontend/node_modules/fastq/queue.js@ 0c6b92a

main
Last change on this file since 0c6b92a was d565449, checked in by stefan toskovski <stefantoska84@…>, 3 months ago

Update repo after prototype presentation

  • Property mode set to 100644
File size: 6.0 KB
Line 
1'use strict'
2
3/* eslint-disable no-var */
4
5var reusify = require('reusify')
6
7function fastqueue (context, worker, _concurrency) {
8 if (typeof context === 'function') {
9 _concurrency = worker
10 worker = context
11 context = null
12 }
13
14 if (!(_concurrency >= 1)) {
15 throw new Error('fastqueue concurrency must be equal to or greater than 1')
16 }
17
18 var cache = reusify(Task)
19 var queueHead = null
20 var queueTail = null
21 var _running = 0
22 var errorHandler = null
23
24 var self = {
25 push: push,
26 drain: noop,
27 saturated: noop,
28 pause: pause,
29 paused: false,
30
31 get concurrency () {
32 return _concurrency
33 },
34 set concurrency (value) {
35 if (!(value >= 1)) {
36 throw new Error('fastqueue concurrency must be equal to or greater than 1')
37 }
38 _concurrency = value
39
40 if (self.paused) return
41 for (; queueHead && _running < _concurrency;) {
42 _running++
43 release()
44 }
45 },
46
47 running: running,
48 resume: resume,
49 idle: idle,
50 length: length,
51 getQueue: getQueue,
52 unshift: unshift,
53 empty: noop,
54 kill: kill,
55 killAndDrain: killAndDrain,
56 error: error
57 }
58
59 return self
60
61 function running () {
62 return _running
63 }
64
65 function pause () {
66 self.paused = true
67 }
68
69 function length () {
70 var current = queueHead
71 var counter = 0
72
73 while (current) {
74 current = current.next
75 counter++
76 }
77
78 return counter
79 }
80
81 function getQueue () {
82 var current = queueHead
83 var tasks = []
84
85 while (current) {
86 tasks.push(current.value)
87 current = current.next
88 }
89
90 return tasks
91 }
92
93 function resume () {
94 if (!self.paused) return
95 self.paused = false
96 if (queueHead === null) {
97 _running++
98 release()
99 return
100 }
101 for (; queueHead && _running < _concurrency;) {
102 _running++
103 release()
104 }
105 }
106
107 function idle () {
108 return _running === 0 && self.length() === 0
109 }
110
111 function push (value, done) {
112 var current = cache.get()
113
114 current.context = context
115 current.release = release
116 current.value = value
117 current.callback = done || noop
118 current.errorHandler = errorHandler
119
120 if (_running >= _concurrency || self.paused) {
121 if (queueTail) {
122 queueTail.next = current
123 queueTail = current
124 } else {
125 queueHead = current
126 queueTail = current
127 self.saturated()
128 }
129 } else {
130 _running++
131 worker.call(context, current.value, current.worked)
132 }
133 }
134
135 function unshift (value, done) {
136 var current = cache.get()
137
138 current.context = context
139 current.release = release
140 current.value = value
141 current.callback = done || noop
142 current.errorHandler = errorHandler
143
144 if (_running >= _concurrency || self.paused) {
145 if (queueHead) {
146 current.next = queueHead
147 queueHead = current
148 } else {
149 queueHead = current
150 queueTail = current
151 self.saturated()
152 }
153 } else {
154 _running++
155 worker.call(context, current.value, current.worked)
156 }
157 }
158
159 function release (holder) {
160 if (holder) {
161 cache.release(holder)
162 }
163 var next = queueHead
164 if (next && _running <= _concurrency) {
165 if (!self.paused) {
166 if (queueTail === queueHead) {
167 queueTail = null
168 }
169 queueHead = next.next
170 next.next = null
171 worker.call(context, next.value, next.worked)
172 if (queueTail === null) {
173 self.empty()
174 }
175 } else {
176 _running--
177 }
178 } else if (--_running === 0) {
179 self.drain()
180 }
181 }
182
183 function kill () {
184 queueHead = null
185 queueTail = null
186 self.drain = noop
187 }
188
189 function killAndDrain () {
190 queueHead = null
191 queueTail = null
192 self.drain()
193 self.drain = noop
194 }
195
196 function error (handler) {
197 errorHandler = handler
198 }
199}
200
201function noop () {}
202
203function Task () {
204 this.value = null
205 this.callback = noop
206 this.next = null
207 this.release = noop
208 this.context = null
209 this.errorHandler = null
210
211 var self = this
212
213 this.worked = function worked (err, result) {
214 var callback = self.callback
215 var errorHandler = self.errorHandler
216 var val = self.value
217 self.value = null
218 self.callback = noop
219 if (self.errorHandler) {
220 errorHandler(err, val)
221 }
222 callback.call(self.context, err, result)
223 self.release(self)
224 }
225}
226
227function queueAsPromised (context, worker, _concurrency) {
228 if (typeof context === 'function') {
229 _concurrency = worker
230 worker = context
231 context = null
232 }
233
234 function asyncWrapper (arg, cb) {
235 worker.call(this, arg)
236 .then(function (res) {
237 cb(null, res)
238 }, cb)
239 }
240
241 var queue = fastqueue(context, asyncWrapper, _concurrency)
242
243 var pushCb = queue.push
244 var unshiftCb = queue.unshift
245
246 queue.push = push
247 queue.unshift = unshift
248 queue.drained = drained
249
250 return queue
251
252 function push (value) {
253 var p = new Promise(function (resolve, reject) {
254 pushCb(value, function (err, result) {
255 if (err) {
256 reject(err)
257 return
258 }
259 resolve(result)
260 })
261 })
262
263 // Let's fork the promise chain to
264 // make the error bubble up to the user but
265 // not lead to a unhandledRejection
266 p.catch(noop)
267
268 return p
269 }
270
271 function unshift (value) {
272 var p = new Promise(function (resolve, reject) {
273 unshiftCb(value, function (err, result) {
274 if (err) {
275 reject(err)
276 return
277 }
278 resolve(result)
279 })
280 })
281
282 // Let's fork the promise chain to
283 // make the error bubble up to the user but
284 // not lead to a unhandledRejection
285 p.catch(noop)
286
287 return p
288 }
289
290 function drained () {
291 if (queue.idle()) {
292 return new Promise(function (resolve) {
293 resolve()
294 })
295 }
296
297 var previousDrain = queue.drain
298
299 var p = new Promise(function (resolve) {
300 queue.drain = function () {
301 previousDrain()
302 resolve()
303 }
304 })
305
306 return p
307 }
308}
309
310module.exports = fastqueue
311module.exports.promise = queueAsPromised
Note: See TracBrowser for help on using the repository browser.