1 | 'use strict'
|
---|
2 | const MiniPass = require('minipass')
|
---|
3 | const EE = require('events').EventEmitter
|
---|
4 | const fs = require('fs')
|
---|
5 |
|
---|
6 | let writev = fs.writev
|
---|
7 | /* istanbul ignore next */
|
---|
8 | if (!writev) {
|
---|
9 | // This entire block can be removed if support for earlier than Node.js
|
---|
10 | // 12.9.0 is not needed.
|
---|
11 | const binding = process.binding('fs')
|
---|
12 | const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
|
---|
13 |
|
---|
14 | writev = (fd, iovec, pos, cb) => {
|
---|
15 | const done = (er, bw) => cb(er, bw, iovec)
|
---|
16 | const req = new FSReqWrap()
|
---|
17 | req.oncomplete = done
|
---|
18 | binding.writeBuffers(fd, iovec, pos, req)
|
---|
19 | }
|
---|
20 | }
|
---|
21 |
|
---|
22 | const _autoClose = Symbol('_autoClose')
|
---|
23 | const _close = Symbol('_close')
|
---|
24 | const _ended = Symbol('_ended')
|
---|
25 | const _fd = Symbol('_fd')
|
---|
26 | const _finished = Symbol('_finished')
|
---|
27 | const _flags = Symbol('_flags')
|
---|
28 | const _flush = Symbol('_flush')
|
---|
29 | const _handleChunk = Symbol('_handleChunk')
|
---|
30 | const _makeBuf = Symbol('_makeBuf')
|
---|
31 | const _mode = Symbol('_mode')
|
---|
32 | const _needDrain = Symbol('_needDrain')
|
---|
33 | const _onerror = Symbol('_onerror')
|
---|
34 | const _onopen = Symbol('_onopen')
|
---|
35 | const _onread = Symbol('_onread')
|
---|
36 | const _onwrite = Symbol('_onwrite')
|
---|
37 | const _open = Symbol('_open')
|
---|
38 | const _path = Symbol('_path')
|
---|
39 | const _pos = Symbol('_pos')
|
---|
40 | const _queue = Symbol('_queue')
|
---|
41 | const _read = Symbol('_read')
|
---|
42 | const _readSize = Symbol('_readSize')
|
---|
43 | const _reading = Symbol('_reading')
|
---|
44 | const _remain = Symbol('_remain')
|
---|
45 | const _size = Symbol('_size')
|
---|
46 | const _write = Symbol('_write')
|
---|
47 | const _writing = Symbol('_writing')
|
---|
48 | const _defaultFlag = Symbol('_defaultFlag')
|
---|
49 | const _errored = Symbol('_errored')
|
---|
50 |
|
---|
51 | class ReadStream extends MiniPass {
|
---|
52 | constructor (path, opt) {
|
---|
53 | opt = opt || {}
|
---|
54 | super(opt)
|
---|
55 |
|
---|
56 | this.readable = true
|
---|
57 | this.writable = false
|
---|
58 |
|
---|
59 | if (typeof path !== 'string')
|
---|
60 | throw new TypeError('path must be a string')
|
---|
61 |
|
---|
62 | this[_errored] = false
|
---|
63 | this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
|
---|
64 | this[_path] = path
|
---|
65 | this[_readSize] = opt.readSize || 16*1024*1024
|
---|
66 | this[_reading] = false
|
---|
67 | this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
|
---|
68 | this[_remain] = this[_size]
|
---|
69 | this[_autoClose] = typeof opt.autoClose === 'boolean' ?
|
---|
70 | opt.autoClose : true
|
---|
71 |
|
---|
72 | if (typeof this[_fd] === 'number')
|
---|
73 | this[_read]()
|
---|
74 | else
|
---|
75 | this[_open]()
|
---|
76 | }
|
---|
77 |
|
---|
78 | get fd () { return this[_fd] }
|
---|
79 | get path () { return this[_path] }
|
---|
80 |
|
---|
81 | write () {
|
---|
82 | throw new TypeError('this is a readable stream')
|
---|
83 | }
|
---|
84 |
|
---|
85 | end () {
|
---|
86 | throw new TypeError('this is a readable stream')
|
---|
87 | }
|
---|
88 |
|
---|
89 | [_open] () {
|
---|
90 | fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
|
---|
91 | }
|
---|
92 |
|
---|
93 | [_onopen] (er, fd) {
|
---|
94 | if (er)
|
---|
95 | this[_onerror](er)
|
---|
96 | else {
|
---|
97 | this[_fd] = fd
|
---|
98 | this.emit('open', fd)
|
---|
99 | this[_read]()
|
---|
100 | }
|
---|
101 | }
|
---|
102 |
|
---|
103 | [_makeBuf] () {
|
---|
104 | return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
|
---|
105 | }
|
---|
106 |
|
---|
107 | [_read] () {
|
---|
108 | if (!this[_reading]) {
|
---|
109 | this[_reading] = true
|
---|
110 | const buf = this[_makeBuf]()
|
---|
111 | /* istanbul ignore if */
|
---|
112 | if (buf.length === 0)
|
---|
113 | return process.nextTick(() => this[_onread](null, 0, buf))
|
---|
114 | fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
|
---|
115 | this[_onread](er, br, buf))
|
---|
116 | }
|
---|
117 | }
|
---|
118 |
|
---|
119 | [_onread] (er, br, buf) {
|
---|
120 | this[_reading] = false
|
---|
121 | if (er)
|
---|
122 | this[_onerror](er)
|
---|
123 | else if (this[_handleChunk](br, buf))
|
---|
124 | this[_read]()
|
---|
125 | }
|
---|
126 |
|
---|
127 | [_close] () {
|
---|
128 | if (this[_autoClose] && typeof this[_fd] === 'number') {
|
---|
129 | const fd = this[_fd]
|
---|
130 | this[_fd] = null
|
---|
131 | fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
|
---|
132 | }
|
---|
133 | }
|
---|
134 |
|
---|
135 | [_onerror] (er) {
|
---|
136 | this[_reading] = true
|
---|
137 | this[_close]()
|
---|
138 | this.emit('error', er)
|
---|
139 | }
|
---|
140 |
|
---|
141 | [_handleChunk] (br, buf) {
|
---|
142 | let ret = false
|
---|
143 | // no effect if infinite
|
---|
144 | this[_remain] -= br
|
---|
145 | if (br > 0)
|
---|
146 | ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
|
---|
147 |
|
---|
148 | if (br === 0 || this[_remain] <= 0) {
|
---|
149 | ret = false
|
---|
150 | this[_close]()
|
---|
151 | super.end()
|
---|
152 | }
|
---|
153 |
|
---|
154 | return ret
|
---|
155 | }
|
---|
156 |
|
---|
157 | emit (ev, data) {
|
---|
158 | switch (ev) {
|
---|
159 | case 'prefinish':
|
---|
160 | case 'finish':
|
---|
161 | break
|
---|
162 |
|
---|
163 | case 'drain':
|
---|
164 | if (typeof this[_fd] === 'number')
|
---|
165 | this[_read]()
|
---|
166 | break
|
---|
167 |
|
---|
168 | case 'error':
|
---|
169 | if (this[_errored])
|
---|
170 | return
|
---|
171 | this[_errored] = true
|
---|
172 | return super.emit(ev, data)
|
---|
173 |
|
---|
174 | default:
|
---|
175 | return super.emit(ev, data)
|
---|
176 | }
|
---|
177 | }
|
---|
178 | }
|
---|
179 |
|
---|
180 | class ReadStreamSync extends ReadStream {
|
---|
181 | [_open] () {
|
---|
182 | let threw = true
|
---|
183 | try {
|
---|
184 | this[_onopen](null, fs.openSync(this[_path], 'r'))
|
---|
185 | threw = false
|
---|
186 | } finally {
|
---|
187 | if (threw)
|
---|
188 | this[_close]()
|
---|
189 | }
|
---|
190 | }
|
---|
191 |
|
---|
192 | [_read] () {
|
---|
193 | let threw = true
|
---|
194 | try {
|
---|
195 | if (!this[_reading]) {
|
---|
196 | this[_reading] = true
|
---|
197 | do {
|
---|
198 | const buf = this[_makeBuf]()
|
---|
199 | /* istanbul ignore next */
|
---|
200 | const br = buf.length === 0 ? 0
|
---|
201 | : fs.readSync(this[_fd], buf, 0, buf.length, null)
|
---|
202 | if (!this[_handleChunk](br, buf))
|
---|
203 | break
|
---|
204 | } while (true)
|
---|
205 | this[_reading] = false
|
---|
206 | }
|
---|
207 | threw = false
|
---|
208 | } finally {
|
---|
209 | if (threw)
|
---|
210 | this[_close]()
|
---|
211 | }
|
---|
212 | }
|
---|
213 |
|
---|
214 | [_close] () {
|
---|
215 | if (this[_autoClose] && typeof this[_fd] === 'number') {
|
---|
216 | const fd = this[_fd]
|
---|
217 | this[_fd] = null
|
---|
218 | fs.closeSync(fd)
|
---|
219 | this.emit('close')
|
---|
220 | }
|
---|
221 | }
|
---|
222 | }
|
---|
223 |
|
---|
224 | class WriteStream extends EE {
|
---|
225 | constructor (path, opt) {
|
---|
226 | opt = opt || {}
|
---|
227 | super(opt)
|
---|
228 | this.readable = false
|
---|
229 | this.writable = true
|
---|
230 | this[_errored] = false
|
---|
231 | this[_writing] = false
|
---|
232 | this[_ended] = false
|
---|
233 | this[_needDrain] = false
|
---|
234 | this[_queue] = []
|
---|
235 | this[_path] = path
|
---|
236 | this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
|
---|
237 | this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
|
---|
238 | this[_pos] = typeof opt.start === 'number' ? opt.start : null
|
---|
239 | this[_autoClose] = typeof opt.autoClose === 'boolean' ?
|
---|
240 | opt.autoClose : true
|
---|
241 |
|
---|
242 | // truncating makes no sense when writing into the middle
|
---|
243 | const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
|
---|
244 | this[_defaultFlag] = opt.flags === undefined
|
---|
245 | this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
|
---|
246 |
|
---|
247 | if (this[_fd] === null)
|
---|
248 | this[_open]()
|
---|
249 | }
|
---|
250 |
|
---|
251 | emit (ev, data) {
|
---|
252 | if (ev === 'error') {
|
---|
253 | if (this[_errored])
|
---|
254 | return
|
---|
255 | this[_errored] = true
|
---|
256 | }
|
---|
257 | return super.emit(ev, data)
|
---|
258 | }
|
---|
259 |
|
---|
260 |
|
---|
261 | get fd () { return this[_fd] }
|
---|
262 | get path () { return this[_path] }
|
---|
263 |
|
---|
264 | [_onerror] (er) {
|
---|
265 | this[_close]()
|
---|
266 | this[_writing] = true
|
---|
267 | this.emit('error', er)
|
---|
268 | }
|
---|
269 |
|
---|
270 | [_open] () {
|
---|
271 | fs.open(this[_path], this[_flags], this[_mode],
|
---|
272 | (er, fd) => this[_onopen](er, fd))
|
---|
273 | }
|
---|
274 |
|
---|
275 | [_onopen] (er, fd) {
|
---|
276 | if (this[_defaultFlag] &&
|
---|
277 | this[_flags] === 'r+' &&
|
---|
278 | er && er.code === 'ENOENT') {
|
---|
279 | this[_flags] = 'w'
|
---|
280 | this[_open]()
|
---|
281 | } else if (er)
|
---|
282 | this[_onerror](er)
|
---|
283 | else {
|
---|
284 | this[_fd] = fd
|
---|
285 | this.emit('open', fd)
|
---|
286 | this[_flush]()
|
---|
287 | }
|
---|
288 | }
|
---|
289 |
|
---|
290 | end (buf, enc) {
|
---|
291 | if (buf)
|
---|
292 | this.write(buf, enc)
|
---|
293 |
|
---|
294 | this[_ended] = true
|
---|
295 |
|
---|
296 | // synthetic after-write logic, where drain/finish live
|
---|
297 | if (!this[_writing] && !this[_queue].length &&
|
---|
298 | typeof this[_fd] === 'number')
|
---|
299 | this[_onwrite](null, 0)
|
---|
300 | return this
|
---|
301 | }
|
---|
302 |
|
---|
303 | write (buf, enc) {
|
---|
304 | if (typeof buf === 'string')
|
---|
305 | buf = Buffer.from(buf, enc)
|
---|
306 |
|
---|
307 | if (this[_ended]) {
|
---|
308 | this.emit('error', new Error('write() after end()'))
|
---|
309 | return false
|
---|
310 | }
|
---|
311 |
|
---|
312 | if (this[_fd] === null || this[_writing] || this[_queue].length) {
|
---|
313 | this[_queue].push(buf)
|
---|
314 | this[_needDrain] = true
|
---|
315 | return false
|
---|
316 | }
|
---|
317 |
|
---|
318 | this[_writing] = true
|
---|
319 | this[_write](buf)
|
---|
320 | return true
|
---|
321 | }
|
---|
322 |
|
---|
323 | [_write] (buf) {
|
---|
324 | fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
|
---|
325 | this[_onwrite](er, bw))
|
---|
326 | }
|
---|
327 |
|
---|
328 | [_onwrite] (er, bw) {
|
---|
329 | if (er)
|
---|
330 | this[_onerror](er)
|
---|
331 | else {
|
---|
332 | if (this[_pos] !== null)
|
---|
333 | this[_pos] += bw
|
---|
334 | if (this[_queue].length)
|
---|
335 | this[_flush]()
|
---|
336 | else {
|
---|
337 | this[_writing] = false
|
---|
338 |
|
---|
339 | if (this[_ended] && !this[_finished]) {
|
---|
340 | this[_finished] = true
|
---|
341 | this[_close]()
|
---|
342 | this.emit('finish')
|
---|
343 | } else if (this[_needDrain]) {
|
---|
344 | this[_needDrain] = false
|
---|
345 | this.emit('drain')
|
---|
346 | }
|
---|
347 | }
|
---|
348 | }
|
---|
349 | }
|
---|
350 |
|
---|
351 | [_flush] () {
|
---|
352 | if (this[_queue].length === 0) {
|
---|
353 | if (this[_ended])
|
---|
354 | this[_onwrite](null, 0)
|
---|
355 | } else if (this[_queue].length === 1)
|
---|
356 | this[_write](this[_queue].pop())
|
---|
357 | else {
|
---|
358 | const iovec = this[_queue]
|
---|
359 | this[_queue] = []
|
---|
360 | writev(this[_fd], iovec, this[_pos],
|
---|
361 | (er, bw) => this[_onwrite](er, bw))
|
---|
362 | }
|
---|
363 | }
|
---|
364 |
|
---|
365 | [_close] () {
|
---|
366 | if (this[_autoClose] && typeof this[_fd] === 'number') {
|
---|
367 | const fd = this[_fd]
|
---|
368 | this[_fd] = null
|
---|
369 | fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
|
---|
370 | }
|
---|
371 | }
|
---|
372 | }
|
---|
373 |
|
---|
374 | class WriteStreamSync extends WriteStream {
|
---|
375 | [_open] () {
|
---|
376 | let fd
|
---|
377 | // only wrap in a try{} block if we know we'll retry, to avoid
|
---|
378 | // the rethrow obscuring the error's source frame in most cases.
|
---|
379 | if (this[_defaultFlag] && this[_flags] === 'r+') {
|
---|
380 | try {
|
---|
381 | fd = fs.openSync(this[_path], this[_flags], this[_mode])
|
---|
382 | } catch (er) {
|
---|
383 | if (er.code === 'ENOENT') {
|
---|
384 | this[_flags] = 'w'
|
---|
385 | return this[_open]()
|
---|
386 | } else
|
---|
387 | throw er
|
---|
388 | }
|
---|
389 | } else
|
---|
390 | fd = fs.openSync(this[_path], this[_flags], this[_mode])
|
---|
391 |
|
---|
392 | this[_onopen](null, fd)
|
---|
393 | }
|
---|
394 |
|
---|
395 | [_close] () {
|
---|
396 | if (this[_autoClose] && typeof this[_fd] === 'number') {
|
---|
397 | const fd = this[_fd]
|
---|
398 | this[_fd] = null
|
---|
399 | fs.closeSync(fd)
|
---|
400 | this.emit('close')
|
---|
401 | }
|
---|
402 | }
|
---|
403 |
|
---|
404 | [_write] (buf) {
|
---|
405 | // throw the original, but try to close if it fails
|
---|
406 | let threw = true
|
---|
407 | try {
|
---|
408 | this[_onwrite](null,
|
---|
409 | fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
|
---|
410 | threw = false
|
---|
411 | } finally {
|
---|
412 | if (threw)
|
---|
413 | try { this[_close]() } catch (_) {}
|
---|
414 | }
|
---|
415 | }
|
---|
416 | }
|
---|
417 |
|
---|
418 | exports.ReadStream = ReadStream
|
---|
419 | exports.ReadStreamSync = ReadStreamSync
|
---|
420 |
|
---|
421 | exports.WriteStream = WriteStream
|
---|
422 | exports.WriteStreamSync = WriteStreamSync
|
---|