source: node_modules/undici/lib/api/api-pipeline.js

main
Last change on this file was d24f17c, checked in by Aleksandar Panovski <apano77@…>, 15 months ago

Initial commit

  • Property mode set to 100644
File size: 5.3 KB
Line 
1'use strict'
2
3const {
4 Readable,
5 Duplex,
6 PassThrough
7} = require('stream')
8const {
9 InvalidArgumentError,
10 InvalidReturnValueError,
11 RequestAbortedError
12} = require('../core/errors')
13const util = require('../core/util')
14const { AsyncResource } = require('async_hooks')
15const { addSignal, removeSignal } = require('./abort-signal')
16const assert = require('assert')
17
18const kResume = Symbol('resume')
19
20class PipelineRequest extends Readable {
21 constructor () {
22 super({ autoDestroy: true })
23
24 this[kResume] = null
25 }
26
27 _read () {
28 const { [kResume]: resume } = this
29
30 if (resume) {
31 this[kResume] = null
32 resume()
33 }
34 }
35
36 _destroy (err, callback) {
37 this._read()
38
39 callback(err)
40 }
41}
42
43class PipelineResponse extends Readable {
44 constructor (resume) {
45 super({ autoDestroy: true })
46 this[kResume] = resume
47 }
48
49 _read () {
50 this[kResume]()
51 }
52
53 _destroy (err, callback) {
54 if (!err && !this._readableState.endEmitted) {
55 err = new RequestAbortedError()
56 }
57
58 callback(err)
59 }
60}
61
62class PipelineHandler extends AsyncResource {
63 constructor (opts, handler) {
64 if (!opts || typeof opts !== 'object') {
65 throw new InvalidArgumentError('invalid opts')
66 }
67
68 if (typeof handler !== 'function') {
69 throw new InvalidArgumentError('invalid handler')
70 }
71
72 const { signal, method, opaque, onInfo, responseHeaders } = opts
73
74 if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
75 throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
76 }
77
78 if (method === 'CONNECT') {
79 throw new InvalidArgumentError('invalid method')
80 }
81
82 if (onInfo && typeof onInfo !== 'function') {
83 throw new InvalidArgumentError('invalid onInfo callback')
84 }
85
86 super('UNDICI_PIPELINE')
87
88 this.opaque = opaque || null
89 this.responseHeaders = responseHeaders || null
90 this.handler = handler
91 this.abort = null
92 this.context = null
93 this.onInfo = onInfo || null
94
95 this.req = new PipelineRequest().on('error', util.nop)
96
97 this.ret = new Duplex({
98 readableObjectMode: opts.objectMode,
99 autoDestroy: true,
100 read: () => {
101 const { body } = this
102
103 if (body && body.resume) {
104 body.resume()
105 }
106 },
107 write: (chunk, encoding, callback) => {
108 const { req } = this
109
110 if (req.push(chunk, encoding) || req._readableState.destroyed) {
111 callback()
112 } else {
113 req[kResume] = callback
114 }
115 },
116 destroy: (err, callback) => {
117 const { body, req, res, ret, abort } = this
118
119 if (!err && !ret._readableState.endEmitted) {
120 err = new RequestAbortedError()
121 }
122
123 if (abort && err) {
124 abort()
125 }
126
127 util.destroy(body, err)
128 util.destroy(req, err)
129 util.destroy(res, err)
130
131 removeSignal(this)
132
133 callback(err)
134 }
135 }).on('prefinish', () => {
136 const { req } = this
137
138 // Node < 15 does not call _final in same tick.
139 req.push(null)
140 })
141
142 this.res = null
143
144 addSignal(this, signal)
145 }
146
147 onConnect (abort, context) {
148 const { ret, res } = this
149
150 assert(!res, 'pipeline cannot be retried')
151
152 if (ret.destroyed) {
153 throw new RequestAbortedError()
154 }
155
156 this.abort = abort
157 this.context = context
158 }
159
160 onHeaders (statusCode, rawHeaders, resume) {
161 const { opaque, handler, context } = this
162
163 if (statusCode < 200) {
164 if (this.onInfo) {
165 const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
166 this.onInfo({ statusCode, headers })
167 }
168 return
169 }
170
171 this.res = new PipelineResponse(resume)
172
173 let body
174 try {
175 this.handler = null
176 const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
177 body = this.runInAsyncScope(handler, null, {
178 statusCode,
179 headers,
180 opaque,
181 body: this.res,
182 context
183 })
184 } catch (err) {
185 this.res.on('error', util.nop)
186 throw err
187 }
188
189 if (!body || typeof body.on !== 'function') {
190 throw new InvalidReturnValueError('expected Readable')
191 }
192
193 body
194 .on('data', (chunk) => {
195 const { ret, body } = this
196
197 if (!ret.push(chunk) && body.pause) {
198 body.pause()
199 }
200 })
201 .on('error', (err) => {
202 const { ret } = this
203
204 util.destroy(ret, err)
205 })
206 .on('end', () => {
207 const { ret } = this
208
209 ret.push(null)
210 })
211 .on('close', () => {
212 const { ret } = this
213
214 if (!ret._readableState.ended) {
215 util.destroy(ret, new RequestAbortedError())
216 }
217 })
218
219 this.body = body
220 }
221
222 onData (chunk) {
223 const { res } = this
224 return res.push(chunk)
225 }
226
227 onComplete (trailers) {
228 const { res } = this
229 res.push(null)
230 }
231
232 onError (err) {
233 const { ret } = this
234 this.handler = null
235 util.destroy(ret, err)
236 }
237}
238
239function pipeline (opts, handler) {
240 try {
241 const pipelineHandler = new PipelineHandler(opts, handler)
242 this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
243 return pipelineHandler.ret
244 } catch (err) {
245 return new PassThrough().destroy(err)
246 }
247}
248
249module.exports = pipeline
Note: See TracBrowser for help on using the repository browser.