source: imaps-frontend/node_modules/combined-stream/lib/combined_stream.js@ 0c6b92a

main
Last change on this file since 0c6b92a was d565449, checked in by stefan toskovski <stefantoska84@…>, 3 months ago

Update repo after prototype presentation

  • Property mode set to 100644
File size: 4.6 KB
Line 
1var util = require('util');
2var Stream = require('stream').Stream;
3var DelayedStream = require('delayed-stream');
4
5module.exports = CombinedStream;
6function CombinedStream() {
7 this.writable = false;
8 this.readable = true;
9 this.dataSize = 0;
10 this.maxDataSize = 2 * 1024 * 1024;
11 this.pauseStreams = true;
12
13 this._released = false;
14 this._streams = [];
15 this._currentStream = null;
16 this._insideLoop = false;
17 this._pendingNext = false;
18}
19util.inherits(CombinedStream, Stream);
20
21CombinedStream.create = function(options) {
22 var combinedStream = new this();
23
24 options = options || {};
25 for (var option in options) {
26 combinedStream[option] = options[option];
27 }
28
29 return combinedStream;
30};
31
32CombinedStream.isStreamLike = function(stream) {
33 return (typeof stream !== 'function')
34 && (typeof stream !== 'string')
35 && (typeof stream !== 'boolean')
36 && (typeof stream !== 'number')
37 && (!Buffer.isBuffer(stream));
38};
39
40CombinedStream.prototype.append = function(stream) {
41 var isStreamLike = CombinedStream.isStreamLike(stream);
42
43 if (isStreamLike) {
44 if (!(stream instanceof DelayedStream)) {
45 var newStream = DelayedStream.create(stream, {
46 maxDataSize: Infinity,
47 pauseStream: this.pauseStreams,
48 });
49 stream.on('data', this._checkDataSize.bind(this));
50 stream = newStream;
51 }
52
53 this._handleErrors(stream);
54
55 if (this.pauseStreams) {
56 stream.pause();
57 }
58 }
59
60 this._streams.push(stream);
61 return this;
62};
63
64CombinedStream.prototype.pipe = function(dest, options) {
65 Stream.prototype.pipe.call(this, dest, options);
66 this.resume();
67 return dest;
68};
69
70CombinedStream.prototype._getNext = function() {
71 this._currentStream = null;
72
73 if (this._insideLoop) {
74 this._pendingNext = true;
75 return; // defer call
76 }
77
78 this._insideLoop = true;
79 try {
80 do {
81 this._pendingNext = false;
82 this._realGetNext();
83 } while (this._pendingNext);
84 } finally {
85 this._insideLoop = false;
86 }
87};
88
89CombinedStream.prototype._realGetNext = function() {
90 var stream = this._streams.shift();
91
92
93 if (typeof stream == 'undefined') {
94 this.end();
95 return;
96 }
97
98 if (typeof stream !== 'function') {
99 this._pipeNext(stream);
100 return;
101 }
102
103 var getStream = stream;
104 getStream(function(stream) {
105 var isStreamLike = CombinedStream.isStreamLike(stream);
106 if (isStreamLike) {
107 stream.on('data', this._checkDataSize.bind(this));
108 this._handleErrors(stream);
109 }
110
111 this._pipeNext(stream);
112 }.bind(this));
113};
114
115CombinedStream.prototype._pipeNext = function(stream) {
116 this._currentStream = stream;
117
118 var isStreamLike = CombinedStream.isStreamLike(stream);
119 if (isStreamLike) {
120 stream.on('end', this._getNext.bind(this));
121 stream.pipe(this, {end: false});
122 return;
123 }
124
125 var value = stream;
126 this.write(value);
127 this._getNext();
128};
129
130CombinedStream.prototype._handleErrors = function(stream) {
131 var self = this;
132 stream.on('error', function(err) {
133 self._emitError(err);
134 });
135};
136
137CombinedStream.prototype.write = function(data) {
138 this.emit('data', data);
139};
140
141CombinedStream.prototype.pause = function() {
142 if (!this.pauseStreams) {
143 return;
144 }
145
146 if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause();
147 this.emit('pause');
148};
149
150CombinedStream.prototype.resume = function() {
151 if (!this._released) {
152 this._released = true;
153 this.writable = true;
154 this._getNext();
155 }
156
157 if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume();
158 this.emit('resume');
159};
160
161CombinedStream.prototype.end = function() {
162 this._reset();
163 this.emit('end');
164};
165
166CombinedStream.prototype.destroy = function() {
167 this._reset();
168 this.emit('close');
169};
170
171CombinedStream.prototype._reset = function() {
172 this.writable = false;
173 this._streams = [];
174 this._currentStream = null;
175};
176
177CombinedStream.prototype._checkDataSize = function() {
178 this._updateDataSize();
179 if (this.dataSize <= this.maxDataSize) {
180 return;
181 }
182
183 var message =
184 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.';
185 this._emitError(new Error(message));
186};
187
188CombinedStream.prototype._updateDataSize = function() {
189 this.dataSize = 0;
190
191 var self = this;
192 this._streams.forEach(function(stream) {
193 if (!stream.dataSize) {
194 return;
195 }
196
197 self.dataSize += stream.dataSize;
198 });
199
200 if (this._currentStream && this._currentStream.dataSize) {
201 this.dataSize += this._currentStream.dataSize;
202 }
203};
204
205CombinedStream.prototype._emitError = function(err) {
206 this._reset();
207 this.emit('error', err);
208};
Note: See TracBrowser for help on using the repository browser.