source: trip-planner-front/node_modules/fastq/queue.js@ e29cc2e

Last change on this file since e29cc2e was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 5.4 KB
Line 
1'use strict'
2
3/* eslint-disable no-var */
4
5var reusify = require('reusify')
6
7function fastqueue (context, worker, concurrency) {
8 if (typeof context === 'function') {
9 concurrency = worker
10 worker = context
11 context = null
12 }
13
14 if (concurrency < 1) {
15 throw new Error('fastqueue concurrency must be greater than 1')
16 }
17
18 var cache = reusify(Task)
19 var queueHead = null
20 var queueTail = null
21 var _running = 0
22 var errorHandler = null
23
24 var self = {
25 push: push,
26 drain: noop,
27 saturated: noop,
28 pause: pause,
29 paused: false,
30 concurrency: concurrency,
31 running: running,
32 resume: resume,
33 idle: idle,
34 length: length,
35 getQueue: getQueue,
36 unshift: unshift,
37 empty: noop,
38 kill: kill,
39 killAndDrain: killAndDrain,
40 error: error
41 }
42
43 return self
44
45 function running () {
46 return _running
47 }
48
49 function pause () {
50 self.paused = true
51 }
52
53 function length () {
54 var current = queueHead
55 var counter = 0
56
57 while (current) {
58 current = current.next
59 counter++
60 }
61
62 return counter
63 }
64
65 function getQueue () {
66 var current = queueHead
67 var tasks = []
68
69 while (current) {
70 tasks.push(current.value)
71 current = current.next
72 }
73
74 return tasks
75 }
76
77 function resume () {
78 if (!self.paused) return
79 self.paused = false
80 for (var i = 0; i < self.concurrency; i++) {
81 _running++
82 release()
83 }
84 }
85
86 function idle () {
87 return _running === 0 && self.length() === 0
88 }
89
90 function push (value, done) {
91 var current = cache.get()
92
93 current.context = context
94 current.release = release
95 current.value = value
96 current.callback = done || noop
97 current.errorHandler = errorHandler
98
99 if (_running === self.concurrency || self.paused) {
100 if (queueTail) {
101 queueTail.next = current
102 queueTail = current
103 } else {
104 queueHead = current
105 queueTail = current
106 self.saturated()
107 }
108 } else {
109 _running++
110 worker.call(context, current.value, current.worked)
111 }
112 }
113
114 function unshift (value, done) {
115 var current = cache.get()
116
117 current.context = context
118 current.release = release
119 current.value = value
120 current.callback = done || noop
121
122 if (_running === self.concurrency || self.paused) {
123 if (queueHead) {
124 current.next = queueHead
125 queueHead = current
126 } else {
127 queueHead = current
128 queueTail = current
129 self.saturated()
130 }
131 } else {
132 _running++
133 worker.call(context, current.value, current.worked)
134 }
135 }
136
137 function release (holder) {
138 if (holder) {
139 cache.release(holder)
140 }
141 var next = queueHead
142 if (next) {
143 if (!self.paused) {
144 if (queueTail === queueHead) {
145 queueTail = null
146 }
147 queueHead = next.next
148 next.next = null
149 worker.call(context, next.value, next.worked)
150 if (queueTail === null) {
151 self.empty()
152 }
153 } else {
154 _running--
155 }
156 } else if (--_running === 0) {
157 self.drain()
158 }
159 }
160
161 function kill () {
162 queueHead = null
163 queueTail = null
164 self.drain = noop
165 }
166
167 function killAndDrain () {
168 queueHead = null
169 queueTail = null
170 self.drain()
171 self.drain = noop
172 }
173
174 function error (handler) {
175 errorHandler = handler
176 }
177}
178
179function noop () {}
180
181function Task () {
182 this.value = null
183 this.callback = noop
184 this.next = null
185 this.release = noop
186 this.context = null
187 this.errorHandler = null
188
189 var self = this
190
191 this.worked = function worked (err, result) {
192 var callback = self.callback
193 var errorHandler = self.errorHandler
194 var val = self.value
195 self.value = null
196 self.callback = noop
197 if (self.errorHandler) {
198 errorHandler(err, val)
199 }
200 callback.call(self.context, err, result)
201 self.release(self)
202 }
203}
204
205function queueAsPromised (context, worker, concurrency) {
206 if (typeof context === 'function') {
207 concurrency = worker
208 worker = context
209 context = null
210 }
211
212 function asyncWrapper (arg, cb) {
213 worker.call(this, arg)
214 .then(function (res) {
215 cb(null, res)
216 }, cb)
217 }
218
219 var queue = fastqueue(context, asyncWrapper, concurrency)
220
221 var pushCb = queue.push
222 var unshiftCb = queue.unshift
223
224 queue.push = push
225 queue.unshift = unshift
226 queue.drained = drained
227
228 return queue
229
230 function push (value) {
231 var p = new Promise(function (resolve, reject) {
232 pushCb(value, function (err, result) {
233 if (err) {
234 reject(err)
235 return
236 }
237 resolve(result)
238 })
239 })
240
241 // Let's fork the promise chain to
242 // make the error bubble up to the user but
243 // not lead to a unhandledRejection
244 p.catch(noop)
245
246 return p
247 }
248
249 function unshift (value) {
250 var p = new Promise(function (resolve, reject) {
251 unshiftCb(value, function (err, result) {
252 if (err) {
253 reject(err)
254 return
255 }
256 resolve(result)
257 })
258 })
259
260 // Let's fork the promise chain to
261 // make the error bubble up to the user but
262 // not lead to a unhandledRejection
263 p.catch(noop)
264
265 return p
266 }
267
268 function drained () {
269 var previousDrain = queue.drain
270
271 var p = new Promise(function (resolve) {
272 queue.drain = function () {
273 previousDrain()
274 resolve()
275 }
276 })
277
278 return p
279 }
280}
281
282module.exports = fastqueue
283module.exports.promise = queueAsPromised
Note: See TracBrowser for help on using the repository browser.