source: node_modules/undici/lib/api/api-stream.js

main
Last change on this file was d24f17c, checked in by Aleksandar Panovski <apano77@…>, 15 months ago

Initial commit

  • Property mode set to 100644
File size: 5.2 KB
Line 
1'use strict'
2
3const { finished, PassThrough } = require('stream')
4const {
5 InvalidArgumentError,
6 InvalidReturnValueError,
7 RequestAbortedError
8} = require('../core/errors')
9const util = require('../core/util')
10const { getResolveErrorBodyCallback } = require('./util')
11const { AsyncResource } = require('async_hooks')
12const { addSignal, removeSignal } = require('./abort-signal')
13
14class StreamHandler extends AsyncResource {
15 constructor (opts, factory, callback) {
16 if (!opts || typeof opts !== 'object') {
17 throw new InvalidArgumentError('invalid opts')
18 }
19
20 const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts
21
22 try {
23 if (typeof callback !== 'function') {
24 throw new InvalidArgumentError('invalid callback')
25 }
26
27 if (typeof factory !== 'function') {
28 throw new InvalidArgumentError('invalid factory')
29 }
30
31 if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
32 throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
33 }
34
35 if (method === 'CONNECT') {
36 throw new InvalidArgumentError('invalid method')
37 }
38
39 if (onInfo && typeof onInfo !== 'function') {
40 throw new InvalidArgumentError('invalid onInfo callback')
41 }
42
43 super('UNDICI_STREAM')
44 } catch (err) {
45 if (util.isStream(body)) {
46 util.destroy(body.on('error', util.nop), err)
47 }
48 throw err
49 }
50
51 this.responseHeaders = responseHeaders || null
52 this.opaque = opaque || null
53 this.factory = factory
54 this.callback = callback
55 this.res = null
56 this.abort = null
57 this.context = null
58 this.trailers = null
59 this.body = body
60 this.onInfo = onInfo || null
61 this.throwOnError = throwOnError || false
62
63 if (util.isStream(body)) {
64 body.on('error', (err) => {
65 this.onError(err)
66 })
67 }
68
69 addSignal(this, signal)
70 }
71
72 onConnect (abort, context) {
73 if (!this.callback) {
74 throw new RequestAbortedError()
75 }
76
77 this.abort = abort
78 this.context = context
79 }
80
81 onHeaders (statusCode, rawHeaders, resume, statusMessage) {
82 const { factory, opaque, context, callback, responseHeaders } = this
83
84 const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
85
86 if (statusCode < 200) {
87 if (this.onInfo) {
88 this.onInfo({ statusCode, headers })
89 }
90 return
91 }
92
93 this.factory = null
94
95 let res
96
97 if (this.throwOnError && statusCode >= 400) {
98 const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
99 const contentType = parsedHeaders['content-type']
100 res = new PassThrough()
101
102 this.callback = null
103 this.runInAsyncScope(getResolveErrorBodyCallback, null,
104 { callback, body: res, contentType, statusCode, statusMessage, headers }
105 )
106 } else {
107 if (factory === null) {
108 return
109 }
110
111 res = this.runInAsyncScope(factory, null, {
112 statusCode,
113 headers,
114 opaque,
115 context
116 })
117
118 if (
119 !res ||
120 typeof res.write !== 'function' ||
121 typeof res.end !== 'function' ||
122 typeof res.on !== 'function'
123 ) {
124 throw new InvalidReturnValueError('expected Writable')
125 }
126
127 // TODO: Avoid finished. It registers an unnecessary amount of listeners.
128 finished(res, { readable: false }, (err) => {
129 const { callback, res, opaque, trailers, abort } = this
130
131 this.res = null
132 if (err || !res.readable) {
133 util.destroy(res, err)
134 }
135
136 this.callback = null
137 this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
138
139 if (err) {
140 abort()
141 }
142 })
143 }
144
145 res.on('drain', resume)
146
147 this.res = res
148
149 const needDrain = res.writableNeedDrain !== undefined
150 ? res.writableNeedDrain
151 : res._writableState && res._writableState.needDrain
152
153 return needDrain !== true
154 }
155
156 onData (chunk) {
157 const { res } = this
158
159 return res ? res.write(chunk) : true
160 }
161
162 onComplete (trailers) {
163 const { res } = this
164
165 removeSignal(this)
166
167 if (!res) {
168 return
169 }
170
171 this.trailers = util.parseHeaders(trailers)
172
173 res.end()
174 }
175
176 onError (err) {
177 const { res, callback, opaque, body } = this
178
179 removeSignal(this)
180
181 this.factory = null
182
183 if (res) {
184 this.res = null
185 util.destroy(res, err)
186 } else if (callback) {
187 this.callback = null
188 queueMicrotask(() => {
189 this.runInAsyncScope(callback, null, err, { opaque })
190 })
191 }
192
193 if (body) {
194 this.body = null
195 util.destroy(body, err)
196 }
197 }
198}
199
200function stream (opts, factory, callback) {
201 if (callback === undefined) {
202 return new Promise((resolve, reject) => {
203 stream.call(this, opts, factory, (err, data) => {
204 return err ? reject(err) : resolve(data)
205 })
206 })
207 }
208
209 try {
210 this.dispatch(opts, new StreamHandler(opts, factory, callback))
211 } catch (err) {
212 if (typeof callback !== 'function') {
213 throw err
214 }
215 const opaque = opts && opts.opaque
216 queueMicrotask(() => callback(err, { opaque }))
217 }
218}
219
220module.exports = stream
Note: See TracBrowser for help on using the repository browser.