source: node_modules/undici/lib/client.js

main
Last change on this file was d24f17c, checked in by Aleksandar Panovski <apano77@…>, 15 months ago

Initial commit

  • Property mode set to 100644
File size: 60.6 KB
Line 
1// @ts-check
2
3'use strict'
4
5/* global WebAssembly */
6
7const assert = require('assert')
8const net = require('net')
9const http = require('http')
10const { pipeline } = require('stream')
11const util = require('./core/util')
12const timers = require('./timers')
13const Request = require('./core/request')
14const DispatcherBase = require('./dispatcher-base')
15const {
16 RequestContentLengthMismatchError,
17 ResponseContentLengthMismatchError,
18 InvalidArgumentError,
19 RequestAbortedError,
20 HeadersTimeoutError,
21 HeadersOverflowError,
22 SocketError,
23 InformationalError,
24 BodyTimeoutError,
25 HTTPParserError,
26 ResponseExceededMaxSizeError,
27 ClientDestroyedError
28} = require('./core/errors')
29const buildConnector = require('./core/connect')
30const {
31 kUrl,
32 kReset,
33 kServerName,
34 kClient,
35 kBusy,
36 kParser,
37 kConnect,
38 kBlocking,
39 kResuming,
40 kRunning,
41 kPending,
42 kSize,
43 kWriting,
44 kQueue,
45 kConnected,
46 kConnecting,
47 kNeedDrain,
48 kNoRef,
49 kKeepAliveDefaultTimeout,
50 kHostHeader,
51 kPendingIdx,
52 kRunningIdx,
53 kError,
54 kPipelining,
55 kSocket,
56 kKeepAliveTimeoutValue,
57 kMaxHeadersSize,
58 kKeepAliveMaxTimeout,
59 kKeepAliveTimeoutThreshold,
60 kHeadersTimeout,
61 kBodyTimeout,
62 kStrictContentLength,
63 kConnector,
64 kMaxRedirections,
65 kMaxRequests,
66 kCounter,
67 kClose,
68 kDestroy,
69 kDispatch,
70 kInterceptors,
71 kLocalAddress,
72 kMaxResponseSize,
73 kHTTPConnVersion,
74 // HTTP2
75 kHost,
76 kHTTP2Session,
77 kHTTP2SessionState,
78 kHTTP2BuildRequest,
79 kHTTP2CopyHeaders,
80 kHTTP1BuildRequest
81} = require('./core/symbols')
82
83/** @type {import('http2')} */
84let http2
85try {
86 http2 = require('http2')
87} catch {
88 // @ts-ignore
89 http2 = { constants: {} }
90}
91
92const {
93 constants: {
94 HTTP2_HEADER_AUTHORITY,
95 HTTP2_HEADER_METHOD,
96 HTTP2_HEADER_PATH,
97 HTTP2_HEADER_SCHEME,
98 HTTP2_HEADER_CONTENT_LENGTH,
99 HTTP2_HEADER_EXPECT,
100 HTTP2_HEADER_STATUS
101 }
102} = http2
103
104// Experimental
105let h2ExperimentalWarned = false
106
107const FastBuffer = Buffer[Symbol.species]
108
109const kClosedResolve = Symbol('kClosedResolve')
110
111const channels = {}
112
113try {
114 const diagnosticsChannel = require('diagnostics_channel')
115 channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders')
116 channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect')
117 channels.connectError = diagnosticsChannel.channel('undici:client:connectError')
118 channels.connected = diagnosticsChannel.channel('undici:client:connected')
119} catch {
120 channels.sendHeaders = { hasSubscribers: false }
121 channels.beforeConnect = { hasSubscribers: false }
122 channels.connectError = { hasSubscribers: false }
123 channels.connected = { hasSubscribers: false }
124}
125
126/**
127 * @type {import('../types/client').default}
128 */
129class Client extends DispatcherBase {
130 /**
131 *
132 * @param {string|URL} url
133 * @param {import('../types/client').Client.Options} options
134 */
135 constructor (url, {
136 interceptors,
137 maxHeaderSize,
138 headersTimeout,
139 socketTimeout,
140 requestTimeout,
141 connectTimeout,
142 bodyTimeout,
143 idleTimeout,
144 keepAlive,
145 keepAliveTimeout,
146 maxKeepAliveTimeout,
147 keepAliveMaxTimeout,
148 keepAliveTimeoutThreshold,
149 socketPath,
150 pipelining,
151 tls,
152 strictContentLength,
153 maxCachedSessions,
154 maxRedirections,
155 connect,
156 maxRequestsPerClient,
157 localAddress,
158 maxResponseSize,
159 autoSelectFamily,
160 autoSelectFamilyAttemptTimeout,
161 // h2
162 allowH2,
163 maxConcurrentStreams
164 } = {}) {
165 super()
166
167 if (keepAlive !== undefined) {
168 throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
169 }
170
171 if (socketTimeout !== undefined) {
172 throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
173 }
174
175 if (requestTimeout !== undefined) {
176 throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
177 }
178
179 if (idleTimeout !== undefined) {
180 throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
181 }
182
183 if (maxKeepAliveTimeout !== undefined) {
184 throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
185 }
186
187 if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
188 throw new InvalidArgumentError('invalid maxHeaderSize')
189 }
190
191 if (socketPath != null && typeof socketPath !== 'string') {
192 throw new InvalidArgumentError('invalid socketPath')
193 }
194
195 if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
196 throw new InvalidArgumentError('invalid connectTimeout')
197 }
198
199 if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
200 throw new InvalidArgumentError('invalid keepAliveTimeout')
201 }
202
203 if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
204 throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
205 }
206
207 if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
208 throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
209 }
210
211 if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
212 throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
213 }
214
215 if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
216 throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
217 }
218
219 if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
220 throw new InvalidArgumentError('connect must be a function or an object')
221 }
222
223 if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
224 throw new InvalidArgumentError('maxRedirections must be a positive number')
225 }
226
227 if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
228 throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
229 }
230
231 if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
232 throw new InvalidArgumentError('localAddress must be valid string IP address')
233 }
234
235 if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
236 throw new InvalidArgumentError('maxResponseSize must be a positive number')
237 }
238
239 if (
240 autoSelectFamilyAttemptTimeout != null &&
241 (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
242 ) {
243 throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
244 }
245
246 // h2
247 if (allowH2 != null && typeof allowH2 !== 'boolean') {
248 throw new InvalidArgumentError('allowH2 must be a valid boolean value')
249 }
250
251 if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
252 throw new InvalidArgumentError('maxConcurrentStreams must be a possitive integer, greater than 0')
253 }
254
255 if (typeof connect !== 'function') {
256 connect = buildConnector({
257 ...tls,
258 maxCachedSessions,
259 allowH2,
260 socketPath,
261 timeout: connectTimeout,
262 ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
263 ...connect
264 })
265 }
266
267 this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client)
268 ? interceptors.Client
269 : [createRedirectInterceptor({ maxRedirections })]
270 this[kUrl] = util.parseOrigin(url)
271 this[kConnector] = connect
272 this[kSocket] = null
273 this[kPipelining] = pipelining != null ? pipelining : 1
274 this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
275 this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
276 this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
277 this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold
278 this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
279 this[kServerName] = null
280 this[kLocalAddress] = localAddress != null ? localAddress : null
281 this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
282 this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
283 this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
284 this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
285 this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
286 this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
287 this[kMaxRedirections] = maxRedirections
288 this[kMaxRequests] = maxRequestsPerClient
289 this[kClosedResolve] = null
290 this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
291 this[kHTTPConnVersion] = 'h1'
292
293 // HTTP/2
294 this[kHTTP2Session] = null
295 this[kHTTP2SessionState] = !allowH2
296 ? null
297 : {
298 // streams: null, // Fixed queue of streams - For future support of `push`
299 openStreams: 0, // Keep track of them to decide wether or not unref the session
300 maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
301 }
302 this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`
303
304 // kQueue is built up of 3 sections separated by
305 // the kRunningIdx and kPendingIdx indices.
306 // | complete | running | pending |
307 // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
308 // kRunningIdx points to the first running element.
309 // kPendingIdx points to the first pending element.
310 // This implements a fast queue with an amortized
311 // time of O(1).
312
313 this[kQueue] = []
314 this[kRunningIdx] = 0
315 this[kPendingIdx] = 0
316 }
317
318 get pipelining () {
319 return this[kPipelining]
320 }
321
322 set pipelining (value) {
323 this[kPipelining] = value
324 resume(this, true)
325 }
326
327 get [kPending] () {
328 return this[kQueue].length - this[kPendingIdx]
329 }
330
331 get [kRunning] () {
332 return this[kPendingIdx] - this[kRunningIdx]
333 }
334
335 get [kSize] () {
336 return this[kQueue].length - this[kRunningIdx]
337 }
338
339 get [kConnected] () {
340 return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
341 }
342
343 get [kBusy] () {
344 const socket = this[kSocket]
345 return (
346 (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
347 (this[kSize] >= (this[kPipelining] || 1)) ||
348 this[kPending] > 0
349 )
350 }
351
352 /* istanbul ignore: only used for test */
353 [kConnect] (cb) {
354 connect(this)
355 this.once('connect', cb)
356 }
357
358 [kDispatch] (opts, handler) {
359 const origin = opts.origin || this[kUrl].origin
360
361 const request = this[kHTTPConnVersion] === 'h2'
362 ? Request[kHTTP2BuildRequest](origin, opts, handler)
363 : Request[kHTTP1BuildRequest](origin, opts, handler)
364
365 this[kQueue].push(request)
366 if (this[kResuming]) {
367 // Do nothing.
368 } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
369 // Wait a tick in case stream/iterator is ended in the same tick.
370 this[kResuming] = 1
371 process.nextTick(resume, this)
372 } else {
373 resume(this, true)
374 }
375
376 if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
377 this[kNeedDrain] = 2
378 }
379
380 return this[kNeedDrain] < 2
381 }
382
383 async [kClose] () {
384 // TODO: for H2 we need to gracefully flush the remaining enqueued
385 // request and close each stream.
386 return new Promise((resolve) => {
387 if (!this[kSize]) {
388 resolve(null)
389 } else {
390 this[kClosedResolve] = resolve
391 }
392 })
393 }
394
395 async [kDestroy] (err) {
396 return new Promise((resolve) => {
397 const requests = this[kQueue].splice(this[kPendingIdx])
398 for (let i = 0; i < requests.length; i++) {
399 const request = requests[i]
400 errorRequest(this, request, err)
401 }
402
403 const callback = () => {
404 if (this[kClosedResolve]) {
405 // TODO (fix): Should we error here with ClientDestroyedError?
406 this[kClosedResolve]()
407 this[kClosedResolve] = null
408 }
409 resolve()
410 }
411
412 if (this[kHTTP2Session] != null) {
413 util.destroy(this[kHTTP2Session], err)
414 this[kHTTP2Session] = null
415 this[kHTTP2SessionState] = null
416 }
417
418 if (!this[kSocket]) {
419 queueMicrotask(callback)
420 } else {
421 util.destroy(this[kSocket].on('close', callback), err)
422 }
423
424 resume(this)
425 })
426 }
427}
428
429function onHttp2SessionError (err) {
430 assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
431
432 this[kSocket][kError] = err
433
434 onError(this[kClient], err)
435}
436
437function onHttp2FrameError (type, code, id) {
438 const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
439
440 if (id === 0) {
441 this[kSocket][kError] = err
442 onError(this[kClient], err)
443 }
444}
445
446function onHttp2SessionEnd () {
447 util.destroy(this, new SocketError('other side closed'))
448 util.destroy(this[kSocket], new SocketError('other side closed'))
449}
450
451function onHTTP2GoAway (code) {
452 const client = this[kClient]
453 const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
454 client[kSocket] = null
455 client[kHTTP2Session] = null
456
457 if (client.destroyed) {
458 assert(this[kPending] === 0)
459
460 // Fail entire queue.
461 const requests = client[kQueue].splice(client[kRunningIdx])
462 for (let i = 0; i < requests.length; i++) {
463 const request = requests[i]
464 errorRequest(this, request, err)
465 }
466 } else if (client[kRunning] > 0) {
467 // Fail head of pipeline.
468 const request = client[kQueue][client[kRunningIdx]]
469 client[kQueue][client[kRunningIdx]++] = null
470
471 errorRequest(client, request, err)
472 }
473
474 client[kPendingIdx] = client[kRunningIdx]
475
476 assert(client[kRunning] === 0)
477
478 client.emit('disconnect',
479 client[kUrl],
480 [client],
481 err
482 )
483
484 resume(client)
485}
486
487const constants = require('./llhttp/constants')
488const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
489const EMPTY_BUF = Buffer.alloc(0)
490
491async function lazyllhttp () {
492 const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined
493
494 let mod
495 try {
496 mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64'))
497 } catch (e) {
498 /* istanbul ignore next */
499
500 // We could check if the error was caused by the simd option not
501 // being enabled, but the occurring of this other error
502 // * https://github.com/emscripten-core/emscripten/issues/11495
503 // got me to remove that check to avoid breaking Node 12.
504 mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64'))
505 }
506
507 return await WebAssembly.instantiate(mod, {
508 env: {
509 /* eslint-disable camelcase */
510
511 wasm_on_url: (p, at, len) => {
512 /* istanbul ignore next */
513 return 0
514 },
515 wasm_on_status: (p, at, len) => {
516 assert.strictEqual(currentParser.ptr, p)
517 const start = at - currentBufferPtr + currentBufferRef.byteOffset
518 return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
519 },
520 wasm_on_message_begin: (p) => {
521 assert.strictEqual(currentParser.ptr, p)
522 return currentParser.onMessageBegin() || 0
523 },
524 wasm_on_header_field: (p, at, len) => {
525 assert.strictEqual(currentParser.ptr, p)
526 const start = at - currentBufferPtr + currentBufferRef.byteOffset
527 return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
528 },
529 wasm_on_header_value: (p, at, len) => {
530 assert.strictEqual(currentParser.ptr, p)
531 const start = at - currentBufferPtr + currentBufferRef.byteOffset
532 return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
533 },
534 wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
535 assert.strictEqual(currentParser.ptr, p)
536 return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0
537 },
538 wasm_on_body: (p, at, len) => {
539 assert.strictEqual(currentParser.ptr, p)
540 const start = at - currentBufferPtr + currentBufferRef.byteOffset
541 return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
542 },
543 wasm_on_message_complete: (p) => {
544 assert.strictEqual(currentParser.ptr, p)
545 return currentParser.onMessageComplete() || 0
546 }
547
548 /* eslint-enable camelcase */
549 }
550 })
551}
552
553let llhttpInstance = null
554let llhttpPromise = lazyllhttp()
555llhttpPromise.catch()
556
557let currentParser = null
558let currentBufferRef = null
559let currentBufferSize = 0
560let currentBufferPtr = null
561
562const TIMEOUT_HEADERS = 1
563const TIMEOUT_BODY = 2
564const TIMEOUT_IDLE = 3
565
566class Parser {
567 constructor (client, socket, { exports }) {
568 assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
569
570 this.llhttp = exports
571 this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
572 this.client = client
573 this.socket = socket
574 this.timeout = null
575 this.timeoutValue = null
576 this.timeoutType = null
577 this.statusCode = null
578 this.statusText = ''
579 this.upgrade = false
580 this.headers = []
581 this.headersSize = 0
582 this.headersMaxSize = client[kMaxHeadersSize]
583 this.shouldKeepAlive = false
584 this.paused = false
585 this.resume = this.resume.bind(this)
586
587 this.bytesRead = 0
588
589 this.keepAlive = ''
590 this.contentLength = ''
591 this.connection = ''
592 this.maxResponseSize = client[kMaxResponseSize]
593 }
594
595 setTimeout (value, type) {
596 this.timeoutType = type
597 if (value !== this.timeoutValue) {
598 timers.clearTimeout(this.timeout)
599 if (value) {
600 this.timeout = timers.setTimeout(onParserTimeout, value, this)
601 // istanbul ignore else: only for jest
602 if (this.timeout.unref) {
603 this.timeout.unref()
604 }
605 } else {
606 this.timeout = null
607 }
608 this.timeoutValue = value
609 } else if (this.timeout) {
610 // istanbul ignore else: only for jest
611 if (this.timeout.refresh) {
612 this.timeout.refresh()
613 }
614 }
615 }
616
617 resume () {
618 if (this.socket.destroyed || !this.paused) {
619 return
620 }
621
622 assert(this.ptr != null)
623 assert(currentParser == null)
624
625 this.llhttp.llhttp_resume(this.ptr)
626
627 assert(this.timeoutType === TIMEOUT_BODY)
628 if (this.timeout) {
629 // istanbul ignore else: only for jest
630 if (this.timeout.refresh) {
631 this.timeout.refresh()
632 }
633 }
634
635 this.paused = false
636 this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
637 this.readMore()
638 }
639
640 readMore () {
641 while (!this.paused && this.ptr) {
642 const chunk = this.socket.read()
643 if (chunk === null) {
644 break
645 }
646 this.execute(chunk)
647 }
648 }
649
650 execute (data) {
651 assert(this.ptr != null)
652 assert(currentParser == null)
653 assert(!this.paused)
654
655 const { socket, llhttp } = this
656
657 if (data.length > currentBufferSize) {
658 if (currentBufferPtr) {
659 llhttp.free(currentBufferPtr)
660 }
661 currentBufferSize = Math.ceil(data.length / 4096) * 4096
662 currentBufferPtr = llhttp.malloc(currentBufferSize)
663 }
664
665 new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data)
666
667 // Call `execute` on the wasm parser.
668 // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
669 // and finally the length of bytes to parse.
670 // The return value is an error code or `constants.ERROR.OK`.
671 try {
672 let ret
673
674 try {
675 currentBufferRef = data
676 currentParser = this
677 ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length)
678 /* eslint-disable-next-line no-useless-catch */
679 } catch (err) {
680 /* istanbul ignore next: difficult to make a test case for */
681 throw err
682 } finally {
683 currentParser = null
684 currentBufferRef = null
685 }
686
687 const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr
688
689 if (ret === constants.ERROR.PAUSED_UPGRADE) {
690 this.onUpgrade(data.slice(offset))
691 } else if (ret === constants.ERROR.PAUSED) {
692 this.paused = true
693 socket.unshift(data.slice(offset))
694 } else if (ret !== constants.ERROR.OK) {
695 const ptr = llhttp.llhttp_get_error_reason(this.ptr)
696 let message = ''
697 /* istanbul ignore else: difficult to make a test case for */
698 if (ptr) {
699 const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
700 message =
701 'Response does not match the HTTP/1.1 protocol (' +
702 Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
703 ')'
704 }
705 throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
706 }
707 } catch (err) {
708 util.destroy(socket, err)
709 }
710 }
711
712 destroy () {
713 assert(this.ptr != null)
714 assert(currentParser == null)
715
716 this.llhttp.llhttp_free(this.ptr)
717 this.ptr = null
718
719 timers.clearTimeout(this.timeout)
720 this.timeout = null
721 this.timeoutValue = null
722 this.timeoutType = null
723
724 this.paused = false
725 }
726
727 onStatus (buf) {
728 this.statusText = buf.toString()
729 }
730
731 onMessageBegin () {
732 const { socket, client } = this
733
734 /* istanbul ignore next: difficult to make a test case for */
735 if (socket.destroyed) {
736 return -1
737 }
738
739 const request = client[kQueue][client[kRunningIdx]]
740 if (!request) {
741 return -1
742 }
743 }
744
745 onHeaderField (buf) {
746 const len = this.headers.length
747
748 if ((len & 1) === 0) {
749 this.headers.push(buf)
750 } else {
751 this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
752 }
753
754 this.trackHeader(buf.length)
755 }
756
757 onHeaderValue (buf) {
758 let len = this.headers.length
759
760 if ((len & 1) === 1) {
761 this.headers.push(buf)
762 len += 1
763 } else {
764 this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
765 }
766
767 const key = this.headers[len - 2]
768 if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') {
769 this.keepAlive += buf.toString()
770 } else if (key.length === 10 && key.toString().toLowerCase() === 'connection') {
771 this.connection += buf.toString()
772 } else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') {
773 this.contentLength += buf.toString()
774 }
775
776 this.trackHeader(buf.length)
777 }
778
779 trackHeader (len) {
780 this.headersSize += len
781 if (this.headersSize >= this.headersMaxSize) {
782 util.destroy(this.socket, new HeadersOverflowError())
783 }
784 }
785
786 onUpgrade (head) {
787 const { upgrade, client, socket, headers, statusCode } = this
788
789 assert(upgrade)
790
791 const request = client[kQueue][client[kRunningIdx]]
792 assert(request)
793
794 assert(!socket.destroyed)
795 assert(socket === client[kSocket])
796 assert(!this.paused)
797 assert(request.upgrade || request.method === 'CONNECT')
798
799 this.statusCode = null
800 this.statusText = ''
801 this.shouldKeepAlive = null
802
803 assert(this.headers.length % 2 === 0)
804 this.headers = []
805 this.headersSize = 0
806
807 socket.unshift(head)
808
809 socket[kParser].destroy()
810 socket[kParser] = null
811
812 socket[kClient] = null
813 socket[kError] = null
814 socket
815 .removeListener('error', onSocketError)
816 .removeListener('readable', onSocketReadable)
817 .removeListener('end', onSocketEnd)
818 .removeListener('close', onSocketClose)
819
820 client[kSocket] = null
821 client[kQueue][client[kRunningIdx]++] = null
822 client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
823
824 try {
825 request.onUpgrade(statusCode, headers, socket)
826 } catch (err) {
827 util.destroy(socket, err)
828 }
829
830 resume(client)
831 }
832
833 onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
834 const { client, socket, headers, statusText } = this
835
836 /* istanbul ignore next: difficult to make a test case for */
837 if (socket.destroyed) {
838 return -1
839 }
840
841 const request = client[kQueue][client[kRunningIdx]]
842
843 /* istanbul ignore next: difficult to make a test case for */
844 if (!request) {
845 return -1
846 }
847
848 assert(!this.upgrade)
849 assert(this.statusCode < 200)
850
851 if (statusCode === 100) {
852 util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
853 return -1
854 }
855
856 /* this can only happen if server is misbehaving */
857 if (upgrade && !request.upgrade) {
858 util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
859 return -1
860 }
861
862 assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS)
863
864 this.statusCode = statusCode
865 this.shouldKeepAlive = (
866 shouldKeepAlive ||
867 // Override llhttp value which does not allow keepAlive for HEAD.
868 (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
869 )
870
871 if (this.statusCode >= 200) {
872 const bodyTimeout = request.bodyTimeout != null
873 ? request.bodyTimeout
874 : client[kBodyTimeout]
875 this.setTimeout(bodyTimeout, TIMEOUT_BODY)
876 } else if (this.timeout) {
877 // istanbul ignore else: only for jest
878 if (this.timeout.refresh) {
879 this.timeout.refresh()
880 }
881 }
882
883 if (request.method === 'CONNECT') {
884 assert(client[kRunning] === 1)
885 this.upgrade = true
886 return 2
887 }
888
889 if (upgrade) {
890 assert(client[kRunning] === 1)
891 this.upgrade = true
892 return 2
893 }
894
895 assert(this.headers.length % 2 === 0)
896 this.headers = []
897 this.headersSize = 0
898
899 if (this.shouldKeepAlive && client[kPipelining]) {
900 const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
901
902 if (keepAliveTimeout != null) {
903 const timeout = Math.min(
904 keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
905 client[kKeepAliveMaxTimeout]
906 )
907 if (timeout <= 0) {
908 socket[kReset] = true
909 } else {
910 client[kKeepAliveTimeoutValue] = timeout
911 }
912 } else {
913 client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
914 }
915 } else {
916 // Stop more requests from being dispatched.
917 socket[kReset] = true
918 }
919
920 const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
921
922 if (request.aborted) {
923 return -1
924 }
925
926 if (request.method === 'HEAD') {
927 return 1
928 }
929
930 if (statusCode < 200) {
931 return 1
932 }
933
934 if (socket[kBlocking]) {
935 socket[kBlocking] = false
936 resume(client)
937 }
938
939 return pause ? constants.ERROR.PAUSED : 0
940 }
941
942 onBody (buf) {
943 const { client, socket, statusCode, maxResponseSize } = this
944
945 if (socket.destroyed) {
946 return -1
947 }
948
949 const request = client[kQueue][client[kRunningIdx]]
950 assert(request)
951
952 assert.strictEqual(this.timeoutType, TIMEOUT_BODY)
953 if (this.timeout) {
954 // istanbul ignore else: only for jest
955 if (this.timeout.refresh) {
956 this.timeout.refresh()
957 }
958 }
959
960 assert(statusCode >= 200)
961
962 if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
963 util.destroy(socket, new ResponseExceededMaxSizeError())
964 return -1
965 }
966
967 this.bytesRead += buf.length
968
969 if (request.onData(buf) === false) {
970 return constants.ERROR.PAUSED
971 }
972 }
973
974 onMessageComplete () {
975 const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
976
977 if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
978 return -1
979 }
980
981 if (upgrade) {
982 return
983 }
984
985 const request = client[kQueue][client[kRunningIdx]]
986 assert(request)
987
988 assert(statusCode >= 100)
989
990 this.statusCode = null
991 this.statusText = ''
992 this.bytesRead = 0
993 this.contentLength = ''
994 this.keepAlive = ''
995 this.connection = ''
996
997 assert(this.headers.length % 2 === 0)
998 this.headers = []
999 this.headersSize = 0
1000
1001 if (statusCode < 200) {
1002 return
1003 }
1004
1005 /* istanbul ignore next: should be handled by llhttp? */
1006 if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
1007 util.destroy(socket, new ResponseContentLengthMismatchError())
1008 return -1
1009 }
1010
1011 request.onComplete(headers)
1012
1013 client[kQueue][client[kRunningIdx]++] = null
1014
1015 if (socket[kWriting]) {
1016 assert.strictEqual(client[kRunning], 0)
1017 // Response completed before request.
1018 util.destroy(socket, new InformationalError('reset'))
1019 return constants.ERROR.PAUSED
1020 } else if (!shouldKeepAlive) {
1021 util.destroy(socket, new InformationalError('reset'))
1022 return constants.ERROR.PAUSED
1023 } else if (socket[kReset] && client[kRunning] === 0) {
1024 // Destroy socket once all requests have completed.
1025 // The request at the tail of the pipeline is the one
1026 // that requested reset and no further requests should
1027 // have been queued since then.
1028 util.destroy(socket, new InformationalError('reset'))
1029 return constants.ERROR.PAUSED
1030 } else if (client[kPipelining] === 1) {
1031 // We must wait a full event loop cycle to reuse this socket to make sure
1032 // that non-spec compliant servers are not closing the connection even if they
1033 // said they won't.
1034 setImmediate(resume, client)
1035 } else {
1036 resume(client)
1037 }
1038 }
1039}
1040
1041function onParserTimeout (parser) {
1042 const { socket, timeoutType, client } = parser
1043
1044 /* istanbul ignore else */
1045 if (timeoutType === TIMEOUT_HEADERS) {
1046 if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
1047 assert(!parser.paused, 'cannot be paused while waiting for headers')
1048 util.destroy(socket, new HeadersTimeoutError())
1049 }
1050 } else if (timeoutType === TIMEOUT_BODY) {
1051 if (!parser.paused) {
1052 util.destroy(socket, new BodyTimeoutError())
1053 }
1054 } else if (timeoutType === TIMEOUT_IDLE) {
1055 assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
1056 util.destroy(socket, new InformationalError('socket idle timeout'))
1057 }
1058}
1059
1060function onSocketReadable () {
1061 const { [kParser]: parser } = this
1062 if (parser) {
1063 parser.readMore()
1064 }
1065}
1066
1067function onSocketError (err) {
1068 const { [kClient]: client, [kParser]: parser } = this
1069
1070 assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
1071
1072 if (client[kHTTPConnVersion] !== 'h2') {
1073 // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
1074 // to the user.
1075 if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
1076 // We treat all incoming data so for as a valid response.
1077 parser.onMessageComplete()
1078 return
1079 }
1080 }
1081
1082 this[kError] = err
1083
1084 onError(this[kClient], err)
1085}
1086
1087function onError (client, err) {
1088 if (
1089 client[kRunning] === 0 &&
1090 err.code !== 'UND_ERR_INFO' &&
1091 err.code !== 'UND_ERR_SOCKET'
1092 ) {
1093 // Error is not caused by running request and not a recoverable
1094 // socket error.
1095
1096 assert(client[kPendingIdx] === client[kRunningIdx])
1097
1098 const requests = client[kQueue].splice(client[kRunningIdx])
1099 for (let i = 0; i < requests.length; i++) {
1100 const request = requests[i]
1101 errorRequest(client, request, err)
1102 }
1103 assert(client[kSize] === 0)
1104 }
1105}
1106
1107function onSocketEnd () {
1108 const { [kParser]: parser, [kClient]: client } = this
1109
1110 if (client[kHTTPConnVersion] !== 'h2') {
1111 if (parser.statusCode && !parser.shouldKeepAlive) {
1112 // We treat all incoming data so far as a valid response.
1113 parser.onMessageComplete()
1114 return
1115 }
1116 }
1117
1118 util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
1119}
1120
1121function onSocketClose () {
1122 const { [kClient]: client, [kParser]: parser } = this
1123
1124 if (client[kHTTPConnVersion] === 'h1' && parser) {
1125 if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
1126 // We treat all incoming data so far as a valid response.
1127 parser.onMessageComplete()
1128 }
1129
1130 this[kParser].destroy()
1131 this[kParser] = null
1132 }
1133
1134 const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
1135
1136 client[kSocket] = null
1137
1138 if (client.destroyed) {
1139 assert(client[kPending] === 0)
1140
1141 // Fail entire queue.
1142 const requests = client[kQueue].splice(client[kRunningIdx])
1143 for (let i = 0; i < requests.length; i++) {
1144 const request = requests[i]
1145 errorRequest(client, request, err)
1146 }
1147 } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
1148 // Fail head of pipeline.
1149 const request = client[kQueue][client[kRunningIdx]]
1150 client[kQueue][client[kRunningIdx]++] = null
1151
1152 errorRequest(client, request, err)
1153 }
1154
1155 client[kPendingIdx] = client[kRunningIdx]
1156
1157 assert(client[kRunning] === 0)
1158
1159 client.emit('disconnect', client[kUrl], [client], err)
1160
1161 resume(client)
1162}
1163
1164async function connect (client) {
1165 assert(!client[kConnecting])
1166 assert(!client[kSocket])
1167
1168 let { host, hostname, protocol, port } = client[kUrl]
1169
1170 // Resolve ipv6
1171 if (hostname[0] === '[') {
1172 const idx = hostname.indexOf(']')
1173
1174 assert(idx !== -1)
1175 const ip = hostname.substring(1, idx)
1176
1177 assert(net.isIP(ip))
1178 hostname = ip
1179 }
1180
1181 client[kConnecting] = true
1182
1183 if (channels.beforeConnect.hasSubscribers) {
1184 channels.beforeConnect.publish({
1185 connectParams: {
1186 host,
1187 hostname,
1188 protocol,
1189 port,
1190 servername: client[kServerName],
1191 localAddress: client[kLocalAddress]
1192 },
1193 connector: client[kConnector]
1194 })
1195 }
1196
1197 try {
1198 const socket = await new Promise((resolve, reject) => {
1199 client[kConnector]({
1200 host,
1201 hostname,
1202 protocol,
1203 port,
1204 servername: client[kServerName],
1205 localAddress: client[kLocalAddress]
1206 }, (err, socket) => {
1207 if (err) {
1208 reject(err)
1209 } else {
1210 resolve(socket)
1211 }
1212 })
1213 })
1214
1215 if (client.destroyed) {
1216 util.destroy(socket.on('error', () => {}), new ClientDestroyedError())
1217 return
1218 }
1219
1220 client[kConnecting] = false
1221
1222 assert(socket)
1223
1224 const isH2 = socket.alpnProtocol === 'h2'
1225 if (isH2) {
1226 if (!h2ExperimentalWarned) {
1227 h2ExperimentalWarned = true
1228 process.emitWarning('H2 support is experimental, expect them to change at any time.', {
1229 code: 'UNDICI-H2'
1230 })
1231 }
1232
1233 const session = http2.connect(client[kUrl], {
1234 createConnection: () => socket,
1235 peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
1236 })
1237
1238 client[kHTTPConnVersion] = 'h2'
1239 session[kClient] = client
1240 session[kSocket] = socket
1241 session.on('error', onHttp2SessionError)
1242 session.on('frameError', onHttp2FrameError)
1243 session.on('end', onHttp2SessionEnd)
1244 session.on('goaway', onHTTP2GoAway)
1245 session.on('close', onSocketClose)
1246 session.unref()
1247
1248 client[kHTTP2Session] = session
1249 socket[kHTTP2Session] = session
1250 } else {
1251 if (!llhttpInstance) {
1252 llhttpInstance = await llhttpPromise
1253 llhttpPromise = null
1254 }
1255
1256 socket[kNoRef] = false
1257 socket[kWriting] = false
1258 socket[kReset] = false
1259 socket[kBlocking] = false
1260 socket[kParser] = new Parser(client, socket, llhttpInstance)
1261 }
1262
1263 socket[kCounter] = 0
1264 socket[kMaxRequests] = client[kMaxRequests]
1265 socket[kClient] = client
1266 socket[kError] = null
1267
1268 socket
1269 .on('error', onSocketError)
1270 .on('readable', onSocketReadable)
1271 .on('end', onSocketEnd)
1272 .on('close', onSocketClose)
1273
1274 client[kSocket] = socket
1275
1276 if (channels.connected.hasSubscribers) {
1277 channels.connected.publish({
1278 connectParams: {
1279 host,
1280 hostname,
1281 protocol,
1282 port,
1283 servername: client[kServerName],
1284 localAddress: client[kLocalAddress]
1285 },
1286 connector: client[kConnector],
1287 socket
1288 })
1289 }
1290 client.emit('connect', client[kUrl], [client])
1291 } catch (err) {
1292 if (client.destroyed) {
1293 return
1294 }
1295
1296 client[kConnecting] = false
1297
1298 if (channels.connectError.hasSubscribers) {
1299 channels.connectError.publish({
1300 connectParams: {
1301 host,
1302 hostname,
1303 protocol,
1304 port,
1305 servername: client[kServerName],
1306 localAddress: client[kLocalAddress]
1307 },
1308 connector: client[kConnector],
1309 error: err
1310 })
1311 }
1312
1313 if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
1314 assert(client[kRunning] === 0)
1315 while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
1316 const request = client[kQueue][client[kPendingIdx]++]
1317 errorRequest(client, request, err)
1318 }
1319 } else {
1320 onError(client, err)
1321 }
1322
1323 client.emit('connectionError', client[kUrl], [client], err)
1324 }
1325
1326 resume(client)
1327}
1328
1329function emitDrain (client) {
1330 client[kNeedDrain] = 0
1331 client.emit('drain', client[kUrl], [client])
1332}
1333
1334function resume (client, sync) {
1335 if (client[kResuming] === 2) {
1336 return
1337 }
1338
1339 client[kResuming] = 2
1340
1341 _resume(client, sync)
1342 client[kResuming] = 0
1343
1344 if (client[kRunningIdx] > 256) {
1345 client[kQueue].splice(0, client[kRunningIdx])
1346 client[kPendingIdx] -= client[kRunningIdx]
1347 client[kRunningIdx] = 0
1348 }
1349}
1350
1351function _resume (client, sync) {
1352 while (true) {
1353 if (client.destroyed) {
1354 assert(client[kPending] === 0)
1355 return
1356 }
1357
1358 if (client[kClosedResolve] && !client[kSize]) {
1359 client[kClosedResolve]()
1360 client[kClosedResolve] = null
1361 return
1362 }
1363
1364 const socket = client[kSocket]
1365
1366 if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') {
1367 if (client[kSize] === 0) {
1368 if (!socket[kNoRef] && socket.unref) {
1369 socket.unref()
1370 socket[kNoRef] = true
1371 }
1372 } else if (socket[kNoRef] && socket.ref) {
1373 socket.ref()
1374 socket[kNoRef] = false
1375 }
1376
1377 if (client[kSize] === 0) {
1378 if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
1379 socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
1380 }
1381 } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
1382 if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
1383 const request = client[kQueue][client[kRunningIdx]]
1384 const headersTimeout = request.headersTimeout != null
1385 ? request.headersTimeout
1386 : client[kHeadersTimeout]
1387 socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
1388 }
1389 }
1390 }
1391
1392 if (client[kBusy]) {
1393 client[kNeedDrain] = 2
1394 } else if (client[kNeedDrain] === 2) {
1395 if (sync) {
1396 client[kNeedDrain] = 1
1397 process.nextTick(emitDrain, client)
1398 } else {
1399 emitDrain(client)
1400 }
1401 continue
1402 }
1403
1404 if (client[kPending] === 0) {
1405 return
1406 }
1407
1408 if (client[kRunning] >= (client[kPipelining] || 1)) {
1409 return
1410 }
1411
1412 const request = client[kQueue][client[kPendingIdx]]
1413
1414 if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
1415 if (client[kRunning] > 0) {
1416 return
1417 }
1418
1419 client[kServerName] = request.servername
1420
1421 if (socket && socket.servername !== request.servername) {
1422 util.destroy(socket, new InformationalError('servername changed'))
1423 return
1424 }
1425 }
1426
1427 if (client[kConnecting]) {
1428 return
1429 }
1430
1431 if (!socket && !client[kHTTP2Session]) {
1432 connect(client)
1433 return
1434 }
1435
1436 if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
1437 return
1438 }
1439
1440 if (client[kRunning] > 0 && !request.idempotent) {
1441 // Non-idempotent request cannot be retried.
1442 // Ensure that no other requests are inflight and
1443 // could cause failure.
1444 return
1445 }
1446
1447 if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
1448 // Don't dispatch an upgrade until all preceding requests have completed.
1449 // A misbehaving server might upgrade the connection before all pipelined
1450 // request has completed.
1451 return
1452 }
1453
1454 if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
1455 (util.isStream(request.body) || util.isAsyncIterable(request.body))) {
1456 // Request with stream or iterator body can error while other requests
1457 // are inflight and indirectly error those as well.
1458 // Ensure this doesn't happen by waiting for inflight
1459 // to complete before dispatching.
1460
1461 // Request with stream or iterator body cannot be retried.
1462 // Ensure that no other requests are inflight and
1463 // could cause failure.
1464 return
1465 }
1466
1467 if (!request.aborted && write(client, request)) {
1468 client[kPendingIdx]++
1469 } else {
1470 client[kQueue].splice(client[kPendingIdx], 1)
1471 }
1472 }
1473}
1474
1475// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
1476function shouldSendContentLength (method) {
1477 return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
1478}
1479
1480function write (client, request) {
1481 if (client[kHTTPConnVersion] === 'h2') {
1482 writeH2(client, client[kHTTP2Session], request)
1483 return
1484 }
1485
1486 const { body, method, path, host, upgrade, headers, blocking, reset } = request
1487
1488 // https://tools.ietf.org/html/rfc7231#section-4.3.1
1489 // https://tools.ietf.org/html/rfc7231#section-4.3.2
1490 // https://tools.ietf.org/html/rfc7231#section-4.3.5
1491
1492 // Sending a payload body on a request that does not
1493 // expect it can cause undefined behavior on some
1494 // servers and corrupt connection state. Do not
1495 // re-use the connection for further requests.
1496
1497 const expectsPayload = (
1498 method === 'PUT' ||
1499 method === 'POST' ||
1500 method === 'PATCH'
1501 )
1502
1503 if (body && typeof body.read === 'function') {
1504 // Try to read EOF in order to get length.
1505 body.read(0)
1506 }
1507
1508 const bodyLength = util.bodyLength(body)
1509
1510 let contentLength = bodyLength
1511
1512 if (contentLength === null) {
1513 contentLength = request.contentLength
1514 }
1515
1516 if (contentLength === 0 && !expectsPayload) {
1517 // https://tools.ietf.org/html/rfc7230#section-3.3.2
1518 // A user agent SHOULD NOT send a Content-Length header field when
1519 // the request message does not contain a payload body and the method
1520 // semantics do not anticipate such a body.
1521
1522 contentLength = null
1523 }
1524
1525 // https://github.com/nodejs/undici/issues/2046
1526 // A user agent may send a Content-Length header with 0 value, this should be allowed.
1527 if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
1528 if (client[kStrictContentLength]) {
1529 errorRequest(client, request, new RequestContentLengthMismatchError())
1530 return false
1531 }
1532
1533 process.emitWarning(new RequestContentLengthMismatchError())
1534 }
1535
1536 const socket = client[kSocket]
1537
1538 try {
1539 request.onConnect((err) => {
1540 if (request.aborted || request.completed) {
1541 return
1542 }
1543
1544 errorRequest(client, request, err || new RequestAbortedError())
1545
1546 util.destroy(socket, new InformationalError('aborted'))
1547 })
1548 } catch (err) {
1549 errorRequest(client, request, err)
1550 }
1551
1552 if (request.aborted) {
1553 return false
1554 }
1555
1556 if (method === 'HEAD') {
1557 // https://github.com/mcollina/undici/issues/258
1558 // Close after a HEAD request to interop with misbehaving servers
1559 // that may send a body in the response.
1560
1561 socket[kReset] = true
1562 }
1563
1564 if (upgrade || method === 'CONNECT') {
1565 // On CONNECT or upgrade, block pipeline from dispatching further
1566 // requests on this connection.
1567
1568 socket[kReset] = true
1569 }
1570
1571 if (reset != null) {
1572 socket[kReset] = reset
1573 }
1574
1575 if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
1576 socket[kReset] = true
1577 }
1578
1579 if (blocking) {
1580 socket[kBlocking] = true
1581 }
1582
1583 let header = `${method} ${path} HTTP/1.1\r\n`
1584
1585 if (typeof host === 'string') {
1586 header += `host: ${host}\r\n`
1587 } else {
1588 header += client[kHostHeader]
1589 }
1590
1591 if (upgrade) {
1592 header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
1593 } else if (client[kPipelining] && !socket[kReset]) {
1594 header += 'connection: keep-alive\r\n'
1595 } else {
1596 header += 'connection: close\r\n'
1597 }
1598
1599 if (headers) {
1600 header += headers
1601 }
1602
1603 if (channels.sendHeaders.hasSubscribers) {
1604 channels.sendHeaders.publish({ request, headers: header, socket })
1605 }
1606
1607 /* istanbul ignore else: assertion */
1608 if (!body || bodyLength === 0) {
1609 if (contentLength === 0) {
1610 socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
1611 } else {
1612 assert(contentLength === null, 'no body must not have content length')
1613 socket.write(`${header}\r\n`, 'latin1')
1614 }
1615 request.onRequestSent()
1616 } else if (util.isBuffer(body)) {
1617 assert(contentLength === body.byteLength, 'buffer body must have content length')
1618
1619 socket.cork()
1620 socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
1621 socket.write(body)
1622 socket.uncork()
1623 request.onBodySent(body)
1624 request.onRequestSent()
1625 if (!expectsPayload) {
1626 socket[kReset] = true
1627 }
1628 } else if (util.isBlobLike(body)) {
1629 if (typeof body.stream === 'function') {
1630 writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
1631 } else {
1632 writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
1633 }
1634 } else if (util.isStream(body)) {
1635 writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
1636 } else if (util.isIterable(body)) {
1637 writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
1638 } else {
1639 assert(false)
1640 }
1641
1642 return true
1643}
1644
1645function writeH2 (client, session, request) {
1646 const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
1647
1648 let headers
1649 if (typeof reqHeaders === 'string') headers = Request[kHTTP2CopyHeaders](reqHeaders.trim())
1650 else headers = reqHeaders
1651
1652 if (upgrade) {
1653 errorRequest(client, request, new Error('Upgrade not supported for H2'))
1654 return false
1655 }
1656
1657 try {
1658 // TODO(HTTP/2): Should we call onConnect immediately or on stream ready event?
1659 request.onConnect((err) => {
1660 if (request.aborted || request.completed) {
1661 return
1662 }
1663
1664 errorRequest(client, request, err || new RequestAbortedError())
1665 })
1666 } catch (err) {
1667 errorRequest(client, request, err)
1668 }
1669
1670 if (request.aborted) {
1671 return false
1672 }
1673
1674 /** @type {import('node:http2').ClientHttp2Stream} */
1675 let stream
1676 const h2State = client[kHTTP2SessionState]
1677
1678 headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
1679 headers[HTTP2_HEADER_METHOD] = method
1680
1681 if (method === 'CONNECT') {
1682 session.ref()
1683 // we are already connected, streams are pending, first request
1684 // will create a new stream. We trigger a request to create the stream and wait until
1685 // `ready` event is triggered
1686 // We disabled endStream to allow the user to write to the stream
1687 stream = session.request(headers, { endStream: false, signal })
1688
1689 if (stream.id && !stream.pending) {
1690 request.onUpgrade(null, null, stream)
1691 ++h2State.openStreams
1692 } else {
1693 stream.once('ready', () => {
1694 request.onUpgrade(null, null, stream)
1695 ++h2State.openStreams
1696 })
1697 }
1698
1699 stream.once('close', () => {
1700 h2State.openStreams -= 1
1701 // TODO(HTTP/2): unref only if current streams count is 0
1702 if (h2State.openStreams === 0) session.unref()
1703 })
1704
1705 return true
1706 }
1707
1708 // https://tools.ietf.org/html/rfc7540#section-8.3
1709 // :path and :scheme headers must be omited when sending CONNECT
1710
1711 headers[HTTP2_HEADER_PATH] = path
1712 headers[HTTP2_HEADER_SCHEME] = 'https'
1713
1714 // https://tools.ietf.org/html/rfc7231#section-4.3.1
1715 // https://tools.ietf.org/html/rfc7231#section-4.3.2
1716 // https://tools.ietf.org/html/rfc7231#section-4.3.5
1717
1718 // Sending a payload body on a request that does not
1719 // expect it can cause undefined behavior on some
1720 // servers and corrupt connection state. Do not
1721 // re-use the connection for further requests.
1722
1723 const expectsPayload = (
1724 method === 'PUT' ||
1725 method === 'POST' ||
1726 method === 'PATCH'
1727 )
1728
1729 if (body && typeof body.read === 'function') {
1730 // Try to read EOF in order to get length.
1731 body.read(0)
1732 }
1733
1734 let contentLength = util.bodyLength(body)
1735
1736 if (contentLength == null) {
1737 contentLength = request.contentLength
1738 }
1739
1740 if (contentLength === 0 || !expectsPayload) {
1741 // https://tools.ietf.org/html/rfc7230#section-3.3.2
1742 // A user agent SHOULD NOT send a Content-Length header field when
1743 // the request message does not contain a payload body and the method
1744 // semantics do not anticipate such a body.
1745
1746 contentLength = null
1747 }
1748
1749 // https://github.com/nodejs/undici/issues/2046
1750 // A user agent may send a Content-Length header with 0 value, this should be allowed.
1751 if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
1752 if (client[kStrictContentLength]) {
1753 errorRequest(client, request, new RequestContentLengthMismatchError())
1754 return false
1755 }
1756
1757 process.emitWarning(new RequestContentLengthMismatchError())
1758 }
1759
1760 if (contentLength != null) {
1761 assert(body, 'no body must not have content length')
1762 headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
1763 }
1764
1765 session.ref()
1766
1767 const shouldEndStream = method === 'GET' || method === 'HEAD'
1768 if (expectContinue) {
1769 headers[HTTP2_HEADER_EXPECT] = '100-continue'
1770 stream = session.request(headers, { endStream: shouldEndStream, signal })
1771
1772 stream.once('continue', writeBodyH2)
1773 } else {
1774 stream = session.request(headers, {
1775 endStream: shouldEndStream,
1776 signal
1777 })
1778 writeBodyH2()
1779 }
1780
1781 // Increment counter as we have new several streams open
1782 ++h2State.openStreams
1783
1784 stream.once('response', headers => {
1785 const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
1786
1787 if (request.onHeaders(Number(statusCode), realHeaders, stream.resume.bind(stream), '') === false) {
1788 stream.pause()
1789 }
1790 })
1791
1792 stream.once('end', () => {
1793 request.onComplete([])
1794 })
1795
1796 stream.on('data', (chunk) => {
1797 if (request.onData(chunk) === false) {
1798 stream.pause()
1799 }
1800 })
1801
1802 stream.once('close', () => {
1803 h2State.openStreams -= 1
1804 // TODO(HTTP/2): unref only if current streams count is 0
1805 if (h2State.openStreams === 0) {
1806 session.unref()
1807 }
1808 })
1809
1810 stream.once('error', function (err) {
1811 if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
1812 h2State.streams -= 1
1813 util.destroy(stream, err)
1814 }
1815 })
1816
1817 stream.once('frameError', (type, code) => {
1818 const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
1819 errorRequest(client, request, err)
1820
1821 if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
1822 h2State.streams -= 1
1823 util.destroy(stream, err)
1824 }
1825 })
1826
1827 // stream.on('aborted', () => {
1828 // // TODO(HTTP/2): Support aborted
1829 // })
1830
1831 // stream.on('timeout', () => {
1832 // // TODO(HTTP/2): Support timeout
1833 // })
1834
1835 // stream.on('push', headers => {
1836 // // TODO(HTTP/2): Suppor push
1837 // })
1838
1839 // stream.on('trailers', headers => {
1840 // // TODO(HTTP/2): Support trailers
1841 // })
1842
1843 return true
1844
1845 function writeBodyH2 () {
1846 /* istanbul ignore else: assertion */
1847 if (!body) {
1848 request.onRequestSent()
1849 } else if (util.isBuffer(body)) {
1850 assert(contentLength === body.byteLength, 'buffer body must have content length')
1851 stream.cork()
1852 stream.write(body)
1853 stream.uncork()
1854 stream.end()
1855 request.onBodySent(body)
1856 request.onRequestSent()
1857 } else if (util.isBlobLike(body)) {
1858 if (typeof body.stream === 'function') {
1859 writeIterable({
1860 client,
1861 request,
1862 contentLength,
1863 h2stream: stream,
1864 expectsPayload,
1865 body: body.stream(),
1866 socket: client[kSocket],
1867 header: ''
1868 })
1869 } else {
1870 writeBlob({
1871 body,
1872 client,
1873 request,
1874 contentLength,
1875 expectsPayload,
1876 h2stream: stream,
1877 header: '',
1878 socket: client[kSocket]
1879 })
1880 }
1881 } else if (util.isStream(body)) {
1882 writeStream({
1883 body,
1884 client,
1885 request,
1886 contentLength,
1887 expectsPayload,
1888 socket: client[kSocket],
1889 h2stream: stream,
1890 header: ''
1891 })
1892 } else if (util.isIterable(body)) {
1893 writeIterable({
1894 body,
1895 client,
1896 request,
1897 contentLength,
1898 expectsPayload,
1899 header: '',
1900 h2stream: stream,
1901 socket: client[kSocket]
1902 })
1903 } else {
1904 assert(false)
1905 }
1906 }
1907}
1908
1909function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
1910 assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
1911
1912 if (client[kHTTPConnVersion] === 'h2') {
1913 // For HTTP/2, is enough to pipe the stream
1914 const pipe = pipeline(
1915 body,
1916 h2stream,
1917 (err) => {
1918 if (err) {
1919 util.destroy(body, err)
1920 util.destroy(h2stream, err)
1921 } else {
1922 request.onRequestSent()
1923 }
1924 }
1925 )
1926
1927 pipe.on('data', onPipeData)
1928 pipe.once('end', () => {
1929 pipe.removeListener('data', onPipeData)
1930 util.destroy(pipe)
1931 })
1932
1933 function onPipeData (chunk) {
1934 request.onBodySent(chunk)
1935 }
1936
1937 return
1938 }
1939
1940 let finished = false
1941
1942 const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
1943
1944 const onData = function (chunk) {
1945 if (finished) {
1946 return
1947 }
1948
1949 try {
1950 if (!writer.write(chunk) && this.pause) {
1951 this.pause()
1952 }
1953 } catch (err) {
1954 util.destroy(this, err)
1955 }
1956 }
1957 const onDrain = function () {
1958 if (finished) {
1959 return
1960 }
1961
1962 if (body.resume) {
1963 body.resume()
1964 }
1965 }
1966 const onAbort = function () {
1967 if (finished) {
1968 return
1969 }
1970 const err = new RequestAbortedError()
1971 queueMicrotask(() => onFinished(err))
1972 }
1973 const onFinished = function (err) {
1974 if (finished) {
1975 return
1976 }
1977
1978 finished = true
1979
1980 assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
1981
1982 socket
1983 .off('drain', onDrain)
1984 .off('error', onFinished)
1985
1986 body
1987 .removeListener('data', onData)
1988 .removeListener('end', onFinished)
1989 .removeListener('error', onFinished)
1990 .removeListener('close', onAbort)
1991
1992 if (!err) {
1993 try {
1994 writer.end()
1995 } catch (er) {
1996 err = er
1997 }
1998 }
1999
2000 writer.destroy(err)
2001
2002 if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
2003 util.destroy(body, err)
2004 } else {
2005 util.destroy(body)
2006 }
2007 }
2008
2009 body
2010 .on('data', onData)
2011 .on('end', onFinished)
2012 .on('error', onFinished)
2013 .on('close', onAbort)
2014
2015 if (body.resume) {
2016 body.resume()
2017 }
2018
2019 socket
2020 .on('drain', onDrain)
2021 .on('error', onFinished)
2022}
2023
2024async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
2025 assert(contentLength === body.size, 'blob body must have content length')
2026
2027 const isH2 = client[kHTTPConnVersion] === 'h2'
2028 try {
2029 if (contentLength != null && contentLength !== body.size) {
2030 throw new RequestContentLengthMismatchError()
2031 }
2032
2033 const buffer = Buffer.from(await body.arrayBuffer())
2034
2035 if (isH2) {
2036 h2stream.cork()
2037 h2stream.write(buffer)
2038 h2stream.uncork()
2039 } else {
2040 socket.cork()
2041 socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
2042 socket.write(buffer)
2043 socket.uncork()
2044 }
2045
2046 request.onBodySent(buffer)
2047 request.onRequestSent()
2048
2049 if (!expectsPayload) {
2050 socket[kReset] = true
2051 }
2052
2053 resume(client)
2054 } catch (err) {
2055 util.destroy(isH2 ? h2stream : socket, err)
2056 }
2057}
2058
2059async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
2060 assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
2061
2062 let callback = null
2063 function onDrain () {
2064 if (callback) {
2065 const cb = callback
2066 callback = null
2067 cb()
2068 }
2069 }
2070
2071 const waitForDrain = () => new Promise((resolve, reject) => {
2072 assert(callback === null)
2073
2074 if (socket[kError]) {
2075 reject(socket[kError])
2076 } else {
2077 callback = resolve
2078 }
2079 })
2080
2081 if (client[kHTTPConnVersion] === 'h2') {
2082 h2stream
2083 .on('close', onDrain)
2084 .on('drain', onDrain)
2085
2086 try {
2087 // It's up to the user to somehow abort the async iterable.
2088 for await (const chunk of body) {
2089 if (socket[kError]) {
2090 throw socket[kError]
2091 }
2092
2093 const res = h2stream.write(chunk)
2094 request.onBodySent(chunk)
2095 if (!res) {
2096 await waitForDrain()
2097 }
2098 }
2099 } catch (err) {
2100 h2stream.destroy(err)
2101 } finally {
2102 request.onRequestSent()
2103 h2stream.end()
2104 h2stream
2105 .off('close', onDrain)
2106 .off('drain', onDrain)
2107 }
2108
2109 return
2110 }
2111
2112 socket
2113 .on('close', onDrain)
2114 .on('drain', onDrain)
2115
2116 const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
2117 try {
2118 // It's up to the user to somehow abort the async iterable.
2119 for await (const chunk of body) {
2120 if (socket[kError]) {
2121 throw socket[kError]
2122 }
2123
2124 if (!writer.write(chunk)) {
2125 await waitForDrain()
2126 }
2127 }
2128
2129 writer.end()
2130 } catch (err) {
2131 writer.destroy(err)
2132 } finally {
2133 socket
2134 .off('close', onDrain)
2135 .off('drain', onDrain)
2136 }
2137}
2138
2139class AsyncWriter {
2140 constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
2141 this.socket = socket
2142 this.request = request
2143 this.contentLength = contentLength
2144 this.client = client
2145 this.bytesWritten = 0
2146 this.expectsPayload = expectsPayload
2147 this.header = header
2148
2149 socket[kWriting] = true
2150 }
2151
2152 write (chunk) {
2153 const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
2154
2155 if (socket[kError]) {
2156 throw socket[kError]
2157 }
2158
2159 if (socket.destroyed) {
2160 return false
2161 }
2162
2163 const len = Buffer.byteLength(chunk)
2164 if (!len) {
2165 return true
2166 }
2167
2168 // We should defer writing chunks.
2169 if (contentLength !== null && bytesWritten + len > contentLength) {
2170 if (client[kStrictContentLength]) {
2171 throw new RequestContentLengthMismatchError()
2172 }
2173
2174 process.emitWarning(new RequestContentLengthMismatchError())
2175 }
2176
2177 socket.cork()
2178
2179 if (bytesWritten === 0) {
2180 if (!expectsPayload) {
2181 socket[kReset] = true
2182 }
2183
2184 if (contentLength === null) {
2185 socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
2186 } else {
2187 socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
2188 }
2189 }
2190
2191 if (contentLength === null) {
2192 socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
2193 }
2194
2195 this.bytesWritten += len
2196
2197 const ret = socket.write(chunk)
2198
2199 socket.uncork()
2200
2201 request.onBodySent(chunk)
2202
2203 if (!ret) {
2204 if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
2205 // istanbul ignore else: only for jest
2206 if (socket[kParser].timeout.refresh) {
2207 socket[kParser].timeout.refresh()
2208 }
2209 }
2210 }
2211
2212 return ret
2213 }
2214
2215 end () {
2216 const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
2217 request.onRequestSent()
2218
2219 socket[kWriting] = false
2220
2221 if (socket[kError]) {
2222 throw socket[kError]
2223 }
2224
2225 if (socket.destroyed) {
2226 return
2227 }
2228
2229 if (bytesWritten === 0) {
2230 if (expectsPayload) {
2231 // https://tools.ietf.org/html/rfc7230#section-3.3.2
2232 // A user agent SHOULD send a Content-Length in a request message when
2233 // no Transfer-Encoding is sent and the request method defines a meaning
2234 // for an enclosed payload body.
2235
2236 socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
2237 } else {
2238 socket.write(`${header}\r\n`, 'latin1')
2239 }
2240 } else if (contentLength === null) {
2241 socket.write('\r\n0\r\n\r\n', 'latin1')
2242 }
2243
2244 if (contentLength !== null && bytesWritten !== contentLength) {
2245 if (client[kStrictContentLength]) {
2246 throw new RequestContentLengthMismatchError()
2247 } else {
2248 process.emitWarning(new RequestContentLengthMismatchError())
2249 }
2250 }
2251
2252 if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
2253 // istanbul ignore else: only for jest
2254 if (socket[kParser].timeout.refresh) {
2255 socket[kParser].timeout.refresh()
2256 }
2257 }
2258
2259 resume(client)
2260 }
2261
2262 destroy (err) {
2263 const { socket, client } = this
2264
2265 socket[kWriting] = false
2266
2267 if (err) {
2268 assert(client[kRunning] <= 1, 'pipeline should only contain this request')
2269 util.destroy(socket, err)
2270 }
2271 }
2272}
2273
2274function errorRequest (client, request, err) {
2275 try {
2276 request.onError(err)
2277 assert(request.aborted)
2278 } catch (err) {
2279 client.emit('error', err)
2280 }
2281}
2282
2283module.exports = Client
Note: See TracBrowser for help on using the repository browser.