1 | 'use strict'
|
---|
2 | /*
|
---|
3 | * merge2
|
---|
4 | * https://github.com/teambition/merge2
|
---|
5 | *
|
---|
6 | * Copyright (c) 2014-2020 Teambition
|
---|
7 | * Licensed under the MIT license.
|
---|
8 | */
|
---|
9 | const Stream = require('stream')
|
---|
10 | const PassThrough = Stream.PassThrough
|
---|
11 | const slice = Array.prototype.slice
|
---|
12 |
|
---|
13 | module.exports = merge2
|
---|
14 |
|
---|
15 | function merge2 () {
|
---|
16 | const streamsQueue = []
|
---|
17 | const args = slice.call(arguments)
|
---|
18 | let merging = false
|
---|
19 | let options = args[args.length - 1]
|
---|
20 |
|
---|
21 | if (options && !Array.isArray(options) && options.pipe == null) {
|
---|
22 | args.pop()
|
---|
23 | } else {
|
---|
24 | options = {}
|
---|
25 | }
|
---|
26 |
|
---|
27 | const doEnd = options.end !== false
|
---|
28 | const doPipeError = options.pipeError === true
|
---|
29 | if (options.objectMode == null) {
|
---|
30 | options.objectMode = true
|
---|
31 | }
|
---|
32 | if (options.highWaterMark == null) {
|
---|
33 | options.highWaterMark = 64 * 1024
|
---|
34 | }
|
---|
35 | const mergedStream = PassThrough(options)
|
---|
36 |
|
---|
37 | function addStream () {
|
---|
38 | for (let i = 0, len = arguments.length; i < len; i++) {
|
---|
39 | streamsQueue.push(pauseStreams(arguments[i], options))
|
---|
40 | }
|
---|
41 | mergeStream()
|
---|
42 | return this
|
---|
43 | }
|
---|
44 |
|
---|
45 | function mergeStream () {
|
---|
46 | if (merging) {
|
---|
47 | return
|
---|
48 | }
|
---|
49 | merging = true
|
---|
50 |
|
---|
51 | let streams = streamsQueue.shift()
|
---|
52 | if (!streams) {
|
---|
53 | process.nextTick(endStream)
|
---|
54 | return
|
---|
55 | }
|
---|
56 | if (!Array.isArray(streams)) {
|
---|
57 | streams = [streams]
|
---|
58 | }
|
---|
59 |
|
---|
60 | let pipesCount = streams.length + 1
|
---|
61 |
|
---|
62 | function next () {
|
---|
63 | if (--pipesCount > 0) {
|
---|
64 | return
|
---|
65 | }
|
---|
66 | merging = false
|
---|
67 | mergeStream()
|
---|
68 | }
|
---|
69 |
|
---|
70 | function pipe (stream) {
|
---|
71 | function onend () {
|
---|
72 | stream.removeListener('merge2UnpipeEnd', onend)
|
---|
73 | stream.removeListener('end', onend)
|
---|
74 | if (doPipeError) {
|
---|
75 | stream.removeListener('error', onerror)
|
---|
76 | }
|
---|
77 | next()
|
---|
78 | }
|
---|
79 | function onerror (err) {
|
---|
80 | mergedStream.emit('error', err)
|
---|
81 | }
|
---|
82 | // skip ended stream
|
---|
83 | if (stream._readableState.endEmitted) {
|
---|
84 | return next()
|
---|
85 | }
|
---|
86 |
|
---|
87 | stream.on('merge2UnpipeEnd', onend)
|
---|
88 | stream.on('end', onend)
|
---|
89 |
|
---|
90 | if (doPipeError) {
|
---|
91 | stream.on('error', onerror)
|
---|
92 | }
|
---|
93 |
|
---|
94 | stream.pipe(mergedStream, { end: false })
|
---|
95 | // compatible for old stream
|
---|
96 | stream.resume()
|
---|
97 | }
|
---|
98 |
|
---|
99 | for (let i = 0; i < streams.length; i++) {
|
---|
100 | pipe(streams[i])
|
---|
101 | }
|
---|
102 |
|
---|
103 | next()
|
---|
104 | }
|
---|
105 |
|
---|
106 | function endStream () {
|
---|
107 | merging = false
|
---|
108 | // emit 'queueDrain' when all streams merged.
|
---|
109 | mergedStream.emit('queueDrain')
|
---|
110 | if (doEnd) {
|
---|
111 | mergedStream.end()
|
---|
112 | }
|
---|
113 | }
|
---|
114 |
|
---|
115 | mergedStream.setMaxListeners(0)
|
---|
116 | mergedStream.add = addStream
|
---|
117 | mergedStream.on('unpipe', function (stream) {
|
---|
118 | stream.emit('merge2UnpipeEnd')
|
---|
119 | })
|
---|
120 |
|
---|
121 | if (args.length) {
|
---|
122 | addStream.apply(null, args)
|
---|
123 | }
|
---|
124 | return mergedStream
|
---|
125 | }
|
---|
126 |
|
---|
127 | // check and pause streams for pipe.
|
---|
128 | function pauseStreams (streams, options) {
|
---|
129 | if (!Array.isArray(streams)) {
|
---|
130 | // Backwards-compat with old-style streams
|
---|
131 | if (!streams._readableState && streams.pipe) {
|
---|
132 | streams = streams.pipe(PassThrough(options))
|
---|
133 | }
|
---|
134 | if (!streams._readableState || !streams.pause || !streams.pipe) {
|
---|
135 | throw new Error('Only readable stream can be merged.')
|
---|
136 | }
|
---|
137 | streams.pause()
|
---|
138 | } else {
|
---|
139 | for (let i = 0, len = streams.length; i < len; i++) {
|
---|
140 | streams[i] = pauseStreams(streams[i], options)
|
---|
141 | }
|
---|
142 | }
|
---|
143 | return streams
|
---|
144 | }
|
---|