1 | // @ts-check
|
---|
2 |
|
---|
3 | 'use strict'
|
---|
4 |
|
---|
5 | /* global WebAssembly */
|
---|
6 |
|
---|
7 | const assert = require('assert')
|
---|
8 | const net = require('net')
|
---|
9 | const http = require('http')
|
---|
10 | const { pipeline } = require('stream')
|
---|
11 | const util = require('./core/util')
|
---|
12 | const timers = require('./timers')
|
---|
13 | const Request = require('./core/request')
|
---|
14 | const DispatcherBase = require('./dispatcher-base')
|
---|
15 | const {
|
---|
16 | RequestContentLengthMismatchError,
|
---|
17 | ResponseContentLengthMismatchError,
|
---|
18 | InvalidArgumentError,
|
---|
19 | RequestAbortedError,
|
---|
20 | HeadersTimeoutError,
|
---|
21 | HeadersOverflowError,
|
---|
22 | SocketError,
|
---|
23 | InformationalError,
|
---|
24 | BodyTimeoutError,
|
---|
25 | HTTPParserError,
|
---|
26 | ResponseExceededMaxSizeError,
|
---|
27 | ClientDestroyedError
|
---|
28 | } = require('./core/errors')
|
---|
29 | const buildConnector = require('./core/connect')
|
---|
30 | const {
|
---|
31 | kUrl,
|
---|
32 | kReset,
|
---|
33 | kServerName,
|
---|
34 | kClient,
|
---|
35 | kBusy,
|
---|
36 | kParser,
|
---|
37 | kConnect,
|
---|
38 | kBlocking,
|
---|
39 | kResuming,
|
---|
40 | kRunning,
|
---|
41 | kPending,
|
---|
42 | kSize,
|
---|
43 | kWriting,
|
---|
44 | kQueue,
|
---|
45 | kConnected,
|
---|
46 | kConnecting,
|
---|
47 | kNeedDrain,
|
---|
48 | kNoRef,
|
---|
49 | kKeepAliveDefaultTimeout,
|
---|
50 | kHostHeader,
|
---|
51 | kPendingIdx,
|
---|
52 | kRunningIdx,
|
---|
53 | kError,
|
---|
54 | kPipelining,
|
---|
55 | kSocket,
|
---|
56 | kKeepAliveTimeoutValue,
|
---|
57 | kMaxHeadersSize,
|
---|
58 | kKeepAliveMaxTimeout,
|
---|
59 | kKeepAliveTimeoutThreshold,
|
---|
60 | kHeadersTimeout,
|
---|
61 | kBodyTimeout,
|
---|
62 | kStrictContentLength,
|
---|
63 | kConnector,
|
---|
64 | kMaxRedirections,
|
---|
65 | kMaxRequests,
|
---|
66 | kCounter,
|
---|
67 | kClose,
|
---|
68 | kDestroy,
|
---|
69 | kDispatch,
|
---|
70 | kInterceptors,
|
---|
71 | kLocalAddress,
|
---|
72 | kMaxResponseSize,
|
---|
73 | kHTTPConnVersion,
|
---|
74 | // HTTP2
|
---|
75 | kHost,
|
---|
76 | kHTTP2Session,
|
---|
77 | kHTTP2SessionState,
|
---|
78 | kHTTP2BuildRequest,
|
---|
79 | kHTTP2CopyHeaders,
|
---|
80 | kHTTP1BuildRequest
|
---|
81 | } = require('./core/symbols')
|
---|
82 |
|
---|
83 | /** @type {import('http2')} */
|
---|
84 | let http2
|
---|
85 | try {
|
---|
86 | http2 = require('http2')
|
---|
87 | } catch {
|
---|
88 | // @ts-ignore
|
---|
89 | http2 = { constants: {} }
|
---|
90 | }
|
---|
91 |
|
---|
92 | const {
|
---|
93 | constants: {
|
---|
94 | HTTP2_HEADER_AUTHORITY,
|
---|
95 | HTTP2_HEADER_METHOD,
|
---|
96 | HTTP2_HEADER_PATH,
|
---|
97 | HTTP2_HEADER_SCHEME,
|
---|
98 | HTTP2_HEADER_CONTENT_LENGTH,
|
---|
99 | HTTP2_HEADER_EXPECT,
|
---|
100 | HTTP2_HEADER_STATUS
|
---|
101 | }
|
---|
102 | } = http2
|
---|
103 |
|
---|
104 | // Experimental
|
---|
105 | let h2ExperimentalWarned = false
|
---|
106 |
|
---|
107 | const FastBuffer = Buffer[Symbol.species]
|
---|
108 |
|
---|
109 | const kClosedResolve = Symbol('kClosedResolve')
|
---|
110 |
|
---|
111 | const channels = {}
|
---|
112 |
|
---|
113 | try {
|
---|
114 | const diagnosticsChannel = require('diagnostics_channel')
|
---|
115 | channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders')
|
---|
116 | channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect')
|
---|
117 | channels.connectError = diagnosticsChannel.channel('undici:client:connectError')
|
---|
118 | channels.connected = diagnosticsChannel.channel('undici:client:connected')
|
---|
119 | } catch {
|
---|
120 | channels.sendHeaders = { hasSubscribers: false }
|
---|
121 | channels.beforeConnect = { hasSubscribers: false }
|
---|
122 | channels.connectError = { hasSubscribers: false }
|
---|
123 | channels.connected = { hasSubscribers: false }
|
---|
124 | }
|
---|
125 |
|
---|
126 | /**
|
---|
127 | * @type {import('../types/client').default}
|
---|
128 | */
|
---|
129 | class Client extends DispatcherBase {
|
---|
130 | /**
|
---|
131 | *
|
---|
132 | * @param {string|URL} url
|
---|
133 | * @param {import('../types/client').Client.Options} options
|
---|
134 | */
|
---|
135 | constructor (url, {
|
---|
136 | interceptors,
|
---|
137 | maxHeaderSize,
|
---|
138 | headersTimeout,
|
---|
139 | socketTimeout,
|
---|
140 | requestTimeout,
|
---|
141 | connectTimeout,
|
---|
142 | bodyTimeout,
|
---|
143 | idleTimeout,
|
---|
144 | keepAlive,
|
---|
145 | keepAliveTimeout,
|
---|
146 | maxKeepAliveTimeout,
|
---|
147 | keepAliveMaxTimeout,
|
---|
148 | keepAliveTimeoutThreshold,
|
---|
149 | socketPath,
|
---|
150 | pipelining,
|
---|
151 | tls,
|
---|
152 | strictContentLength,
|
---|
153 | maxCachedSessions,
|
---|
154 | maxRedirections,
|
---|
155 | connect,
|
---|
156 | maxRequestsPerClient,
|
---|
157 | localAddress,
|
---|
158 | maxResponseSize,
|
---|
159 | autoSelectFamily,
|
---|
160 | autoSelectFamilyAttemptTimeout,
|
---|
161 | // h2
|
---|
162 | allowH2,
|
---|
163 | maxConcurrentStreams
|
---|
164 | } = {}) {
|
---|
165 | super()
|
---|
166 |
|
---|
167 | if (keepAlive !== undefined) {
|
---|
168 | throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
|
---|
169 | }
|
---|
170 |
|
---|
171 | if (socketTimeout !== undefined) {
|
---|
172 | throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
|
---|
173 | }
|
---|
174 |
|
---|
175 | if (requestTimeout !== undefined) {
|
---|
176 | throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
|
---|
177 | }
|
---|
178 |
|
---|
179 | if (idleTimeout !== undefined) {
|
---|
180 | throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
|
---|
181 | }
|
---|
182 |
|
---|
183 | if (maxKeepAliveTimeout !== undefined) {
|
---|
184 | throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
|
---|
185 | }
|
---|
186 |
|
---|
187 | if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
|
---|
188 | throw new InvalidArgumentError('invalid maxHeaderSize')
|
---|
189 | }
|
---|
190 |
|
---|
191 | if (socketPath != null && typeof socketPath !== 'string') {
|
---|
192 | throw new InvalidArgumentError('invalid socketPath')
|
---|
193 | }
|
---|
194 |
|
---|
195 | if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
|
---|
196 | throw new InvalidArgumentError('invalid connectTimeout')
|
---|
197 | }
|
---|
198 |
|
---|
199 | if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
|
---|
200 | throw new InvalidArgumentError('invalid keepAliveTimeout')
|
---|
201 | }
|
---|
202 |
|
---|
203 | if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
|
---|
204 | throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
|
---|
205 | }
|
---|
206 |
|
---|
207 | if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
|
---|
208 | throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
|
---|
209 | }
|
---|
210 |
|
---|
211 | if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
|
---|
212 | throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
|
---|
213 | }
|
---|
214 |
|
---|
215 | if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
|
---|
216 | throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
|
---|
217 | }
|
---|
218 |
|
---|
219 | if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
|
---|
220 | throw new InvalidArgumentError('connect must be a function or an object')
|
---|
221 | }
|
---|
222 |
|
---|
223 | if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
|
---|
224 | throw new InvalidArgumentError('maxRedirections must be a positive number')
|
---|
225 | }
|
---|
226 |
|
---|
227 | if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
|
---|
228 | throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
|
---|
229 | }
|
---|
230 |
|
---|
231 | if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
|
---|
232 | throw new InvalidArgumentError('localAddress must be valid string IP address')
|
---|
233 | }
|
---|
234 |
|
---|
235 | if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
|
---|
236 | throw new InvalidArgumentError('maxResponseSize must be a positive number')
|
---|
237 | }
|
---|
238 |
|
---|
239 | if (
|
---|
240 | autoSelectFamilyAttemptTimeout != null &&
|
---|
241 | (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
|
---|
242 | ) {
|
---|
243 | throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
|
---|
244 | }
|
---|
245 |
|
---|
246 | // h2
|
---|
247 | if (allowH2 != null && typeof allowH2 !== 'boolean') {
|
---|
248 | throw new InvalidArgumentError('allowH2 must be a valid boolean value')
|
---|
249 | }
|
---|
250 |
|
---|
251 | if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
|
---|
252 | throw new InvalidArgumentError('maxConcurrentStreams must be a possitive integer, greater than 0')
|
---|
253 | }
|
---|
254 |
|
---|
255 | if (typeof connect !== 'function') {
|
---|
256 | connect = buildConnector({
|
---|
257 | ...tls,
|
---|
258 | maxCachedSessions,
|
---|
259 | allowH2,
|
---|
260 | socketPath,
|
---|
261 | timeout: connectTimeout,
|
---|
262 | ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
|
---|
263 | ...connect
|
---|
264 | })
|
---|
265 | }
|
---|
266 |
|
---|
267 | this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client)
|
---|
268 | ? interceptors.Client
|
---|
269 | : [createRedirectInterceptor({ maxRedirections })]
|
---|
270 | this[kUrl] = util.parseOrigin(url)
|
---|
271 | this[kConnector] = connect
|
---|
272 | this[kSocket] = null
|
---|
273 | this[kPipelining] = pipelining != null ? pipelining : 1
|
---|
274 | this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
|
---|
275 | this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
|
---|
276 | this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
|
---|
277 | this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold
|
---|
278 | this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
|
---|
279 | this[kServerName] = null
|
---|
280 | this[kLocalAddress] = localAddress != null ? localAddress : null
|
---|
281 | this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
|
---|
282 | this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
|
---|
283 | this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
|
---|
284 | this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
|
---|
285 | this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
|
---|
286 | this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
|
---|
287 | this[kMaxRedirections] = maxRedirections
|
---|
288 | this[kMaxRequests] = maxRequestsPerClient
|
---|
289 | this[kClosedResolve] = null
|
---|
290 | this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
|
---|
291 | this[kHTTPConnVersion] = 'h1'
|
---|
292 |
|
---|
293 | // HTTP/2
|
---|
294 | this[kHTTP2Session] = null
|
---|
295 | this[kHTTP2SessionState] = !allowH2
|
---|
296 | ? null
|
---|
297 | : {
|
---|
298 | // streams: null, // Fixed queue of streams - For future support of `push`
|
---|
299 | openStreams: 0, // Keep track of them to decide wether or not unref the session
|
---|
300 | maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
|
---|
301 | }
|
---|
302 | this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`
|
---|
303 |
|
---|
304 | // kQueue is built up of 3 sections separated by
|
---|
305 | // the kRunningIdx and kPendingIdx indices.
|
---|
306 | // | complete | running | pending |
|
---|
307 | // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
|
---|
308 | // kRunningIdx points to the first running element.
|
---|
309 | // kPendingIdx points to the first pending element.
|
---|
310 | // This implements a fast queue with an amortized
|
---|
311 | // time of O(1).
|
---|
312 |
|
---|
313 | this[kQueue] = []
|
---|
314 | this[kRunningIdx] = 0
|
---|
315 | this[kPendingIdx] = 0
|
---|
316 | }
|
---|
317 |
|
---|
318 | get pipelining () {
|
---|
319 | return this[kPipelining]
|
---|
320 | }
|
---|
321 |
|
---|
322 | set pipelining (value) {
|
---|
323 | this[kPipelining] = value
|
---|
324 | resume(this, true)
|
---|
325 | }
|
---|
326 |
|
---|
327 | get [kPending] () {
|
---|
328 | return this[kQueue].length - this[kPendingIdx]
|
---|
329 | }
|
---|
330 |
|
---|
331 | get [kRunning] () {
|
---|
332 | return this[kPendingIdx] - this[kRunningIdx]
|
---|
333 | }
|
---|
334 |
|
---|
335 | get [kSize] () {
|
---|
336 | return this[kQueue].length - this[kRunningIdx]
|
---|
337 | }
|
---|
338 |
|
---|
339 | get [kConnected] () {
|
---|
340 | return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
|
---|
341 | }
|
---|
342 |
|
---|
343 | get [kBusy] () {
|
---|
344 | const socket = this[kSocket]
|
---|
345 | return (
|
---|
346 | (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
|
---|
347 | (this[kSize] >= (this[kPipelining] || 1)) ||
|
---|
348 | this[kPending] > 0
|
---|
349 | )
|
---|
350 | }
|
---|
351 |
|
---|
352 | /* istanbul ignore: only used for test */
|
---|
353 | [kConnect] (cb) {
|
---|
354 | connect(this)
|
---|
355 | this.once('connect', cb)
|
---|
356 | }
|
---|
357 |
|
---|
358 | [kDispatch] (opts, handler) {
|
---|
359 | const origin = opts.origin || this[kUrl].origin
|
---|
360 |
|
---|
361 | const request = this[kHTTPConnVersion] === 'h2'
|
---|
362 | ? Request[kHTTP2BuildRequest](origin, opts, handler)
|
---|
363 | : Request[kHTTP1BuildRequest](origin, opts, handler)
|
---|
364 |
|
---|
365 | this[kQueue].push(request)
|
---|
366 | if (this[kResuming]) {
|
---|
367 | // Do nothing.
|
---|
368 | } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
|
---|
369 | // Wait a tick in case stream/iterator is ended in the same tick.
|
---|
370 | this[kResuming] = 1
|
---|
371 | process.nextTick(resume, this)
|
---|
372 | } else {
|
---|
373 | resume(this, true)
|
---|
374 | }
|
---|
375 |
|
---|
376 | if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
|
---|
377 | this[kNeedDrain] = 2
|
---|
378 | }
|
---|
379 |
|
---|
380 | return this[kNeedDrain] < 2
|
---|
381 | }
|
---|
382 |
|
---|
383 | async [kClose] () {
|
---|
384 | // TODO: for H2 we need to gracefully flush the remaining enqueued
|
---|
385 | // request and close each stream.
|
---|
386 | return new Promise((resolve) => {
|
---|
387 | if (!this[kSize]) {
|
---|
388 | resolve(null)
|
---|
389 | } else {
|
---|
390 | this[kClosedResolve] = resolve
|
---|
391 | }
|
---|
392 | })
|
---|
393 | }
|
---|
394 |
|
---|
395 | async [kDestroy] (err) {
|
---|
396 | return new Promise((resolve) => {
|
---|
397 | const requests = this[kQueue].splice(this[kPendingIdx])
|
---|
398 | for (let i = 0; i < requests.length; i++) {
|
---|
399 | const request = requests[i]
|
---|
400 | errorRequest(this, request, err)
|
---|
401 | }
|
---|
402 |
|
---|
403 | const callback = () => {
|
---|
404 | if (this[kClosedResolve]) {
|
---|
405 | // TODO (fix): Should we error here with ClientDestroyedError?
|
---|
406 | this[kClosedResolve]()
|
---|
407 | this[kClosedResolve] = null
|
---|
408 | }
|
---|
409 | resolve()
|
---|
410 | }
|
---|
411 |
|
---|
412 | if (this[kHTTP2Session] != null) {
|
---|
413 | util.destroy(this[kHTTP2Session], err)
|
---|
414 | this[kHTTP2Session] = null
|
---|
415 | this[kHTTP2SessionState] = null
|
---|
416 | }
|
---|
417 |
|
---|
418 | if (!this[kSocket]) {
|
---|
419 | queueMicrotask(callback)
|
---|
420 | } else {
|
---|
421 | util.destroy(this[kSocket].on('close', callback), err)
|
---|
422 | }
|
---|
423 |
|
---|
424 | resume(this)
|
---|
425 | })
|
---|
426 | }
|
---|
427 | }
|
---|
428 |
|
---|
429 | function onHttp2SessionError (err) {
|
---|
430 | assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
|
---|
431 |
|
---|
432 | this[kSocket][kError] = err
|
---|
433 |
|
---|
434 | onError(this[kClient], err)
|
---|
435 | }
|
---|
436 |
|
---|
437 | function onHttp2FrameError (type, code, id) {
|
---|
438 | const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
|
---|
439 |
|
---|
440 | if (id === 0) {
|
---|
441 | this[kSocket][kError] = err
|
---|
442 | onError(this[kClient], err)
|
---|
443 | }
|
---|
444 | }
|
---|
445 |
|
---|
446 | function onHttp2SessionEnd () {
|
---|
447 | util.destroy(this, new SocketError('other side closed'))
|
---|
448 | util.destroy(this[kSocket], new SocketError('other side closed'))
|
---|
449 | }
|
---|
450 |
|
---|
451 | function onHTTP2GoAway (code) {
|
---|
452 | const client = this[kClient]
|
---|
453 | const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
|
---|
454 | client[kSocket] = null
|
---|
455 | client[kHTTP2Session] = null
|
---|
456 |
|
---|
457 | if (client.destroyed) {
|
---|
458 | assert(this[kPending] === 0)
|
---|
459 |
|
---|
460 | // Fail entire queue.
|
---|
461 | const requests = client[kQueue].splice(client[kRunningIdx])
|
---|
462 | for (let i = 0; i < requests.length; i++) {
|
---|
463 | const request = requests[i]
|
---|
464 | errorRequest(this, request, err)
|
---|
465 | }
|
---|
466 | } else if (client[kRunning] > 0) {
|
---|
467 | // Fail head of pipeline.
|
---|
468 | const request = client[kQueue][client[kRunningIdx]]
|
---|
469 | client[kQueue][client[kRunningIdx]++] = null
|
---|
470 |
|
---|
471 | errorRequest(client, request, err)
|
---|
472 | }
|
---|
473 |
|
---|
474 | client[kPendingIdx] = client[kRunningIdx]
|
---|
475 |
|
---|
476 | assert(client[kRunning] === 0)
|
---|
477 |
|
---|
478 | client.emit('disconnect',
|
---|
479 | client[kUrl],
|
---|
480 | [client],
|
---|
481 | err
|
---|
482 | )
|
---|
483 |
|
---|
484 | resume(client)
|
---|
485 | }
|
---|
486 |
|
---|
487 | const constants = require('./llhttp/constants')
|
---|
488 | const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
|
---|
489 | const EMPTY_BUF = Buffer.alloc(0)
|
---|
490 |
|
---|
491 | async function lazyllhttp () {
|
---|
492 | const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined
|
---|
493 |
|
---|
494 | let mod
|
---|
495 | try {
|
---|
496 | mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64'))
|
---|
497 | } catch (e) {
|
---|
498 | /* istanbul ignore next */
|
---|
499 |
|
---|
500 | // We could check if the error was caused by the simd option not
|
---|
501 | // being enabled, but the occurring of this other error
|
---|
502 | // * https://github.com/emscripten-core/emscripten/issues/11495
|
---|
503 | // got me to remove that check to avoid breaking Node 12.
|
---|
504 | mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64'))
|
---|
505 | }
|
---|
506 |
|
---|
507 | return await WebAssembly.instantiate(mod, {
|
---|
508 | env: {
|
---|
509 | /* eslint-disable camelcase */
|
---|
510 |
|
---|
511 | wasm_on_url: (p, at, len) => {
|
---|
512 | /* istanbul ignore next */
|
---|
513 | return 0
|
---|
514 | },
|
---|
515 | wasm_on_status: (p, at, len) => {
|
---|
516 | assert.strictEqual(currentParser.ptr, p)
|
---|
517 | const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
---|
518 | return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
---|
519 | },
|
---|
520 | wasm_on_message_begin: (p) => {
|
---|
521 | assert.strictEqual(currentParser.ptr, p)
|
---|
522 | return currentParser.onMessageBegin() || 0
|
---|
523 | },
|
---|
524 | wasm_on_header_field: (p, at, len) => {
|
---|
525 | assert.strictEqual(currentParser.ptr, p)
|
---|
526 | const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
---|
527 | return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
---|
528 | },
|
---|
529 | wasm_on_header_value: (p, at, len) => {
|
---|
530 | assert.strictEqual(currentParser.ptr, p)
|
---|
531 | const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
---|
532 | return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
---|
533 | },
|
---|
534 | wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
|
---|
535 | assert.strictEqual(currentParser.ptr, p)
|
---|
536 | return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0
|
---|
537 | },
|
---|
538 | wasm_on_body: (p, at, len) => {
|
---|
539 | assert.strictEqual(currentParser.ptr, p)
|
---|
540 | const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
---|
541 | return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
---|
542 | },
|
---|
543 | wasm_on_message_complete: (p) => {
|
---|
544 | assert.strictEqual(currentParser.ptr, p)
|
---|
545 | return currentParser.onMessageComplete() || 0
|
---|
546 | }
|
---|
547 |
|
---|
548 | /* eslint-enable camelcase */
|
---|
549 | }
|
---|
550 | })
|
---|
551 | }
|
---|
552 |
|
---|
553 | let llhttpInstance = null
|
---|
554 | let llhttpPromise = lazyllhttp()
|
---|
555 | llhttpPromise.catch()
|
---|
556 |
|
---|
557 | let currentParser = null
|
---|
558 | let currentBufferRef = null
|
---|
559 | let currentBufferSize = 0
|
---|
560 | let currentBufferPtr = null
|
---|
561 |
|
---|
562 | const TIMEOUT_HEADERS = 1
|
---|
563 | const TIMEOUT_BODY = 2
|
---|
564 | const TIMEOUT_IDLE = 3
|
---|
565 |
|
---|
566 | class Parser {
|
---|
567 | constructor (client, socket, { exports }) {
|
---|
568 | assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
|
---|
569 |
|
---|
570 | this.llhttp = exports
|
---|
571 | this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
|
---|
572 | this.client = client
|
---|
573 | this.socket = socket
|
---|
574 | this.timeout = null
|
---|
575 | this.timeoutValue = null
|
---|
576 | this.timeoutType = null
|
---|
577 | this.statusCode = null
|
---|
578 | this.statusText = ''
|
---|
579 | this.upgrade = false
|
---|
580 | this.headers = []
|
---|
581 | this.headersSize = 0
|
---|
582 | this.headersMaxSize = client[kMaxHeadersSize]
|
---|
583 | this.shouldKeepAlive = false
|
---|
584 | this.paused = false
|
---|
585 | this.resume = this.resume.bind(this)
|
---|
586 |
|
---|
587 | this.bytesRead = 0
|
---|
588 |
|
---|
589 | this.keepAlive = ''
|
---|
590 | this.contentLength = ''
|
---|
591 | this.connection = ''
|
---|
592 | this.maxResponseSize = client[kMaxResponseSize]
|
---|
593 | }
|
---|
594 |
|
---|
595 | setTimeout (value, type) {
|
---|
596 | this.timeoutType = type
|
---|
597 | if (value !== this.timeoutValue) {
|
---|
598 | timers.clearTimeout(this.timeout)
|
---|
599 | if (value) {
|
---|
600 | this.timeout = timers.setTimeout(onParserTimeout, value, this)
|
---|
601 | // istanbul ignore else: only for jest
|
---|
602 | if (this.timeout.unref) {
|
---|
603 | this.timeout.unref()
|
---|
604 | }
|
---|
605 | } else {
|
---|
606 | this.timeout = null
|
---|
607 | }
|
---|
608 | this.timeoutValue = value
|
---|
609 | } else if (this.timeout) {
|
---|
610 | // istanbul ignore else: only for jest
|
---|
611 | if (this.timeout.refresh) {
|
---|
612 | this.timeout.refresh()
|
---|
613 | }
|
---|
614 | }
|
---|
615 | }
|
---|
616 |
|
---|
617 | resume () {
|
---|
618 | if (this.socket.destroyed || !this.paused) {
|
---|
619 | return
|
---|
620 | }
|
---|
621 |
|
---|
622 | assert(this.ptr != null)
|
---|
623 | assert(currentParser == null)
|
---|
624 |
|
---|
625 | this.llhttp.llhttp_resume(this.ptr)
|
---|
626 |
|
---|
627 | assert(this.timeoutType === TIMEOUT_BODY)
|
---|
628 | if (this.timeout) {
|
---|
629 | // istanbul ignore else: only for jest
|
---|
630 | if (this.timeout.refresh) {
|
---|
631 | this.timeout.refresh()
|
---|
632 | }
|
---|
633 | }
|
---|
634 |
|
---|
635 | this.paused = false
|
---|
636 | this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
|
---|
637 | this.readMore()
|
---|
638 | }
|
---|
639 |
|
---|
640 | readMore () {
|
---|
641 | while (!this.paused && this.ptr) {
|
---|
642 | const chunk = this.socket.read()
|
---|
643 | if (chunk === null) {
|
---|
644 | break
|
---|
645 | }
|
---|
646 | this.execute(chunk)
|
---|
647 | }
|
---|
648 | }
|
---|
649 |
|
---|
650 | execute (data) {
|
---|
651 | assert(this.ptr != null)
|
---|
652 | assert(currentParser == null)
|
---|
653 | assert(!this.paused)
|
---|
654 |
|
---|
655 | const { socket, llhttp } = this
|
---|
656 |
|
---|
657 | if (data.length > currentBufferSize) {
|
---|
658 | if (currentBufferPtr) {
|
---|
659 | llhttp.free(currentBufferPtr)
|
---|
660 | }
|
---|
661 | currentBufferSize = Math.ceil(data.length / 4096) * 4096
|
---|
662 | currentBufferPtr = llhttp.malloc(currentBufferSize)
|
---|
663 | }
|
---|
664 |
|
---|
665 | new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data)
|
---|
666 |
|
---|
667 | // Call `execute` on the wasm parser.
|
---|
668 | // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
|
---|
669 | // and finally the length of bytes to parse.
|
---|
670 | // The return value is an error code or `constants.ERROR.OK`.
|
---|
671 | try {
|
---|
672 | let ret
|
---|
673 |
|
---|
674 | try {
|
---|
675 | currentBufferRef = data
|
---|
676 | currentParser = this
|
---|
677 | ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length)
|
---|
678 | /* eslint-disable-next-line no-useless-catch */
|
---|
679 | } catch (err) {
|
---|
680 | /* istanbul ignore next: difficult to make a test case for */
|
---|
681 | throw err
|
---|
682 | } finally {
|
---|
683 | currentParser = null
|
---|
684 | currentBufferRef = null
|
---|
685 | }
|
---|
686 |
|
---|
687 | const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr
|
---|
688 |
|
---|
689 | if (ret === constants.ERROR.PAUSED_UPGRADE) {
|
---|
690 | this.onUpgrade(data.slice(offset))
|
---|
691 | } else if (ret === constants.ERROR.PAUSED) {
|
---|
692 | this.paused = true
|
---|
693 | socket.unshift(data.slice(offset))
|
---|
694 | } else if (ret !== constants.ERROR.OK) {
|
---|
695 | const ptr = llhttp.llhttp_get_error_reason(this.ptr)
|
---|
696 | let message = ''
|
---|
697 | /* istanbul ignore else: difficult to make a test case for */
|
---|
698 | if (ptr) {
|
---|
699 | const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
|
---|
700 | message =
|
---|
701 | 'Response does not match the HTTP/1.1 protocol (' +
|
---|
702 | Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
|
---|
703 | ')'
|
---|
704 | }
|
---|
705 | throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
|
---|
706 | }
|
---|
707 | } catch (err) {
|
---|
708 | util.destroy(socket, err)
|
---|
709 | }
|
---|
710 | }
|
---|
711 |
|
---|
712 | destroy () {
|
---|
713 | assert(this.ptr != null)
|
---|
714 | assert(currentParser == null)
|
---|
715 |
|
---|
716 | this.llhttp.llhttp_free(this.ptr)
|
---|
717 | this.ptr = null
|
---|
718 |
|
---|
719 | timers.clearTimeout(this.timeout)
|
---|
720 | this.timeout = null
|
---|
721 | this.timeoutValue = null
|
---|
722 | this.timeoutType = null
|
---|
723 |
|
---|
724 | this.paused = false
|
---|
725 | }
|
---|
726 |
|
---|
727 | onStatus (buf) {
|
---|
728 | this.statusText = buf.toString()
|
---|
729 | }
|
---|
730 |
|
---|
731 | onMessageBegin () {
|
---|
732 | const { socket, client } = this
|
---|
733 |
|
---|
734 | /* istanbul ignore next: difficult to make a test case for */
|
---|
735 | if (socket.destroyed) {
|
---|
736 | return -1
|
---|
737 | }
|
---|
738 |
|
---|
739 | const request = client[kQueue][client[kRunningIdx]]
|
---|
740 | if (!request) {
|
---|
741 | return -1
|
---|
742 | }
|
---|
743 | }
|
---|
744 |
|
---|
745 | onHeaderField (buf) {
|
---|
746 | const len = this.headers.length
|
---|
747 |
|
---|
748 | if ((len & 1) === 0) {
|
---|
749 | this.headers.push(buf)
|
---|
750 | } else {
|
---|
751 | this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
|
---|
752 | }
|
---|
753 |
|
---|
754 | this.trackHeader(buf.length)
|
---|
755 | }
|
---|
756 |
|
---|
757 | onHeaderValue (buf) {
|
---|
758 | let len = this.headers.length
|
---|
759 |
|
---|
760 | if ((len & 1) === 1) {
|
---|
761 | this.headers.push(buf)
|
---|
762 | len += 1
|
---|
763 | } else {
|
---|
764 | this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
|
---|
765 | }
|
---|
766 |
|
---|
767 | const key = this.headers[len - 2]
|
---|
768 | if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') {
|
---|
769 | this.keepAlive += buf.toString()
|
---|
770 | } else if (key.length === 10 && key.toString().toLowerCase() === 'connection') {
|
---|
771 | this.connection += buf.toString()
|
---|
772 | } else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') {
|
---|
773 | this.contentLength += buf.toString()
|
---|
774 | }
|
---|
775 |
|
---|
776 | this.trackHeader(buf.length)
|
---|
777 | }
|
---|
778 |
|
---|
779 | trackHeader (len) {
|
---|
780 | this.headersSize += len
|
---|
781 | if (this.headersSize >= this.headersMaxSize) {
|
---|
782 | util.destroy(this.socket, new HeadersOverflowError())
|
---|
783 | }
|
---|
784 | }
|
---|
785 |
|
---|
786 | onUpgrade (head) {
|
---|
787 | const { upgrade, client, socket, headers, statusCode } = this
|
---|
788 |
|
---|
789 | assert(upgrade)
|
---|
790 |
|
---|
791 | const request = client[kQueue][client[kRunningIdx]]
|
---|
792 | assert(request)
|
---|
793 |
|
---|
794 | assert(!socket.destroyed)
|
---|
795 | assert(socket === client[kSocket])
|
---|
796 | assert(!this.paused)
|
---|
797 | assert(request.upgrade || request.method === 'CONNECT')
|
---|
798 |
|
---|
799 | this.statusCode = null
|
---|
800 | this.statusText = ''
|
---|
801 | this.shouldKeepAlive = null
|
---|
802 |
|
---|
803 | assert(this.headers.length % 2 === 0)
|
---|
804 | this.headers = []
|
---|
805 | this.headersSize = 0
|
---|
806 |
|
---|
807 | socket.unshift(head)
|
---|
808 |
|
---|
809 | socket[kParser].destroy()
|
---|
810 | socket[kParser] = null
|
---|
811 |
|
---|
812 | socket[kClient] = null
|
---|
813 | socket[kError] = null
|
---|
814 | socket
|
---|
815 | .removeListener('error', onSocketError)
|
---|
816 | .removeListener('readable', onSocketReadable)
|
---|
817 | .removeListener('end', onSocketEnd)
|
---|
818 | .removeListener('close', onSocketClose)
|
---|
819 |
|
---|
820 | client[kSocket] = null
|
---|
821 | client[kQueue][client[kRunningIdx]++] = null
|
---|
822 | client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
|
---|
823 |
|
---|
824 | try {
|
---|
825 | request.onUpgrade(statusCode, headers, socket)
|
---|
826 | } catch (err) {
|
---|
827 | util.destroy(socket, err)
|
---|
828 | }
|
---|
829 |
|
---|
830 | resume(client)
|
---|
831 | }
|
---|
832 |
|
---|
833 | onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
|
---|
834 | const { client, socket, headers, statusText } = this
|
---|
835 |
|
---|
836 | /* istanbul ignore next: difficult to make a test case for */
|
---|
837 | if (socket.destroyed) {
|
---|
838 | return -1
|
---|
839 | }
|
---|
840 |
|
---|
841 | const request = client[kQueue][client[kRunningIdx]]
|
---|
842 |
|
---|
843 | /* istanbul ignore next: difficult to make a test case for */
|
---|
844 | if (!request) {
|
---|
845 | return -1
|
---|
846 | }
|
---|
847 |
|
---|
848 | assert(!this.upgrade)
|
---|
849 | assert(this.statusCode < 200)
|
---|
850 |
|
---|
851 | if (statusCode === 100) {
|
---|
852 | util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
|
---|
853 | return -1
|
---|
854 | }
|
---|
855 |
|
---|
856 | /* this can only happen if server is misbehaving */
|
---|
857 | if (upgrade && !request.upgrade) {
|
---|
858 | util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
|
---|
859 | return -1
|
---|
860 | }
|
---|
861 |
|
---|
862 | assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS)
|
---|
863 |
|
---|
864 | this.statusCode = statusCode
|
---|
865 | this.shouldKeepAlive = (
|
---|
866 | shouldKeepAlive ||
|
---|
867 | // Override llhttp value which does not allow keepAlive for HEAD.
|
---|
868 | (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
|
---|
869 | )
|
---|
870 |
|
---|
871 | if (this.statusCode >= 200) {
|
---|
872 | const bodyTimeout = request.bodyTimeout != null
|
---|
873 | ? request.bodyTimeout
|
---|
874 | : client[kBodyTimeout]
|
---|
875 | this.setTimeout(bodyTimeout, TIMEOUT_BODY)
|
---|
876 | } else if (this.timeout) {
|
---|
877 | // istanbul ignore else: only for jest
|
---|
878 | if (this.timeout.refresh) {
|
---|
879 | this.timeout.refresh()
|
---|
880 | }
|
---|
881 | }
|
---|
882 |
|
---|
883 | if (request.method === 'CONNECT') {
|
---|
884 | assert(client[kRunning] === 1)
|
---|
885 | this.upgrade = true
|
---|
886 | return 2
|
---|
887 | }
|
---|
888 |
|
---|
889 | if (upgrade) {
|
---|
890 | assert(client[kRunning] === 1)
|
---|
891 | this.upgrade = true
|
---|
892 | return 2
|
---|
893 | }
|
---|
894 |
|
---|
895 | assert(this.headers.length % 2 === 0)
|
---|
896 | this.headers = []
|
---|
897 | this.headersSize = 0
|
---|
898 |
|
---|
899 | if (this.shouldKeepAlive && client[kPipelining]) {
|
---|
900 | const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
|
---|
901 |
|
---|
902 | if (keepAliveTimeout != null) {
|
---|
903 | const timeout = Math.min(
|
---|
904 | keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
|
---|
905 | client[kKeepAliveMaxTimeout]
|
---|
906 | )
|
---|
907 | if (timeout <= 0) {
|
---|
908 | socket[kReset] = true
|
---|
909 | } else {
|
---|
910 | client[kKeepAliveTimeoutValue] = timeout
|
---|
911 | }
|
---|
912 | } else {
|
---|
913 | client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
|
---|
914 | }
|
---|
915 | } else {
|
---|
916 | // Stop more requests from being dispatched.
|
---|
917 | socket[kReset] = true
|
---|
918 | }
|
---|
919 |
|
---|
920 | const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
|
---|
921 |
|
---|
922 | if (request.aborted) {
|
---|
923 | return -1
|
---|
924 | }
|
---|
925 |
|
---|
926 | if (request.method === 'HEAD') {
|
---|
927 | return 1
|
---|
928 | }
|
---|
929 |
|
---|
930 | if (statusCode < 200) {
|
---|
931 | return 1
|
---|
932 | }
|
---|
933 |
|
---|
934 | if (socket[kBlocking]) {
|
---|
935 | socket[kBlocking] = false
|
---|
936 | resume(client)
|
---|
937 | }
|
---|
938 |
|
---|
939 | return pause ? constants.ERROR.PAUSED : 0
|
---|
940 | }
|
---|
941 |
|
---|
942 | onBody (buf) {
|
---|
943 | const { client, socket, statusCode, maxResponseSize } = this
|
---|
944 |
|
---|
945 | if (socket.destroyed) {
|
---|
946 | return -1
|
---|
947 | }
|
---|
948 |
|
---|
949 | const request = client[kQueue][client[kRunningIdx]]
|
---|
950 | assert(request)
|
---|
951 |
|
---|
952 | assert.strictEqual(this.timeoutType, TIMEOUT_BODY)
|
---|
953 | if (this.timeout) {
|
---|
954 | // istanbul ignore else: only for jest
|
---|
955 | if (this.timeout.refresh) {
|
---|
956 | this.timeout.refresh()
|
---|
957 | }
|
---|
958 | }
|
---|
959 |
|
---|
960 | assert(statusCode >= 200)
|
---|
961 |
|
---|
962 | if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
|
---|
963 | util.destroy(socket, new ResponseExceededMaxSizeError())
|
---|
964 | return -1
|
---|
965 | }
|
---|
966 |
|
---|
967 | this.bytesRead += buf.length
|
---|
968 |
|
---|
969 | if (request.onData(buf) === false) {
|
---|
970 | return constants.ERROR.PAUSED
|
---|
971 | }
|
---|
972 | }
|
---|
973 |
|
---|
974 | onMessageComplete () {
|
---|
975 | const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
|
---|
976 |
|
---|
977 | if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
|
---|
978 | return -1
|
---|
979 | }
|
---|
980 |
|
---|
981 | if (upgrade) {
|
---|
982 | return
|
---|
983 | }
|
---|
984 |
|
---|
985 | const request = client[kQueue][client[kRunningIdx]]
|
---|
986 | assert(request)
|
---|
987 |
|
---|
988 | assert(statusCode >= 100)
|
---|
989 |
|
---|
990 | this.statusCode = null
|
---|
991 | this.statusText = ''
|
---|
992 | this.bytesRead = 0
|
---|
993 | this.contentLength = ''
|
---|
994 | this.keepAlive = ''
|
---|
995 | this.connection = ''
|
---|
996 |
|
---|
997 | assert(this.headers.length % 2 === 0)
|
---|
998 | this.headers = []
|
---|
999 | this.headersSize = 0
|
---|
1000 |
|
---|
1001 | if (statusCode < 200) {
|
---|
1002 | return
|
---|
1003 | }
|
---|
1004 |
|
---|
1005 | /* istanbul ignore next: should be handled by llhttp? */
|
---|
1006 | if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
|
---|
1007 | util.destroy(socket, new ResponseContentLengthMismatchError())
|
---|
1008 | return -1
|
---|
1009 | }
|
---|
1010 |
|
---|
1011 | request.onComplete(headers)
|
---|
1012 |
|
---|
1013 | client[kQueue][client[kRunningIdx]++] = null
|
---|
1014 |
|
---|
1015 | if (socket[kWriting]) {
|
---|
1016 | assert.strictEqual(client[kRunning], 0)
|
---|
1017 | // Response completed before request.
|
---|
1018 | util.destroy(socket, new InformationalError('reset'))
|
---|
1019 | return constants.ERROR.PAUSED
|
---|
1020 | } else if (!shouldKeepAlive) {
|
---|
1021 | util.destroy(socket, new InformationalError('reset'))
|
---|
1022 | return constants.ERROR.PAUSED
|
---|
1023 | } else if (socket[kReset] && client[kRunning] === 0) {
|
---|
1024 | // Destroy socket once all requests have completed.
|
---|
1025 | // The request at the tail of the pipeline is the one
|
---|
1026 | // that requested reset and no further requests should
|
---|
1027 | // have been queued since then.
|
---|
1028 | util.destroy(socket, new InformationalError('reset'))
|
---|
1029 | return constants.ERROR.PAUSED
|
---|
1030 | } else if (client[kPipelining] === 1) {
|
---|
1031 | // We must wait a full event loop cycle to reuse this socket to make sure
|
---|
1032 | // that non-spec compliant servers are not closing the connection even if they
|
---|
1033 | // said they won't.
|
---|
1034 | setImmediate(resume, client)
|
---|
1035 | } else {
|
---|
1036 | resume(client)
|
---|
1037 | }
|
---|
1038 | }
|
---|
1039 | }
|
---|
1040 |
|
---|
1041 | function onParserTimeout (parser) {
|
---|
1042 | const { socket, timeoutType, client } = parser
|
---|
1043 |
|
---|
1044 | /* istanbul ignore else */
|
---|
1045 | if (timeoutType === TIMEOUT_HEADERS) {
|
---|
1046 | if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
|
---|
1047 | assert(!parser.paused, 'cannot be paused while waiting for headers')
|
---|
1048 | util.destroy(socket, new HeadersTimeoutError())
|
---|
1049 | }
|
---|
1050 | } else if (timeoutType === TIMEOUT_BODY) {
|
---|
1051 | if (!parser.paused) {
|
---|
1052 | util.destroy(socket, new BodyTimeoutError())
|
---|
1053 | }
|
---|
1054 | } else if (timeoutType === TIMEOUT_IDLE) {
|
---|
1055 | assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
|
---|
1056 | util.destroy(socket, new InformationalError('socket idle timeout'))
|
---|
1057 | }
|
---|
1058 | }
|
---|
1059 |
|
---|
1060 | function onSocketReadable () {
|
---|
1061 | const { [kParser]: parser } = this
|
---|
1062 | if (parser) {
|
---|
1063 | parser.readMore()
|
---|
1064 | }
|
---|
1065 | }
|
---|
1066 |
|
---|
1067 | function onSocketError (err) {
|
---|
1068 | const { [kClient]: client, [kParser]: parser } = this
|
---|
1069 |
|
---|
1070 | assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
|
---|
1071 |
|
---|
1072 | if (client[kHTTPConnVersion] !== 'h2') {
|
---|
1073 | // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
|
---|
1074 | // to the user.
|
---|
1075 | if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
|
---|
1076 | // We treat all incoming data so for as a valid response.
|
---|
1077 | parser.onMessageComplete()
|
---|
1078 | return
|
---|
1079 | }
|
---|
1080 | }
|
---|
1081 |
|
---|
1082 | this[kError] = err
|
---|
1083 |
|
---|
1084 | onError(this[kClient], err)
|
---|
1085 | }
|
---|
1086 |
|
---|
1087 | function onError (client, err) {
|
---|
1088 | if (
|
---|
1089 | client[kRunning] === 0 &&
|
---|
1090 | err.code !== 'UND_ERR_INFO' &&
|
---|
1091 | err.code !== 'UND_ERR_SOCKET'
|
---|
1092 | ) {
|
---|
1093 | // Error is not caused by running request and not a recoverable
|
---|
1094 | // socket error.
|
---|
1095 |
|
---|
1096 | assert(client[kPendingIdx] === client[kRunningIdx])
|
---|
1097 |
|
---|
1098 | const requests = client[kQueue].splice(client[kRunningIdx])
|
---|
1099 | for (let i = 0; i < requests.length; i++) {
|
---|
1100 | const request = requests[i]
|
---|
1101 | errorRequest(client, request, err)
|
---|
1102 | }
|
---|
1103 | assert(client[kSize] === 0)
|
---|
1104 | }
|
---|
1105 | }
|
---|
1106 |
|
---|
1107 | function onSocketEnd () {
|
---|
1108 | const { [kParser]: parser, [kClient]: client } = this
|
---|
1109 |
|
---|
1110 | if (client[kHTTPConnVersion] !== 'h2') {
|
---|
1111 | if (parser.statusCode && !parser.shouldKeepAlive) {
|
---|
1112 | // We treat all incoming data so far as a valid response.
|
---|
1113 | parser.onMessageComplete()
|
---|
1114 | return
|
---|
1115 | }
|
---|
1116 | }
|
---|
1117 |
|
---|
1118 | util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
|
---|
1119 | }
|
---|
1120 |
|
---|
1121 | function onSocketClose () {
|
---|
1122 | const { [kClient]: client, [kParser]: parser } = this
|
---|
1123 |
|
---|
1124 | if (client[kHTTPConnVersion] === 'h1' && parser) {
|
---|
1125 | if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
|
---|
1126 | // We treat all incoming data so far as a valid response.
|
---|
1127 | parser.onMessageComplete()
|
---|
1128 | }
|
---|
1129 |
|
---|
1130 | this[kParser].destroy()
|
---|
1131 | this[kParser] = null
|
---|
1132 | }
|
---|
1133 |
|
---|
1134 | const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
|
---|
1135 |
|
---|
1136 | client[kSocket] = null
|
---|
1137 |
|
---|
1138 | if (client.destroyed) {
|
---|
1139 | assert(client[kPending] === 0)
|
---|
1140 |
|
---|
1141 | // Fail entire queue.
|
---|
1142 | const requests = client[kQueue].splice(client[kRunningIdx])
|
---|
1143 | for (let i = 0; i < requests.length; i++) {
|
---|
1144 | const request = requests[i]
|
---|
1145 | errorRequest(client, request, err)
|
---|
1146 | }
|
---|
1147 | } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
|
---|
1148 | // Fail head of pipeline.
|
---|
1149 | const request = client[kQueue][client[kRunningIdx]]
|
---|
1150 | client[kQueue][client[kRunningIdx]++] = null
|
---|
1151 |
|
---|
1152 | errorRequest(client, request, err)
|
---|
1153 | }
|
---|
1154 |
|
---|
1155 | client[kPendingIdx] = client[kRunningIdx]
|
---|
1156 |
|
---|
1157 | assert(client[kRunning] === 0)
|
---|
1158 |
|
---|
1159 | client.emit('disconnect', client[kUrl], [client], err)
|
---|
1160 |
|
---|
1161 | resume(client)
|
---|
1162 | }
|
---|
1163 |
|
---|
1164 | async function connect (client) {
|
---|
1165 | assert(!client[kConnecting])
|
---|
1166 | assert(!client[kSocket])
|
---|
1167 |
|
---|
1168 | let { host, hostname, protocol, port } = client[kUrl]
|
---|
1169 |
|
---|
1170 | // Resolve ipv6
|
---|
1171 | if (hostname[0] === '[') {
|
---|
1172 | const idx = hostname.indexOf(']')
|
---|
1173 |
|
---|
1174 | assert(idx !== -1)
|
---|
1175 | const ip = hostname.substring(1, idx)
|
---|
1176 |
|
---|
1177 | assert(net.isIP(ip))
|
---|
1178 | hostname = ip
|
---|
1179 | }
|
---|
1180 |
|
---|
1181 | client[kConnecting] = true
|
---|
1182 |
|
---|
1183 | if (channels.beforeConnect.hasSubscribers) {
|
---|
1184 | channels.beforeConnect.publish({
|
---|
1185 | connectParams: {
|
---|
1186 | host,
|
---|
1187 | hostname,
|
---|
1188 | protocol,
|
---|
1189 | port,
|
---|
1190 | servername: client[kServerName],
|
---|
1191 | localAddress: client[kLocalAddress]
|
---|
1192 | },
|
---|
1193 | connector: client[kConnector]
|
---|
1194 | })
|
---|
1195 | }
|
---|
1196 |
|
---|
1197 | try {
|
---|
1198 | const socket = await new Promise((resolve, reject) => {
|
---|
1199 | client[kConnector]({
|
---|
1200 | host,
|
---|
1201 | hostname,
|
---|
1202 | protocol,
|
---|
1203 | port,
|
---|
1204 | servername: client[kServerName],
|
---|
1205 | localAddress: client[kLocalAddress]
|
---|
1206 | }, (err, socket) => {
|
---|
1207 | if (err) {
|
---|
1208 | reject(err)
|
---|
1209 | } else {
|
---|
1210 | resolve(socket)
|
---|
1211 | }
|
---|
1212 | })
|
---|
1213 | })
|
---|
1214 |
|
---|
1215 | if (client.destroyed) {
|
---|
1216 | util.destroy(socket.on('error', () => {}), new ClientDestroyedError())
|
---|
1217 | return
|
---|
1218 | }
|
---|
1219 |
|
---|
1220 | client[kConnecting] = false
|
---|
1221 |
|
---|
1222 | assert(socket)
|
---|
1223 |
|
---|
1224 | const isH2 = socket.alpnProtocol === 'h2'
|
---|
1225 | if (isH2) {
|
---|
1226 | if (!h2ExperimentalWarned) {
|
---|
1227 | h2ExperimentalWarned = true
|
---|
1228 | process.emitWarning('H2 support is experimental, expect them to change at any time.', {
|
---|
1229 | code: 'UNDICI-H2'
|
---|
1230 | })
|
---|
1231 | }
|
---|
1232 |
|
---|
1233 | const session = http2.connect(client[kUrl], {
|
---|
1234 | createConnection: () => socket,
|
---|
1235 | peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
|
---|
1236 | })
|
---|
1237 |
|
---|
1238 | client[kHTTPConnVersion] = 'h2'
|
---|
1239 | session[kClient] = client
|
---|
1240 | session[kSocket] = socket
|
---|
1241 | session.on('error', onHttp2SessionError)
|
---|
1242 | session.on('frameError', onHttp2FrameError)
|
---|
1243 | session.on('end', onHttp2SessionEnd)
|
---|
1244 | session.on('goaway', onHTTP2GoAway)
|
---|
1245 | session.on('close', onSocketClose)
|
---|
1246 | session.unref()
|
---|
1247 |
|
---|
1248 | client[kHTTP2Session] = session
|
---|
1249 | socket[kHTTP2Session] = session
|
---|
1250 | } else {
|
---|
1251 | if (!llhttpInstance) {
|
---|
1252 | llhttpInstance = await llhttpPromise
|
---|
1253 | llhttpPromise = null
|
---|
1254 | }
|
---|
1255 |
|
---|
1256 | socket[kNoRef] = false
|
---|
1257 | socket[kWriting] = false
|
---|
1258 | socket[kReset] = false
|
---|
1259 | socket[kBlocking] = false
|
---|
1260 | socket[kParser] = new Parser(client, socket, llhttpInstance)
|
---|
1261 | }
|
---|
1262 |
|
---|
1263 | socket[kCounter] = 0
|
---|
1264 | socket[kMaxRequests] = client[kMaxRequests]
|
---|
1265 | socket[kClient] = client
|
---|
1266 | socket[kError] = null
|
---|
1267 |
|
---|
1268 | socket
|
---|
1269 | .on('error', onSocketError)
|
---|
1270 | .on('readable', onSocketReadable)
|
---|
1271 | .on('end', onSocketEnd)
|
---|
1272 | .on('close', onSocketClose)
|
---|
1273 |
|
---|
1274 | client[kSocket] = socket
|
---|
1275 |
|
---|
1276 | if (channels.connected.hasSubscribers) {
|
---|
1277 | channels.connected.publish({
|
---|
1278 | connectParams: {
|
---|
1279 | host,
|
---|
1280 | hostname,
|
---|
1281 | protocol,
|
---|
1282 | port,
|
---|
1283 | servername: client[kServerName],
|
---|
1284 | localAddress: client[kLocalAddress]
|
---|
1285 | },
|
---|
1286 | connector: client[kConnector],
|
---|
1287 | socket
|
---|
1288 | })
|
---|
1289 | }
|
---|
1290 | client.emit('connect', client[kUrl], [client])
|
---|
1291 | } catch (err) {
|
---|
1292 | if (client.destroyed) {
|
---|
1293 | return
|
---|
1294 | }
|
---|
1295 |
|
---|
1296 | client[kConnecting] = false
|
---|
1297 |
|
---|
1298 | if (channels.connectError.hasSubscribers) {
|
---|
1299 | channels.connectError.publish({
|
---|
1300 | connectParams: {
|
---|
1301 | host,
|
---|
1302 | hostname,
|
---|
1303 | protocol,
|
---|
1304 | port,
|
---|
1305 | servername: client[kServerName],
|
---|
1306 | localAddress: client[kLocalAddress]
|
---|
1307 | },
|
---|
1308 | connector: client[kConnector],
|
---|
1309 | error: err
|
---|
1310 | })
|
---|
1311 | }
|
---|
1312 |
|
---|
1313 | if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
|
---|
1314 | assert(client[kRunning] === 0)
|
---|
1315 | while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
|
---|
1316 | const request = client[kQueue][client[kPendingIdx]++]
|
---|
1317 | errorRequest(client, request, err)
|
---|
1318 | }
|
---|
1319 | } else {
|
---|
1320 | onError(client, err)
|
---|
1321 | }
|
---|
1322 |
|
---|
1323 | client.emit('connectionError', client[kUrl], [client], err)
|
---|
1324 | }
|
---|
1325 |
|
---|
1326 | resume(client)
|
---|
1327 | }
|
---|
1328 |
|
---|
1329 | function emitDrain (client) {
|
---|
1330 | client[kNeedDrain] = 0
|
---|
1331 | client.emit('drain', client[kUrl], [client])
|
---|
1332 | }
|
---|
1333 |
|
---|
1334 | function resume (client, sync) {
|
---|
1335 | if (client[kResuming] === 2) {
|
---|
1336 | return
|
---|
1337 | }
|
---|
1338 |
|
---|
1339 | client[kResuming] = 2
|
---|
1340 |
|
---|
1341 | _resume(client, sync)
|
---|
1342 | client[kResuming] = 0
|
---|
1343 |
|
---|
1344 | if (client[kRunningIdx] > 256) {
|
---|
1345 | client[kQueue].splice(0, client[kRunningIdx])
|
---|
1346 | client[kPendingIdx] -= client[kRunningIdx]
|
---|
1347 | client[kRunningIdx] = 0
|
---|
1348 | }
|
---|
1349 | }
|
---|
1350 |
|
---|
1351 | function _resume (client, sync) {
|
---|
1352 | while (true) {
|
---|
1353 | if (client.destroyed) {
|
---|
1354 | assert(client[kPending] === 0)
|
---|
1355 | return
|
---|
1356 | }
|
---|
1357 |
|
---|
1358 | if (client[kClosedResolve] && !client[kSize]) {
|
---|
1359 | client[kClosedResolve]()
|
---|
1360 | client[kClosedResolve] = null
|
---|
1361 | return
|
---|
1362 | }
|
---|
1363 |
|
---|
1364 | const socket = client[kSocket]
|
---|
1365 |
|
---|
1366 | if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') {
|
---|
1367 | if (client[kSize] === 0) {
|
---|
1368 | if (!socket[kNoRef] && socket.unref) {
|
---|
1369 | socket.unref()
|
---|
1370 | socket[kNoRef] = true
|
---|
1371 | }
|
---|
1372 | } else if (socket[kNoRef] && socket.ref) {
|
---|
1373 | socket.ref()
|
---|
1374 | socket[kNoRef] = false
|
---|
1375 | }
|
---|
1376 |
|
---|
1377 | if (client[kSize] === 0) {
|
---|
1378 | if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
|
---|
1379 | socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
|
---|
1380 | }
|
---|
1381 | } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
|
---|
1382 | if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
|
---|
1383 | const request = client[kQueue][client[kRunningIdx]]
|
---|
1384 | const headersTimeout = request.headersTimeout != null
|
---|
1385 | ? request.headersTimeout
|
---|
1386 | : client[kHeadersTimeout]
|
---|
1387 | socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
|
---|
1388 | }
|
---|
1389 | }
|
---|
1390 | }
|
---|
1391 |
|
---|
1392 | if (client[kBusy]) {
|
---|
1393 | client[kNeedDrain] = 2
|
---|
1394 | } else if (client[kNeedDrain] === 2) {
|
---|
1395 | if (sync) {
|
---|
1396 | client[kNeedDrain] = 1
|
---|
1397 | process.nextTick(emitDrain, client)
|
---|
1398 | } else {
|
---|
1399 | emitDrain(client)
|
---|
1400 | }
|
---|
1401 | continue
|
---|
1402 | }
|
---|
1403 |
|
---|
1404 | if (client[kPending] === 0) {
|
---|
1405 | return
|
---|
1406 | }
|
---|
1407 |
|
---|
1408 | if (client[kRunning] >= (client[kPipelining] || 1)) {
|
---|
1409 | return
|
---|
1410 | }
|
---|
1411 |
|
---|
1412 | const request = client[kQueue][client[kPendingIdx]]
|
---|
1413 |
|
---|
1414 | if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
|
---|
1415 | if (client[kRunning] > 0) {
|
---|
1416 | return
|
---|
1417 | }
|
---|
1418 |
|
---|
1419 | client[kServerName] = request.servername
|
---|
1420 |
|
---|
1421 | if (socket && socket.servername !== request.servername) {
|
---|
1422 | util.destroy(socket, new InformationalError('servername changed'))
|
---|
1423 | return
|
---|
1424 | }
|
---|
1425 | }
|
---|
1426 |
|
---|
1427 | if (client[kConnecting]) {
|
---|
1428 | return
|
---|
1429 | }
|
---|
1430 |
|
---|
1431 | if (!socket && !client[kHTTP2Session]) {
|
---|
1432 | connect(client)
|
---|
1433 | return
|
---|
1434 | }
|
---|
1435 |
|
---|
1436 | if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
|
---|
1437 | return
|
---|
1438 | }
|
---|
1439 |
|
---|
1440 | if (client[kRunning] > 0 && !request.idempotent) {
|
---|
1441 | // Non-idempotent request cannot be retried.
|
---|
1442 | // Ensure that no other requests are inflight and
|
---|
1443 | // could cause failure.
|
---|
1444 | return
|
---|
1445 | }
|
---|
1446 |
|
---|
1447 | if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
|
---|
1448 | // Don't dispatch an upgrade until all preceding requests have completed.
|
---|
1449 | // A misbehaving server might upgrade the connection before all pipelined
|
---|
1450 | // request has completed.
|
---|
1451 | return
|
---|
1452 | }
|
---|
1453 |
|
---|
1454 | if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
|
---|
1455 | (util.isStream(request.body) || util.isAsyncIterable(request.body))) {
|
---|
1456 | // Request with stream or iterator body can error while other requests
|
---|
1457 | // are inflight and indirectly error those as well.
|
---|
1458 | // Ensure this doesn't happen by waiting for inflight
|
---|
1459 | // to complete before dispatching.
|
---|
1460 |
|
---|
1461 | // Request with stream or iterator body cannot be retried.
|
---|
1462 | // Ensure that no other requests are inflight and
|
---|
1463 | // could cause failure.
|
---|
1464 | return
|
---|
1465 | }
|
---|
1466 |
|
---|
1467 | if (!request.aborted && write(client, request)) {
|
---|
1468 | client[kPendingIdx]++
|
---|
1469 | } else {
|
---|
1470 | client[kQueue].splice(client[kPendingIdx], 1)
|
---|
1471 | }
|
---|
1472 | }
|
---|
1473 | }
|
---|
1474 |
|
---|
1475 | // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
|
---|
1476 | function shouldSendContentLength (method) {
|
---|
1477 | return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
|
---|
1478 | }
|
---|
1479 |
|
---|
1480 | function write (client, request) {
|
---|
1481 | if (client[kHTTPConnVersion] === 'h2') {
|
---|
1482 | writeH2(client, client[kHTTP2Session], request)
|
---|
1483 | return
|
---|
1484 | }
|
---|
1485 |
|
---|
1486 | const { body, method, path, host, upgrade, headers, blocking, reset } = request
|
---|
1487 |
|
---|
1488 | // https://tools.ietf.org/html/rfc7231#section-4.3.1
|
---|
1489 | // https://tools.ietf.org/html/rfc7231#section-4.3.2
|
---|
1490 | // https://tools.ietf.org/html/rfc7231#section-4.3.5
|
---|
1491 |
|
---|
1492 | // Sending a payload body on a request that does not
|
---|
1493 | // expect it can cause undefined behavior on some
|
---|
1494 | // servers and corrupt connection state. Do not
|
---|
1495 | // re-use the connection for further requests.
|
---|
1496 |
|
---|
1497 | const expectsPayload = (
|
---|
1498 | method === 'PUT' ||
|
---|
1499 | method === 'POST' ||
|
---|
1500 | method === 'PATCH'
|
---|
1501 | )
|
---|
1502 |
|
---|
1503 | if (body && typeof body.read === 'function') {
|
---|
1504 | // Try to read EOF in order to get length.
|
---|
1505 | body.read(0)
|
---|
1506 | }
|
---|
1507 |
|
---|
1508 | const bodyLength = util.bodyLength(body)
|
---|
1509 |
|
---|
1510 | let contentLength = bodyLength
|
---|
1511 |
|
---|
1512 | if (contentLength === null) {
|
---|
1513 | contentLength = request.contentLength
|
---|
1514 | }
|
---|
1515 |
|
---|
1516 | if (contentLength === 0 && !expectsPayload) {
|
---|
1517 | // https://tools.ietf.org/html/rfc7230#section-3.3.2
|
---|
1518 | // A user agent SHOULD NOT send a Content-Length header field when
|
---|
1519 | // the request message does not contain a payload body and the method
|
---|
1520 | // semantics do not anticipate such a body.
|
---|
1521 |
|
---|
1522 | contentLength = null
|
---|
1523 | }
|
---|
1524 |
|
---|
1525 | // https://github.com/nodejs/undici/issues/2046
|
---|
1526 | // A user agent may send a Content-Length header with 0 value, this should be allowed.
|
---|
1527 | if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
|
---|
1528 | if (client[kStrictContentLength]) {
|
---|
1529 | errorRequest(client, request, new RequestContentLengthMismatchError())
|
---|
1530 | return false
|
---|
1531 | }
|
---|
1532 |
|
---|
1533 | process.emitWarning(new RequestContentLengthMismatchError())
|
---|
1534 | }
|
---|
1535 |
|
---|
1536 | const socket = client[kSocket]
|
---|
1537 |
|
---|
1538 | try {
|
---|
1539 | request.onConnect((err) => {
|
---|
1540 | if (request.aborted || request.completed) {
|
---|
1541 | return
|
---|
1542 | }
|
---|
1543 |
|
---|
1544 | errorRequest(client, request, err || new RequestAbortedError())
|
---|
1545 |
|
---|
1546 | util.destroy(socket, new InformationalError('aborted'))
|
---|
1547 | })
|
---|
1548 | } catch (err) {
|
---|
1549 | errorRequest(client, request, err)
|
---|
1550 | }
|
---|
1551 |
|
---|
1552 | if (request.aborted) {
|
---|
1553 | return false
|
---|
1554 | }
|
---|
1555 |
|
---|
1556 | if (method === 'HEAD') {
|
---|
1557 | // https://github.com/mcollina/undici/issues/258
|
---|
1558 | // Close after a HEAD request to interop with misbehaving servers
|
---|
1559 | // that may send a body in the response.
|
---|
1560 |
|
---|
1561 | socket[kReset] = true
|
---|
1562 | }
|
---|
1563 |
|
---|
1564 | if (upgrade || method === 'CONNECT') {
|
---|
1565 | // On CONNECT or upgrade, block pipeline from dispatching further
|
---|
1566 | // requests on this connection.
|
---|
1567 |
|
---|
1568 | socket[kReset] = true
|
---|
1569 | }
|
---|
1570 |
|
---|
1571 | if (reset != null) {
|
---|
1572 | socket[kReset] = reset
|
---|
1573 | }
|
---|
1574 |
|
---|
1575 | if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
|
---|
1576 | socket[kReset] = true
|
---|
1577 | }
|
---|
1578 |
|
---|
1579 | if (blocking) {
|
---|
1580 | socket[kBlocking] = true
|
---|
1581 | }
|
---|
1582 |
|
---|
1583 | let header = `${method} ${path} HTTP/1.1\r\n`
|
---|
1584 |
|
---|
1585 | if (typeof host === 'string') {
|
---|
1586 | header += `host: ${host}\r\n`
|
---|
1587 | } else {
|
---|
1588 | header += client[kHostHeader]
|
---|
1589 | }
|
---|
1590 |
|
---|
1591 | if (upgrade) {
|
---|
1592 | header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
|
---|
1593 | } else if (client[kPipelining] && !socket[kReset]) {
|
---|
1594 | header += 'connection: keep-alive\r\n'
|
---|
1595 | } else {
|
---|
1596 | header += 'connection: close\r\n'
|
---|
1597 | }
|
---|
1598 |
|
---|
1599 | if (headers) {
|
---|
1600 | header += headers
|
---|
1601 | }
|
---|
1602 |
|
---|
1603 | if (channels.sendHeaders.hasSubscribers) {
|
---|
1604 | channels.sendHeaders.publish({ request, headers: header, socket })
|
---|
1605 | }
|
---|
1606 |
|
---|
1607 | /* istanbul ignore else: assertion */
|
---|
1608 | if (!body || bodyLength === 0) {
|
---|
1609 | if (contentLength === 0) {
|
---|
1610 | socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
|
---|
1611 | } else {
|
---|
1612 | assert(contentLength === null, 'no body must not have content length')
|
---|
1613 | socket.write(`${header}\r\n`, 'latin1')
|
---|
1614 | }
|
---|
1615 | request.onRequestSent()
|
---|
1616 | } else if (util.isBuffer(body)) {
|
---|
1617 | assert(contentLength === body.byteLength, 'buffer body must have content length')
|
---|
1618 |
|
---|
1619 | socket.cork()
|
---|
1620 | socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
---|
1621 | socket.write(body)
|
---|
1622 | socket.uncork()
|
---|
1623 | request.onBodySent(body)
|
---|
1624 | request.onRequestSent()
|
---|
1625 | if (!expectsPayload) {
|
---|
1626 | socket[kReset] = true
|
---|
1627 | }
|
---|
1628 | } else if (util.isBlobLike(body)) {
|
---|
1629 | if (typeof body.stream === 'function') {
|
---|
1630 | writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
|
---|
1631 | } else {
|
---|
1632 | writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
|
---|
1633 | }
|
---|
1634 | } else if (util.isStream(body)) {
|
---|
1635 | writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
|
---|
1636 | } else if (util.isIterable(body)) {
|
---|
1637 | writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
|
---|
1638 | } else {
|
---|
1639 | assert(false)
|
---|
1640 | }
|
---|
1641 |
|
---|
1642 | return true
|
---|
1643 | }
|
---|
1644 |
|
---|
1645 | function writeH2 (client, session, request) {
|
---|
1646 | const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
|
---|
1647 |
|
---|
1648 | let headers
|
---|
1649 | if (typeof reqHeaders === 'string') headers = Request[kHTTP2CopyHeaders](reqHeaders.trim())
|
---|
1650 | else headers = reqHeaders
|
---|
1651 |
|
---|
1652 | if (upgrade) {
|
---|
1653 | errorRequest(client, request, new Error('Upgrade not supported for H2'))
|
---|
1654 | return false
|
---|
1655 | }
|
---|
1656 |
|
---|
1657 | try {
|
---|
1658 | // TODO(HTTP/2): Should we call onConnect immediately or on stream ready event?
|
---|
1659 | request.onConnect((err) => {
|
---|
1660 | if (request.aborted || request.completed) {
|
---|
1661 | return
|
---|
1662 | }
|
---|
1663 |
|
---|
1664 | errorRequest(client, request, err || new RequestAbortedError())
|
---|
1665 | })
|
---|
1666 | } catch (err) {
|
---|
1667 | errorRequest(client, request, err)
|
---|
1668 | }
|
---|
1669 |
|
---|
1670 | if (request.aborted) {
|
---|
1671 | return false
|
---|
1672 | }
|
---|
1673 |
|
---|
1674 | /** @type {import('node:http2').ClientHttp2Stream} */
|
---|
1675 | let stream
|
---|
1676 | const h2State = client[kHTTP2SessionState]
|
---|
1677 |
|
---|
1678 | headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
|
---|
1679 | headers[HTTP2_HEADER_METHOD] = method
|
---|
1680 |
|
---|
1681 | if (method === 'CONNECT') {
|
---|
1682 | session.ref()
|
---|
1683 | // we are already connected, streams are pending, first request
|
---|
1684 | // will create a new stream. We trigger a request to create the stream and wait until
|
---|
1685 | // `ready` event is triggered
|
---|
1686 | // We disabled endStream to allow the user to write to the stream
|
---|
1687 | stream = session.request(headers, { endStream: false, signal })
|
---|
1688 |
|
---|
1689 | if (stream.id && !stream.pending) {
|
---|
1690 | request.onUpgrade(null, null, stream)
|
---|
1691 | ++h2State.openStreams
|
---|
1692 | } else {
|
---|
1693 | stream.once('ready', () => {
|
---|
1694 | request.onUpgrade(null, null, stream)
|
---|
1695 | ++h2State.openStreams
|
---|
1696 | })
|
---|
1697 | }
|
---|
1698 |
|
---|
1699 | stream.once('close', () => {
|
---|
1700 | h2State.openStreams -= 1
|
---|
1701 | // TODO(HTTP/2): unref only if current streams count is 0
|
---|
1702 | if (h2State.openStreams === 0) session.unref()
|
---|
1703 | })
|
---|
1704 |
|
---|
1705 | return true
|
---|
1706 | }
|
---|
1707 |
|
---|
1708 | // https://tools.ietf.org/html/rfc7540#section-8.3
|
---|
1709 | // :path and :scheme headers must be omited when sending CONNECT
|
---|
1710 |
|
---|
1711 | headers[HTTP2_HEADER_PATH] = path
|
---|
1712 | headers[HTTP2_HEADER_SCHEME] = 'https'
|
---|
1713 |
|
---|
1714 | // https://tools.ietf.org/html/rfc7231#section-4.3.1
|
---|
1715 | // https://tools.ietf.org/html/rfc7231#section-4.3.2
|
---|
1716 | // https://tools.ietf.org/html/rfc7231#section-4.3.5
|
---|
1717 |
|
---|
1718 | // Sending a payload body on a request that does not
|
---|
1719 | // expect it can cause undefined behavior on some
|
---|
1720 | // servers and corrupt connection state. Do not
|
---|
1721 | // re-use the connection for further requests.
|
---|
1722 |
|
---|
1723 | const expectsPayload = (
|
---|
1724 | method === 'PUT' ||
|
---|
1725 | method === 'POST' ||
|
---|
1726 | method === 'PATCH'
|
---|
1727 | )
|
---|
1728 |
|
---|
1729 | if (body && typeof body.read === 'function') {
|
---|
1730 | // Try to read EOF in order to get length.
|
---|
1731 | body.read(0)
|
---|
1732 | }
|
---|
1733 |
|
---|
1734 | let contentLength = util.bodyLength(body)
|
---|
1735 |
|
---|
1736 | if (contentLength == null) {
|
---|
1737 | contentLength = request.contentLength
|
---|
1738 | }
|
---|
1739 |
|
---|
1740 | if (contentLength === 0 || !expectsPayload) {
|
---|
1741 | // https://tools.ietf.org/html/rfc7230#section-3.3.2
|
---|
1742 | // A user agent SHOULD NOT send a Content-Length header field when
|
---|
1743 | // the request message does not contain a payload body and the method
|
---|
1744 | // semantics do not anticipate such a body.
|
---|
1745 |
|
---|
1746 | contentLength = null
|
---|
1747 | }
|
---|
1748 |
|
---|
1749 | // https://github.com/nodejs/undici/issues/2046
|
---|
1750 | // A user agent may send a Content-Length header with 0 value, this should be allowed.
|
---|
1751 | if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
|
---|
1752 | if (client[kStrictContentLength]) {
|
---|
1753 | errorRequest(client, request, new RequestContentLengthMismatchError())
|
---|
1754 | return false
|
---|
1755 | }
|
---|
1756 |
|
---|
1757 | process.emitWarning(new RequestContentLengthMismatchError())
|
---|
1758 | }
|
---|
1759 |
|
---|
1760 | if (contentLength != null) {
|
---|
1761 | assert(body, 'no body must not have content length')
|
---|
1762 | headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
|
---|
1763 | }
|
---|
1764 |
|
---|
1765 | session.ref()
|
---|
1766 |
|
---|
1767 | const shouldEndStream = method === 'GET' || method === 'HEAD'
|
---|
1768 | if (expectContinue) {
|
---|
1769 | headers[HTTP2_HEADER_EXPECT] = '100-continue'
|
---|
1770 | stream = session.request(headers, { endStream: shouldEndStream, signal })
|
---|
1771 |
|
---|
1772 | stream.once('continue', writeBodyH2)
|
---|
1773 | } else {
|
---|
1774 | stream = session.request(headers, {
|
---|
1775 | endStream: shouldEndStream,
|
---|
1776 | signal
|
---|
1777 | })
|
---|
1778 | writeBodyH2()
|
---|
1779 | }
|
---|
1780 |
|
---|
1781 | // Increment counter as we have new several streams open
|
---|
1782 | ++h2State.openStreams
|
---|
1783 |
|
---|
1784 | stream.once('response', headers => {
|
---|
1785 | const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
|
---|
1786 |
|
---|
1787 | if (request.onHeaders(Number(statusCode), realHeaders, stream.resume.bind(stream), '') === false) {
|
---|
1788 | stream.pause()
|
---|
1789 | }
|
---|
1790 | })
|
---|
1791 |
|
---|
1792 | stream.once('end', () => {
|
---|
1793 | request.onComplete([])
|
---|
1794 | })
|
---|
1795 |
|
---|
1796 | stream.on('data', (chunk) => {
|
---|
1797 | if (request.onData(chunk) === false) {
|
---|
1798 | stream.pause()
|
---|
1799 | }
|
---|
1800 | })
|
---|
1801 |
|
---|
1802 | stream.once('close', () => {
|
---|
1803 | h2State.openStreams -= 1
|
---|
1804 | // TODO(HTTP/2): unref only if current streams count is 0
|
---|
1805 | if (h2State.openStreams === 0) {
|
---|
1806 | session.unref()
|
---|
1807 | }
|
---|
1808 | })
|
---|
1809 |
|
---|
1810 | stream.once('error', function (err) {
|
---|
1811 | if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
|
---|
1812 | h2State.streams -= 1
|
---|
1813 | util.destroy(stream, err)
|
---|
1814 | }
|
---|
1815 | })
|
---|
1816 |
|
---|
1817 | stream.once('frameError', (type, code) => {
|
---|
1818 | const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
|
---|
1819 | errorRequest(client, request, err)
|
---|
1820 |
|
---|
1821 | if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
|
---|
1822 | h2State.streams -= 1
|
---|
1823 | util.destroy(stream, err)
|
---|
1824 | }
|
---|
1825 | })
|
---|
1826 |
|
---|
1827 | // stream.on('aborted', () => {
|
---|
1828 | // // TODO(HTTP/2): Support aborted
|
---|
1829 | // })
|
---|
1830 |
|
---|
1831 | // stream.on('timeout', () => {
|
---|
1832 | // // TODO(HTTP/2): Support timeout
|
---|
1833 | // })
|
---|
1834 |
|
---|
1835 | // stream.on('push', headers => {
|
---|
1836 | // // TODO(HTTP/2): Suppor push
|
---|
1837 | // })
|
---|
1838 |
|
---|
1839 | // stream.on('trailers', headers => {
|
---|
1840 | // // TODO(HTTP/2): Support trailers
|
---|
1841 | // })
|
---|
1842 |
|
---|
1843 | return true
|
---|
1844 |
|
---|
1845 | function writeBodyH2 () {
|
---|
1846 | /* istanbul ignore else: assertion */
|
---|
1847 | if (!body) {
|
---|
1848 | request.onRequestSent()
|
---|
1849 | } else if (util.isBuffer(body)) {
|
---|
1850 | assert(contentLength === body.byteLength, 'buffer body must have content length')
|
---|
1851 | stream.cork()
|
---|
1852 | stream.write(body)
|
---|
1853 | stream.uncork()
|
---|
1854 | stream.end()
|
---|
1855 | request.onBodySent(body)
|
---|
1856 | request.onRequestSent()
|
---|
1857 | } else if (util.isBlobLike(body)) {
|
---|
1858 | if (typeof body.stream === 'function') {
|
---|
1859 | writeIterable({
|
---|
1860 | client,
|
---|
1861 | request,
|
---|
1862 | contentLength,
|
---|
1863 | h2stream: stream,
|
---|
1864 | expectsPayload,
|
---|
1865 | body: body.stream(),
|
---|
1866 | socket: client[kSocket],
|
---|
1867 | header: ''
|
---|
1868 | })
|
---|
1869 | } else {
|
---|
1870 | writeBlob({
|
---|
1871 | body,
|
---|
1872 | client,
|
---|
1873 | request,
|
---|
1874 | contentLength,
|
---|
1875 | expectsPayload,
|
---|
1876 | h2stream: stream,
|
---|
1877 | header: '',
|
---|
1878 | socket: client[kSocket]
|
---|
1879 | })
|
---|
1880 | }
|
---|
1881 | } else if (util.isStream(body)) {
|
---|
1882 | writeStream({
|
---|
1883 | body,
|
---|
1884 | client,
|
---|
1885 | request,
|
---|
1886 | contentLength,
|
---|
1887 | expectsPayload,
|
---|
1888 | socket: client[kSocket],
|
---|
1889 | h2stream: stream,
|
---|
1890 | header: ''
|
---|
1891 | })
|
---|
1892 | } else if (util.isIterable(body)) {
|
---|
1893 | writeIterable({
|
---|
1894 | body,
|
---|
1895 | client,
|
---|
1896 | request,
|
---|
1897 | contentLength,
|
---|
1898 | expectsPayload,
|
---|
1899 | header: '',
|
---|
1900 | h2stream: stream,
|
---|
1901 | socket: client[kSocket]
|
---|
1902 | })
|
---|
1903 | } else {
|
---|
1904 | assert(false)
|
---|
1905 | }
|
---|
1906 | }
|
---|
1907 | }
|
---|
1908 |
|
---|
1909 | function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
---|
1910 | assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
|
---|
1911 |
|
---|
1912 | if (client[kHTTPConnVersion] === 'h2') {
|
---|
1913 | // For HTTP/2, is enough to pipe the stream
|
---|
1914 | const pipe = pipeline(
|
---|
1915 | body,
|
---|
1916 | h2stream,
|
---|
1917 | (err) => {
|
---|
1918 | if (err) {
|
---|
1919 | util.destroy(body, err)
|
---|
1920 | util.destroy(h2stream, err)
|
---|
1921 | } else {
|
---|
1922 | request.onRequestSent()
|
---|
1923 | }
|
---|
1924 | }
|
---|
1925 | )
|
---|
1926 |
|
---|
1927 | pipe.on('data', onPipeData)
|
---|
1928 | pipe.once('end', () => {
|
---|
1929 | pipe.removeListener('data', onPipeData)
|
---|
1930 | util.destroy(pipe)
|
---|
1931 | })
|
---|
1932 |
|
---|
1933 | function onPipeData (chunk) {
|
---|
1934 | request.onBodySent(chunk)
|
---|
1935 | }
|
---|
1936 |
|
---|
1937 | return
|
---|
1938 | }
|
---|
1939 |
|
---|
1940 | let finished = false
|
---|
1941 |
|
---|
1942 | const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
|
---|
1943 |
|
---|
1944 | const onData = function (chunk) {
|
---|
1945 | if (finished) {
|
---|
1946 | return
|
---|
1947 | }
|
---|
1948 |
|
---|
1949 | try {
|
---|
1950 | if (!writer.write(chunk) && this.pause) {
|
---|
1951 | this.pause()
|
---|
1952 | }
|
---|
1953 | } catch (err) {
|
---|
1954 | util.destroy(this, err)
|
---|
1955 | }
|
---|
1956 | }
|
---|
1957 | const onDrain = function () {
|
---|
1958 | if (finished) {
|
---|
1959 | return
|
---|
1960 | }
|
---|
1961 |
|
---|
1962 | if (body.resume) {
|
---|
1963 | body.resume()
|
---|
1964 | }
|
---|
1965 | }
|
---|
1966 | const onAbort = function () {
|
---|
1967 | if (finished) {
|
---|
1968 | return
|
---|
1969 | }
|
---|
1970 | const err = new RequestAbortedError()
|
---|
1971 | queueMicrotask(() => onFinished(err))
|
---|
1972 | }
|
---|
1973 | const onFinished = function (err) {
|
---|
1974 | if (finished) {
|
---|
1975 | return
|
---|
1976 | }
|
---|
1977 |
|
---|
1978 | finished = true
|
---|
1979 |
|
---|
1980 | assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
|
---|
1981 |
|
---|
1982 | socket
|
---|
1983 | .off('drain', onDrain)
|
---|
1984 | .off('error', onFinished)
|
---|
1985 |
|
---|
1986 | body
|
---|
1987 | .removeListener('data', onData)
|
---|
1988 | .removeListener('end', onFinished)
|
---|
1989 | .removeListener('error', onFinished)
|
---|
1990 | .removeListener('close', onAbort)
|
---|
1991 |
|
---|
1992 | if (!err) {
|
---|
1993 | try {
|
---|
1994 | writer.end()
|
---|
1995 | } catch (er) {
|
---|
1996 | err = er
|
---|
1997 | }
|
---|
1998 | }
|
---|
1999 |
|
---|
2000 | writer.destroy(err)
|
---|
2001 |
|
---|
2002 | if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
|
---|
2003 | util.destroy(body, err)
|
---|
2004 | } else {
|
---|
2005 | util.destroy(body)
|
---|
2006 | }
|
---|
2007 | }
|
---|
2008 |
|
---|
2009 | body
|
---|
2010 | .on('data', onData)
|
---|
2011 | .on('end', onFinished)
|
---|
2012 | .on('error', onFinished)
|
---|
2013 | .on('close', onAbort)
|
---|
2014 |
|
---|
2015 | if (body.resume) {
|
---|
2016 | body.resume()
|
---|
2017 | }
|
---|
2018 |
|
---|
2019 | socket
|
---|
2020 | .on('drain', onDrain)
|
---|
2021 | .on('error', onFinished)
|
---|
2022 | }
|
---|
2023 |
|
---|
2024 | async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
---|
2025 | assert(contentLength === body.size, 'blob body must have content length')
|
---|
2026 |
|
---|
2027 | const isH2 = client[kHTTPConnVersion] === 'h2'
|
---|
2028 | try {
|
---|
2029 | if (contentLength != null && contentLength !== body.size) {
|
---|
2030 | throw new RequestContentLengthMismatchError()
|
---|
2031 | }
|
---|
2032 |
|
---|
2033 | const buffer = Buffer.from(await body.arrayBuffer())
|
---|
2034 |
|
---|
2035 | if (isH2) {
|
---|
2036 | h2stream.cork()
|
---|
2037 | h2stream.write(buffer)
|
---|
2038 | h2stream.uncork()
|
---|
2039 | } else {
|
---|
2040 | socket.cork()
|
---|
2041 | socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
---|
2042 | socket.write(buffer)
|
---|
2043 | socket.uncork()
|
---|
2044 | }
|
---|
2045 |
|
---|
2046 | request.onBodySent(buffer)
|
---|
2047 | request.onRequestSent()
|
---|
2048 |
|
---|
2049 | if (!expectsPayload) {
|
---|
2050 | socket[kReset] = true
|
---|
2051 | }
|
---|
2052 |
|
---|
2053 | resume(client)
|
---|
2054 | } catch (err) {
|
---|
2055 | util.destroy(isH2 ? h2stream : socket, err)
|
---|
2056 | }
|
---|
2057 | }
|
---|
2058 |
|
---|
2059 | async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
---|
2060 | assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
|
---|
2061 |
|
---|
2062 | let callback = null
|
---|
2063 | function onDrain () {
|
---|
2064 | if (callback) {
|
---|
2065 | const cb = callback
|
---|
2066 | callback = null
|
---|
2067 | cb()
|
---|
2068 | }
|
---|
2069 | }
|
---|
2070 |
|
---|
2071 | const waitForDrain = () => new Promise((resolve, reject) => {
|
---|
2072 | assert(callback === null)
|
---|
2073 |
|
---|
2074 | if (socket[kError]) {
|
---|
2075 | reject(socket[kError])
|
---|
2076 | } else {
|
---|
2077 | callback = resolve
|
---|
2078 | }
|
---|
2079 | })
|
---|
2080 |
|
---|
2081 | if (client[kHTTPConnVersion] === 'h2') {
|
---|
2082 | h2stream
|
---|
2083 | .on('close', onDrain)
|
---|
2084 | .on('drain', onDrain)
|
---|
2085 |
|
---|
2086 | try {
|
---|
2087 | // It's up to the user to somehow abort the async iterable.
|
---|
2088 | for await (const chunk of body) {
|
---|
2089 | if (socket[kError]) {
|
---|
2090 | throw socket[kError]
|
---|
2091 | }
|
---|
2092 |
|
---|
2093 | const res = h2stream.write(chunk)
|
---|
2094 | request.onBodySent(chunk)
|
---|
2095 | if (!res) {
|
---|
2096 | await waitForDrain()
|
---|
2097 | }
|
---|
2098 | }
|
---|
2099 | } catch (err) {
|
---|
2100 | h2stream.destroy(err)
|
---|
2101 | } finally {
|
---|
2102 | request.onRequestSent()
|
---|
2103 | h2stream.end()
|
---|
2104 | h2stream
|
---|
2105 | .off('close', onDrain)
|
---|
2106 | .off('drain', onDrain)
|
---|
2107 | }
|
---|
2108 |
|
---|
2109 | return
|
---|
2110 | }
|
---|
2111 |
|
---|
2112 | socket
|
---|
2113 | .on('close', onDrain)
|
---|
2114 | .on('drain', onDrain)
|
---|
2115 |
|
---|
2116 | const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
|
---|
2117 | try {
|
---|
2118 | // It's up to the user to somehow abort the async iterable.
|
---|
2119 | for await (const chunk of body) {
|
---|
2120 | if (socket[kError]) {
|
---|
2121 | throw socket[kError]
|
---|
2122 | }
|
---|
2123 |
|
---|
2124 | if (!writer.write(chunk)) {
|
---|
2125 | await waitForDrain()
|
---|
2126 | }
|
---|
2127 | }
|
---|
2128 |
|
---|
2129 | writer.end()
|
---|
2130 | } catch (err) {
|
---|
2131 | writer.destroy(err)
|
---|
2132 | } finally {
|
---|
2133 | socket
|
---|
2134 | .off('close', onDrain)
|
---|
2135 | .off('drain', onDrain)
|
---|
2136 | }
|
---|
2137 | }
|
---|
2138 |
|
---|
2139 | class AsyncWriter {
|
---|
2140 | constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
|
---|
2141 | this.socket = socket
|
---|
2142 | this.request = request
|
---|
2143 | this.contentLength = contentLength
|
---|
2144 | this.client = client
|
---|
2145 | this.bytesWritten = 0
|
---|
2146 | this.expectsPayload = expectsPayload
|
---|
2147 | this.header = header
|
---|
2148 |
|
---|
2149 | socket[kWriting] = true
|
---|
2150 | }
|
---|
2151 |
|
---|
2152 | write (chunk) {
|
---|
2153 | const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
|
---|
2154 |
|
---|
2155 | if (socket[kError]) {
|
---|
2156 | throw socket[kError]
|
---|
2157 | }
|
---|
2158 |
|
---|
2159 | if (socket.destroyed) {
|
---|
2160 | return false
|
---|
2161 | }
|
---|
2162 |
|
---|
2163 | const len = Buffer.byteLength(chunk)
|
---|
2164 | if (!len) {
|
---|
2165 | return true
|
---|
2166 | }
|
---|
2167 |
|
---|
2168 | // We should defer writing chunks.
|
---|
2169 | if (contentLength !== null && bytesWritten + len > contentLength) {
|
---|
2170 | if (client[kStrictContentLength]) {
|
---|
2171 | throw new RequestContentLengthMismatchError()
|
---|
2172 | }
|
---|
2173 |
|
---|
2174 | process.emitWarning(new RequestContentLengthMismatchError())
|
---|
2175 | }
|
---|
2176 |
|
---|
2177 | socket.cork()
|
---|
2178 |
|
---|
2179 | if (bytesWritten === 0) {
|
---|
2180 | if (!expectsPayload) {
|
---|
2181 | socket[kReset] = true
|
---|
2182 | }
|
---|
2183 |
|
---|
2184 | if (contentLength === null) {
|
---|
2185 | socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
|
---|
2186 | } else {
|
---|
2187 | socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
---|
2188 | }
|
---|
2189 | }
|
---|
2190 |
|
---|
2191 | if (contentLength === null) {
|
---|
2192 | socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
|
---|
2193 | }
|
---|
2194 |
|
---|
2195 | this.bytesWritten += len
|
---|
2196 |
|
---|
2197 | const ret = socket.write(chunk)
|
---|
2198 |
|
---|
2199 | socket.uncork()
|
---|
2200 |
|
---|
2201 | request.onBodySent(chunk)
|
---|
2202 |
|
---|
2203 | if (!ret) {
|
---|
2204 | if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
|
---|
2205 | // istanbul ignore else: only for jest
|
---|
2206 | if (socket[kParser].timeout.refresh) {
|
---|
2207 | socket[kParser].timeout.refresh()
|
---|
2208 | }
|
---|
2209 | }
|
---|
2210 | }
|
---|
2211 |
|
---|
2212 | return ret
|
---|
2213 | }
|
---|
2214 |
|
---|
2215 | end () {
|
---|
2216 | const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
|
---|
2217 | request.onRequestSent()
|
---|
2218 |
|
---|
2219 | socket[kWriting] = false
|
---|
2220 |
|
---|
2221 | if (socket[kError]) {
|
---|
2222 | throw socket[kError]
|
---|
2223 | }
|
---|
2224 |
|
---|
2225 | if (socket.destroyed) {
|
---|
2226 | return
|
---|
2227 | }
|
---|
2228 |
|
---|
2229 | if (bytesWritten === 0) {
|
---|
2230 | if (expectsPayload) {
|
---|
2231 | // https://tools.ietf.org/html/rfc7230#section-3.3.2
|
---|
2232 | // A user agent SHOULD send a Content-Length in a request message when
|
---|
2233 | // no Transfer-Encoding is sent and the request method defines a meaning
|
---|
2234 | // for an enclosed payload body.
|
---|
2235 |
|
---|
2236 | socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
|
---|
2237 | } else {
|
---|
2238 | socket.write(`${header}\r\n`, 'latin1')
|
---|
2239 | }
|
---|
2240 | } else if (contentLength === null) {
|
---|
2241 | socket.write('\r\n0\r\n\r\n', 'latin1')
|
---|
2242 | }
|
---|
2243 |
|
---|
2244 | if (contentLength !== null && bytesWritten !== contentLength) {
|
---|
2245 | if (client[kStrictContentLength]) {
|
---|
2246 | throw new RequestContentLengthMismatchError()
|
---|
2247 | } else {
|
---|
2248 | process.emitWarning(new RequestContentLengthMismatchError())
|
---|
2249 | }
|
---|
2250 | }
|
---|
2251 |
|
---|
2252 | if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
|
---|
2253 | // istanbul ignore else: only for jest
|
---|
2254 | if (socket[kParser].timeout.refresh) {
|
---|
2255 | socket[kParser].timeout.refresh()
|
---|
2256 | }
|
---|
2257 | }
|
---|
2258 |
|
---|
2259 | resume(client)
|
---|
2260 | }
|
---|
2261 |
|
---|
2262 | destroy (err) {
|
---|
2263 | const { socket, client } = this
|
---|
2264 |
|
---|
2265 | socket[kWriting] = false
|
---|
2266 |
|
---|
2267 | if (err) {
|
---|
2268 | assert(client[kRunning] <= 1, 'pipeline should only contain this request')
|
---|
2269 | util.destroy(socket, err)
|
---|
2270 | }
|
---|
2271 | }
|
---|
2272 | }
|
---|
2273 |
|
---|
2274 | function errorRequest (client, request, err) {
|
---|
2275 | try {
|
---|
2276 | request.onError(err)
|
---|
2277 | assert(request.aborted)
|
---|
2278 | } catch (err) {
|
---|
2279 | client.emit('error', err)
|
---|
2280 | }
|
---|
2281 | }
|
---|
2282 |
|
---|
2283 | module.exports = Client
|
---|