source: trip-planner-front/node_modules/minipass/index.js@ ceaed42

Last change on this file since ceaed42 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 14.3 KB
Line 
1'use strict'
2const proc = typeof process === 'object' && process ? process : {
3 stdout: null,
4 stderr: null,
5}
6const EE = require('events')
7const Stream = require('stream')
8const Yallist = require('yallist')
9const SD = require('string_decoder').StringDecoder
10
11const EOF = Symbol('EOF')
12const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
13const EMITTED_END = Symbol('emittedEnd')
14const EMITTING_END = Symbol('emittingEnd')
15const EMITTED_ERROR = Symbol('emittedError')
16const CLOSED = Symbol('closed')
17const READ = Symbol('read')
18const FLUSH = Symbol('flush')
19const FLUSHCHUNK = Symbol('flushChunk')
20const ENCODING = Symbol('encoding')
21const DECODER = Symbol('decoder')
22const FLOWING = Symbol('flowing')
23const PAUSED = Symbol('paused')
24const RESUME = Symbol('resume')
25const BUFFERLENGTH = Symbol('bufferLength')
26const BUFFERPUSH = Symbol('bufferPush')
27const BUFFERSHIFT = Symbol('bufferShift')
28const OBJECTMODE = Symbol('objectMode')
29const DESTROYED = Symbol('destroyed')
30
31// TODO remove when Node v8 support drops
32const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
33const ASYNCITERATOR = doIter && Symbol.asyncIterator
34 || Symbol('asyncIterator not implemented')
35const ITERATOR = doIter && Symbol.iterator
36 || Symbol('iterator not implemented')
37
38// events that mean 'the stream is over'
39// these are treated specially, and re-emitted
40// if they are listened for after emitting.
41const isEndish = ev =>
42 ev === 'end' ||
43 ev === 'finish' ||
44 ev === 'prefinish'
45
46const isArrayBuffer = b => b instanceof ArrayBuffer ||
47 typeof b === 'object' &&
48 b.constructor &&
49 b.constructor.name === 'ArrayBuffer' &&
50 b.byteLength >= 0
51
52const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
53
54module.exports = class Minipass extends Stream {
55 constructor (options) {
56 super()
57 this[FLOWING] = false
58 // whether we're explicitly paused
59 this[PAUSED] = false
60 this.pipes = new Yallist()
61 this.buffer = new Yallist()
62 this[OBJECTMODE] = options && options.objectMode || false
63 if (this[OBJECTMODE])
64 this[ENCODING] = null
65 else
66 this[ENCODING] = options && options.encoding || null
67 if (this[ENCODING] === 'buffer')
68 this[ENCODING] = null
69 this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
70 this[EOF] = false
71 this[EMITTED_END] = false
72 this[EMITTING_END] = false
73 this[CLOSED] = false
74 this[EMITTED_ERROR] = null
75 this.writable = true
76 this.readable = true
77 this[BUFFERLENGTH] = 0
78 this[DESTROYED] = false
79 }
80
81 get bufferLength () { return this[BUFFERLENGTH] }
82
83 get encoding () { return this[ENCODING] }
84 set encoding (enc) {
85 if (this[OBJECTMODE])
86 throw new Error('cannot set encoding in objectMode')
87
88 if (this[ENCODING] && enc !== this[ENCODING] &&
89 (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH]))
90 throw new Error('cannot change encoding')
91
92 if (this[ENCODING] !== enc) {
93 this[DECODER] = enc ? new SD(enc) : null
94 if (this.buffer.length)
95 this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk))
96 }
97
98 this[ENCODING] = enc
99 }
100
101 setEncoding (enc) {
102 this.encoding = enc
103 }
104
105 get objectMode () { return this[OBJECTMODE] }
106 set objectMode (om) { this[OBJECTMODE] = this[OBJECTMODE] || !!om }
107
108 write (chunk, encoding, cb) {
109 if (this[EOF])
110 throw new Error('write after end')
111
112 if (this[DESTROYED]) {
113 this.emit('error', Object.assign(
114 new Error('Cannot call write after a stream was destroyed'),
115 { code: 'ERR_STREAM_DESTROYED' }
116 ))
117 return true
118 }
119
120 if (typeof encoding === 'function')
121 cb = encoding, encoding = 'utf8'
122
123 if (!encoding)
124 encoding = 'utf8'
125
126 // convert array buffers and typed array views into buffers
127 // at some point in the future, we may want to do the opposite!
128 // leave strings and buffers as-is
129 // anything else switches us into object mode
130 if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
131 if (isArrayBufferView(chunk))
132 chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
133 else if (isArrayBuffer(chunk))
134 chunk = Buffer.from(chunk)
135 else if (typeof chunk !== 'string')
136 // use the setter so we throw if we have encoding set
137 this.objectMode = true
138 }
139
140 // this ensures at this point that the chunk is a buffer or string
141 // don't buffer it up or send it to the decoder
142 if (!this.objectMode && !chunk.length) {
143 if (this[BUFFERLENGTH] !== 0)
144 this.emit('readable')
145 if (cb)
146 cb()
147 return this.flowing
148 }
149
150 // fast-path writing strings of same encoding to a stream with
151 // an empty buffer, skipping the buffer/decoder dance
152 if (typeof chunk === 'string' && !this[OBJECTMODE] &&
153 // unless it is a string already ready for us to use
154 !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
155 chunk = Buffer.from(chunk, encoding)
156 }
157
158 if (Buffer.isBuffer(chunk) && this[ENCODING])
159 chunk = this[DECODER].write(chunk)
160
161 if (this.flowing) {
162 // if we somehow have something in the buffer, but we think we're
163 // flowing, then we need to flush all that out first, or we get
164 // chunks coming in out of order. Can't emit 'drain' here though,
165 // because we're mid-write, so that'd be bad.
166 if (this[BUFFERLENGTH] !== 0)
167 this[FLUSH](true)
168 this.emit('data', chunk)
169 } else
170 this[BUFFERPUSH](chunk)
171
172 if (this[BUFFERLENGTH] !== 0)
173 this.emit('readable')
174
175 if (cb)
176 cb()
177
178 return this.flowing
179 }
180
181 read (n) {
182 if (this[DESTROYED])
183 return null
184
185 try {
186 if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
187 return null
188
189 if (this[OBJECTMODE])
190 n = null
191
192 if (this.buffer.length > 1 && !this[OBJECTMODE]) {
193 if (this.encoding)
194 this.buffer = new Yallist([
195 Array.from(this.buffer).join('')
196 ])
197 else
198 this.buffer = new Yallist([
199 Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH])
200 ])
201 }
202
203 return this[READ](n || null, this.buffer.head.value)
204 } finally {
205 this[MAYBE_EMIT_END]()
206 }
207 }
208
209 [READ] (n, chunk) {
210 if (n === chunk.length || n === null)
211 this[BUFFERSHIFT]()
212 else {
213 this.buffer.head.value = chunk.slice(n)
214 chunk = chunk.slice(0, n)
215 this[BUFFERLENGTH] -= n
216 }
217
218 this.emit('data', chunk)
219
220 if (!this.buffer.length && !this[EOF])
221 this.emit('drain')
222
223 return chunk
224 }
225
226 end (chunk, encoding, cb) {
227 if (typeof chunk === 'function')
228 cb = chunk, chunk = null
229 if (typeof encoding === 'function')
230 cb = encoding, encoding = 'utf8'
231 if (chunk)
232 this.write(chunk, encoding)
233 if (cb)
234 this.once('end', cb)
235 this[EOF] = true
236 this.writable = false
237
238 // if we haven't written anything, then go ahead and emit,
239 // even if we're not reading.
240 // we'll re-emit if a new 'end' listener is added anyway.
241 // This makes MP more suitable to write-only use cases.
242 if (this.flowing || !this[PAUSED])
243 this[MAYBE_EMIT_END]()
244 return this
245 }
246
247 // don't let the internal resume be overwritten
248 [RESUME] () {
249 if (this[DESTROYED])
250 return
251
252 this[PAUSED] = false
253 this[FLOWING] = true
254 this.emit('resume')
255 if (this.buffer.length)
256 this[FLUSH]()
257 else if (this[EOF])
258 this[MAYBE_EMIT_END]()
259 else
260 this.emit('drain')
261 }
262
263 resume () {
264 return this[RESUME]()
265 }
266
267 pause () {
268 this[FLOWING] = false
269 this[PAUSED] = true
270 }
271
272 get destroyed () {
273 return this[DESTROYED]
274 }
275
276 get flowing () {
277 return this[FLOWING]
278 }
279
280 get paused () {
281 return this[PAUSED]
282 }
283
284 [BUFFERPUSH] (chunk) {
285 if (this[OBJECTMODE])
286 this[BUFFERLENGTH] += 1
287 else
288 this[BUFFERLENGTH] += chunk.length
289 return this.buffer.push(chunk)
290 }
291
292 [BUFFERSHIFT] () {
293 if (this.buffer.length) {
294 if (this[OBJECTMODE])
295 this[BUFFERLENGTH] -= 1
296 else
297 this[BUFFERLENGTH] -= this.buffer.head.value.length
298 }
299 return this.buffer.shift()
300 }
301
302 [FLUSH] (noDrain) {
303 do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))
304
305 if (!noDrain && !this.buffer.length && !this[EOF])
306 this.emit('drain')
307 }
308
309 [FLUSHCHUNK] (chunk) {
310 return chunk ? (this.emit('data', chunk), this.flowing) : false
311 }
312
313 pipe (dest, opts) {
314 if (this[DESTROYED])
315 return
316
317 const ended = this[EMITTED_END]
318 opts = opts || {}
319 if (dest === proc.stdout || dest === proc.stderr)
320 opts.end = false
321 else
322 opts.end = opts.end !== false
323
324 const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
325 this.pipes.push(p)
326
327 dest.on('drain', p.ondrain)
328 this[RESUME]()
329 // piping an ended stream ends immediately
330 if (ended && p.opts.end)
331 p.dest.end()
332 return dest
333 }
334
335 addListener (ev, fn) {
336 return this.on(ev, fn)
337 }
338
339 on (ev, fn) {
340 try {
341 return super.on(ev, fn)
342 } finally {
343 if (ev === 'data' && !this.pipes.length && !this.flowing)
344 this[RESUME]()
345 else if (isEndish(ev) && this[EMITTED_END]) {
346 super.emit(ev)
347 this.removeAllListeners(ev)
348 } else if (ev === 'error' && this[EMITTED_ERROR]) {
349 fn.call(this, this[EMITTED_ERROR])
350 }
351 }
352 }
353
354 get emittedEnd () {
355 return this[EMITTED_END]
356 }
357
358 [MAYBE_EMIT_END] () {
359 if (!this[EMITTING_END] &&
360 !this[EMITTED_END] &&
361 !this[DESTROYED] &&
362 this.buffer.length === 0 &&
363 this[EOF]) {
364 this[EMITTING_END] = true
365 this.emit('end')
366 this.emit('prefinish')
367 this.emit('finish')
368 if (this[CLOSED])
369 this.emit('close')
370 this[EMITTING_END] = false
371 }
372 }
373
374 emit (ev, data) {
375 // error and close are only events allowed after calling destroy()
376 if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
377 return
378 else if (ev === 'data') {
379 if (!data)
380 return
381
382 if (this.pipes.length)
383 this.pipes.forEach(p =>
384 p.dest.write(data) === false && this.pause())
385 } else if (ev === 'end') {
386 // only actual end gets this treatment
387 if (this[EMITTED_END] === true)
388 return
389
390 this[EMITTED_END] = true
391 this.readable = false
392
393 if (this[DECODER]) {
394 data = this[DECODER].end()
395 if (data) {
396 this.pipes.forEach(p => p.dest.write(data))
397 super.emit('data', data)
398 }
399 }
400
401 this.pipes.forEach(p => {
402 p.dest.removeListener('drain', p.ondrain)
403 if (p.opts.end)
404 p.dest.end()
405 })
406 } else if (ev === 'close') {
407 this[CLOSED] = true
408 // don't emit close before 'end' and 'finish'
409 if (!this[EMITTED_END] && !this[DESTROYED])
410 return
411 } else if (ev === 'error') {
412 this[EMITTED_ERROR] = data
413 }
414
415 // TODO: replace with a spread operator when Node v4 support drops
416 const args = new Array(arguments.length)
417 args[0] = ev
418 args[1] = data
419 if (arguments.length > 2) {
420 for (let i = 2; i < arguments.length; i++) {
421 args[i] = arguments[i]
422 }
423 }
424
425 try {
426 return super.emit.apply(this, args)
427 } finally {
428 if (!isEndish(ev))
429 this[MAYBE_EMIT_END]()
430 else
431 this.removeAllListeners(ev)
432 }
433 }
434
435 // const all = await stream.collect()
436 collect () {
437 const buf = []
438 if (!this[OBJECTMODE])
439 buf.dataLength = 0
440 // set the promise first, in case an error is raised
441 // by triggering the flow here.
442 const p = this.promise()
443 this.on('data', c => {
444 buf.push(c)
445 if (!this[OBJECTMODE])
446 buf.dataLength += c.length
447 })
448 return p.then(() => buf)
449 }
450
451 // const data = await stream.concat()
452 concat () {
453 return this[OBJECTMODE]
454 ? Promise.reject(new Error('cannot concat in objectMode'))
455 : this.collect().then(buf =>
456 this[OBJECTMODE]
457 ? Promise.reject(new Error('cannot concat in objectMode'))
458 : this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength))
459 }
460
461 // stream.promise().then(() => done, er => emitted error)
462 promise () {
463 return new Promise((resolve, reject) => {
464 this.on(DESTROYED, () => reject(new Error('stream destroyed')))
465 this.on('error', er => reject(er))
466 this.on('end', () => resolve())
467 })
468 }
469
470 // for await (let chunk of stream)
471 [ASYNCITERATOR] () {
472 const next = () => {
473 const res = this.read()
474 if (res !== null)
475 return Promise.resolve({ done: false, value: res })
476
477 if (this[EOF])
478 return Promise.resolve({ done: true })
479
480 let resolve = null
481 let reject = null
482 const onerr = er => {
483 this.removeListener('data', ondata)
484 this.removeListener('end', onend)
485 reject(er)
486 }
487 const ondata = value => {
488 this.removeListener('error', onerr)
489 this.removeListener('end', onend)
490 this.pause()
491 resolve({ value: value, done: !!this[EOF] })
492 }
493 const onend = () => {
494 this.removeListener('error', onerr)
495 this.removeListener('data', ondata)
496 resolve({ done: true })
497 }
498 const ondestroy = () => onerr(new Error('stream destroyed'))
499 return new Promise((res, rej) => {
500 reject = rej
501 resolve = res
502 this.once(DESTROYED, ondestroy)
503 this.once('error', onerr)
504 this.once('end', onend)
505 this.once('data', ondata)
506 })
507 }
508
509 return { next }
510 }
511
512 // for (let chunk of stream)
513 [ITERATOR] () {
514 const next = () => {
515 const value = this.read()
516 const done = value === null
517 return { value, done }
518 }
519 return { next }
520 }
521
522 destroy (er) {
523 if (this[DESTROYED]) {
524 if (er)
525 this.emit('error', er)
526 else
527 this.emit(DESTROYED)
528 return this
529 }
530
531 this[DESTROYED] = true
532
533 // throw away all buffered data, it's never coming out
534 this.buffer = new Yallist()
535 this[BUFFERLENGTH] = 0
536
537 if (typeof this.close === 'function' && !this[CLOSED])
538 this.close()
539
540 if (er)
541 this.emit('error', er)
542 else // if no error to emit, still reject pending promises
543 this.emit(DESTROYED)
544
545 return this
546 }
547
548 static isStream (s) {
549 return !!s && (s instanceof Minipass || s instanceof Stream ||
550 s instanceof EE && (
551 typeof s.pipe === 'function' || // readable
552 (typeof s.write === 'function' && typeof s.end === 'function') // writable
553 ))
554 }
555}
Note: See TracBrowser for help on using the repository browser.