source: trip-planner-front/node_modules/merge2/index.js@ 8d391a1

Last change on this file since 8d391a1 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 3.2 KB
Line 
1'use strict'
2/*
3 * merge2
4 * https://github.com/teambition/merge2
5 *
6 * Copyright (c) 2014-2020 Teambition
7 * Licensed under the MIT license.
8 */
9const Stream = require('stream')
10const PassThrough = Stream.PassThrough
11const slice = Array.prototype.slice
12
13module.exports = merge2
14
15function merge2 () {
16 const streamsQueue = []
17 const args = slice.call(arguments)
18 let merging = false
19 let options = args[args.length - 1]
20
21 if (options && !Array.isArray(options) && options.pipe == null) {
22 args.pop()
23 } else {
24 options = {}
25 }
26
27 const doEnd = options.end !== false
28 const doPipeError = options.pipeError === true
29 if (options.objectMode == null) {
30 options.objectMode = true
31 }
32 if (options.highWaterMark == null) {
33 options.highWaterMark = 64 * 1024
34 }
35 const mergedStream = PassThrough(options)
36
37 function addStream () {
38 for (let i = 0, len = arguments.length; i < len; i++) {
39 streamsQueue.push(pauseStreams(arguments[i], options))
40 }
41 mergeStream()
42 return this
43 }
44
45 function mergeStream () {
46 if (merging) {
47 return
48 }
49 merging = true
50
51 let streams = streamsQueue.shift()
52 if (!streams) {
53 process.nextTick(endStream)
54 return
55 }
56 if (!Array.isArray(streams)) {
57 streams = [streams]
58 }
59
60 let pipesCount = streams.length + 1
61
62 function next () {
63 if (--pipesCount > 0) {
64 return
65 }
66 merging = false
67 mergeStream()
68 }
69
70 function pipe (stream) {
71 function onend () {
72 stream.removeListener('merge2UnpipeEnd', onend)
73 stream.removeListener('end', onend)
74 if (doPipeError) {
75 stream.removeListener('error', onerror)
76 }
77 next()
78 }
79 function onerror (err) {
80 mergedStream.emit('error', err)
81 }
82 // skip ended stream
83 if (stream._readableState.endEmitted) {
84 return next()
85 }
86
87 stream.on('merge2UnpipeEnd', onend)
88 stream.on('end', onend)
89
90 if (doPipeError) {
91 stream.on('error', onerror)
92 }
93
94 stream.pipe(mergedStream, { end: false })
95 // compatible for old stream
96 stream.resume()
97 }
98
99 for (let i = 0; i < streams.length; i++) {
100 pipe(streams[i])
101 }
102
103 next()
104 }
105
106 function endStream () {
107 merging = false
108 // emit 'queueDrain' when all streams merged.
109 mergedStream.emit('queueDrain')
110 if (doEnd) {
111 mergedStream.end()
112 }
113 }
114
115 mergedStream.setMaxListeners(0)
116 mergedStream.add = addStream
117 mergedStream.on('unpipe', function (stream) {
118 stream.emit('merge2UnpipeEnd')
119 })
120
121 if (args.length) {
122 addStream.apply(null, args)
123 }
124 return mergedStream
125}
126
127// check and pause streams for pipe.
128function pauseStreams (streams, options) {
129 if (!Array.isArray(streams)) {
130 // Backwards-compat with old-style streams
131 if (!streams._readableState && streams.pipe) {
132 streams = streams.pipe(PassThrough(options))
133 }
134 if (!streams._readableState || !streams.pause || !streams.pipe) {
135 throw new Error('Only readable stream can be merged.')
136 }
137 streams.pause()
138 } else {
139 for (let i = 0, len = streams.length; i < len; i++) {
140 streams[i] = pauseStreams(streams[i], options)
141 }
142 }
143 return streams
144}
Note: See TracBrowser for help on using the repository browser.