source: trip-planner-front/node_modules/fs-minipass/index.js@ 76712b2

Last change on this file since 76712b2 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 9.8 KB
RevLine 
[6a3a178]1'use strict'
2const MiniPass = require('minipass')
3const EE = require('events').EventEmitter
4const fs = require('fs')
5
6let writev = fs.writev
7/* istanbul ignore next */
8if (!writev) {
9 // This entire block can be removed if support for earlier than Node.js
10 // 12.9.0 is not needed.
11 const binding = process.binding('fs')
12 const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
13
14 writev = (fd, iovec, pos, cb) => {
15 const done = (er, bw) => cb(er, bw, iovec)
16 const req = new FSReqWrap()
17 req.oncomplete = done
18 binding.writeBuffers(fd, iovec, pos, req)
19 }
20}
21
22const _autoClose = Symbol('_autoClose')
23const _close = Symbol('_close')
24const _ended = Symbol('_ended')
25const _fd = Symbol('_fd')
26const _finished = Symbol('_finished')
27const _flags = Symbol('_flags')
28const _flush = Symbol('_flush')
29const _handleChunk = Symbol('_handleChunk')
30const _makeBuf = Symbol('_makeBuf')
31const _mode = Symbol('_mode')
32const _needDrain = Symbol('_needDrain')
33const _onerror = Symbol('_onerror')
34const _onopen = Symbol('_onopen')
35const _onread = Symbol('_onread')
36const _onwrite = Symbol('_onwrite')
37const _open = Symbol('_open')
38const _path = Symbol('_path')
39const _pos = Symbol('_pos')
40const _queue = Symbol('_queue')
41const _read = Symbol('_read')
42const _readSize = Symbol('_readSize')
43const _reading = Symbol('_reading')
44const _remain = Symbol('_remain')
45const _size = Symbol('_size')
46const _write = Symbol('_write')
47const _writing = Symbol('_writing')
48const _defaultFlag = Symbol('_defaultFlag')
49const _errored = Symbol('_errored')
50
51class ReadStream extends MiniPass {
52 constructor (path, opt) {
53 opt = opt || {}
54 super(opt)
55
56 this.readable = true
57 this.writable = false
58
59 if (typeof path !== 'string')
60 throw new TypeError('path must be a string')
61
62 this[_errored] = false
63 this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
64 this[_path] = path
65 this[_readSize] = opt.readSize || 16*1024*1024
66 this[_reading] = false
67 this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
68 this[_remain] = this[_size]
69 this[_autoClose] = typeof opt.autoClose === 'boolean' ?
70 opt.autoClose : true
71
72 if (typeof this[_fd] === 'number')
73 this[_read]()
74 else
75 this[_open]()
76 }
77
78 get fd () { return this[_fd] }
79 get path () { return this[_path] }
80
81 write () {
82 throw new TypeError('this is a readable stream')
83 }
84
85 end () {
86 throw new TypeError('this is a readable stream')
87 }
88
89 [_open] () {
90 fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
91 }
92
93 [_onopen] (er, fd) {
94 if (er)
95 this[_onerror](er)
96 else {
97 this[_fd] = fd
98 this.emit('open', fd)
99 this[_read]()
100 }
101 }
102
103 [_makeBuf] () {
104 return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
105 }
106
107 [_read] () {
108 if (!this[_reading]) {
109 this[_reading] = true
110 const buf = this[_makeBuf]()
111 /* istanbul ignore if */
112 if (buf.length === 0)
113 return process.nextTick(() => this[_onread](null, 0, buf))
114 fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
115 this[_onread](er, br, buf))
116 }
117 }
118
119 [_onread] (er, br, buf) {
120 this[_reading] = false
121 if (er)
122 this[_onerror](er)
123 else if (this[_handleChunk](br, buf))
124 this[_read]()
125 }
126
127 [_close] () {
128 if (this[_autoClose] && typeof this[_fd] === 'number') {
129 const fd = this[_fd]
130 this[_fd] = null
131 fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
132 }
133 }
134
135 [_onerror] (er) {
136 this[_reading] = true
137 this[_close]()
138 this.emit('error', er)
139 }
140
141 [_handleChunk] (br, buf) {
142 let ret = false
143 // no effect if infinite
144 this[_remain] -= br
145 if (br > 0)
146 ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
147
148 if (br === 0 || this[_remain] <= 0) {
149 ret = false
150 this[_close]()
151 super.end()
152 }
153
154 return ret
155 }
156
157 emit (ev, data) {
158 switch (ev) {
159 case 'prefinish':
160 case 'finish':
161 break
162
163 case 'drain':
164 if (typeof this[_fd] === 'number')
165 this[_read]()
166 break
167
168 case 'error':
169 if (this[_errored])
170 return
171 this[_errored] = true
172 return super.emit(ev, data)
173
174 default:
175 return super.emit(ev, data)
176 }
177 }
178}
179
180class ReadStreamSync extends ReadStream {
181 [_open] () {
182 let threw = true
183 try {
184 this[_onopen](null, fs.openSync(this[_path], 'r'))
185 threw = false
186 } finally {
187 if (threw)
188 this[_close]()
189 }
190 }
191
192 [_read] () {
193 let threw = true
194 try {
195 if (!this[_reading]) {
196 this[_reading] = true
197 do {
198 const buf = this[_makeBuf]()
199 /* istanbul ignore next */
200 const br = buf.length === 0 ? 0
201 : fs.readSync(this[_fd], buf, 0, buf.length, null)
202 if (!this[_handleChunk](br, buf))
203 break
204 } while (true)
205 this[_reading] = false
206 }
207 threw = false
208 } finally {
209 if (threw)
210 this[_close]()
211 }
212 }
213
214 [_close] () {
215 if (this[_autoClose] && typeof this[_fd] === 'number') {
216 const fd = this[_fd]
217 this[_fd] = null
218 fs.closeSync(fd)
219 this.emit('close')
220 }
221 }
222}
223
224class WriteStream extends EE {
225 constructor (path, opt) {
226 opt = opt || {}
227 super(opt)
228 this.readable = false
229 this.writable = true
230 this[_errored] = false
231 this[_writing] = false
232 this[_ended] = false
233 this[_needDrain] = false
234 this[_queue] = []
235 this[_path] = path
236 this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
237 this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
238 this[_pos] = typeof opt.start === 'number' ? opt.start : null
239 this[_autoClose] = typeof opt.autoClose === 'boolean' ?
240 opt.autoClose : true
241
242 // truncating makes no sense when writing into the middle
243 const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
244 this[_defaultFlag] = opt.flags === undefined
245 this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
246
247 if (this[_fd] === null)
248 this[_open]()
249 }
250
251 emit (ev, data) {
252 if (ev === 'error') {
253 if (this[_errored])
254 return
255 this[_errored] = true
256 }
257 return super.emit(ev, data)
258 }
259
260
261 get fd () { return this[_fd] }
262 get path () { return this[_path] }
263
264 [_onerror] (er) {
265 this[_close]()
266 this[_writing] = true
267 this.emit('error', er)
268 }
269
270 [_open] () {
271 fs.open(this[_path], this[_flags], this[_mode],
272 (er, fd) => this[_onopen](er, fd))
273 }
274
275 [_onopen] (er, fd) {
276 if (this[_defaultFlag] &&
277 this[_flags] === 'r+' &&
278 er && er.code === 'ENOENT') {
279 this[_flags] = 'w'
280 this[_open]()
281 } else if (er)
282 this[_onerror](er)
283 else {
284 this[_fd] = fd
285 this.emit('open', fd)
286 this[_flush]()
287 }
288 }
289
290 end (buf, enc) {
291 if (buf)
292 this.write(buf, enc)
293
294 this[_ended] = true
295
296 // synthetic after-write logic, where drain/finish live
297 if (!this[_writing] && !this[_queue].length &&
298 typeof this[_fd] === 'number')
299 this[_onwrite](null, 0)
300 return this
301 }
302
303 write (buf, enc) {
304 if (typeof buf === 'string')
305 buf = Buffer.from(buf, enc)
306
307 if (this[_ended]) {
308 this.emit('error', new Error('write() after end()'))
309 return false
310 }
311
312 if (this[_fd] === null || this[_writing] || this[_queue].length) {
313 this[_queue].push(buf)
314 this[_needDrain] = true
315 return false
316 }
317
318 this[_writing] = true
319 this[_write](buf)
320 return true
321 }
322
323 [_write] (buf) {
324 fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
325 this[_onwrite](er, bw))
326 }
327
328 [_onwrite] (er, bw) {
329 if (er)
330 this[_onerror](er)
331 else {
332 if (this[_pos] !== null)
333 this[_pos] += bw
334 if (this[_queue].length)
335 this[_flush]()
336 else {
337 this[_writing] = false
338
339 if (this[_ended] && !this[_finished]) {
340 this[_finished] = true
341 this[_close]()
342 this.emit('finish')
343 } else if (this[_needDrain]) {
344 this[_needDrain] = false
345 this.emit('drain')
346 }
347 }
348 }
349 }
350
351 [_flush] () {
352 if (this[_queue].length === 0) {
353 if (this[_ended])
354 this[_onwrite](null, 0)
355 } else if (this[_queue].length === 1)
356 this[_write](this[_queue].pop())
357 else {
358 const iovec = this[_queue]
359 this[_queue] = []
360 writev(this[_fd], iovec, this[_pos],
361 (er, bw) => this[_onwrite](er, bw))
362 }
363 }
364
365 [_close] () {
366 if (this[_autoClose] && typeof this[_fd] === 'number') {
367 const fd = this[_fd]
368 this[_fd] = null
369 fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
370 }
371 }
372}
373
374class WriteStreamSync extends WriteStream {
375 [_open] () {
376 let fd
377 // only wrap in a try{} block if we know we'll retry, to avoid
378 // the rethrow obscuring the error's source frame in most cases.
379 if (this[_defaultFlag] && this[_flags] === 'r+') {
380 try {
381 fd = fs.openSync(this[_path], this[_flags], this[_mode])
382 } catch (er) {
383 if (er.code === 'ENOENT') {
384 this[_flags] = 'w'
385 return this[_open]()
386 } else
387 throw er
388 }
389 } else
390 fd = fs.openSync(this[_path], this[_flags], this[_mode])
391
392 this[_onopen](null, fd)
393 }
394
395 [_close] () {
396 if (this[_autoClose] && typeof this[_fd] === 'number') {
397 const fd = this[_fd]
398 this[_fd] = null
399 fs.closeSync(fd)
400 this.emit('close')
401 }
402 }
403
404 [_write] (buf) {
405 // throw the original, but try to close if it fails
406 let threw = true
407 try {
408 this[_onwrite](null,
409 fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
410 threw = false
411 } finally {
412 if (threw)
413 try { this[_close]() } catch (_) {}
414 }
415 }
416}
417
418exports.ReadStream = ReadStream
419exports.ReadStreamSync = ReadStreamSync
420
421exports.WriteStream = WriteStream
422exports.WriteStreamSync = WriteStreamSync
Note: See TracBrowser for help on using the repository browser.