1 | 'use strict'
|
---|
2 |
|
---|
3 | const {
|
---|
4 | Readable,
|
---|
5 | Duplex,
|
---|
6 | PassThrough
|
---|
7 | } = require('stream')
|
---|
8 | const {
|
---|
9 | InvalidArgumentError,
|
---|
10 | InvalidReturnValueError,
|
---|
11 | RequestAbortedError
|
---|
12 | } = require('../core/errors')
|
---|
13 | const util = require('../core/util')
|
---|
14 | const { AsyncResource } = require('async_hooks')
|
---|
15 | const { addSignal, removeSignal } = require('./abort-signal')
|
---|
16 | const assert = require('assert')
|
---|
17 |
|
---|
18 | const kResume = Symbol('resume')
|
---|
19 |
|
---|
20 | class PipelineRequest extends Readable {
|
---|
21 | constructor () {
|
---|
22 | super({ autoDestroy: true })
|
---|
23 |
|
---|
24 | this[kResume] = null
|
---|
25 | }
|
---|
26 |
|
---|
27 | _read () {
|
---|
28 | const { [kResume]: resume } = this
|
---|
29 |
|
---|
30 | if (resume) {
|
---|
31 | this[kResume] = null
|
---|
32 | resume()
|
---|
33 | }
|
---|
34 | }
|
---|
35 |
|
---|
36 | _destroy (err, callback) {
|
---|
37 | this._read()
|
---|
38 |
|
---|
39 | callback(err)
|
---|
40 | }
|
---|
41 | }
|
---|
42 |
|
---|
43 | class PipelineResponse extends Readable {
|
---|
44 | constructor (resume) {
|
---|
45 | super({ autoDestroy: true })
|
---|
46 | this[kResume] = resume
|
---|
47 | }
|
---|
48 |
|
---|
49 | _read () {
|
---|
50 | this[kResume]()
|
---|
51 | }
|
---|
52 |
|
---|
53 | _destroy (err, callback) {
|
---|
54 | if (!err && !this._readableState.endEmitted) {
|
---|
55 | err = new RequestAbortedError()
|
---|
56 | }
|
---|
57 |
|
---|
58 | callback(err)
|
---|
59 | }
|
---|
60 | }
|
---|
61 |
|
---|
62 | class PipelineHandler extends AsyncResource {
|
---|
63 | constructor (opts, handler) {
|
---|
64 | if (!opts || typeof opts !== 'object') {
|
---|
65 | throw new InvalidArgumentError('invalid opts')
|
---|
66 | }
|
---|
67 |
|
---|
68 | if (typeof handler !== 'function') {
|
---|
69 | throw new InvalidArgumentError('invalid handler')
|
---|
70 | }
|
---|
71 |
|
---|
72 | const { signal, method, opaque, onInfo, responseHeaders } = opts
|
---|
73 |
|
---|
74 | if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
|
---|
75 | throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
|
---|
76 | }
|
---|
77 |
|
---|
78 | if (method === 'CONNECT') {
|
---|
79 | throw new InvalidArgumentError('invalid method')
|
---|
80 | }
|
---|
81 |
|
---|
82 | if (onInfo && typeof onInfo !== 'function') {
|
---|
83 | throw new InvalidArgumentError('invalid onInfo callback')
|
---|
84 | }
|
---|
85 |
|
---|
86 | super('UNDICI_PIPELINE')
|
---|
87 |
|
---|
88 | this.opaque = opaque || null
|
---|
89 | this.responseHeaders = responseHeaders || null
|
---|
90 | this.handler = handler
|
---|
91 | this.abort = null
|
---|
92 | this.context = null
|
---|
93 | this.onInfo = onInfo || null
|
---|
94 |
|
---|
95 | this.req = new PipelineRequest().on('error', util.nop)
|
---|
96 |
|
---|
97 | this.ret = new Duplex({
|
---|
98 | readableObjectMode: opts.objectMode,
|
---|
99 | autoDestroy: true,
|
---|
100 | read: () => {
|
---|
101 | const { body } = this
|
---|
102 |
|
---|
103 | if (body && body.resume) {
|
---|
104 | body.resume()
|
---|
105 | }
|
---|
106 | },
|
---|
107 | write: (chunk, encoding, callback) => {
|
---|
108 | const { req } = this
|
---|
109 |
|
---|
110 | if (req.push(chunk, encoding) || req._readableState.destroyed) {
|
---|
111 | callback()
|
---|
112 | } else {
|
---|
113 | req[kResume] = callback
|
---|
114 | }
|
---|
115 | },
|
---|
116 | destroy: (err, callback) => {
|
---|
117 | const { body, req, res, ret, abort } = this
|
---|
118 |
|
---|
119 | if (!err && !ret._readableState.endEmitted) {
|
---|
120 | err = new RequestAbortedError()
|
---|
121 | }
|
---|
122 |
|
---|
123 | if (abort && err) {
|
---|
124 | abort()
|
---|
125 | }
|
---|
126 |
|
---|
127 | util.destroy(body, err)
|
---|
128 | util.destroy(req, err)
|
---|
129 | util.destroy(res, err)
|
---|
130 |
|
---|
131 | removeSignal(this)
|
---|
132 |
|
---|
133 | callback(err)
|
---|
134 | }
|
---|
135 | }).on('prefinish', () => {
|
---|
136 | const { req } = this
|
---|
137 |
|
---|
138 | // Node < 15 does not call _final in same tick.
|
---|
139 | req.push(null)
|
---|
140 | })
|
---|
141 |
|
---|
142 | this.res = null
|
---|
143 |
|
---|
144 | addSignal(this, signal)
|
---|
145 | }
|
---|
146 |
|
---|
147 | onConnect (abort, context) {
|
---|
148 | const { ret, res } = this
|
---|
149 |
|
---|
150 | assert(!res, 'pipeline cannot be retried')
|
---|
151 |
|
---|
152 | if (ret.destroyed) {
|
---|
153 | throw new RequestAbortedError()
|
---|
154 | }
|
---|
155 |
|
---|
156 | this.abort = abort
|
---|
157 | this.context = context
|
---|
158 | }
|
---|
159 |
|
---|
160 | onHeaders (statusCode, rawHeaders, resume) {
|
---|
161 | const { opaque, handler, context } = this
|
---|
162 |
|
---|
163 | if (statusCode < 200) {
|
---|
164 | if (this.onInfo) {
|
---|
165 | const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
|
---|
166 | this.onInfo({ statusCode, headers })
|
---|
167 | }
|
---|
168 | return
|
---|
169 | }
|
---|
170 |
|
---|
171 | this.res = new PipelineResponse(resume)
|
---|
172 |
|
---|
173 | let body
|
---|
174 | try {
|
---|
175 | this.handler = null
|
---|
176 | const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
|
---|
177 | body = this.runInAsyncScope(handler, null, {
|
---|
178 | statusCode,
|
---|
179 | headers,
|
---|
180 | opaque,
|
---|
181 | body: this.res,
|
---|
182 | context
|
---|
183 | })
|
---|
184 | } catch (err) {
|
---|
185 | this.res.on('error', util.nop)
|
---|
186 | throw err
|
---|
187 | }
|
---|
188 |
|
---|
189 | if (!body || typeof body.on !== 'function') {
|
---|
190 | throw new InvalidReturnValueError('expected Readable')
|
---|
191 | }
|
---|
192 |
|
---|
193 | body
|
---|
194 | .on('data', (chunk) => {
|
---|
195 | const { ret, body } = this
|
---|
196 |
|
---|
197 | if (!ret.push(chunk) && body.pause) {
|
---|
198 | body.pause()
|
---|
199 | }
|
---|
200 | })
|
---|
201 | .on('error', (err) => {
|
---|
202 | const { ret } = this
|
---|
203 |
|
---|
204 | util.destroy(ret, err)
|
---|
205 | })
|
---|
206 | .on('end', () => {
|
---|
207 | const { ret } = this
|
---|
208 |
|
---|
209 | ret.push(null)
|
---|
210 | })
|
---|
211 | .on('close', () => {
|
---|
212 | const { ret } = this
|
---|
213 |
|
---|
214 | if (!ret._readableState.ended) {
|
---|
215 | util.destroy(ret, new RequestAbortedError())
|
---|
216 | }
|
---|
217 | })
|
---|
218 |
|
---|
219 | this.body = body
|
---|
220 | }
|
---|
221 |
|
---|
222 | onData (chunk) {
|
---|
223 | const { res } = this
|
---|
224 | return res.push(chunk)
|
---|
225 | }
|
---|
226 |
|
---|
227 | onComplete (trailers) {
|
---|
228 | const { res } = this
|
---|
229 | res.push(null)
|
---|
230 | }
|
---|
231 |
|
---|
232 | onError (err) {
|
---|
233 | const { ret } = this
|
---|
234 | this.handler = null
|
---|
235 | util.destroy(ret, err)
|
---|
236 | }
|
---|
237 | }
|
---|
238 |
|
---|
239 | function pipeline (opts, handler) {
|
---|
240 | try {
|
---|
241 | const pipelineHandler = new PipelineHandler(opts, handler)
|
---|
242 | this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
|
---|
243 | return pipelineHandler.ret
|
---|
244 | } catch (err) {
|
---|
245 | return new PassThrough().destroy(err)
|
---|
246 | }
|
---|
247 | }
|
---|
248 |
|
---|
249 | module.exports = pipeline
|
---|