1 | const Minipass = require('minipass')
|
---|
2 | const EE = require('events')
|
---|
3 | const isStream = s => s && s instanceof EE && (
|
---|
4 | typeof s.pipe === 'function' || // readable
|
---|
5 | (typeof s.write === 'function' && typeof s.end === 'function') // writable
|
---|
6 | )
|
---|
7 |
|
---|
8 | const _head = Symbol('_head')
|
---|
9 | const _tail = Symbol('_tail')
|
---|
10 | const _linkStreams = Symbol('_linkStreams')
|
---|
11 | const _setHead = Symbol('_setHead')
|
---|
12 | const _setTail = Symbol('_setTail')
|
---|
13 | const _onError = Symbol('_onError')
|
---|
14 | const _onData = Symbol('_onData')
|
---|
15 | const _onEnd = Symbol('_onEnd')
|
---|
16 | const _onDrain = Symbol('_onDrain')
|
---|
17 | const _streams = Symbol('_streams')
|
---|
18 | class Pipeline extends Minipass {
|
---|
19 | constructor (opts, ...streams) {
|
---|
20 | if (isStream(opts)) {
|
---|
21 | streams.unshift(opts)
|
---|
22 | opts = {}
|
---|
23 | }
|
---|
24 |
|
---|
25 | super(opts)
|
---|
26 | this[_streams] = []
|
---|
27 | if (streams.length)
|
---|
28 | this.push(...streams)
|
---|
29 | }
|
---|
30 |
|
---|
31 | [_linkStreams] (streams) {
|
---|
32 | // reduce takes (left,right), and we return right to make it the
|
---|
33 | // new left value.
|
---|
34 | return streams.reduce((src, dest) => {
|
---|
35 | src.on('error', er => dest.emit('error', er))
|
---|
36 | src.pipe(dest)
|
---|
37 | return dest
|
---|
38 | })
|
---|
39 | }
|
---|
40 |
|
---|
41 | push (...streams) {
|
---|
42 | this[_streams].push(...streams)
|
---|
43 | if (this[_tail])
|
---|
44 | streams.unshift(this[_tail])
|
---|
45 |
|
---|
46 | const linkRet = this[_linkStreams](streams)
|
---|
47 |
|
---|
48 | this[_setTail](linkRet)
|
---|
49 | if (!this[_head])
|
---|
50 | this[_setHead](streams[0])
|
---|
51 | }
|
---|
52 |
|
---|
53 | unshift (...streams) {
|
---|
54 | this[_streams].unshift(...streams)
|
---|
55 | if (this[_head])
|
---|
56 | streams.push(this[_head])
|
---|
57 |
|
---|
58 | const linkRet = this[_linkStreams](streams)
|
---|
59 | this[_setHead](streams[0])
|
---|
60 | if (!this[_tail])
|
---|
61 | this[_setTail](linkRet)
|
---|
62 | }
|
---|
63 |
|
---|
64 | destroy (er) {
|
---|
65 | // set fire to the whole thing.
|
---|
66 | this[_streams].forEach(s =>
|
---|
67 | typeof s.destroy === 'function' && s.destroy())
|
---|
68 | return super.destroy(er)
|
---|
69 | }
|
---|
70 |
|
---|
71 | // readable interface -> tail
|
---|
72 | [_setTail] (stream) {
|
---|
73 | this[_tail] = stream
|
---|
74 | stream.on('error', er => this[_onError](stream, er))
|
---|
75 | stream.on('data', chunk => this[_onData](stream, chunk))
|
---|
76 | stream.on('end', () => this[_onEnd](stream))
|
---|
77 | stream.on('finish', () => this[_onEnd](stream))
|
---|
78 | }
|
---|
79 |
|
---|
80 | // errors proxied down the pipeline
|
---|
81 | // they're considered part of the "read" interface
|
---|
82 | [_onError] (stream, er) {
|
---|
83 | if (stream === this[_tail])
|
---|
84 | this.emit('error', er)
|
---|
85 | }
|
---|
86 | [_onData] (stream, chunk) {
|
---|
87 | if (stream === this[_tail])
|
---|
88 | super.write(chunk)
|
---|
89 | }
|
---|
90 | [_onEnd] (stream) {
|
---|
91 | if (stream === this[_tail])
|
---|
92 | super.end()
|
---|
93 | }
|
---|
94 | pause () {
|
---|
95 | super.pause()
|
---|
96 | return this[_tail] && this[_tail].pause && this[_tail].pause()
|
---|
97 | }
|
---|
98 |
|
---|
99 | // NB: Minipass calls its internal private [RESUME] method during
|
---|
100 | // pipe drains, to avoid hazards where stream.resume() is overridden.
|
---|
101 | // Thus, we need to listen to the resume *event*, not override the
|
---|
102 | // resume() method, and proxy *that* to the tail.
|
---|
103 | emit (ev, ...args) {
|
---|
104 | if (ev === 'resume' && this[_tail] && this[_tail].resume)
|
---|
105 | this[_tail].resume()
|
---|
106 | return super.emit(ev, ...args)
|
---|
107 | }
|
---|
108 |
|
---|
109 | // writable interface -> head
|
---|
110 | [_setHead] (stream) {
|
---|
111 | this[_head] = stream
|
---|
112 | stream.on('drain', () => this[_onDrain](stream))
|
---|
113 | }
|
---|
114 | [_onDrain] (stream) {
|
---|
115 | if (stream === this[_head])
|
---|
116 | this.emit('drain')
|
---|
117 | }
|
---|
118 | write (chunk, enc, cb) {
|
---|
119 | return this[_head].write(chunk, enc, cb) &&
|
---|
120 | (this.flowing || this.buffer.length === 0)
|
---|
121 | }
|
---|
122 | end (chunk, enc, cb) {
|
---|
123 | this[_head].end(chunk, enc, cb)
|
---|
124 | return this
|
---|
125 | }
|
---|
126 | }
|
---|
127 |
|
---|
128 | module.exports = Pipeline
|
---|