source: trip-planner-front/node_modules/minipass-pipeline/index.js@ eed0bf8

Last change on this file since eed0bf8 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 3.3 KB
Line 
1const Minipass = require('minipass')
2const EE = require('events')
3const isStream = s => s && s instanceof EE && (
4 typeof s.pipe === 'function' || // readable
5 (typeof s.write === 'function' && typeof s.end === 'function') // writable
6)
7
8const _head = Symbol('_head')
9const _tail = Symbol('_tail')
10const _linkStreams = Symbol('_linkStreams')
11const _setHead = Symbol('_setHead')
12const _setTail = Symbol('_setTail')
13const _onError = Symbol('_onError')
14const _onData = Symbol('_onData')
15const _onEnd = Symbol('_onEnd')
16const _onDrain = Symbol('_onDrain')
17const _streams = Symbol('_streams')
18class Pipeline extends Minipass {
19 constructor (opts, ...streams) {
20 if (isStream(opts)) {
21 streams.unshift(opts)
22 opts = {}
23 }
24
25 super(opts)
26 this[_streams] = []
27 if (streams.length)
28 this.push(...streams)
29 }
30
31 [_linkStreams] (streams) {
32 // reduce takes (left,right), and we return right to make it the
33 // new left value.
34 return streams.reduce((src, dest) => {
35 src.on('error', er => dest.emit('error', er))
36 src.pipe(dest)
37 return dest
38 })
39 }
40
41 push (...streams) {
42 this[_streams].push(...streams)
43 if (this[_tail])
44 streams.unshift(this[_tail])
45
46 const linkRet = this[_linkStreams](streams)
47
48 this[_setTail](linkRet)
49 if (!this[_head])
50 this[_setHead](streams[0])
51 }
52
53 unshift (...streams) {
54 this[_streams].unshift(...streams)
55 if (this[_head])
56 streams.push(this[_head])
57
58 const linkRet = this[_linkStreams](streams)
59 this[_setHead](streams[0])
60 if (!this[_tail])
61 this[_setTail](linkRet)
62 }
63
64 destroy (er) {
65 // set fire to the whole thing.
66 this[_streams].forEach(s =>
67 typeof s.destroy === 'function' && s.destroy())
68 return super.destroy(er)
69 }
70
71 // readable interface -> tail
72 [_setTail] (stream) {
73 this[_tail] = stream
74 stream.on('error', er => this[_onError](stream, er))
75 stream.on('data', chunk => this[_onData](stream, chunk))
76 stream.on('end', () => this[_onEnd](stream))
77 stream.on('finish', () => this[_onEnd](stream))
78 }
79
80 // errors proxied down the pipeline
81 // they're considered part of the "read" interface
82 [_onError] (stream, er) {
83 if (stream === this[_tail])
84 this.emit('error', er)
85 }
86 [_onData] (stream, chunk) {
87 if (stream === this[_tail])
88 super.write(chunk)
89 }
90 [_onEnd] (stream) {
91 if (stream === this[_tail])
92 super.end()
93 }
94 pause () {
95 super.pause()
96 return this[_tail] && this[_tail].pause && this[_tail].pause()
97 }
98
99 // NB: Minipass calls its internal private [RESUME] method during
100 // pipe drains, to avoid hazards where stream.resume() is overridden.
101 // Thus, we need to listen to the resume *event*, not override the
102 // resume() method, and proxy *that* to the tail.
103 emit (ev, ...args) {
104 if (ev === 'resume' && this[_tail] && this[_tail].resume)
105 this[_tail].resume()
106 return super.emit(ev, ...args)
107 }
108
109 // writable interface -> head
110 [_setHead] (stream) {
111 this[_head] = stream
112 stream.on('drain', () => this[_onDrain](stream))
113 }
114 [_onDrain] (stream) {
115 if (stream === this[_head])
116 this.emit('drain')
117 }
118 write (chunk, enc, cb) {
119 return this[_head].write(chunk, enc, cb) &&
120 (this.flowing || this.buffer.length === 0)
121 }
122 end (chunk, enc, cb) {
123 this[_head].end(chunk, enc, cb)
124 return this
125 }
126}
127
128module.exports = Pipeline
Note: See TracBrowser for help on using the repository browser.