[6a3a178] | 1 | const Minipass = require('minipass')
|
---|
| 2 | const EE = require('events')
|
---|
| 3 | const isStream = s => s && s instanceof EE && (
|
---|
| 4 | typeof s.pipe === 'function' || // readable
|
---|
| 5 | (typeof s.write === 'function' && typeof s.end === 'function') // writable
|
---|
| 6 | )
|
---|
| 7 |
|
---|
| 8 | const _head = Symbol('_head')
|
---|
| 9 | const _tail = Symbol('_tail')
|
---|
| 10 | const _linkStreams = Symbol('_linkStreams')
|
---|
| 11 | const _setHead = Symbol('_setHead')
|
---|
| 12 | const _setTail = Symbol('_setTail')
|
---|
| 13 | const _onError = Symbol('_onError')
|
---|
| 14 | const _onData = Symbol('_onData')
|
---|
| 15 | const _onEnd = Symbol('_onEnd')
|
---|
| 16 | const _onDrain = Symbol('_onDrain')
|
---|
| 17 | const _streams = Symbol('_streams')
|
---|
| 18 | class Pipeline extends Minipass {
|
---|
| 19 | constructor (opts, ...streams) {
|
---|
| 20 | if (isStream(opts)) {
|
---|
| 21 | streams.unshift(opts)
|
---|
| 22 | opts = {}
|
---|
| 23 | }
|
---|
| 24 |
|
---|
| 25 | super(opts)
|
---|
| 26 | this[_streams] = []
|
---|
| 27 | if (streams.length)
|
---|
| 28 | this.push(...streams)
|
---|
| 29 | }
|
---|
| 30 |
|
---|
| 31 | [_linkStreams] (streams) {
|
---|
| 32 | // reduce takes (left,right), and we return right to make it the
|
---|
| 33 | // new left value.
|
---|
| 34 | return streams.reduce((src, dest) => {
|
---|
| 35 | src.on('error', er => dest.emit('error', er))
|
---|
| 36 | src.pipe(dest)
|
---|
| 37 | return dest
|
---|
| 38 | })
|
---|
| 39 | }
|
---|
| 40 |
|
---|
| 41 | push (...streams) {
|
---|
| 42 | this[_streams].push(...streams)
|
---|
| 43 | if (this[_tail])
|
---|
| 44 | streams.unshift(this[_tail])
|
---|
| 45 |
|
---|
| 46 | const linkRet = this[_linkStreams](streams)
|
---|
| 47 |
|
---|
| 48 | this[_setTail](linkRet)
|
---|
| 49 | if (!this[_head])
|
---|
| 50 | this[_setHead](streams[0])
|
---|
| 51 | }
|
---|
| 52 |
|
---|
| 53 | unshift (...streams) {
|
---|
| 54 | this[_streams].unshift(...streams)
|
---|
| 55 | if (this[_head])
|
---|
| 56 | streams.push(this[_head])
|
---|
| 57 |
|
---|
| 58 | const linkRet = this[_linkStreams](streams)
|
---|
| 59 | this[_setHead](streams[0])
|
---|
| 60 | if (!this[_tail])
|
---|
| 61 | this[_setTail](linkRet)
|
---|
| 62 | }
|
---|
| 63 |
|
---|
| 64 | destroy (er) {
|
---|
| 65 | // set fire to the whole thing.
|
---|
| 66 | this[_streams].forEach(s =>
|
---|
| 67 | typeof s.destroy === 'function' && s.destroy())
|
---|
| 68 | return super.destroy(er)
|
---|
| 69 | }
|
---|
| 70 |
|
---|
| 71 | // readable interface -> tail
|
---|
| 72 | [_setTail] (stream) {
|
---|
| 73 | this[_tail] = stream
|
---|
| 74 | stream.on('error', er => this[_onError](stream, er))
|
---|
| 75 | stream.on('data', chunk => this[_onData](stream, chunk))
|
---|
| 76 | stream.on('end', () => this[_onEnd](stream))
|
---|
| 77 | stream.on('finish', () => this[_onEnd](stream))
|
---|
| 78 | }
|
---|
| 79 |
|
---|
| 80 | // errors proxied down the pipeline
|
---|
| 81 | // they're considered part of the "read" interface
|
---|
| 82 | [_onError] (stream, er) {
|
---|
| 83 | if (stream === this[_tail])
|
---|
| 84 | this.emit('error', er)
|
---|
| 85 | }
|
---|
| 86 | [_onData] (stream, chunk) {
|
---|
| 87 | if (stream === this[_tail])
|
---|
| 88 | super.write(chunk)
|
---|
| 89 | }
|
---|
| 90 | [_onEnd] (stream) {
|
---|
| 91 | if (stream === this[_tail])
|
---|
| 92 | super.end()
|
---|
| 93 | }
|
---|
| 94 | pause () {
|
---|
| 95 | super.pause()
|
---|
| 96 | return this[_tail] && this[_tail].pause && this[_tail].pause()
|
---|
| 97 | }
|
---|
| 98 |
|
---|
| 99 | // NB: Minipass calls its internal private [RESUME] method during
|
---|
| 100 | // pipe drains, to avoid hazards where stream.resume() is overridden.
|
---|
| 101 | // Thus, we need to listen to the resume *event*, not override the
|
---|
| 102 | // resume() method, and proxy *that* to the tail.
|
---|
| 103 | emit (ev, ...args) {
|
---|
| 104 | if (ev === 'resume' && this[_tail] && this[_tail].resume)
|
---|
| 105 | this[_tail].resume()
|
---|
| 106 | return super.emit(ev, ...args)
|
---|
| 107 | }
|
---|
| 108 |
|
---|
| 109 | // writable interface -> head
|
---|
| 110 | [_setHead] (stream) {
|
---|
| 111 | this[_head] = stream
|
---|
| 112 | stream.on('drain', () => this[_onDrain](stream))
|
---|
| 113 | }
|
---|
| 114 | [_onDrain] (stream) {
|
---|
| 115 | if (stream === this[_head])
|
---|
| 116 | this.emit('drain')
|
---|
| 117 | }
|
---|
| 118 | write (chunk, enc, cb) {
|
---|
| 119 | return this[_head].write(chunk, enc, cb) &&
|
---|
| 120 | (this.flowing || this.buffer.length === 0)
|
---|
| 121 | }
|
---|
| 122 | end (chunk, enc, cb) {
|
---|
| 123 | this[_head].end(chunk, enc, cb)
|
---|
| 124 | return this
|
---|
| 125 | }
|
---|
| 126 | }
|
---|
| 127 |
|
---|
| 128 | module.exports = Pipeline
|
---|