[6a3a178] | 1 | # minipass
|
---|
| 2 |
|
---|
| 3 | A _very_ minimal implementation of a [PassThrough
|
---|
| 4 | stream](https://nodejs.org/api/stream.html#stream_class_stream_passthrough)
|
---|
| 5 |
|
---|
| 6 | [It's very
|
---|
| 7 | fast](https://docs.google.com/spreadsheets/d/1oObKSrVwLX_7Ut4Z6g3fZW-AX1j1-k6w-cDsrkaSbHM/edit#gid=0)
|
---|
| 8 | for objects, strings, and buffers.
|
---|
| 9 |
|
---|
| 10 | Supports `pipe()`ing (including multi-`pipe()` and backpressure transmission),
|
---|
| 11 | buffering data until either a `data` event handler or `pipe()` is added (so
|
---|
| 12 | you don't lose the first chunk), and most other cases where PassThrough is
|
---|
| 13 | a good idea.
|
---|
| 14 |
|
---|
| 15 | There is a `read()` method, but it's much more efficient to consume data
|
---|
| 16 | from this stream via `'data'` events or by calling `pipe()` into some other
|
---|
| 17 | stream. Calling `read()` requires the buffer to be flattened in some
|
---|
| 18 | cases, which requires copying memory.
|
---|
| 19 |
|
---|
| 20 | There is also no `unpipe()` method. Once you start piping, there is no
|
---|
| 21 | stopping it!
|
---|
| 22 |
|
---|
| 23 | If you set `objectMode: true` in the options, then whatever is written will
|
---|
| 24 | be emitted. Otherwise, it'll do a minimal amount of Buffer copying to
|
---|
| 25 | ensure proper Streams semantics when `read(n)` is called.
|
---|
| 26 |
|
---|
| 27 | `objectMode` can also be set by doing `stream.objectMode = true`, or by
|
---|
| 28 | writing any non-string/non-buffer data. `objectMode` cannot be set to
|
---|
| 29 | false once it is set.
|
---|
| 30 |
|
---|
| 31 | This is not a `through` or `through2` stream. It doesn't transform the
|
---|
| 32 | data, it just passes it right through. If you want to transform the data,
|
---|
| 33 | extend the class, and override the `write()` method. Once you're done
|
---|
| 34 | transforming the data however you want, call `super.write()` with the
|
---|
| 35 | transform output.
|
---|
| 36 |
|
---|
| 37 | For some examples of streams that extend Minipass in various ways, check
|
---|
| 38 | out:
|
---|
| 39 |
|
---|
| 40 | - [minizlib](http://npm.im/minizlib)
|
---|
| 41 | - [fs-minipass](http://npm.im/fs-minipass)
|
---|
| 42 | - [tar](http://npm.im/tar)
|
---|
| 43 | - [minipass-collect](http://npm.im/minipass-collect)
|
---|
| 44 | - [minipass-flush](http://npm.im/minipass-flush)
|
---|
| 45 | - [minipass-pipeline](http://npm.im/minipass-pipeline)
|
---|
| 46 | - [tap](http://npm.im/tap)
|
---|
| 47 | - [tap-parser](http://npm.im/tap-parser)
|
---|
| 48 | - [treport](http://npm.im/treport)
|
---|
| 49 | - [minipass-fetch](http://npm.im/minipass-fetch)
|
---|
| 50 | - [pacote](http://npm.im/pacote)
|
---|
| 51 | - [make-fetch-happen](http://npm.im/make-fetch-happen)
|
---|
| 52 | - [cacache](http://npm.im/cacache)
|
---|
| 53 | - [ssri](http://npm.im/ssri)
|
---|
| 54 | - [npm-registry-fetch](http://npm.im/npm-registry-fetch)
|
---|
| 55 | - [minipass-json-stream](http://npm.im/minipass-json-stream)
|
---|
| 56 | - [minipass-sized](http://npm.im/minipass-sized)
|
---|
| 57 |
|
---|
| 58 | ## Differences from Node.js Streams
|
---|
| 59 |
|
---|
| 60 | There are several things that make Minipass streams different from (and in
|
---|
| 61 | some ways superior to) Node.js core streams.
|
---|
| 62 |
|
---|
| 63 | Please read these caveats if you are familiar with node-core streams and
|
---|
| 64 | intend to use Minipass streams in your programs.
|
---|
| 65 |
|
---|
| 66 | ### Timing
|
---|
| 67 |
|
---|
| 68 | Minipass streams are designed to support synchronous use-cases. Thus, data
|
---|
| 69 | is emitted as soon as it is available, always. It is buffered until read,
|
---|
| 70 | but no longer. Another way to look at it is that Minipass streams are
|
---|
| 71 | exactly as synchronous as the logic that writes into them.
|
---|
| 72 |
|
---|
| 73 | This can be surprising if your code relies on `PassThrough.write()` always
|
---|
| 74 | providing data on the next tick rather than the current one, or being able
|
---|
| 75 | to call `resume()` and not have the entire buffer disappear immediately.
|
---|
| 76 |
|
---|
| 77 | However, without this synchronicity guarantee, there would be no way for
|
---|
| 78 | Minipass to achieve the speeds it does, or support the synchronous use
|
---|
| 79 | cases that it does. Simply put, waiting takes time.
|
---|
| 80 |
|
---|
| 81 | This non-deferring approach makes Minipass streams much easier to reason
|
---|
| 82 | about, especially in the context of Promises and other flow-control
|
---|
| 83 | mechanisms.
|
---|
| 84 |
|
---|
| 85 | ### No High/Low Water Marks
|
---|
| 86 |
|
---|
| 87 | Node.js core streams will optimistically fill up a buffer, returning `true`
|
---|
| 88 | on all writes until the limit is hit, even if the data has nowhere to go.
|
---|
| 89 | Then, they will not attempt to draw more data in until the buffer size dips
|
---|
| 90 | below a minimum value.
|
---|
| 91 |
|
---|
| 92 | Minipass streams are much simpler. The `write()` method will return `true`
|
---|
| 93 | if the data has somewhere to go (which is to say, given the timing
|
---|
| 94 | guarantees, that the data is already there by the time `write()` returns).
|
---|
| 95 |
|
---|
| 96 | If the data has nowhere to go, then `write()` returns false, and the data
|
---|
| 97 | sits in a buffer, to be drained out immediately as soon as anyone consumes
|
---|
| 98 | it.
|
---|
| 99 |
|
---|
| 100 | ### Hazards of Buffering (or: Why Minipass Is So Fast)
|
---|
| 101 |
|
---|
| 102 | Since data written to a Minipass stream is immediately written all the way
|
---|
| 103 | through the pipeline, and `write()` always returns true/false based on
|
---|
| 104 | whether the data was fully flushed, backpressure is communicated
|
---|
| 105 | immediately to the upstream caller. This minimizes buffering.
|
---|
| 106 |
|
---|
| 107 | Consider this case:
|
---|
| 108 |
|
---|
| 109 | ```js
|
---|
| 110 | const {PassThrough} = require('stream')
|
---|
| 111 | const p1 = new PassThrough({ highWaterMark: 1024 })
|
---|
| 112 | const p2 = new PassThrough({ highWaterMark: 1024 })
|
---|
| 113 | const p3 = new PassThrough({ highWaterMark: 1024 })
|
---|
| 114 | const p4 = new PassThrough({ highWaterMark: 1024 })
|
---|
| 115 |
|
---|
| 116 | p1.pipe(p2).pipe(p3).pipe(p4)
|
---|
| 117 | p4.on('data', () => console.log('made it through'))
|
---|
| 118 |
|
---|
| 119 | // this returns false and buffers, then writes to p2 on next tick (1)
|
---|
| 120 | // p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2)
|
---|
| 121 | // p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3)
|
---|
| 122 | // p4 returns false and buffers, pausing p3, then emits 'data' and 'drain'
|
---|
| 123 | // on next tick (4)
|
---|
| 124 | // p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and
|
---|
| 125 | // 'drain' on next tick (5)
|
---|
| 126 | // p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6)
|
---|
| 127 | // p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next
|
---|
| 128 | // tick (7)
|
---|
| 129 |
|
---|
| 130 | p1.write(Buffer.alloc(2048)) // returns false
|
---|
| 131 | ```
|
---|
| 132 |
|
---|
| 133 | Along the way, the data was buffered and deferred at each stage, and
|
---|
| 134 | multiple event deferrals happened, for an unblocked pipeline where it was
|
---|
| 135 | perfectly safe to write all the way through!
|
---|
| 136 |
|
---|
| 137 | Furthermore, setting a `highWaterMark` of `1024` might lead someone reading
|
---|
| 138 | the code to think an advisory maximum of 1KiB is being set for the
|
---|
| 139 | pipeline. However, the actual advisory buffering level is the _sum_ of
|
---|
| 140 | `highWaterMark` values, since each one has its own bucket.
|
---|
| 141 |
|
---|
| 142 | Consider the Minipass case:
|
---|
| 143 |
|
---|
| 144 | ```js
|
---|
| 145 | const m1 = new Minipass()
|
---|
| 146 | const m2 = new Minipass()
|
---|
| 147 | const m3 = new Minipass()
|
---|
| 148 | const m4 = new Minipass()
|
---|
| 149 |
|
---|
| 150 | m1.pipe(m2).pipe(m3).pipe(m4)
|
---|
| 151 | m4.on('data', () => console.log('made it through'))
|
---|
| 152 |
|
---|
| 153 | // m1 is flowing, so it writes the data to m2 immediately
|
---|
| 154 | // m2 is flowing, so it writes the data to m3 immediately
|
---|
| 155 | // m3 is flowing, so it writes the data to m4 immediately
|
---|
| 156 | // m4 is flowing, so it fires the 'data' event immediately, returns true
|
---|
| 157 | // m4's write returned true, so m3 is still flowing, returns true
|
---|
| 158 | // m3's write returned true, so m2 is still flowing, returns true
|
---|
| 159 | // m2's write returned true, so m1 is still flowing, returns true
|
---|
| 160 | // No event deferrals or buffering along the way!
|
---|
| 161 |
|
---|
| 162 | m1.write(Buffer.alloc(2048)) // returns true
|
---|
| 163 | ```
|
---|
| 164 |
|
---|
| 165 | It is extremely unlikely that you _don't_ want to buffer any data written,
|
---|
| 166 | or _ever_ buffer data that can be flushed all the way through. Neither
|
---|
| 167 | node-core streams nor Minipass ever fail to buffer written data, but
|
---|
| 168 | node-core streams do a lot of unnecessary buffering and pausing.
|
---|
| 169 |
|
---|
| 170 | As always, the faster implementation is the one that does less stuff and
|
---|
| 171 | waits less time to do it.
|
---|
| 172 |
|
---|
| 173 | ### Immediately emit `end` for empty streams (when not paused)
|
---|
| 174 |
|
---|
| 175 | If a stream is not paused, and `end()` is called before writing any data
|
---|
| 176 | into it, then it will emit `end` immediately.
|
---|
| 177 |
|
---|
| 178 | If you have logic that occurs on the `end` event which you don't want to
|
---|
| 179 | potentially happen immediately (for example, closing file descriptors,
|
---|
| 180 | moving on to the next entry in an archive parse stream, etc.) then be sure
|
---|
| 181 | to call `stream.pause()` on creation, and then `stream.resume()` once you
|
---|
| 182 | are ready to respond to the `end` event.
|
---|
| 183 |
|
---|
| 184 | ### Emit `end` When Asked
|
---|
| 185 |
|
---|
| 186 | One hazard of immediately emitting `'end'` is that you may not yet have had
|
---|
| 187 | a chance to add a listener. In order to avoid this hazard, Minipass
|
---|
| 188 | streams safely re-emit the `'end'` event if a new listener is added after
|
---|
| 189 | `'end'` has been emitted.
|
---|
| 190 |
|
---|
| 191 | Ie, if you do `stream.on('end', someFunction)`, and the stream has already
|
---|
| 192 | emitted `end`, then it will call the handler right away. (You can think of
|
---|
| 193 | this somewhat like attaching a new `.then(fn)` to a previously-resolved
|
---|
| 194 | Promise.)
|
---|
| 195 |
|
---|
| 196 | To prevent calling handlers multiple times who would not expect multiple
|
---|
| 197 | ends to occur, all listeners are removed from the `'end'` event whenever it
|
---|
| 198 | is emitted.
|
---|
| 199 |
|
---|
| 200 | ### Impact of "immediate flow" on Tee-streams
|
---|
| 201 |
|
---|
| 202 | A "tee stream" is a stream piping to multiple destinations:
|
---|
| 203 |
|
---|
| 204 | ```js
|
---|
| 205 | const tee = new Minipass()
|
---|
| 206 | t.pipe(dest1)
|
---|
| 207 | t.pipe(dest2)
|
---|
| 208 | t.write('foo') // goes to both destinations
|
---|
| 209 | ```
|
---|
| 210 |
|
---|
| 211 | Since Minipass streams _immediately_ process any pending data through the
|
---|
| 212 | pipeline when a new pipe destination is added, this can have surprising
|
---|
| 213 | effects, especially when a stream comes in from some other function and may
|
---|
| 214 | or may not have data in its buffer.
|
---|
| 215 |
|
---|
| 216 | ```js
|
---|
| 217 | // WARNING! WILL LOSE DATA!
|
---|
| 218 | const src = new Minipass()
|
---|
| 219 | src.write('foo')
|
---|
| 220 | src.pipe(dest1) // 'foo' chunk flows to dest1 immediately, and is gone
|
---|
| 221 | src.pipe(dest2) // gets nothing!
|
---|
| 222 | ```
|
---|
| 223 |
|
---|
| 224 | The solution is to create a dedicated tee-stream junction that pipes to
|
---|
| 225 | both locations, and then pipe to _that_ instead.
|
---|
| 226 |
|
---|
| 227 | ```js
|
---|
| 228 | // Safe example: tee to both places
|
---|
| 229 | const src = new Minipass()
|
---|
| 230 | src.write('foo')
|
---|
| 231 | const tee = new Minipass()
|
---|
| 232 | tee.pipe(dest1)
|
---|
| 233 | tee.pipe(dest2)
|
---|
| 234 | src.pipe(tee) // tee gets 'foo', pipes to both locations
|
---|
| 235 | ```
|
---|
| 236 |
|
---|
| 237 | The same caveat applies to `on('data')` event listeners. The first one
|
---|
| 238 | added will _immediately_ receive all of the data, leaving nothing for the
|
---|
| 239 | second:
|
---|
| 240 |
|
---|
| 241 | ```js
|
---|
| 242 | // WARNING! WILL LOSE DATA!
|
---|
| 243 | const src = new Minipass()
|
---|
| 244 | src.write('foo')
|
---|
| 245 | src.on('data', handler1) // receives 'foo' right away
|
---|
| 246 | src.on('data', handler2) // nothing to see here!
|
---|
| 247 | ```
|
---|
| 248 |
|
---|
| 249 | Using a dedicated tee-stream can be used in this case as well:
|
---|
| 250 |
|
---|
| 251 | ```js
|
---|
| 252 | // Safe example: tee to both data handlers
|
---|
| 253 | const src = new Minipass()
|
---|
| 254 | src.write('foo')
|
---|
| 255 | const tee = new Minipass()
|
---|
| 256 | tee.on('data', handler1)
|
---|
| 257 | tee.on('data', handler2)
|
---|
| 258 | src.pipe(tee)
|
---|
| 259 | ```
|
---|
| 260 |
|
---|
| 261 | ## USAGE
|
---|
| 262 |
|
---|
| 263 | It's a stream! Use it like a stream and it'll most likely do what you
|
---|
| 264 | want.
|
---|
| 265 |
|
---|
| 266 | ```js
|
---|
| 267 | const Minipass = require('minipass')
|
---|
| 268 | const mp = new Minipass(options) // optional: { encoding, objectMode }
|
---|
| 269 | mp.write('foo')
|
---|
| 270 | mp.pipe(someOtherStream)
|
---|
| 271 | mp.end('bar')
|
---|
| 272 | ```
|
---|
| 273 |
|
---|
| 274 | ### OPTIONS
|
---|
| 275 |
|
---|
| 276 | * `encoding` How would you like the data coming _out_ of the stream to be
|
---|
| 277 | encoded? Accepts any values that can be passed to `Buffer.toString()`.
|
---|
| 278 | * `objectMode` Emit data exactly as it comes in. This will be flipped on
|
---|
| 279 | by default if you write() something other than a string or Buffer at any
|
---|
| 280 | point. Setting `objectMode: true` will prevent setting any encoding
|
---|
| 281 | value.
|
---|
| 282 |
|
---|
| 283 | ### API
|
---|
| 284 |
|
---|
| 285 | Implements the user-facing portions of Node.js's `Readable` and `Writable`
|
---|
| 286 | streams.
|
---|
| 287 |
|
---|
| 288 | ### Methods
|
---|
| 289 |
|
---|
| 290 | * `write(chunk, [encoding], [callback])` - Put data in. (Note that, in the
|
---|
| 291 | base Minipass class, the same data will come out.) Returns `false` if
|
---|
| 292 | the stream will buffer the next write, or true if it's still in "flowing"
|
---|
| 293 | mode.
|
---|
| 294 | * `end([chunk, [encoding]], [callback])` - Signal that you have no more
|
---|
| 295 | data to write. This will queue an `end` event to be fired when all the
|
---|
| 296 | data has been consumed.
|
---|
| 297 | * `setEncoding(encoding)` - Set the encoding for data coming of the stream.
|
---|
| 298 | This can only be done once.
|
---|
| 299 | * `pause()` - No more data for a while, please. This also prevents `end`
|
---|
| 300 | from being emitted for empty streams until the stream is resumed.
|
---|
| 301 | * `resume()` - Resume the stream. If there's data in the buffer, it is all
|
---|
| 302 | discarded. Any buffered events are immediately emitted.
|
---|
| 303 | * `pipe(dest)` - Send all output to the stream provided. There is no way
|
---|
| 304 | to unpipe. When data is emitted, it is immediately written to any and
|
---|
| 305 | all pipe destinations.
|
---|
| 306 | * `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. Some
|
---|
| 307 | events are given special treatment, however. (See below under "events".)
|
---|
| 308 | * `promise()` - Returns a Promise that resolves when the stream emits
|
---|
| 309 | `end`, or rejects if the stream emits `error`.
|
---|
| 310 | * `collect()` - Return a Promise that resolves on `end` with an array
|
---|
| 311 | containing each chunk of data that was emitted, or rejects if the stream
|
---|
| 312 | emits `error`. Note that this consumes the stream data.
|
---|
| 313 | * `concat()` - Same as `collect()`, but concatenates the data into a single
|
---|
| 314 | Buffer object. Will reject the returned promise if the stream is in
|
---|
| 315 | objectMode, or if it goes into objectMode by the end of the data.
|
---|
| 316 | * `read(n)` - Consume `n` bytes of data out of the buffer. If `n` is not
|
---|
| 317 | provided, then consume all of it. If `n` bytes are not available, then
|
---|
| 318 | it returns null. **Note** consuming streams in this way is less
|
---|
| 319 | efficient, and can lead to unnecessary Buffer copying.
|
---|
| 320 | * `destroy([er])` - Destroy the stream. If an error is provided, then an
|
---|
| 321 | `'error'` event is emitted. If the stream has a `close()` method, and
|
---|
| 322 | has not emitted a `'close'` event yet, then `stream.close()` will be
|
---|
| 323 | called. Any Promises returned by `.promise()`, `.collect()` or
|
---|
| 324 | `.concat()` will be rejected. After being destroyed, writing to the
|
---|
| 325 | stream will emit an error. No more data will be emitted if the stream is
|
---|
| 326 | destroyed, even if it was previously buffered.
|
---|
| 327 |
|
---|
| 328 | ### Properties
|
---|
| 329 |
|
---|
| 330 | * `bufferLength` Read-only. Total number of bytes buffered, or in the case
|
---|
| 331 | of objectMode, the total number of objects.
|
---|
| 332 | * `encoding` The encoding that has been set. (Setting this is equivalent
|
---|
| 333 | to calling `setEncoding(enc)` and has the same prohibition against
|
---|
| 334 | setting multiple times.)
|
---|
| 335 | * `flowing` Read-only. Boolean indicating whether a chunk written to the
|
---|
| 336 | stream will be immediately emitted.
|
---|
| 337 | * `emittedEnd` Read-only. Boolean indicating whether the end-ish events
|
---|
| 338 | (ie, `end`, `prefinish`, `finish`) have been emitted. Note that
|
---|
| 339 | listening on any end-ish event will immediateyl re-emit it if it has
|
---|
| 340 | already been emitted.
|
---|
| 341 | * `writable` Whether the stream is writable. Default `true`. Set to
|
---|
| 342 | `false` when `end()`
|
---|
| 343 | * `readable` Whether the stream is readable. Default `true`.
|
---|
| 344 | * `buffer` A [yallist](http://npm.im/yallist) linked list of chunks written
|
---|
| 345 | to the stream that have not yet been emitted. (It's probably a bad idea
|
---|
| 346 | to mess with this.)
|
---|
| 347 | * `pipes` A [yallist](http://npm.im/yallist) linked list of streams that
|
---|
| 348 | this stream is piping into. (It's probably a bad idea to mess with
|
---|
| 349 | this.)
|
---|
| 350 | * `destroyed` A getter that indicates whether the stream was destroyed.
|
---|
| 351 | * `paused` True if the stream has been explicitly paused, otherwise false.
|
---|
| 352 | * `objectMode` Indicates whether the stream is in `objectMode`. Once set
|
---|
| 353 | to `true`, it cannot be set to `false`.
|
---|
| 354 |
|
---|
| 355 | ### Events
|
---|
| 356 |
|
---|
| 357 | * `data` Emitted when there's data to read. Argument is the data to read.
|
---|
| 358 | This is never emitted while not flowing. If a listener is attached, that
|
---|
| 359 | will resume the stream.
|
---|
| 360 | * `end` Emitted when there's no more data to read. This will be emitted
|
---|
| 361 | immediately for empty streams when `end()` is called. If a listener is
|
---|
| 362 | attached, and `end` was already emitted, then it will be emitted again.
|
---|
| 363 | All listeners are removed when `end` is emitted.
|
---|
| 364 | * `prefinish` An end-ish event that follows the same logic as `end` and is
|
---|
| 365 | emitted in the same conditions where `end` is emitted. Emitted after
|
---|
| 366 | `'end'`.
|
---|
| 367 | * `finish` An end-ish event that follows the same logic as `end` and is
|
---|
| 368 | emitted in the same conditions where `end` is emitted. Emitted after
|
---|
| 369 | `'prefinish'`.
|
---|
| 370 | * `close` An indication that an underlying resource has been released.
|
---|
| 371 | Minipass does not emit this event, but will defer it until after `end`
|
---|
| 372 | has been emitted, since it throws off some stream libraries otherwise.
|
---|
| 373 | * `drain` Emitted when the internal buffer empties, and it is again
|
---|
| 374 | suitable to `write()` into the stream.
|
---|
| 375 | * `readable` Emitted when data is buffered and ready to be read by a
|
---|
| 376 | consumer.
|
---|
| 377 | * `resume` Emitted when stream changes state from buffering to flowing
|
---|
| 378 | mode. (Ie, when `resume` is called, `pipe` is called, or a `data` event
|
---|
| 379 | listener is added.)
|
---|
| 380 |
|
---|
| 381 | ### Static Methods
|
---|
| 382 |
|
---|
| 383 | * `Minipass.isStream(stream)` Returns `true` if the argument is a stream,
|
---|
| 384 | and false otherwise. To be considered a stream, the object must be
|
---|
| 385 | either an instance of Minipass, or an EventEmitter that has either a
|
---|
| 386 | `pipe()` method, or both `write()` and `end()` methods. (Pretty much any
|
---|
| 387 | stream in node-land will return `true` for this.)
|
---|
| 388 |
|
---|
| 389 | ## EXAMPLES
|
---|
| 390 |
|
---|
| 391 | Here are some examples of things you can do with Minipass streams.
|
---|
| 392 |
|
---|
| 393 | ### simple "are you done yet" promise
|
---|
| 394 |
|
---|
| 395 | ```js
|
---|
| 396 | mp.promise().then(() => {
|
---|
| 397 | // stream is finished
|
---|
| 398 | }, er => {
|
---|
| 399 | // stream emitted an error
|
---|
| 400 | })
|
---|
| 401 | ```
|
---|
| 402 |
|
---|
| 403 | ### collecting
|
---|
| 404 |
|
---|
| 405 | ```js
|
---|
| 406 | mp.collect().then(all => {
|
---|
| 407 | // all is an array of all the data emitted
|
---|
| 408 | // encoding is supported in this case, so
|
---|
| 409 | // so the result will be a collection of strings if
|
---|
| 410 | // an encoding is specified, or buffers/objects if not.
|
---|
| 411 | //
|
---|
| 412 | // In an async function, you may do
|
---|
| 413 | // const data = await stream.collect()
|
---|
| 414 | })
|
---|
| 415 | ```
|
---|
| 416 |
|
---|
| 417 | ### collecting into a single blob
|
---|
| 418 |
|
---|
| 419 | This is a bit slower because it concatenates the data into one chunk for
|
---|
| 420 | you, but if you're going to do it yourself anyway, it's convenient this
|
---|
| 421 | way:
|
---|
| 422 |
|
---|
| 423 | ```js
|
---|
| 424 | mp.concat().then(onebigchunk => {
|
---|
| 425 | // onebigchunk is a string if the stream
|
---|
| 426 | // had an encoding set, or a buffer otherwise.
|
---|
| 427 | })
|
---|
| 428 | ```
|
---|
| 429 |
|
---|
| 430 | ### iteration
|
---|
| 431 |
|
---|
| 432 | You can iterate over streams synchronously or asynchronously in platforms
|
---|
| 433 | that support it.
|
---|
| 434 |
|
---|
| 435 | Synchronous iteration will end when the currently available data is
|
---|
| 436 | consumed, even if the `end` event has not been reached. In string and
|
---|
| 437 | buffer mode, the data is concatenated, so unless multiple writes are
|
---|
| 438 | occurring in the same tick as the `read()`, sync iteration loops will
|
---|
| 439 | generally only have a single iteration.
|
---|
| 440 |
|
---|
| 441 | To consume chunks in this way exactly as they have been written, with no
|
---|
| 442 | flattening, create the stream with the `{ objectMode: true }` option.
|
---|
| 443 |
|
---|
| 444 | ```js
|
---|
| 445 | const mp = new Minipass({ objectMode: true })
|
---|
| 446 | mp.write('a')
|
---|
| 447 | mp.write('b')
|
---|
| 448 | for (let letter of mp) {
|
---|
| 449 | console.log(letter) // a, b
|
---|
| 450 | }
|
---|
| 451 | mp.write('c')
|
---|
| 452 | mp.write('d')
|
---|
| 453 | for (let letter of mp) {
|
---|
| 454 | console.log(letter) // c, d
|
---|
| 455 | }
|
---|
| 456 | mp.write('e')
|
---|
| 457 | mp.end()
|
---|
| 458 | for (let letter of mp) {
|
---|
| 459 | console.log(letter) // e
|
---|
| 460 | }
|
---|
| 461 | for (let letter of mp) {
|
---|
| 462 | console.log(letter) // nothing
|
---|
| 463 | }
|
---|
| 464 | ```
|
---|
| 465 |
|
---|
| 466 | Asynchronous iteration will continue until the end event is reached,
|
---|
| 467 | consuming all of the data.
|
---|
| 468 |
|
---|
| 469 | ```js
|
---|
| 470 | const mp = new Minipass({ encoding: 'utf8' })
|
---|
| 471 |
|
---|
| 472 | // some source of some data
|
---|
| 473 | let i = 5
|
---|
| 474 | const inter = setInterval(() => {
|
---|
| 475 | if (i-- > 0)
|
---|
| 476 | mp.write(Buffer.from('foo\n', 'utf8'))
|
---|
| 477 | else {
|
---|
| 478 | mp.end()
|
---|
| 479 | clearInterval(inter)
|
---|
| 480 | }
|
---|
| 481 | }, 100)
|
---|
| 482 |
|
---|
| 483 | // consume the data with asynchronous iteration
|
---|
| 484 | async function consume () {
|
---|
| 485 | for await (let chunk of mp) {
|
---|
| 486 | console.log(chunk)
|
---|
| 487 | }
|
---|
| 488 | return 'ok'
|
---|
| 489 | }
|
---|
| 490 |
|
---|
| 491 | consume().then(res => console.log(res))
|
---|
| 492 | // logs `foo\n` 5 times, and then `ok`
|
---|
| 493 | ```
|
---|
| 494 |
|
---|
| 495 | ### subclass that `console.log()`s everything written into it
|
---|
| 496 |
|
---|
| 497 | ```js
|
---|
| 498 | class Logger extends Minipass {
|
---|
| 499 | write (chunk, encoding, callback) {
|
---|
| 500 | console.log('WRITE', chunk, encoding)
|
---|
| 501 | return super.write(chunk, encoding, callback)
|
---|
| 502 | }
|
---|
| 503 | end (chunk, encoding, callback) {
|
---|
| 504 | console.log('END', chunk, encoding)
|
---|
| 505 | return super.end(chunk, encoding, callback)
|
---|
| 506 | }
|
---|
| 507 | }
|
---|
| 508 |
|
---|
| 509 | someSource.pipe(new Logger()).pipe(someDest)
|
---|
| 510 | ```
|
---|
| 511 |
|
---|
| 512 | ### same thing, but using an inline anonymous class
|
---|
| 513 |
|
---|
| 514 | ```js
|
---|
| 515 | // js classes are fun
|
---|
| 516 | someSource
|
---|
| 517 | .pipe(new (class extends Minipass {
|
---|
| 518 | emit (ev, ...data) {
|
---|
| 519 | // let's also log events, because debugging some weird thing
|
---|
| 520 | console.log('EMIT', ev)
|
---|
| 521 | return super.emit(ev, ...data)
|
---|
| 522 | }
|
---|
| 523 | write (chunk, encoding, callback) {
|
---|
| 524 | console.log('WRITE', chunk, encoding)
|
---|
| 525 | return super.write(chunk, encoding, callback)
|
---|
| 526 | }
|
---|
| 527 | end (chunk, encoding, callback) {
|
---|
| 528 | console.log('END', chunk, encoding)
|
---|
| 529 | return super.end(chunk, encoding, callback)
|
---|
| 530 | }
|
---|
| 531 | }))
|
---|
| 532 | .pipe(someDest)
|
---|
| 533 | ```
|
---|
| 534 |
|
---|
| 535 | ### subclass that defers 'end' for some reason
|
---|
| 536 |
|
---|
| 537 | ```js
|
---|
| 538 | class SlowEnd extends Minipass {
|
---|
| 539 | emit (ev, ...args) {
|
---|
| 540 | if (ev === 'end') {
|
---|
| 541 | console.log('going to end, hold on a sec')
|
---|
| 542 | setTimeout(() => {
|
---|
| 543 | console.log('ok, ready to end now')
|
---|
| 544 | super.emit('end', ...args)
|
---|
| 545 | }, 100)
|
---|
| 546 | } else {
|
---|
| 547 | return super.emit(ev, ...args)
|
---|
| 548 | }
|
---|
| 549 | }
|
---|
| 550 | }
|
---|
| 551 | ```
|
---|
| 552 |
|
---|
| 553 | ### transform that creates newline-delimited JSON
|
---|
| 554 |
|
---|
| 555 | ```js
|
---|
| 556 | class NDJSONEncode extends Minipass {
|
---|
| 557 | write (obj, cb) {
|
---|
| 558 | try {
|
---|
| 559 | // JSON.stringify can throw, emit an error on that
|
---|
| 560 | return super.write(JSON.stringify(obj) + '\n', 'utf8', cb)
|
---|
| 561 | } catch (er) {
|
---|
| 562 | this.emit('error', er)
|
---|
| 563 | }
|
---|
| 564 | }
|
---|
| 565 | end (obj, cb) {
|
---|
| 566 | if (typeof obj === 'function') {
|
---|
| 567 | cb = obj
|
---|
| 568 | obj = undefined
|
---|
| 569 | }
|
---|
| 570 | if (obj !== undefined) {
|
---|
| 571 | this.write(obj)
|
---|
| 572 | }
|
---|
| 573 | return super.end(cb)
|
---|
| 574 | }
|
---|
| 575 | }
|
---|
| 576 | ```
|
---|
| 577 |
|
---|
| 578 | ### transform that parses newline-delimited JSON
|
---|
| 579 |
|
---|
| 580 | ```js
|
---|
| 581 | class NDJSONDecode extends Minipass {
|
---|
| 582 | constructor (options) {
|
---|
| 583 | // always be in object mode, as far as Minipass is concerned
|
---|
| 584 | super({ objectMode: true })
|
---|
| 585 | this._jsonBuffer = ''
|
---|
| 586 | }
|
---|
| 587 | write (chunk, encoding, cb) {
|
---|
| 588 | if (typeof chunk === 'string' &&
|
---|
| 589 | typeof encoding === 'string' &&
|
---|
| 590 | encoding !== 'utf8') {
|
---|
| 591 | chunk = Buffer.from(chunk, encoding).toString()
|
---|
| 592 | } else if (Buffer.isBuffer(chunk))
|
---|
| 593 | chunk = chunk.toString()
|
---|
| 594 | }
|
---|
| 595 | if (typeof encoding === 'function') {
|
---|
| 596 | cb = encoding
|
---|
| 597 | }
|
---|
| 598 | const jsonData = (this._jsonBuffer + chunk).split('\n')
|
---|
| 599 | this._jsonBuffer = jsonData.pop()
|
---|
| 600 | for (let i = 0; i < jsonData.length; i++) {
|
---|
| 601 | try {
|
---|
| 602 | // JSON.parse can throw, emit an error on that
|
---|
| 603 | super.write(JSON.parse(jsonData[i]))
|
---|
| 604 | } catch (er) {
|
---|
| 605 | this.emit('error', er)
|
---|
| 606 | continue
|
---|
| 607 | }
|
---|
| 608 | }
|
---|
| 609 | if (cb)
|
---|
| 610 | cb()
|
---|
| 611 | }
|
---|
| 612 | }
|
---|
| 613 | ```
|
---|