1 | # minipass
|
---|
2 |
|
---|
3 | A _very_ minimal implementation of a [PassThrough
|
---|
4 | stream](https://nodejs.org/api/stream.html#stream_class_stream_passthrough)
|
---|
5 |
|
---|
6 | [It's very
|
---|
7 | fast](https://docs.google.com/spreadsheets/d/1oObKSrVwLX_7Ut4Z6g3fZW-AX1j1-k6w-cDsrkaSbHM/edit#gid=0)
|
---|
8 | for objects, strings, and buffers.
|
---|
9 |
|
---|
10 | Supports `pipe()`ing (including multi-`pipe()` and backpressure transmission),
|
---|
11 | buffering data until either a `data` event handler or `pipe()` is added (so
|
---|
12 | you don't lose the first chunk), and most other cases where PassThrough is
|
---|
13 | a good idea.
|
---|
14 |
|
---|
15 | There is a `read()` method, but it's much more efficient to consume data
|
---|
16 | from this stream via `'data'` events or by calling `pipe()` into some other
|
---|
17 | stream. Calling `read()` requires the buffer to be flattened in some
|
---|
18 | cases, which requires copying memory.
|
---|
19 |
|
---|
20 | There is also no `unpipe()` method. Once you start piping, there is no
|
---|
21 | stopping it!
|
---|
22 |
|
---|
23 | If you set `objectMode: true` in the options, then whatever is written will
|
---|
24 | be emitted. Otherwise, it'll do a minimal amount of Buffer copying to
|
---|
25 | ensure proper Streams semantics when `read(n)` is called.
|
---|
26 |
|
---|
27 | `objectMode` can also be set by doing `stream.objectMode = true`, or by
|
---|
28 | writing any non-string/non-buffer data. `objectMode` cannot be set to
|
---|
29 | false once it is set.
|
---|
30 |
|
---|
31 | This is not a `through` or `through2` stream. It doesn't transform the
|
---|
32 | data, it just passes it right through. If you want to transform the data,
|
---|
33 | extend the class, and override the `write()` method. Once you're done
|
---|
34 | transforming the data however you want, call `super.write()` with the
|
---|
35 | transform output.
|
---|
36 |
|
---|
37 | For some examples of streams that extend Minipass in various ways, check
|
---|
38 | out:
|
---|
39 |
|
---|
40 | - [minizlib](http://npm.im/minizlib)
|
---|
41 | - [fs-minipass](http://npm.im/fs-minipass)
|
---|
42 | - [tar](http://npm.im/tar)
|
---|
43 | - [minipass-collect](http://npm.im/minipass-collect)
|
---|
44 | - [minipass-flush](http://npm.im/minipass-flush)
|
---|
45 | - [minipass-pipeline](http://npm.im/minipass-pipeline)
|
---|
46 | - [tap](http://npm.im/tap)
|
---|
47 | - [tap-parser](http://npm.im/tap-parser)
|
---|
48 | - [treport](http://npm.im/treport)
|
---|
49 | - [minipass-fetch](http://npm.im/minipass-fetch)
|
---|
50 | - [pacote](http://npm.im/pacote)
|
---|
51 | - [make-fetch-happen](http://npm.im/make-fetch-happen)
|
---|
52 | - [cacache](http://npm.im/cacache)
|
---|
53 | - [ssri](http://npm.im/ssri)
|
---|
54 | - [npm-registry-fetch](http://npm.im/npm-registry-fetch)
|
---|
55 | - [minipass-json-stream](http://npm.im/minipass-json-stream)
|
---|
56 | - [minipass-sized](http://npm.im/minipass-sized)
|
---|
57 |
|
---|
58 | ## Differences from Node.js Streams
|
---|
59 |
|
---|
60 | There are several things that make Minipass streams different from (and in
|
---|
61 | some ways superior to) Node.js core streams.
|
---|
62 |
|
---|
63 | Please read these caveats if you are familiar with node-core streams and
|
---|
64 | intend to use Minipass streams in your programs.
|
---|
65 |
|
---|
66 | ### Timing
|
---|
67 |
|
---|
68 | Minipass streams are designed to support synchronous use-cases. Thus, data
|
---|
69 | is emitted as soon as it is available, always. It is buffered until read,
|
---|
70 | but no longer. Another way to look at it is that Minipass streams are
|
---|
71 | exactly as synchronous as the logic that writes into them.
|
---|
72 |
|
---|
73 | This can be surprising if your code relies on `PassThrough.write()` always
|
---|
74 | providing data on the next tick rather than the current one, or being able
|
---|
75 | to call `resume()` and not have the entire buffer disappear immediately.
|
---|
76 |
|
---|
77 | However, without this synchronicity guarantee, there would be no way for
|
---|
78 | Minipass to achieve the speeds it does, or support the synchronous use
|
---|
79 | cases that it does. Simply put, waiting takes time.
|
---|
80 |
|
---|
81 | This non-deferring approach makes Minipass streams much easier to reason
|
---|
82 | about, especially in the context of Promises and other flow-control
|
---|
83 | mechanisms.
|
---|
84 |
|
---|
85 | ### No High/Low Water Marks
|
---|
86 |
|
---|
87 | Node.js core streams will optimistically fill up a buffer, returning `true`
|
---|
88 | on all writes until the limit is hit, even if the data has nowhere to go.
|
---|
89 | Then, they will not attempt to draw more data in until the buffer size dips
|
---|
90 | below a minimum value.
|
---|
91 |
|
---|
92 | Minipass streams are much simpler. The `write()` method will return `true`
|
---|
93 | if the data has somewhere to go (which is to say, given the timing
|
---|
94 | guarantees, that the data is already there by the time `write()` returns).
|
---|
95 |
|
---|
96 | If the data has nowhere to go, then `write()` returns false, and the data
|
---|
97 | sits in a buffer, to be drained out immediately as soon as anyone consumes
|
---|
98 | it.
|
---|
99 |
|
---|
100 | ### Hazards of Buffering (or: Why Minipass Is So Fast)
|
---|
101 |
|
---|
102 | Since data written to a Minipass stream is immediately written all the way
|
---|
103 | through the pipeline, and `write()` always returns true/false based on
|
---|
104 | whether the data was fully flushed, backpressure is communicated
|
---|
105 | immediately to the upstream caller. This minimizes buffering.
|
---|
106 |
|
---|
107 | Consider this case:
|
---|
108 |
|
---|
109 | ```js
|
---|
110 | const {PassThrough} = require('stream')
|
---|
111 | const p1 = new PassThrough({ highWaterMark: 1024 })
|
---|
112 | const p2 = new PassThrough({ highWaterMark: 1024 })
|
---|
113 | const p3 = new PassThrough({ highWaterMark: 1024 })
|
---|
114 | const p4 = new PassThrough({ highWaterMark: 1024 })
|
---|
115 |
|
---|
116 | p1.pipe(p2).pipe(p3).pipe(p4)
|
---|
117 | p4.on('data', () => console.log('made it through'))
|
---|
118 |
|
---|
119 | // this returns false and buffers, then writes to p2 on next tick (1)
|
---|
120 | // p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2)
|
---|
121 | // p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3)
|
---|
122 | // p4 returns false and buffers, pausing p3, then emits 'data' and 'drain'
|
---|
123 | // on next tick (4)
|
---|
124 | // p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and
|
---|
125 | // 'drain' on next tick (5)
|
---|
126 | // p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6)
|
---|
127 | // p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next
|
---|
128 | // tick (7)
|
---|
129 |
|
---|
130 | p1.write(Buffer.alloc(2048)) // returns false
|
---|
131 | ```
|
---|
132 |
|
---|
133 | Along the way, the data was buffered and deferred at each stage, and
|
---|
134 | multiple event deferrals happened, for an unblocked pipeline where it was
|
---|
135 | perfectly safe to write all the way through!
|
---|
136 |
|
---|
137 | Furthermore, setting a `highWaterMark` of `1024` might lead someone reading
|
---|
138 | the code to think an advisory maximum of 1KiB is being set for the
|
---|
139 | pipeline. However, the actual advisory buffering level is the _sum_ of
|
---|
140 | `highWaterMark` values, since each one has its own bucket.
|
---|
141 |
|
---|
142 | Consider the Minipass case:
|
---|
143 |
|
---|
144 | ```js
|
---|
145 | const m1 = new Minipass()
|
---|
146 | const m2 = new Minipass()
|
---|
147 | const m3 = new Minipass()
|
---|
148 | const m4 = new Minipass()
|
---|
149 |
|
---|
150 | m1.pipe(m2).pipe(m3).pipe(m4)
|
---|
151 | m4.on('data', () => console.log('made it through'))
|
---|
152 |
|
---|
153 | // m1 is flowing, so it writes the data to m2 immediately
|
---|
154 | // m2 is flowing, so it writes the data to m3 immediately
|
---|
155 | // m3 is flowing, so it writes the data to m4 immediately
|
---|
156 | // m4 is flowing, so it fires the 'data' event immediately, returns true
|
---|
157 | // m4's write returned true, so m3 is still flowing, returns true
|
---|
158 | // m3's write returned true, so m2 is still flowing, returns true
|
---|
159 | // m2's write returned true, so m1 is still flowing, returns true
|
---|
160 | // No event deferrals or buffering along the way!
|
---|
161 |
|
---|
162 | m1.write(Buffer.alloc(2048)) // returns true
|
---|
163 | ```
|
---|
164 |
|
---|
165 | It is extremely unlikely that you _don't_ want to buffer any data written,
|
---|
166 | or _ever_ buffer data that can be flushed all the way through. Neither
|
---|
167 | node-core streams nor Minipass ever fail to buffer written data, but
|
---|
168 | node-core streams do a lot of unnecessary buffering and pausing.
|
---|
169 |
|
---|
170 | As always, the faster implementation is the one that does less stuff and
|
---|
171 | waits less time to do it.
|
---|
172 |
|
---|
173 | ### Immediately emit `end` for empty streams (when not paused)
|
---|
174 |
|
---|
175 | If a stream is not paused, and `end()` is called before writing any data
|
---|
176 | into it, then it will emit `end` immediately.
|
---|
177 |
|
---|
178 | If you have logic that occurs on the `end` event which you don't want to
|
---|
179 | potentially happen immediately (for example, closing file descriptors,
|
---|
180 | moving on to the next entry in an archive parse stream, etc.) then be sure
|
---|
181 | to call `stream.pause()` on creation, and then `stream.resume()` once you
|
---|
182 | are ready to respond to the `end` event.
|
---|
183 |
|
---|
184 | ### Emit `end` When Asked
|
---|
185 |
|
---|
186 | One hazard of immediately emitting `'end'` is that you may not yet have had
|
---|
187 | a chance to add a listener. In order to avoid this hazard, Minipass
|
---|
188 | streams safely re-emit the `'end'` event if a new listener is added after
|
---|
189 | `'end'` has been emitted.
|
---|
190 |
|
---|
191 | Ie, if you do `stream.on('end', someFunction)`, and the stream has already
|
---|
192 | emitted `end`, then it will call the handler right away. (You can think of
|
---|
193 | this somewhat like attaching a new `.then(fn)` to a previously-resolved
|
---|
194 | Promise.)
|
---|
195 |
|
---|
196 | To prevent calling handlers multiple times who would not expect multiple
|
---|
197 | ends to occur, all listeners are removed from the `'end'` event whenever it
|
---|
198 | is emitted.
|
---|
199 |
|
---|
200 | ### Impact of "immediate flow" on Tee-streams
|
---|
201 |
|
---|
202 | A "tee stream" is a stream piping to multiple destinations:
|
---|
203 |
|
---|
204 | ```js
|
---|
205 | const tee = new Minipass()
|
---|
206 | t.pipe(dest1)
|
---|
207 | t.pipe(dest2)
|
---|
208 | t.write('foo') // goes to both destinations
|
---|
209 | ```
|
---|
210 |
|
---|
211 | Since Minipass streams _immediately_ process any pending data through the
|
---|
212 | pipeline when a new pipe destination is added, this can have surprising
|
---|
213 | effects, especially when a stream comes in from some other function and may
|
---|
214 | or may not have data in its buffer.
|
---|
215 |
|
---|
216 | ```js
|
---|
217 | // WARNING! WILL LOSE DATA!
|
---|
218 | const src = new Minipass()
|
---|
219 | src.write('foo')
|
---|
220 | src.pipe(dest1) // 'foo' chunk flows to dest1 immediately, and is gone
|
---|
221 | src.pipe(dest2) // gets nothing!
|
---|
222 | ```
|
---|
223 |
|
---|
224 | The solution is to create a dedicated tee-stream junction that pipes to
|
---|
225 | both locations, and then pipe to _that_ instead.
|
---|
226 |
|
---|
227 | ```js
|
---|
228 | // Safe example: tee to both places
|
---|
229 | const src = new Minipass()
|
---|
230 | src.write('foo')
|
---|
231 | const tee = new Minipass()
|
---|
232 | tee.pipe(dest1)
|
---|
233 | tee.pipe(dest2)
|
---|
234 | src.pipe(tee) // tee gets 'foo', pipes to both locations
|
---|
235 | ```
|
---|
236 |
|
---|
237 | The same caveat applies to `on('data')` event listeners. The first one
|
---|
238 | added will _immediately_ receive all of the data, leaving nothing for the
|
---|
239 | second:
|
---|
240 |
|
---|
241 | ```js
|
---|
242 | // WARNING! WILL LOSE DATA!
|
---|
243 | const src = new Minipass()
|
---|
244 | src.write('foo')
|
---|
245 | src.on('data', handler1) // receives 'foo' right away
|
---|
246 | src.on('data', handler2) // nothing to see here!
|
---|
247 | ```
|
---|
248 |
|
---|
249 | Using a dedicated tee-stream can be used in this case as well:
|
---|
250 |
|
---|
251 | ```js
|
---|
252 | // Safe example: tee to both data handlers
|
---|
253 | const src = new Minipass()
|
---|
254 | src.write('foo')
|
---|
255 | const tee = new Minipass()
|
---|
256 | tee.on('data', handler1)
|
---|
257 | tee.on('data', handler2)
|
---|
258 | src.pipe(tee)
|
---|
259 | ```
|
---|
260 |
|
---|
261 | ## USAGE
|
---|
262 |
|
---|
263 | It's a stream! Use it like a stream and it'll most likely do what you
|
---|
264 | want.
|
---|
265 |
|
---|
266 | ```js
|
---|
267 | const Minipass = require('minipass')
|
---|
268 | const mp = new Minipass(options) // optional: { encoding, objectMode }
|
---|
269 | mp.write('foo')
|
---|
270 | mp.pipe(someOtherStream)
|
---|
271 | mp.end('bar')
|
---|
272 | ```
|
---|
273 |
|
---|
274 | ### OPTIONS
|
---|
275 |
|
---|
276 | * `encoding` How would you like the data coming _out_ of the stream to be
|
---|
277 | encoded? Accepts any values that can be passed to `Buffer.toString()`.
|
---|
278 | * `objectMode` Emit data exactly as it comes in. This will be flipped on
|
---|
279 | by default if you write() something other than a string or Buffer at any
|
---|
280 | point. Setting `objectMode: true` will prevent setting any encoding
|
---|
281 | value.
|
---|
282 |
|
---|
283 | ### API
|
---|
284 |
|
---|
285 | Implements the user-facing portions of Node.js's `Readable` and `Writable`
|
---|
286 | streams.
|
---|
287 |
|
---|
288 | ### Methods
|
---|
289 |
|
---|
290 | * `write(chunk, [encoding], [callback])` - Put data in. (Note that, in the
|
---|
291 | base Minipass class, the same data will come out.) Returns `false` if
|
---|
292 | the stream will buffer the next write, or true if it's still in "flowing"
|
---|
293 | mode.
|
---|
294 | * `end([chunk, [encoding]], [callback])` - Signal that you have no more
|
---|
295 | data to write. This will queue an `end` event to be fired when all the
|
---|
296 | data has been consumed.
|
---|
297 | * `setEncoding(encoding)` - Set the encoding for data coming of the stream.
|
---|
298 | This can only be done once.
|
---|
299 | * `pause()` - No more data for a while, please. This also prevents `end`
|
---|
300 | from being emitted for empty streams until the stream is resumed.
|
---|
301 | * `resume()` - Resume the stream. If there's data in the buffer, it is all
|
---|
302 | discarded. Any buffered events are immediately emitted.
|
---|
303 | * `pipe(dest)` - Send all output to the stream provided. There is no way
|
---|
304 | to unpipe. When data is emitted, it is immediately written to any and
|
---|
305 | all pipe destinations.
|
---|
306 | * `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. Some
|
---|
307 | events are given special treatment, however. (See below under "events".)
|
---|
308 | * `promise()` - Returns a Promise that resolves when the stream emits
|
---|
309 | `end`, or rejects if the stream emits `error`.
|
---|
310 | * `collect()` - Return a Promise that resolves on `end` with an array
|
---|
311 | containing each chunk of data that was emitted, or rejects if the stream
|
---|
312 | emits `error`. Note that this consumes the stream data.
|
---|
313 | * `concat()` - Same as `collect()`, but concatenates the data into a single
|
---|
314 | Buffer object. Will reject the returned promise if the stream is in
|
---|
315 | objectMode, or if it goes into objectMode by the end of the data.
|
---|
316 | * `read(n)` - Consume `n` bytes of data out of the buffer. If `n` is not
|
---|
317 | provided, then consume all of it. If `n` bytes are not available, then
|
---|
318 | it returns null. **Note** consuming streams in this way is less
|
---|
319 | efficient, and can lead to unnecessary Buffer copying.
|
---|
320 | * `destroy([er])` - Destroy the stream. If an error is provided, then an
|
---|
321 | `'error'` event is emitted. If the stream has a `close()` method, and
|
---|
322 | has not emitted a `'close'` event yet, then `stream.close()` will be
|
---|
323 | called. Any Promises returned by `.promise()`, `.collect()` or
|
---|
324 | `.concat()` will be rejected. After being destroyed, writing to the
|
---|
325 | stream will emit an error. No more data will be emitted if the stream is
|
---|
326 | destroyed, even if it was previously buffered.
|
---|
327 |
|
---|
328 | ### Properties
|
---|
329 |
|
---|
330 | * `bufferLength` Read-only. Total number of bytes buffered, or in the case
|
---|
331 | of objectMode, the total number of objects.
|
---|
332 | * `encoding` The encoding that has been set. (Setting this is equivalent
|
---|
333 | to calling `setEncoding(enc)` and has the same prohibition against
|
---|
334 | setting multiple times.)
|
---|
335 | * `flowing` Read-only. Boolean indicating whether a chunk written to the
|
---|
336 | stream will be immediately emitted.
|
---|
337 | * `emittedEnd` Read-only. Boolean indicating whether the end-ish events
|
---|
338 | (ie, `end`, `prefinish`, `finish`) have been emitted. Note that
|
---|
339 | listening on any end-ish event will immediateyl re-emit it if it has
|
---|
340 | already been emitted.
|
---|
341 | * `writable` Whether the stream is writable. Default `true`. Set to
|
---|
342 | `false` when `end()`
|
---|
343 | * `readable` Whether the stream is readable. Default `true`.
|
---|
344 | * `buffer` A [yallist](http://npm.im/yallist) linked list of chunks written
|
---|
345 | to the stream that have not yet been emitted. (It's probably a bad idea
|
---|
346 | to mess with this.)
|
---|
347 | * `pipes` A [yallist](http://npm.im/yallist) linked list of streams that
|
---|
348 | this stream is piping into. (It's probably a bad idea to mess with
|
---|
349 | this.)
|
---|
350 | * `destroyed` A getter that indicates whether the stream was destroyed.
|
---|
351 | * `paused` True if the stream has been explicitly paused, otherwise false.
|
---|
352 | * `objectMode` Indicates whether the stream is in `objectMode`. Once set
|
---|
353 | to `true`, it cannot be set to `false`.
|
---|
354 |
|
---|
355 | ### Events
|
---|
356 |
|
---|
357 | * `data` Emitted when there's data to read. Argument is the data to read.
|
---|
358 | This is never emitted while not flowing. If a listener is attached, that
|
---|
359 | will resume the stream.
|
---|
360 | * `end` Emitted when there's no more data to read. This will be emitted
|
---|
361 | immediately for empty streams when `end()` is called. If a listener is
|
---|
362 | attached, and `end` was already emitted, then it will be emitted again.
|
---|
363 | All listeners are removed when `end` is emitted.
|
---|
364 | * `prefinish` An end-ish event that follows the same logic as `end` and is
|
---|
365 | emitted in the same conditions where `end` is emitted. Emitted after
|
---|
366 | `'end'`.
|
---|
367 | * `finish` An end-ish event that follows the same logic as `end` and is
|
---|
368 | emitted in the same conditions where `end` is emitted. Emitted after
|
---|
369 | `'prefinish'`.
|
---|
370 | * `close` An indication that an underlying resource has been released.
|
---|
371 | Minipass does not emit this event, but will defer it until after `end`
|
---|
372 | has been emitted, since it throws off some stream libraries otherwise.
|
---|
373 | * `drain` Emitted when the internal buffer empties, and it is again
|
---|
374 | suitable to `write()` into the stream.
|
---|
375 | * `readable` Emitted when data is buffered and ready to be read by a
|
---|
376 | consumer.
|
---|
377 | * `resume` Emitted when stream changes state from buffering to flowing
|
---|
378 | mode. (Ie, when `resume` is called, `pipe` is called, or a `data` event
|
---|
379 | listener is added.)
|
---|
380 |
|
---|
381 | ### Static Methods
|
---|
382 |
|
---|
383 | * `Minipass.isStream(stream)` Returns `true` if the argument is a stream,
|
---|
384 | and false otherwise. To be considered a stream, the object must be
|
---|
385 | either an instance of Minipass, or an EventEmitter that has either a
|
---|
386 | `pipe()` method, or both `write()` and `end()` methods. (Pretty much any
|
---|
387 | stream in node-land will return `true` for this.)
|
---|
388 |
|
---|
389 | ## EXAMPLES
|
---|
390 |
|
---|
391 | Here are some examples of things you can do with Minipass streams.
|
---|
392 |
|
---|
393 | ### simple "are you done yet" promise
|
---|
394 |
|
---|
395 | ```js
|
---|
396 | mp.promise().then(() => {
|
---|
397 | // stream is finished
|
---|
398 | }, er => {
|
---|
399 | // stream emitted an error
|
---|
400 | })
|
---|
401 | ```
|
---|
402 |
|
---|
403 | ### collecting
|
---|
404 |
|
---|
405 | ```js
|
---|
406 | mp.collect().then(all => {
|
---|
407 | // all is an array of all the data emitted
|
---|
408 | // encoding is supported in this case, so
|
---|
409 | // so the result will be a collection of strings if
|
---|
410 | // an encoding is specified, or buffers/objects if not.
|
---|
411 | //
|
---|
412 | // In an async function, you may do
|
---|
413 | // const data = await stream.collect()
|
---|
414 | })
|
---|
415 | ```
|
---|
416 |
|
---|
417 | ### collecting into a single blob
|
---|
418 |
|
---|
419 | This is a bit slower because it concatenates the data into one chunk for
|
---|
420 | you, but if you're going to do it yourself anyway, it's convenient this
|
---|
421 | way:
|
---|
422 |
|
---|
423 | ```js
|
---|
424 | mp.concat().then(onebigchunk => {
|
---|
425 | // onebigchunk is a string if the stream
|
---|
426 | // had an encoding set, or a buffer otherwise.
|
---|
427 | })
|
---|
428 | ```
|
---|
429 |
|
---|
430 | ### iteration
|
---|
431 |
|
---|
432 | You can iterate over streams synchronously or asynchronously in platforms
|
---|
433 | that support it.
|
---|
434 |
|
---|
435 | Synchronous iteration will end when the currently available data is
|
---|
436 | consumed, even if the `end` event has not been reached. In string and
|
---|
437 | buffer mode, the data is concatenated, so unless multiple writes are
|
---|
438 | occurring in the same tick as the `read()`, sync iteration loops will
|
---|
439 | generally only have a single iteration.
|
---|
440 |
|
---|
441 | To consume chunks in this way exactly as they have been written, with no
|
---|
442 | flattening, create the stream with the `{ objectMode: true }` option.
|
---|
443 |
|
---|
444 | ```js
|
---|
445 | const mp = new Minipass({ objectMode: true })
|
---|
446 | mp.write('a')
|
---|
447 | mp.write('b')
|
---|
448 | for (let letter of mp) {
|
---|
449 | console.log(letter) // a, b
|
---|
450 | }
|
---|
451 | mp.write('c')
|
---|
452 | mp.write('d')
|
---|
453 | for (let letter of mp) {
|
---|
454 | console.log(letter) // c, d
|
---|
455 | }
|
---|
456 | mp.write('e')
|
---|
457 | mp.end()
|
---|
458 | for (let letter of mp) {
|
---|
459 | console.log(letter) // e
|
---|
460 | }
|
---|
461 | for (let letter of mp) {
|
---|
462 | console.log(letter) // nothing
|
---|
463 | }
|
---|
464 | ```
|
---|
465 |
|
---|
466 | Asynchronous iteration will continue until the end event is reached,
|
---|
467 | consuming all of the data.
|
---|
468 |
|
---|
469 | ```js
|
---|
470 | const mp = new Minipass({ encoding: 'utf8' })
|
---|
471 |
|
---|
472 | // some source of some data
|
---|
473 | let i = 5
|
---|
474 | const inter = setInterval(() => {
|
---|
475 | if (i-- > 0)
|
---|
476 | mp.write(Buffer.from('foo\n', 'utf8'))
|
---|
477 | else {
|
---|
478 | mp.end()
|
---|
479 | clearInterval(inter)
|
---|
480 | }
|
---|
481 | }, 100)
|
---|
482 |
|
---|
483 | // consume the data with asynchronous iteration
|
---|
484 | async function consume () {
|
---|
485 | for await (let chunk of mp) {
|
---|
486 | console.log(chunk)
|
---|
487 | }
|
---|
488 | return 'ok'
|
---|
489 | }
|
---|
490 |
|
---|
491 | consume().then(res => console.log(res))
|
---|
492 | // logs `foo\n` 5 times, and then `ok`
|
---|
493 | ```
|
---|
494 |
|
---|
495 | ### subclass that `console.log()`s everything written into it
|
---|
496 |
|
---|
497 | ```js
|
---|
498 | class Logger extends Minipass {
|
---|
499 | write (chunk, encoding, callback) {
|
---|
500 | console.log('WRITE', chunk, encoding)
|
---|
501 | return super.write(chunk, encoding, callback)
|
---|
502 | }
|
---|
503 | end (chunk, encoding, callback) {
|
---|
504 | console.log('END', chunk, encoding)
|
---|
505 | return super.end(chunk, encoding, callback)
|
---|
506 | }
|
---|
507 | }
|
---|
508 |
|
---|
509 | someSource.pipe(new Logger()).pipe(someDest)
|
---|
510 | ```
|
---|
511 |
|
---|
512 | ### same thing, but using an inline anonymous class
|
---|
513 |
|
---|
514 | ```js
|
---|
515 | // js classes are fun
|
---|
516 | someSource
|
---|
517 | .pipe(new (class extends Minipass {
|
---|
518 | emit (ev, ...data) {
|
---|
519 | // let's also log events, because debugging some weird thing
|
---|
520 | console.log('EMIT', ev)
|
---|
521 | return super.emit(ev, ...data)
|
---|
522 | }
|
---|
523 | write (chunk, encoding, callback) {
|
---|
524 | console.log('WRITE', chunk, encoding)
|
---|
525 | return super.write(chunk, encoding, callback)
|
---|
526 | }
|
---|
527 | end (chunk, encoding, callback) {
|
---|
528 | console.log('END', chunk, encoding)
|
---|
529 | return super.end(chunk, encoding, callback)
|
---|
530 | }
|
---|
531 | }))
|
---|
532 | .pipe(someDest)
|
---|
533 | ```
|
---|
534 |
|
---|
535 | ### subclass that defers 'end' for some reason
|
---|
536 |
|
---|
537 | ```js
|
---|
538 | class SlowEnd extends Minipass {
|
---|
539 | emit (ev, ...args) {
|
---|
540 | if (ev === 'end') {
|
---|
541 | console.log('going to end, hold on a sec')
|
---|
542 | setTimeout(() => {
|
---|
543 | console.log('ok, ready to end now')
|
---|
544 | super.emit('end', ...args)
|
---|
545 | }, 100)
|
---|
546 | } else {
|
---|
547 | return super.emit(ev, ...args)
|
---|
548 | }
|
---|
549 | }
|
---|
550 | }
|
---|
551 | ```
|
---|
552 |
|
---|
553 | ### transform that creates newline-delimited JSON
|
---|
554 |
|
---|
555 | ```js
|
---|
556 | class NDJSONEncode extends Minipass {
|
---|
557 | write (obj, cb) {
|
---|
558 | try {
|
---|
559 | // JSON.stringify can throw, emit an error on that
|
---|
560 | return super.write(JSON.stringify(obj) + '\n', 'utf8', cb)
|
---|
561 | } catch (er) {
|
---|
562 | this.emit('error', er)
|
---|
563 | }
|
---|
564 | }
|
---|
565 | end (obj, cb) {
|
---|
566 | if (typeof obj === 'function') {
|
---|
567 | cb = obj
|
---|
568 | obj = undefined
|
---|
569 | }
|
---|
570 | if (obj !== undefined) {
|
---|
571 | this.write(obj)
|
---|
572 | }
|
---|
573 | return super.end(cb)
|
---|
574 | }
|
---|
575 | }
|
---|
576 | ```
|
---|
577 |
|
---|
578 | ### transform that parses newline-delimited JSON
|
---|
579 |
|
---|
580 | ```js
|
---|
581 | class NDJSONDecode extends Minipass {
|
---|
582 | constructor (options) {
|
---|
583 | // always be in object mode, as far as Minipass is concerned
|
---|
584 | super({ objectMode: true })
|
---|
585 | this._jsonBuffer = ''
|
---|
586 | }
|
---|
587 | write (chunk, encoding, cb) {
|
---|
588 | if (typeof chunk === 'string' &&
|
---|
589 | typeof encoding === 'string' &&
|
---|
590 | encoding !== 'utf8') {
|
---|
591 | chunk = Buffer.from(chunk, encoding).toString()
|
---|
592 | } else if (Buffer.isBuffer(chunk))
|
---|
593 | chunk = chunk.toString()
|
---|
594 | }
|
---|
595 | if (typeof encoding === 'function') {
|
---|
596 | cb = encoding
|
---|
597 | }
|
---|
598 | const jsonData = (this._jsonBuffer + chunk).split('\n')
|
---|
599 | this._jsonBuffer = jsonData.pop()
|
---|
600 | for (let i = 0; i < jsonData.length; i++) {
|
---|
601 | try {
|
---|
602 | // JSON.parse can throw, emit an error on that
|
---|
603 | super.write(JSON.parse(jsonData[i]))
|
---|
604 | } catch (er) {
|
---|
605 | this.emit('error', er)
|
---|
606 | continue
|
---|
607 | }
|
---|
608 | }
|
---|
609 | if (cb)
|
---|
610 | cb()
|
---|
611 | }
|
---|
612 | }
|
---|
613 | ```
|
---|