source: node_modules/undici/lib/api/readable.js

main
Last change on this file was d24f17c, checked in by Aleksandar Panovski <apano77@…>, 15 months ago

Initial commit

  • Property mode set to 100644
File size: 7.1 KB
Line 
1// Ported from https://github.com/nodejs/undici/pull/907
2
3'use strict'
4
5const assert = require('assert')
6const { Readable } = require('stream')
7const { RequestAbortedError, NotSupportedError, InvalidArgumentError } = require('../core/errors')
8const util = require('../core/util')
9const { ReadableStreamFrom, toUSVString } = require('../core/util')
10
11let Blob
12
13const kConsume = Symbol('kConsume')
14const kReading = Symbol('kReading')
15const kBody = Symbol('kBody')
16const kAbort = Symbol('abort')
17const kContentType = Symbol('kContentType')
18
19const noop = () => {}
20
21module.exports = class BodyReadable extends Readable {
22 constructor ({
23 resume,
24 abort,
25 contentType = '',
26 highWaterMark = 64 * 1024 // Same as nodejs fs streams.
27 }) {
28 super({
29 autoDestroy: true,
30 read: resume,
31 highWaterMark
32 })
33
34 this._readableState.dataEmitted = false
35
36 this[kAbort] = abort
37 this[kConsume] = null
38 this[kBody] = null
39 this[kContentType] = contentType
40
41 // Is stream being consumed through Readable API?
42 // This is an optimization so that we avoid checking
43 // for 'data' and 'readable' listeners in the hot path
44 // inside push().
45 this[kReading] = false
46 }
47
48 destroy (err) {
49 if (this.destroyed) {
50 // Node < 16
51 return this
52 }
53
54 if (!err && !this._readableState.endEmitted) {
55 err = new RequestAbortedError()
56 }
57
58 if (err) {
59 this[kAbort]()
60 }
61
62 return super.destroy(err)
63 }
64
65 emit (ev, ...args) {
66 if (ev === 'data') {
67 // Node < 16.7
68 this._readableState.dataEmitted = true
69 } else if (ev === 'error') {
70 // Node < 16
71 this._readableState.errorEmitted = true
72 }
73 return super.emit(ev, ...args)
74 }
75
76 on (ev, ...args) {
77 if (ev === 'data' || ev === 'readable') {
78 this[kReading] = true
79 }
80 return super.on(ev, ...args)
81 }
82
83 addListener (ev, ...args) {
84 return this.on(ev, ...args)
85 }
86
87 off (ev, ...args) {
88 const ret = super.off(ev, ...args)
89 if (ev === 'data' || ev === 'readable') {
90 this[kReading] = (
91 this.listenerCount('data') > 0 ||
92 this.listenerCount('readable') > 0
93 )
94 }
95 return ret
96 }
97
98 removeListener (ev, ...args) {
99 return this.off(ev, ...args)
100 }
101
102 push (chunk) {
103 if (this[kConsume] && chunk !== null && this.readableLength === 0) {
104 consumePush(this[kConsume], chunk)
105 return this[kReading] ? super.push(chunk) : true
106 }
107 return super.push(chunk)
108 }
109
110 // https://fetch.spec.whatwg.org/#dom-body-text
111 async text () {
112 return consume(this, 'text')
113 }
114
115 // https://fetch.spec.whatwg.org/#dom-body-json
116 async json () {
117 return consume(this, 'json')
118 }
119
120 // https://fetch.spec.whatwg.org/#dom-body-blob
121 async blob () {
122 return consume(this, 'blob')
123 }
124
125 // https://fetch.spec.whatwg.org/#dom-body-arraybuffer
126 async arrayBuffer () {
127 return consume(this, 'arrayBuffer')
128 }
129
130 // https://fetch.spec.whatwg.org/#dom-body-formdata
131 async formData () {
132 // TODO: Implement.
133 throw new NotSupportedError()
134 }
135
136 // https://fetch.spec.whatwg.org/#dom-body-bodyused
137 get bodyUsed () {
138 return util.isDisturbed(this)
139 }
140
141 // https://fetch.spec.whatwg.org/#dom-body-body
142 get body () {
143 if (!this[kBody]) {
144 this[kBody] = ReadableStreamFrom(this)
145 if (this[kConsume]) {
146 // TODO: Is this the best way to force a lock?
147 this[kBody].getReader() // Ensure stream is locked.
148 assert(this[kBody].locked)
149 }
150 }
151 return this[kBody]
152 }
153
154 dump (opts) {
155 let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144
156 const signal = opts && opts.signal
157
158 if (signal) {
159 try {
160 if (typeof signal !== 'object' || !('aborted' in signal)) {
161 throw new InvalidArgumentError('signal must be an AbortSignal')
162 }
163 util.throwIfAborted(signal)
164 } catch (err) {
165 return Promise.reject(err)
166 }
167 }
168
169 if (this.closed) {
170 return Promise.resolve(null)
171 }
172
173 return new Promise((resolve, reject) => {
174 const signalListenerCleanup = signal
175 ? util.addAbortListener(signal, () => {
176 this.destroy()
177 })
178 : noop
179
180 this
181 .on('close', function () {
182 signalListenerCleanup()
183 if (signal && signal.aborted) {
184 reject(signal.reason || Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }))
185 } else {
186 resolve(null)
187 }
188 })
189 .on('error', noop)
190 .on('data', function (chunk) {
191 limit -= chunk.length
192 if (limit <= 0) {
193 this.destroy()
194 }
195 })
196 .resume()
197 })
198 }
199}
200
201// https://streams.spec.whatwg.org/#readablestream-locked
202function isLocked (self) {
203 // Consume is an implicit lock.
204 return (self[kBody] && self[kBody].locked === true) || self[kConsume]
205}
206
207// https://fetch.spec.whatwg.org/#body-unusable
208function isUnusable (self) {
209 return util.isDisturbed(self) || isLocked(self)
210}
211
212async function consume (stream, type) {
213 if (isUnusable(stream)) {
214 throw new TypeError('unusable')
215 }
216
217 assert(!stream[kConsume])
218
219 return new Promise((resolve, reject) => {
220 stream[kConsume] = {
221 type,
222 stream,
223 resolve,
224 reject,
225 length: 0,
226 body: []
227 }
228
229 stream
230 .on('error', function (err) {
231 consumeFinish(this[kConsume], err)
232 })
233 .on('close', function () {
234 if (this[kConsume].body !== null) {
235 consumeFinish(this[kConsume], new RequestAbortedError())
236 }
237 })
238
239 process.nextTick(consumeStart, stream[kConsume])
240 })
241}
242
243function consumeStart (consume) {
244 if (consume.body === null) {
245 return
246 }
247
248 const { _readableState: state } = consume.stream
249
250 for (const chunk of state.buffer) {
251 consumePush(consume, chunk)
252 }
253
254 if (state.endEmitted) {
255 consumeEnd(this[kConsume])
256 } else {
257 consume.stream.on('end', function () {
258 consumeEnd(this[kConsume])
259 })
260 }
261
262 consume.stream.resume()
263
264 while (consume.stream.read() != null) {
265 // Loop
266 }
267}
268
269function consumeEnd (consume) {
270 const { type, body, resolve, stream, length } = consume
271
272 try {
273 if (type === 'text') {
274 resolve(toUSVString(Buffer.concat(body)))
275 } else if (type === 'json') {
276 resolve(JSON.parse(Buffer.concat(body)))
277 } else if (type === 'arrayBuffer') {
278 const dst = new Uint8Array(length)
279
280 let pos = 0
281 for (const buf of body) {
282 dst.set(buf, pos)
283 pos += buf.byteLength
284 }
285
286 resolve(dst.buffer)
287 } else if (type === 'blob') {
288 if (!Blob) {
289 Blob = require('buffer').Blob
290 }
291 resolve(new Blob(body, { type: stream[kContentType] }))
292 }
293
294 consumeFinish(consume)
295 } catch (err) {
296 stream.destroy(err)
297 }
298}
299
300function consumePush (consume, chunk) {
301 consume.length += chunk.length
302 consume.body.push(chunk)
303}
304
305function consumeFinish (consume, err) {
306 if (consume.body === null) {
307 return
308 }
309
310 if (err) {
311 consume.reject(err)
312 } else {
313 consume.resolve()
314 }
315
316 consume.type = null
317 consume.stream = null
318 consume.resolve = null
319 consume.reject = null
320 consume.length = 0
321 consume.body = null
322}
Note: See TracBrowser for help on using the repository browser.