[d24f17c] | 1 | // @ts-check
|
---|
| 2 |
|
---|
| 3 | 'use strict'
|
---|
| 4 |
|
---|
| 5 | /* global WebAssembly */
|
---|
| 6 |
|
---|
| 7 | const assert = require('assert')
|
---|
| 8 | const net = require('net')
|
---|
| 9 | const http = require('http')
|
---|
| 10 | const { pipeline } = require('stream')
|
---|
| 11 | const util = require('./core/util')
|
---|
| 12 | const timers = require('./timers')
|
---|
| 13 | const Request = require('./core/request')
|
---|
| 14 | const DispatcherBase = require('./dispatcher-base')
|
---|
| 15 | const {
|
---|
| 16 | RequestContentLengthMismatchError,
|
---|
| 17 | ResponseContentLengthMismatchError,
|
---|
| 18 | InvalidArgumentError,
|
---|
| 19 | RequestAbortedError,
|
---|
| 20 | HeadersTimeoutError,
|
---|
| 21 | HeadersOverflowError,
|
---|
| 22 | SocketError,
|
---|
| 23 | InformationalError,
|
---|
| 24 | BodyTimeoutError,
|
---|
| 25 | HTTPParserError,
|
---|
| 26 | ResponseExceededMaxSizeError,
|
---|
| 27 | ClientDestroyedError
|
---|
| 28 | } = require('./core/errors')
|
---|
| 29 | const buildConnector = require('./core/connect')
|
---|
| 30 | const {
|
---|
| 31 | kUrl,
|
---|
| 32 | kReset,
|
---|
| 33 | kServerName,
|
---|
| 34 | kClient,
|
---|
| 35 | kBusy,
|
---|
| 36 | kParser,
|
---|
| 37 | kConnect,
|
---|
| 38 | kBlocking,
|
---|
| 39 | kResuming,
|
---|
| 40 | kRunning,
|
---|
| 41 | kPending,
|
---|
| 42 | kSize,
|
---|
| 43 | kWriting,
|
---|
| 44 | kQueue,
|
---|
| 45 | kConnected,
|
---|
| 46 | kConnecting,
|
---|
| 47 | kNeedDrain,
|
---|
| 48 | kNoRef,
|
---|
| 49 | kKeepAliveDefaultTimeout,
|
---|
| 50 | kHostHeader,
|
---|
| 51 | kPendingIdx,
|
---|
| 52 | kRunningIdx,
|
---|
| 53 | kError,
|
---|
| 54 | kPipelining,
|
---|
| 55 | kSocket,
|
---|
| 56 | kKeepAliveTimeoutValue,
|
---|
| 57 | kMaxHeadersSize,
|
---|
| 58 | kKeepAliveMaxTimeout,
|
---|
| 59 | kKeepAliveTimeoutThreshold,
|
---|
| 60 | kHeadersTimeout,
|
---|
| 61 | kBodyTimeout,
|
---|
| 62 | kStrictContentLength,
|
---|
| 63 | kConnector,
|
---|
| 64 | kMaxRedirections,
|
---|
| 65 | kMaxRequests,
|
---|
| 66 | kCounter,
|
---|
| 67 | kClose,
|
---|
| 68 | kDestroy,
|
---|
| 69 | kDispatch,
|
---|
| 70 | kInterceptors,
|
---|
| 71 | kLocalAddress,
|
---|
| 72 | kMaxResponseSize,
|
---|
| 73 | kHTTPConnVersion,
|
---|
| 74 | // HTTP2
|
---|
| 75 | kHost,
|
---|
| 76 | kHTTP2Session,
|
---|
| 77 | kHTTP2SessionState,
|
---|
| 78 | kHTTP2BuildRequest,
|
---|
| 79 | kHTTP2CopyHeaders,
|
---|
| 80 | kHTTP1BuildRequest
|
---|
| 81 | } = require('./core/symbols')
|
---|
| 82 |
|
---|
| 83 | /** @type {import('http2')} */
|
---|
| 84 | let http2
|
---|
| 85 | try {
|
---|
| 86 | http2 = require('http2')
|
---|
| 87 | } catch {
|
---|
| 88 | // @ts-ignore
|
---|
| 89 | http2 = { constants: {} }
|
---|
| 90 | }
|
---|
| 91 |
|
---|
| 92 | const {
|
---|
| 93 | constants: {
|
---|
| 94 | HTTP2_HEADER_AUTHORITY,
|
---|
| 95 | HTTP2_HEADER_METHOD,
|
---|
| 96 | HTTP2_HEADER_PATH,
|
---|
| 97 | HTTP2_HEADER_SCHEME,
|
---|
| 98 | HTTP2_HEADER_CONTENT_LENGTH,
|
---|
| 99 | HTTP2_HEADER_EXPECT,
|
---|
| 100 | HTTP2_HEADER_STATUS
|
---|
| 101 | }
|
---|
| 102 | } = http2
|
---|
| 103 |
|
---|
| 104 | // Experimental
|
---|
| 105 | let h2ExperimentalWarned = false
|
---|
| 106 |
|
---|
| 107 | const FastBuffer = Buffer[Symbol.species]
|
---|
| 108 |
|
---|
| 109 | const kClosedResolve = Symbol('kClosedResolve')
|
---|
| 110 |
|
---|
| 111 | const channels = {}
|
---|
| 112 |
|
---|
| 113 | try {
|
---|
| 114 | const diagnosticsChannel = require('diagnostics_channel')
|
---|
| 115 | channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders')
|
---|
| 116 | channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect')
|
---|
| 117 | channels.connectError = diagnosticsChannel.channel('undici:client:connectError')
|
---|
| 118 | channels.connected = diagnosticsChannel.channel('undici:client:connected')
|
---|
| 119 | } catch {
|
---|
| 120 | channels.sendHeaders = { hasSubscribers: false }
|
---|
| 121 | channels.beforeConnect = { hasSubscribers: false }
|
---|
| 122 | channels.connectError = { hasSubscribers: false }
|
---|
| 123 | channels.connected = { hasSubscribers: false }
|
---|
| 124 | }
|
---|
| 125 |
|
---|
| 126 | /**
|
---|
| 127 | * @type {import('../types/client').default}
|
---|
| 128 | */
|
---|
| 129 | class Client extends DispatcherBase {
|
---|
| 130 | /**
|
---|
| 131 | *
|
---|
| 132 | * @param {string|URL} url
|
---|
| 133 | * @param {import('../types/client').Client.Options} options
|
---|
| 134 | */
|
---|
| 135 | constructor (url, {
|
---|
| 136 | interceptors,
|
---|
| 137 | maxHeaderSize,
|
---|
| 138 | headersTimeout,
|
---|
| 139 | socketTimeout,
|
---|
| 140 | requestTimeout,
|
---|
| 141 | connectTimeout,
|
---|
| 142 | bodyTimeout,
|
---|
| 143 | idleTimeout,
|
---|
| 144 | keepAlive,
|
---|
| 145 | keepAliveTimeout,
|
---|
| 146 | maxKeepAliveTimeout,
|
---|
| 147 | keepAliveMaxTimeout,
|
---|
| 148 | keepAliveTimeoutThreshold,
|
---|
| 149 | socketPath,
|
---|
| 150 | pipelining,
|
---|
| 151 | tls,
|
---|
| 152 | strictContentLength,
|
---|
| 153 | maxCachedSessions,
|
---|
| 154 | maxRedirections,
|
---|
| 155 | connect,
|
---|
| 156 | maxRequestsPerClient,
|
---|
| 157 | localAddress,
|
---|
| 158 | maxResponseSize,
|
---|
| 159 | autoSelectFamily,
|
---|
| 160 | autoSelectFamilyAttemptTimeout,
|
---|
| 161 | // h2
|
---|
| 162 | allowH2,
|
---|
| 163 | maxConcurrentStreams
|
---|
| 164 | } = {}) {
|
---|
| 165 | super()
|
---|
| 166 |
|
---|
| 167 | if (keepAlive !== undefined) {
|
---|
| 168 | throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
|
---|
| 169 | }
|
---|
| 170 |
|
---|
| 171 | if (socketTimeout !== undefined) {
|
---|
| 172 | throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
|
---|
| 173 | }
|
---|
| 174 |
|
---|
| 175 | if (requestTimeout !== undefined) {
|
---|
| 176 | throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
|
---|
| 177 | }
|
---|
| 178 |
|
---|
| 179 | if (idleTimeout !== undefined) {
|
---|
| 180 | throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
|
---|
| 181 | }
|
---|
| 182 |
|
---|
| 183 | if (maxKeepAliveTimeout !== undefined) {
|
---|
| 184 | throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
|
---|
| 185 | }
|
---|
| 186 |
|
---|
| 187 | if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
|
---|
| 188 | throw new InvalidArgumentError('invalid maxHeaderSize')
|
---|
| 189 | }
|
---|
| 190 |
|
---|
| 191 | if (socketPath != null && typeof socketPath !== 'string') {
|
---|
| 192 | throw new InvalidArgumentError('invalid socketPath')
|
---|
| 193 | }
|
---|
| 194 |
|
---|
| 195 | if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
|
---|
| 196 | throw new InvalidArgumentError('invalid connectTimeout')
|
---|
| 197 | }
|
---|
| 198 |
|
---|
| 199 | if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
|
---|
| 200 | throw new InvalidArgumentError('invalid keepAliveTimeout')
|
---|
| 201 | }
|
---|
| 202 |
|
---|
| 203 | if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
|
---|
| 204 | throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
|
---|
| 205 | }
|
---|
| 206 |
|
---|
| 207 | if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
|
---|
| 208 | throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
|
---|
| 209 | }
|
---|
| 210 |
|
---|
| 211 | if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
|
---|
| 212 | throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
|
---|
| 213 | }
|
---|
| 214 |
|
---|
| 215 | if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
|
---|
| 216 | throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
|
---|
| 217 | }
|
---|
| 218 |
|
---|
| 219 | if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
|
---|
| 220 | throw new InvalidArgumentError('connect must be a function or an object')
|
---|
| 221 | }
|
---|
| 222 |
|
---|
| 223 | if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
|
---|
| 224 | throw new InvalidArgumentError('maxRedirections must be a positive number')
|
---|
| 225 | }
|
---|
| 226 |
|
---|
| 227 | if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
|
---|
| 228 | throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
|
---|
| 229 | }
|
---|
| 230 |
|
---|
| 231 | if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
|
---|
| 232 | throw new InvalidArgumentError('localAddress must be valid string IP address')
|
---|
| 233 | }
|
---|
| 234 |
|
---|
| 235 | if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
|
---|
| 236 | throw new InvalidArgumentError('maxResponseSize must be a positive number')
|
---|
| 237 | }
|
---|
| 238 |
|
---|
| 239 | if (
|
---|
| 240 | autoSelectFamilyAttemptTimeout != null &&
|
---|
| 241 | (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
|
---|
| 242 | ) {
|
---|
| 243 | throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
|
---|
| 244 | }
|
---|
| 245 |
|
---|
| 246 | // h2
|
---|
| 247 | if (allowH2 != null && typeof allowH2 !== 'boolean') {
|
---|
| 248 | throw new InvalidArgumentError('allowH2 must be a valid boolean value')
|
---|
| 249 | }
|
---|
| 250 |
|
---|
| 251 | if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
|
---|
| 252 | throw new InvalidArgumentError('maxConcurrentStreams must be a possitive integer, greater than 0')
|
---|
| 253 | }
|
---|
| 254 |
|
---|
| 255 | if (typeof connect !== 'function') {
|
---|
| 256 | connect = buildConnector({
|
---|
| 257 | ...tls,
|
---|
| 258 | maxCachedSessions,
|
---|
| 259 | allowH2,
|
---|
| 260 | socketPath,
|
---|
| 261 | timeout: connectTimeout,
|
---|
| 262 | ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
|
---|
| 263 | ...connect
|
---|
| 264 | })
|
---|
| 265 | }
|
---|
| 266 |
|
---|
| 267 | this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client)
|
---|
| 268 | ? interceptors.Client
|
---|
| 269 | : [createRedirectInterceptor({ maxRedirections })]
|
---|
| 270 | this[kUrl] = util.parseOrigin(url)
|
---|
| 271 | this[kConnector] = connect
|
---|
| 272 | this[kSocket] = null
|
---|
| 273 | this[kPipelining] = pipelining != null ? pipelining : 1
|
---|
| 274 | this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
|
---|
| 275 | this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
|
---|
| 276 | this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
|
---|
| 277 | this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold
|
---|
| 278 | this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
|
---|
| 279 | this[kServerName] = null
|
---|
| 280 | this[kLocalAddress] = localAddress != null ? localAddress : null
|
---|
| 281 | this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
|
---|
| 282 | this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
|
---|
| 283 | this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
|
---|
| 284 | this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
|
---|
| 285 | this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
|
---|
| 286 | this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
|
---|
| 287 | this[kMaxRedirections] = maxRedirections
|
---|
| 288 | this[kMaxRequests] = maxRequestsPerClient
|
---|
| 289 | this[kClosedResolve] = null
|
---|
| 290 | this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
|
---|
| 291 | this[kHTTPConnVersion] = 'h1'
|
---|
| 292 |
|
---|
| 293 | // HTTP/2
|
---|
| 294 | this[kHTTP2Session] = null
|
---|
| 295 | this[kHTTP2SessionState] = !allowH2
|
---|
| 296 | ? null
|
---|
| 297 | : {
|
---|
| 298 | // streams: null, // Fixed queue of streams - For future support of `push`
|
---|
| 299 | openStreams: 0, // Keep track of them to decide wether or not unref the session
|
---|
| 300 | maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
|
---|
| 301 | }
|
---|
| 302 | this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`
|
---|
| 303 |
|
---|
| 304 | // kQueue is built up of 3 sections separated by
|
---|
| 305 | // the kRunningIdx and kPendingIdx indices.
|
---|
| 306 | // | complete | running | pending |
|
---|
| 307 | // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
|
---|
| 308 | // kRunningIdx points to the first running element.
|
---|
| 309 | // kPendingIdx points to the first pending element.
|
---|
| 310 | // This implements a fast queue with an amortized
|
---|
| 311 | // time of O(1).
|
---|
| 312 |
|
---|
| 313 | this[kQueue] = []
|
---|
| 314 | this[kRunningIdx] = 0
|
---|
| 315 | this[kPendingIdx] = 0
|
---|
| 316 | }
|
---|
| 317 |
|
---|
| 318 | get pipelining () {
|
---|
| 319 | return this[kPipelining]
|
---|
| 320 | }
|
---|
| 321 |
|
---|
| 322 | set pipelining (value) {
|
---|
| 323 | this[kPipelining] = value
|
---|
| 324 | resume(this, true)
|
---|
| 325 | }
|
---|
| 326 |
|
---|
| 327 | get [kPending] () {
|
---|
| 328 | return this[kQueue].length - this[kPendingIdx]
|
---|
| 329 | }
|
---|
| 330 |
|
---|
| 331 | get [kRunning] () {
|
---|
| 332 | return this[kPendingIdx] - this[kRunningIdx]
|
---|
| 333 | }
|
---|
| 334 |
|
---|
| 335 | get [kSize] () {
|
---|
| 336 | return this[kQueue].length - this[kRunningIdx]
|
---|
| 337 | }
|
---|
| 338 |
|
---|
| 339 | get [kConnected] () {
|
---|
| 340 | return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
|
---|
| 341 | }
|
---|
| 342 |
|
---|
| 343 | get [kBusy] () {
|
---|
| 344 | const socket = this[kSocket]
|
---|
| 345 | return (
|
---|
| 346 | (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
|
---|
| 347 | (this[kSize] >= (this[kPipelining] || 1)) ||
|
---|
| 348 | this[kPending] > 0
|
---|
| 349 | )
|
---|
| 350 | }
|
---|
| 351 |
|
---|
| 352 | /* istanbul ignore: only used for test */
|
---|
| 353 | [kConnect] (cb) {
|
---|
| 354 | connect(this)
|
---|
| 355 | this.once('connect', cb)
|
---|
| 356 | }
|
---|
| 357 |
|
---|
| 358 | [kDispatch] (opts, handler) {
|
---|
| 359 | const origin = opts.origin || this[kUrl].origin
|
---|
| 360 |
|
---|
| 361 | const request = this[kHTTPConnVersion] === 'h2'
|
---|
| 362 | ? Request[kHTTP2BuildRequest](origin, opts, handler)
|
---|
| 363 | : Request[kHTTP1BuildRequest](origin, opts, handler)
|
---|
| 364 |
|
---|
| 365 | this[kQueue].push(request)
|
---|
| 366 | if (this[kResuming]) {
|
---|
| 367 | // Do nothing.
|
---|
| 368 | } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
|
---|
| 369 | // Wait a tick in case stream/iterator is ended in the same tick.
|
---|
| 370 | this[kResuming] = 1
|
---|
| 371 | process.nextTick(resume, this)
|
---|
| 372 | } else {
|
---|
| 373 | resume(this, true)
|
---|
| 374 | }
|
---|
| 375 |
|
---|
| 376 | if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
|
---|
| 377 | this[kNeedDrain] = 2
|
---|
| 378 | }
|
---|
| 379 |
|
---|
| 380 | return this[kNeedDrain] < 2
|
---|
| 381 | }
|
---|
| 382 |
|
---|
| 383 | async [kClose] () {
|
---|
| 384 | // TODO: for H2 we need to gracefully flush the remaining enqueued
|
---|
| 385 | // request and close each stream.
|
---|
| 386 | return new Promise((resolve) => {
|
---|
| 387 | if (!this[kSize]) {
|
---|
| 388 | resolve(null)
|
---|
| 389 | } else {
|
---|
| 390 | this[kClosedResolve] = resolve
|
---|
| 391 | }
|
---|
| 392 | })
|
---|
| 393 | }
|
---|
| 394 |
|
---|
| 395 | async [kDestroy] (err) {
|
---|
| 396 | return new Promise((resolve) => {
|
---|
| 397 | const requests = this[kQueue].splice(this[kPendingIdx])
|
---|
| 398 | for (let i = 0; i < requests.length; i++) {
|
---|
| 399 | const request = requests[i]
|
---|
| 400 | errorRequest(this, request, err)
|
---|
| 401 | }
|
---|
| 402 |
|
---|
| 403 | const callback = () => {
|
---|
| 404 | if (this[kClosedResolve]) {
|
---|
| 405 | // TODO (fix): Should we error here with ClientDestroyedError?
|
---|
| 406 | this[kClosedResolve]()
|
---|
| 407 | this[kClosedResolve] = null
|
---|
| 408 | }
|
---|
| 409 | resolve()
|
---|
| 410 | }
|
---|
| 411 |
|
---|
| 412 | if (this[kHTTP2Session] != null) {
|
---|
| 413 | util.destroy(this[kHTTP2Session], err)
|
---|
| 414 | this[kHTTP2Session] = null
|
---|
| 415 | this[kHTTP2SessionState] = null
|
---|
| 416 | }
|
---|
| 417 |
|
---|
| 418 | if (!this[kSocket]) {
|
---|
| 419 | queueMicrotask(callback)
|
---|
| 420 | } else {
|
---|
| 421 | util.destroy(this[kSocket].on('close', callback), err)
|
---|
| 422 | }
|
---|
| 423 |
|
---|
| 424 | resume(this)
|
---|
| 425 | })
|
---|
| 426 | }
|
---|
| 427 | }
|
---|
| 428 |
|
---|
| 429 | function onHttp2SessionError (err) {
|
---|
| 430 | assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
|
---|
| 431 |
|
---|
| 432 | this[kSocket][kError] = err
|
---|
| 433 |
|
---|
| 434 | onError(this[kClient], err)
|
---|
| 435 | }
|
---|
| 436 |
|
---|
| 437 | function onHttp2FrameError (type, code, id) {
|
---|
| 438 | const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
|
---|
| 439 |
|
---|
| 440 | if (id === 0) {
|
---|
| 441 | this[kSocket][kError] = err
|
---|
| 442 | onError(this[kClient], err)
|
---|
| 443 | }
|
---|
| 444 | }
|
---|
| 445 |
|
---|
| 446 | function onHttp2SessionEnd () {
|
---|
| 447 | util.destroy(this, new SocketError('other side closed'))
|
---|
| 448 | util.destroy(this[kSocket], new SocketError('other side closed'))
|
---|
| 449 | }
|
---|
| 450 |
|
---|
| 451 | function onHTTP2GoAway (code) {
|
---|
| 452 | const client = this[kClient]
|
---|
| 453 | const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
|
---|
| 454 | client[kSocket] = null
|
---|
| 455 | client[kHTTP2Session] = null
|
---|
| 456 |
|
---|
| 457 | if (client.destroyed) {
|
---|
| 458 | assert(this[kPending] === 0)
|
---|
| 459 |
|
---|
| 460 | // Fail entire queue.
|
---|
| 461 | const requests = client[kQueue].splice(client[kRunningIdx])
|
---|
| 462 | for (let i = 0; i < requests.length; i++) {
|
---|
| 463 | const request = requests[i]
|
---|
| 464 | errorRequest(this, request, err)
|
---|
| 465 | }
|
---|
| 466 | } else if (client[kRunning] > 0) {
|
---|
| 467 | // Fail head of pipeline.
|
---|
| 468 | const request = client[kQueue][client[kRunningIdx]]
|
---|
| 469 | client[kQueue][client[kRunningIdx]++] = null
|
---|
| 470 |
|
---|
| 471 | errorRequest(client, request, err)
|
---|
| 472 | }
|
---|
| 473 |
|
---|
| 474 | client[kPendingIdx] = client[kRunningIdx]
|
---|
| 475 |
|
---|
| 476 | assert(client[kRunning] === 0)
|
---|
| 477 |
|
---|
| 478 | client.emit('disconnect',
|
---|
| 479 | client[kUrl],
|
---|
| 480 | [client],
|
---|
| 481 | err
|
---|
| 482 | )
|
---|
| 483 |
|
---|
| 484 | resume(client)
|
---|
| 485 | }
|
---|
| 486 |
|
---|
| 487 | const constants = require('./llhttp/constants')
|
---|
| 488 | const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
|
---|
| 489 | const EMPTY_BUF = Buffer.alloc(0)
|
---|
| 490 |
|
---|
| 491 | async function lazyllhttp () {
|
---|
| 492 | const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined
|
---|
| 493 |
|
---|
| 494 | let mod
|
---|
| 495 | try {
|
---|
| 496 | mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64'))
|
---|
| 497 | } catch (e) {
|
---|
| 498 | /* istanbul ignore next */
|
---|
| 499 |
|
---|
| 500 | // We could check if the error was caused by the simd option not
|
---|
| 501 | // being enabled, but the occurring of this other error
|
---|
| 502 | // * https://github.com/emscripten-core/emscripten/issues/11495
|
---|
| 503 | // got me to remove that check to avoid breaking Node 12.
|
---|
| 504 | mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64'))
|
---|
| 505 | }
|
---|
| 506 |
|
---|
| 507 | return await WebAssembly.instantiate(mod, {
|
---|
| 508 | env: {
|
---|
| 509 | /* eslint-disable camelcase */
|
---|
| 510 |
|
---|
| 511 | wasm_on_url: (p, at, len) => {
|
---|
| 512 | /* istanbul ignore next */
|
---|
| 513 | return 0
|
---|
| 514 | },
|
---|
| 515 | wasm_on_status: (p, at, len) => {
|
---|
| 516 | assert.strictEqual(currentParser.ptr, p)
|
---|
| 517 | const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
---|
| 518 | return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
---|
| 519 | },
|
---|
| 520 | wasm_on_message_begin: (p) => {
|
---|
| 521 | assert.strictEqual(currentParser.ptr, p)
|
---|
| 522 | return currentParser.onMessageBegin() || 0
|
---|
| 523 | },
|
---|
| 524 | wasm_on_header_field: (p, at, len) => {
|
---|
| 525 | assert.strictEqual(currentParser.ptr, p)
|
---|
| 526 | const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
---|
| 527 | return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
---|
| 528 | },
|
---|
| 529 | wasm_on_header_value: (p, at, len) => {
|
---|
| 530 | assert.strictEqual(currentParser.ptr, p)
|
---|
| 531 | const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
---|
| 532 | return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
---|
| 533 | },
|
---|
| 534 | wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
|
---|
| 535 | assert.strictEqual(currentParser.ptr, p)
|
---|
| 536 | return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0
|
---|
| 537 | },
|
---|
| 538 | wasm_on_body: (p, at, len) => {
|
---|
| 539 | assert.strictEqual(currentParser.ptr, p)
|
---|
| 540 | const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
---|
| 541 | return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
---|
| 542 | },
|
---|
| 543 | wasm_on_message_complete: (p) => {
|
---|
| 544 | assert.strictEqual(currentParser.ptr, p)
|
---|
| 545 | return currentParser.onMessageComplete() || 0
|
---|
| 546 | }
|
---|
| 547 |
|
---|
| 548 | /* eslint-enable camelcase */
|
---|
| 549 | }
|
---|
| 550 | })
|
---|
| 551 | }
|
---|
| 552 |
|
---|
| 553 | let llhttpInstance = null
|
---|
| 554 | let llhttpPromise = lazyllhttp()
|
---|
| 555 | llhttpPromise.catch()
|
---|
| 556 |
|
---|
| 557 | let currentParser = null
|
---|
| 558 | let currentBufferRef = null
|
---|
| 559 | let currentBufferSize = 0
|
---|
| 560 | let currentBufferPtr = null
|
---|
| 561 |
|
---|
| 562 | const TIMEOUT_HEADERS = 1
|
---|
| 563 | const TIMEOUT_BODY = 2
|
---|
| 564 | const TIMEOUT_IDLE = 3
|
---|
| 565 |
|
---|
| 566 | class Parser {
|
---|
| 567 | constructor (client, socket, { exports }) {
|
---|
| 568 | assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
|
---|
| 569 |
|
---|
| 570 | this.llhttp = exports
|
---|
| 571 | this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
|
---|
| 572 | this.client = client
|
---|
| 573 | this.socket = socket
|
---|
| 574 | this.timeout = null
|
---|
| 575 | this.timeoutValue = null
|
---|
| 576 | this.timeoutType = null
|
---|
| 577 | this.statusCode = null
|
---|
| 578 | this.statusText = ''
|
---|
| 579 | this.upgrade = false
|
---|
| 580 | this.headers = []
|
---|
| 581 | this.headersSize = 0
|
---|
| 582 | this.headersMaxSize = client[kMaxHeadersSize]
|
---|
| 583 | this.shouldKeepAlive = false
|
---|
| 584 | this.paused = false
|
---|
| 585 | this.resume = this.resume.bind(this)
|
---|
| 586 |
|
---|
| 587 | this.bytesRead = 0
|
---|
| 588 |
|
---|
| 589 | this.keepAlive = ''
|
---|
| 590 | this.contentLength = ''
|
---|
| 591 | this.connection = ''
|
---|
| 592 | this.maxResponseSize = client[kMaxResponseSize]
|
---|
| 593 | }
|
---|
| 594 |
|
---|
| 595 | setTimeout (value, type) {
|
---|
| 596 | this.timeoutType = type
|
---|
| 597 | if (value !== this.timeoutValue) {
|
---|
| 598 | timers.clearTimeout(this.timeout)
|
---|
| 599 | if (value) {
|
---|
| 600 | this.timeout = timers.setTimeout(onParserTimeout, value, this)
|
---|
| 601 | // istanbul ignore else: only for jest
|
---|
| 602 | if (this.timeout.unref) {
|
---|
| 603 | this.timeout.unref()
|
---|
| 604 | }
|
---|
| 605 | } else {
|
---|
| 606 | this.timeout = null
|
---|
| 607 | }
|
---|
| 608 | this.timeoutValue = value
|
---|
| 609 | } else if (this.timeout) {
|
---|
| 610 | // istanbul ignore else: only for jest
|
---|
| 611 | if (this.timeout.refresh) {
|
---|
| 612 | this.timeout.refresh()
|
---|
| 613 | }
|
---|
| 614 | }
|
---|
| 615 | }
|
---|
| 616 |
|
---|
| 617 | resume () {
|
---|
| 618 | if (this.socket.destroyed || !this.paused) {
|
---|
| 619 | return
|
---|
| 620 | }
|
---|
| 621 |
|
---|
| 622 | assert(this.ptr != null)
|
---|
| 623 | assert(currentParser == null)
|
---|
| 624 |
|
---|
| 625 | this.llhttp.llhttp_resume(this.ptr)
|
---|
| 626 |
|
---|
| 627 | assert(this.timeoutType === TIMEOUT_BODY)
|
---|
| 628 | if (this.timeout) {
|
---|
| 629 | // istanbul ignore else: only for jest
|
---|
| 630 | if (this.timeout.refresh) {
|
---|
| 631 | this.timeout.refresh()
|
---|
| 632 | }
|
---|
| 633 | }
|
---|
| 634 |
|
---|
| 635 | this.paused = false
|
---|
| 636 | this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
|
---|
| 637 | this.readMore()
|
---|
| 638 | }
|
---|
| 639 |
|
---|
| 640 | readMore () {
|
---|
| 641 | while (!this.paused && this.ptr) {
|
---|
| 642 | const chunk = this.socket.read()
|
---|
| 643 | if (chunk === null) {
|
---|
| 644 | break
|
---|
| 645 | }
|
---|
| 646 | this.execute(chunk)
|
---|
| 647 | }
|
---|
| 648 | }
|
---|
| 649 |
|
---|
| 650 | execute (data) {
|
---|
| 651 | assert(this.ptr != null)
|
---|
| 652 | assert(currentParser == null)
|
---|
| 653 | assert(!this.paused)
|
---|
| 654 |
|
---|
| 655 | const { socket, llhttp } = this
|
---|
| 656 |
|
---|
| 657 | if (data.length > currentBufferSize) {
|
---|
| 658 | if (currentBufferPtr) {
|
---|
| 659 | llhttp.free(currentBufferPtr)
|
---|
| 660 | }
|
---|
| 661 | currentBufferSize = Math.ceil(data.length / 4096) * 4096
|
---|
| 662 | currentBufferPtr = llhttp.malloc(currentBufferSize)
|
---|
| 663 | }
|
---|
| 664 |
|
---|
| 665 | new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data)
|
---|
| 666 |
|
---|
| 667 | // Call `execute` on the wasm parser.
|
---|
| 668 | // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
|
---|
| 669 | // and finally the length of bytes to parse.
|
---|
| 670 | // The return value is an error code or `constants.ERROR.OK`.
|
---|
| 671 | try {
|
---|
| 672 | let ret
|
---|
| 673 |
|
---|
| 674 | try {
|
---|
| 675 | currentBufferRef = data
|
---|
| 676 | currentParser = this
|
---|
| 677 | ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length)
|
---|
| 678 | /* eslint-disable-next-line no-useless-catch */
|
---|
| 679 | } catch (err) {
|
---|
| 680 | /* istanbul ignore next: difficult to make a test case for */
|
---|
| 681 | throw err
|
---|
| 682 | } finally {
|
---|
| 683 | currentParser = null
|
---|
| 684 | currentBufferRef = null
|
---|
| 685 | }
|
---|
| 686 |
|
---|
| 687 | const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr
|
---|
| 688 |
|
---|
| 689 | if (ret === constants.ERROR.PAUSED_UPGRADE) {
|
---|
| 690 | this.onUpgrade(data.slice(offset))
|
---|
| 691 | } else if (ret === constants.ERROR.PAUSED) {
|
---|
| 692 | this.paused = true
|
---|
| 693 | socket.unshift(data.slice(offset))
|
---|
| 694 | } else if (ret !== constants.ERROR.OK) {
|
---|
| 695 | const ptr = llhttp.llhttp_get_error_reason(this.ptr)
|
---|
| 696 | let message = ''
|
---|
| 697 | /* istanbul ignore else: difficult to make a test case for */
|
---|
| 698 | if (ptr) {
|
---|
| 699 | const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
|
---|
| 700 | message =
|
---|
| 701 | 'Response does not match the HTTP/1.1 protocol (' +
|
---|
| 702 | Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
|
---|
| 703 | ')'
|
---|
| 704 | }
|
---|
| 705 | throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
|
---|
| 706 | }
|
---|
| 707 | } catch (err) {
|
---|
| 708 | util.destroy(socket, err)
|
---|
| 709 | }
|
---|
| 710 | }
|
---|
| 711 |
|
---|
| 712 | destroy () {
|
---|
| 713 | assert(this.ptr != null)
|
---|
| 714 | assert(currentParser == null)
|
---|
| 715 |
|
---|
| 716 | this.llhttp.llhttp_free(this.ptr)
|
---|
| 717 | this.ptr = null
|
---|
| 718 |
|
---|
| 719 | timers.clearTimeout(this.timeout)
|
---|
| 720 | this.timeout = null
|
---|
| 721 | this.timeoutValue = null
|
---|
| 722 | this.timeoutType = null
|
---|
| 723 |
|
---|
| 724 | this.paused = false
|
---|
| 725 | }
|
---|
| 726 |
|
---|
| 727 | onStatus (buf) {
|
---|
| 728 | this.statusText = buf.toString()
|
---|
| 729 | }
|
---|
| 730 |
|
---|
| 731 | onMessageBegin () {
|
---|
| 732 | const { socket, client } = this
|
---|
| 733 |
|
---|
| 734 | /* istanbul ignore next: difficult to make a test case for */
|
---|
| 735 | if (socket.destroyed) {
|
---|
| 736 | return -1
|
---|
| 737 | }
|
---|
| 738 |
|
---|
| 739 | const request = client[kQueue][client[kRunningIdx]]
|
---|
| 740 | if (!request) {
|
---|
| 741 | return -1
|
---|
| 742 | }
|
---|
| 743 | }
|
---|
| 744 |
|
---|
| 745 | onHeaderField (buf) {
|
---|
| 746 | const len = this.headers.length
|
---|
| 747 |
|
---|
| 748 | if ((len & 1) === 0) {
|
---|
| 749 | this.headers.push(buf)
|
---|
| 750 | } else {
|
---|
| 751 | this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
|
---|
| 752 | }
|
---|
| 753 |
|
---|
| 754 | this.trackHeader(buf.length)
|
---|
| 755 | }
|
---|
| 756 |
|
---|
| 757 | onHeaderValue (buf) {
|
---|
| 758 | let len = this.headers.length
|
---|
| 759 |
|
---|
| 760 | if ((len & 1) === 1) {
|
---|
| 761 | this.headers.push(buf)
|
---|
| 762 | len += 1
|
---|
| 763 | } else {
|
---|
| 764 | this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
|
---|
| 765 | }
|
---|
| 766 |
|
---|
| 767 | const key = this.headers[len - 2]
|
---|
| 768 | if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') {
|
---|
| 769 | this.keepAlive += buf.toString()
|
---|
| 770 | } else if (key.length === 10 && key.toString().toLowerCase() === 'connection') {
|
---|
| 771 | this.connection += buf.toString()
|
---|
| 772 | } else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') {
|
---|
| 773 | this.contentLength += buf.toString()
|
---|
| 774 | }
|
---|
| 775 |
|
---|
| 776 | this.trackHeader(buf.length)
|
---|
| 777 | }
|
---|
| 778 |
|
---|
| 779 | trackHeader (len) {
|
---|
| 780 | this.headersSize += len
|
---|
| 781 | if (this.headersSize >= this.headersMaxSize) {
|
---|
| 782 | util.destroy(this.socket, new HeadersOverflowError())
|
---|
| 783 | }
|
---|
| 784 | }
|
---|
| 785 |
|
---|
| 786 | onUpgrade (head) {
|
---|
| 787 | const { upgrade, client, socket, headers, statusCode } = this
|
---|
| 788 |
|
---|
| 789 | assert(upgrade)
|
---|
| 790 |
|
---|
| 791 | const request = client[kQueue][client[kRunningIdx]]
|
---|
| 792 | assert(request)
|
---|
| 793 |
|
---|
| 794 | assert(!socket.destroyed)
|
---|
| 795 | assert(socket === client[kSocket])
|
---|
| 796 | assert(!this.paused)
|
---|
| 797 | assert(request.upgrade || request.method === 'CONNECT')
|
---|
| 798 |
|
---|
| 799 | this.statusCode = null
|
---|
| 800 | this.statusText = ''
|
---|
| 801 | this.shouldKeepAlive = null
|
---|
| 802 |
|
---|
| 803 | assert(this.headers.length % 2 === 0)
|
---|
| 804 | this.headers = []
|
---|
| 805 | this.headersSize = 0
|
---|
| 806 |
|
---|
| 807 | socket.unshift(head)
|
---|
| 808 |
|
---|
| 809 | socket[kParser].destroy()
|
---|
| 810 | socket[kParser] = null
|
---|
| 811 |
|
---|
| 812 | socket[kClient] = null
|
---|
| 813 | socket[kError] = null
|
---|
| 814 | socket
|
---|
| 815 | .removeListener('error', onSocketError)
|
---|
| 816 | .removeListener('readable', onSocketReadable)
|
---|
| 817 | .removeListener('end', onSocketEnd)
|
---|
| 818 | .removeListener('close', onSocketClose)
|
---|
| 819 |
|
---|
| 820 | client[kSocket] = null
|
---|
| 821 | client[kQueue][client[kRunningIdx]++] = null
|
---|
| 822 | client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
|
---|
| 823 |
|
---|
| 824 | try {
|
---|
| 825 | request.onUpgrade(statusCode, headers, socket)
|
---|
| 826 | } catch (err) {
|
---|
| 827 | util.destroy(socket, err)
|
---|
| 828 | }
|
---|
| 829 |
|
---|
| 830 | resume(client)
|
---|
| 831 | }
|
---|
| 832 |
|
---|
| 833 | onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
|
---|
| 834 | const { client, socket, headers, statusText } = this
|
---|
| 835 |
|
---|
| 836 | /* istanbul ignore next: difficult to make a test case for */
|
---|
| 837 | if (socket.destroyed) {
|
---|
| 838 | return -1
|
---|
| 839 | }
|
---|
| 840 |
|
---|
| 841 | const request = client[kQueue][client[kRunningIdx]]
|
---|
| 842 |
|
---|
| 843 | /* istanbul ignore next: difficult to make a test case for */
|
---|
| 844 | if (!request) {
|
---|
| 845 | return -1
|
---|
| 846 | }
|
---|
| 847 |
|
---|
| 848 | assert(!this.upgrade)
|
---|
| 849 | assert(this.statusCode < 200)
|
---|
| 850 |
|
---|
| 851 | if (statusCode === 100) {
|
---|
| 852 | util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
|
---|
| 853 | return -1
|
---|
| 854 | }
|
---|
| 855 |
|
---|
| 856 | /* this can only happen if server is misbehaving */
|
---|
| 857 | if (upgrade && !request.upgrade) {
|
---|
| 858 | util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
|
---|
| 859 | return -1
|
---|
| 860 | }
|
---|
| 861 |
|
---|
| 862 | assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS)
|
---|
| 863 |
|
---|
| 864 | this.statusCode = statusCode
|
---|
| 865 | this.shouldKeepAlive = (
|
---|
| 866 | shouldKeepAlive ||
|
---|
| 867 | // Override llhttp value which does not allow keepAlive for HEAD.
|
---|
| 868 | (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
|
---|
| 869 | )
|
---|
| 870 |
|
---|
| 871 | if (this.statusCode >= 200) {
|
---|
| 872 | const bodyTimeout = request.bodyTimeout != null
|
---|
| 873 | ? request.bodyTimeout
|
---|
| 874 | : client[kBodyTimeout]
|
---|
| 875 | this.setTimeout(bodyTimeout, TIMEOUT_BODY)
|
---|
| 876 | } else if (this.timeout) {
|
---|
| 877 | // istanbul ignore else: only for jest
|
---|
| 878 | if (this.timeout.refresh) {
|
---|
| 879 | this.timeout.refresh()
|
---|
| 880 | }
|
---|
| 881 | }
|
---|
| 882 |
|
---|
| 883 | if (request.method === 'CONNECT') {
|
---|
| 884 | assert(client[kRunning] === 1)
|
---|
| 885 | this.upgrade = true
|
---|
| 886 | return 2
|
---|
| 887 | }
|
---|
| 888 |
|
---|
| 889 | if (upgrade) {
|
---|
| 890 | assert(client[kRunning] === 1)
|
---|
| 891 | this.upgrade = true
|
---|
| 892 | return 2
|
---|
| 893 | }
|
---|
| 894 |
|
---|
| 895 | assert(this.headers.length % 2 === 0)
|
---|
| 896 | this.headers = []
|
---|
| 897 | this.headersSize = 0
|
---|
| 898 |
|
---|
| 899 | if (this.shouldKeepAlive && client[kPipelining]) {
|
---|
| 900 | const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
|
---|
| 901 |
|
---|
| 902 | if (keepAliveTimeout != null) {
|
---|
| 903 | const timeout = Math.min(
|
---|
| 904 | keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
|
---|
| 905 | client[kKeepAliveMaxTimeout]
|
---|
| 906 | )
|
---|
| 907 | if (timeout <= 0) {
|
---|
| 908 | socket[kReset] = true
|
---|
| 909 | } else {
|
---|
| 910 | client[kKeepAliveTimeoutValue] = timeout
|
---|
| 911 | }
|
---|
| 912 | } else {
|
---|
| 913 | client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
|
---|
| 914 | }
|
---|
| 915 | } else {
|
---|
| 916 | // Stop more requests from being dispatched.
|
---|
| 917 | socket[kReset] = true
|
---|
| 918 | }
|
---|
| 919 |
|
---|
| 920 | const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
|
---|
| 921 |
|
---|
| 922 | if (request.aborted) {
|
---|
| 923 | return -1
|
---|
| 924 | }
|
---|
| 925 |
|
---|
| 926 | if (request.method === 'HEAD') {
|
---|
| 927 | return 1
|
---|
| 928 | }
|
---|
| 929 |
|
---|
| 930 | if (statusCode < 200) {
|
---|
| 931 | return 1
|
---|
| 932 | }
|
---|
| 933 |
|
---|
| 934 | if (socket[kBlocking]) {
|
---|
| 935 | socket[kBlocking] = false
|
---|
| 936 | resume(client)
|
---|
| 937 | }
|
---|
| 938 |
|
---|
| 939 | return pause ? constants.ERROR.PAUSED : 0
|
---|
| 940 | }
|
---|
| 941 |
|
---|
| 942 | onBody (buf) {
|
---|
| 943 | const { client, socket, statusCode, maxResponseSize } = this
|
---|
| 944 |
|
---|
| 945 | if (socket.destroyed) {
|
---|
| 946 | return -1
|
---|
| 947 | }
|
---|
| 948 |
|
---|
| 949 | const request = client[kQueue][client[kRunningIdx]]
|
---|
| 950 | assert(request)
|
---|
| 951 |
|
---|
| 952 | assert.strictEqual(this.timeoutType, TIMEOUT_BODY)
|
---|
| 953 | if (this.timeout) {
|
---|
| 954 | // istanbul ignore else: only for jest
|
---|
| 955 | if (this.timeout.refresh) {
|
---|
| 956 | this.timeout.refresh()
|
---|
| 957 | }
|
---|
| 958 | }
|
---|
| 959 |
|
---|
| 960 | assert(statusCode >= 200)
|
---|
| 961 |
|
---|
| 962 | if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
|
---|
| 963 | util.destroy(socket, new ResponseExceededMaxSizeError())
|
---|
| 964 | return -1
|
---|
| 965 | }
|
---|
| 966 |
|
---|
| 967 | this.bytesRead += buf.length
|
---|
| 968 |
|
---|
| 969 | if (request.onData(buf) === false) {
|
---|
| 970 | return constants.ERROR.PAUSED
|
---|
| 971 | }
|
---|
| 972 | }
|
---|
| 973 |
|
---|
| 974 | onMessageComplete () {
|
---|
| 975 | const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
|
---|
| 976 |
|
---|
| 977 | if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
|
---|
| 978 | return -1
|
---|
| 979 | }
|
---|
| 980 |
|
---|
| 981 | if (upgrade) {
|
---|
| 982 | return
|
---|
| 983 | }
|
---|
| 984 |
|
---|
| 985 | const request = client[kQueue][client[kRunningIdx]]
|
---|
| 986 | assert(request)
|
---|
| 987 |
|
---|
| 988 | assert(statusCode >= 100)
|
---|
| 989 |
|
---|
| 990 | this.statusCode = null
|
---|
| 991 | this.statusText = ''
|
---|
| 992 | this.bytesRead = 0
|
---|
| 993 | this.contentLength = ''
|
---|
| 994 | this.keepAlive = ''
|
---|
| 995 | this.connection = ''
|
---|
| 996 |
|
---|
| 997 | assert(this.headers.length % 2 === 0)
|
---|
| 998 | this.headers = []
|
---|
| 999 | this.headersSize = 0
|
---|
| 1000 |
|
---|
| 1001 | if (statusCode < 200) {
|
---|
| 1002 | return
|
---|
| 1003 | }
|
---|
| 1004 |
|
---|
| 1005 | /* istanbul ignore next: should be handled by llhttp? */
|
---|
| 1006 | if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
|
---|
| 1007 | util.destroy(socket, new ResponseContentLengthMismatchError())
|
---|
| 1008 | return -1
|
---|
| 1009 | }
|
---|
| 1010 |
|
---|
| 1011 | request.onComplete(headers)
|
---|
| 1012 |
|
---|
| 1013 | client[kQueue][client[kRunningIdx]++] = null
|
---|
| 1014 |
|
---|
| 1015 | if (socket[kWriting]) {
|
---|
| 1016 | assert.strictEqual(client[kRunning], 0)
|
---|
| 1017 | // Response completed before request.
|
---|
| 1018 | util.destroy(socket, new InformationalError('reset'))
|
---|
| 1019 | return constants.ERROR.PAUSED
|
---|
| 1020 | } else if (!shouldKeepAlive) {
|
---|
| 1021 | util.destroy(socket, new InformationalError('reset'))
|
---|
| 1022 | return constants.ERROR.PAUSED
|
---|
| 1023 | } else if (socket[kReset] && client[kRunning] === 0) {
|
---|
| 1024 | // Destroy socket once all requests have completed.
|
---|
| 1025 | // The request at the tail of the pipeline is the one
|
---|
| 1026 | // that requested reset and no further requests should
|
---|
| 1027 | // have been queued since then.
|
---|
| 1028 | util.destroy(socket, new InformationalError('reset'))
|
---|
| 1029 | return constants.ERROR.PAUSED
|
---|
| 1030 | } else if (client[kPipelining] === 1) {
|
---|
| 1031 | // We must wait a full event loop cycle to reuse this socket to make sure
|
---|
| 1032 | // that non-spec compliant servers are not closing the connection even if they
|
---|
| 1033 | // said they won't.
|
---|
| 1034 | setImmediate(resume, client)
|
---|
| 1035 | } else {
|
---|
| 1036 | resume(client)
|
---|
| 1037 | }
|
---|
| 1038 | }
|
---|
| 1039 | }
|
---|
| 1040 |
|
---|
| 1041 | function onParserTimeout (parser) {
|
---|
| 1042 | const { socket, timeoutType, client } = parser
|
---|
| 1043 |
|
---|
| 1044 | /* istanbul ignore else */
|
---|
| 1045 | if (timeoutType === TIMEOUT_HEADERS) {
|
---|
| 1046 | if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
|
---|
| 1047 | assert(!parser.paused, 'cannot be paused while waiting for headers')
|
---|
| 1048 | util.destroy(socket, new HeadersTimeoutError())
|
---|
| 1049 | }
|
---|
| 1050 | } else if (timeoutType === TIMEOUT_BODY) {
|
---|
| 1051 | if (!parser.paused) {
|
---|
| 1052 | util.destroy(socket, new BodyTimeoutError())
|
---|
| 1053 | }
|
---|
| 1054 | } else if (timeoutType === TIMEOUT_IDLE) {
|
---|
| 1055 | assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
|
---|
| 1056 | util.destroy(socket, new InformationalError('socket idle timeout'))
|
---|
| 1057 | }
|
---|
| 1058 | }
|
---|
| 1059 |
|
---|
| 1060 | function onSocketReadable () {
|
---|
| 1061 | const { [kParser]: parser } = this
|
---|
| 1062 | if (parser) {
|
---|
| 1063 | parser.readMore()
|
---|
| 1064 | }
|
---|
| 1065 | }
|
---|
| 1066 |
|
---|
| 1067 | function onSocketError (err) {
|
---|
| 1068 | const { [kClient]: client, [kParser]: parser } = this
|
---|
| 1069 |
|
---|
| 1070 | assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
|
---|
| 1071 |
|
---|
| 1072 | if (client[kHTTPConnVersion] !== 'h2') {
|
---|
| 1073 | // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
|
---|
| 1074 | // to the user.
|
---|
| 1075 | if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
|
---|
| 1076 | // We treat all incoming data so for as a valid response.
|
---|
| 1077 | parser.onMessageComplete()
|
---|
| 1078 | return
|
---|
| 1079 | }
|
---|
| 1080 | }
|
---|
| 1081 |
|
---|
| 1082 | this[kError] = err
|
---|
| 1083 |
|
---|
| 1084 | onError(this[kClient], err)
|
---|
| 1085 | }
|
---|
| 1086 |
|
---|
| 1087 | function onError (client, err) {
|
---|
| 1088 | if (
|
---|
| 1089 | client[kRunning] === 0 &&
|
---|
| 1090 | err.code !== 'UND_ERR_INFO' &&
|
---|
| 1091 | err.code !== 'UND_ERR_SOCKET'
|
---|
| 1092 | ) {
|
---|
| 1093 | // Error is not caused by running request and not a recoverable
|
---|
| 1094 | // socket error.
|
---|
| 1095 |
|
---|
| 1096 | assert(client[kPendingIdx] === client[kRunningIdx])
|
---|
| 1097 |
|
---|
| 1098 | const requests = client[kQueue].splice(client[kRunningIdx])
|
---|
| 1099 | for (let i = 0; i < requests.length; i++) {
|
---|
| 1100 | const request = requests[i]
|
---|
| 1101 | errorRequest(client, request, err)
|
---|
| 1102 | }
|
---|
| 1103 | assert(client[kSize] === 0)
|
---|
| 1104 | }
|
---|
| 1105 | }
|
---|
| 1106 |
|
---|
| 1107 | function onSocketEnd () {
|
---|
| 1108 | const { [kParser]: parser, [kClient]: client } = this
|
---|
| 1109 |
|
---|
| 1110 | if (client[kHTTPConnVersion] !== 'h2') {
|
---|
| 1111 | if (parser.statusCode && !parser.shouldKeepAlive) {
|
---|
| 1112 | // We treat all incoming data so far as a valid response.
|
---|
| 1113 | parser.onMessageComplete()
|
---|
| 1114 | return
|
---|
| 1115 | }
|
---|
| 1116 | }
|
---|
| 1117 |
|
---|
| 1118 | util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
|
---|
| 1119 | }
|
---|
| 1120 |
|
---|
| 1121 | function onSocketClose () {
|
---|
| 1122 | const { [kClient]: client, [kParser]: parser } = this
|
---|
| 1123 |
|
---|
| 1124 | if (client[kHTTPConnVersion] === 'h1' && parser) {
|
---|
| 1125 | if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
|
---|
| 1126 | // We treat all incoming data so far as a valid response.
|
---|
| 1127 | parser.onMessageComplete()
|
---|
| 1128 | }
|
---|
| 1129 |
|
---|
| 1130 | this[kParser].destroy()
|
---|
| 1131 | this[kParser] = null
|
---|
| 1132 | }
|
---|
| 1133 |
|
---|
| 1134 | const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
|
---|
| 1135 |
|
---|
| 1136 | client[kSocket] = null
|
---|
| 1137 |
|
---|
| 1138 | if (client.destroyed) {
|
---|
| 1139 | assert(client[kPending] === 0)
|
---|
| 1140 |
|
---|
| 1141 | // Fail entire queue.
|
---|
| 1142 | const requests = client[kQueue].splice(client[kRunningIdx])
|
---|
| 1143 | for (let i = 0; i < requests.length; i++) {
|
---|
| 1144 | const request = requests[i]
|
---|
| 1145 | errorRequest(client, request, err)
|
---|
| 1146 | }
|
---|
| 1147 | } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
|
---|
| 1148 | // Fail head of pipeline.
|
---|
| 1149 | const request = client[kQueue][client[kRunningIdx]]
|
---|
| 1150 | client[kQueue][client[kRunningIdx]++] = null
|
---|
| 1151 |
|
---|
| 1152 | errorRequest(client, request, err)
|
---|
| 1153 | }
|
---|
| 1154 |
|
---|
| 1155 | client[kPendingIdx] = client[kRunningIdx]
|
---|
| 1156 |
|
---|
| 1157 | assert(client[kRunning] === 0)
|
---|
| 1158 |
|
---|
| 1159 | client.emit('disconnect', client[kUrl], [client], err)
|
---|
| 1160 |
|
---|
| 1161 | resume(client)
|
---|
| 1162 | }
|
---|
| 1163 |
|
---|
| 1164 | async function connect (client) {
|
---|
| 1165 | assert(!client[kConnecting])
|
---|
| 1166 | assert(!client[kSocket])
|
---|
| 1167 |
|
---|
| 1168 | let { host, hostname, protocol, port } = client[kUrl]
|
---|
| 1169 |
|
---|
| 1170 | // Resolve ipv6
|
---|
| 1171 | if (hostname[0] === '[') {
|
---|
| 1172 | const idx = hostname.indexOf(']')
|
---|
| 1173 |
|
---|
| 1174 | assert(idx !== -1)
|
---|
| 1175 | const ip = hostname.substring(1, idx)
|
---|
| 1176 |
|
---|
| 1177 | assert(net.isIP(ip))
|
---|
| 1178 | hostname = ip
|
---|
| 1179 | }
|
---|
| 1180 |
|
---|
| 1181 | client[kConnecting] = true
|
---|
| 1182 |
|
---|
| 1183 | if (channels.beforeConnect.hasSubscribers) {
|
---|
| 1184 | channels.beforeConnect.publish({
|
---|
| 1185 | connectParams: {
|
---|
| 1186 | host,
|
---|
| 1187 | hostname,
|
---|
| 1188 | protocol,
|
---|
| 1189 | port,
|
---|
| 1190 | servername: client[kServerName],
|
---|
| 1191 | localAddress: client[kLocalAddress]
|
---|
| 1192 | },
|
---|
| 1193 | connector: client[kConnector]
|
---|
| 1194 | })
|
---|
| 1195 | }
|
---|
| 1196 |
|
---|
| 1197 | try {
|
---|
| 1198 | const socket = await new Promise((resolve, reject) => {
|
---|
| 1199 | client[kConnector]({
|
---|
| 1200 | host,
|
---|
| 1201 | hostname,
|
---|
| 1202 | protocol,
|
---|
| 1203 | port,
|
---|
| 1204 | servername: client[kServerName],
|
---|
| 1205 | localAddress: client[kLocalAddress]
|
---|
| 1206 | }, (err, socket) => {
|
---|
| 1207 | if (err) {
|
---|
| 1208 | reject(err)
|
---|
| 1209 | } else {
|
---|
| 1210 | resolve(socket)
|
---|
| 1211 | }
|
---|
| 1212 | })
|
---|
| 1213 | })
|
---|
| 1214 |
|
---|
| 1215 | if (client.destroyed) {
|
---|
| 1216 | util.destroy(socket.on('error', () => {}), new ClientDestroyedError())
|
---|
| 1217 | return
|
---|
| 1218 | }
|
---|
| 1219 |
|
---|
| 1220 | client[kConnecting] = false
|
---|
| 1221 |
|
---|
| 1222 | assert(socket)
|
---|
| 1223 |
|
---|
| 1224 | const isH2 = socket.alpnProtocol === 'h2'
|
---|
| 1225 | if (isH2) {
|
---|
| 1226 | if (!h2ExperimentalWarned) {
|
---|
| 1227 | h2ExperimentalWarned = true
|
---|
| 1228 | process.emitWarning('H2 support is experimental, expect them to change at any time.', {
|
---|
| 1229 | code: 'UNDICI-H2'
|
---|
| 1230 | })
|
---|
| 1231 | }
|
---|
| 1232 |
|
---|
| 1233 | const session = http2.connect(client[kUrl], {
|
---|
| 1234 | createConnection: () => socket,
|
---|
| 1235 | peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
|
---|
| 1236 | })
|
---|
| 1237 |
|
---|
| 1238 | client[kHTTPConnVersion] = 'h2'
|
---|
| 1239 | session[kClient] = client
|
---|
| 1240 | session[kSocket] = socket
|
---|
| 1241 | session.on('error', onHttp2SessionError)
|
---|
| 1242 | session.on('frameError', onHttp2FrameError)
|
---|
| 1243 | session.on('end', onHttp2SessionEnd)
|
---|
| 1244 | session.on('goaway', onHTTP2GoAway)
|
---|
| 1245 | session.on('close', onSocketClose)
|
---|
| 1246 | session.unref()
|
---|
| 1247 |
|
---|
| 1248 | client[kHTTP2Session] = session
|
---|
| 1249 | socket[kHTTP2Session] = session
|
---|
| 1250 | } else {
|
---|
| 1251 | if (!llhttpInstance) {
|
---|
| 1252 | llhttpInstance = await llhttpPromise
|
---|
| 1253 | llhttpPromise = null
|
---|
| 1254 | }
|
---|
| 1255 |
|
---|
| 1256 | socket[kNoRef] = false
|
---|
| 1257 | socket[kWriting] = false
|
---|
| 1258 | socket[kReset] = false
|
---|
| 1259 | socket[kBlocking] = false
|
---|
| 1260 | socket[kParser] = new Parser(client, socket, llhttpInstance)
|
---|
| 1261 | }
|
---|
| 1262 |
|
---|
| 1263 | socket[kCounter] = 0
|
---|
| 1264 | socket[kMaxRequests] = client[kMaxRequests]
|
---|
| 1265 | socket[kClient] = client
|
---|
| 1266 | socket[kError] = null
|
---|
| 1267 |
|
---|
| 1268 | socket
|
---|
| 1269 | .on('error', onSocketError)
|
---|
| 1270 | .on('readable', onSocketReadable)
|
---|
| 1271 | .on('end', onSocketEnd)
|
---|
| 1272 | .on('close', onSocketClose)
|
---|
| 1273 |
|
---|
| 1274 | client[kSocket] = socket
|
---|
| 1275 |
|
---|
| 1276 | if (channels.connected.hasSubscribers) {
|
---|
| 1277 | channels.connected.publish({
|
---|
| 1278 | connectParams: {
|
---|
| 1279 | host,
|
---|
| 1280 | hostname,
|
---|
| 1281 | protocol,
|
---|
| 1282 | port,
|
---|
| 1283 | servername: client[kServerName],
|
---|
| 1284 | localAddress: client[kLocalAddress]
|
---|
| 1285 | },
|
---|
| 1286 | connector: client[kConnector],
|
---|
| 1287 | socket
|
---|
| 1288 | })
|
---|
| 1289 | }
|
---|
| 1290 | client.emit('connect', client[kUrl], [client])
|
---|
| 1291 | } catch (err) {
|
---|
| 1292 | if (client.destroyed) {
|
---|
| 1293 | return
|
---|
| 1294 | }
|
---|
| 1295 |
|
---|
| 1296 | client[kConnecting] = false
|
---|
| 1297 |
|
---|
| 1298 | if (channels.connectError.hasSubscribers) {
|
---|
| 1299 | channels.connectError.publish({
|
---|
| 1300 | connectParams: {
|
---|
| 1301 | host,
|
---|
| 1302 | hostname,
|
---|
| 1303 | protocol,
|
---|
| 1304 | port,
|
---|
| 1305 | servername: client[kServerName],
|
---|
| 1306 | localAddress: client[kLocalAddress]
|
---|
| 1307 | },
|
---|
| 1308 | connector: client[kConnector],
|
---|
| 1309 | error: err
|
---|
| 1310 | })
|
---|
| 1311 | }
|
---|
| 1312 |
|
---|
| 1313 | if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
|
---|
| 1314 | assert(client[kRunning] === 0)
|
---|
| 1315 | while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
|
---|
| 1316 | const request = client[kQueue][client[kPendingIdx]++]
|
---|
| 1317 | errorRequest(client, request, err)
|
---|
| 1318 | }
|
---|
| 1319 | } else {
|
---|
| 1320 | onError(client, err)
|
---|
| 1321 | }
|
---|
| 1322 |
|
---|
| 1323 | client.emit('connectionError', client[kUrl], [client], err)
|
---|
| 1324 | }
|
---|
| 1325 |
|
---|
| 1326 | resume(client)
|
---|
| 1327 | }
|
---|
| 1328 |
|
---|
| 1329 | function emitDrain (client) {
|
---|
| 1330 | client[kNeedDrain] = 0
|
---|
| 1331 | client.emit('drain', client[kUrl], [client])
|
---|
| 1332 | }
|
---|
| 1333 |
|
---|
| 1334 | function resume (client, sync) {
|
---|
| 1335 | if (client[kResuming] === 2) {
|
---|
| 1336 | return
|
---|
| 1337 | }
|
---|
| 1338 |
|
---|
| 1339 | client[kResuming] = 2
|
---|
| 1340 |
|
---|
| 1341 | _resume(client, sync)
|
---|
| 1342 | client[kResuming] = 0
|
---|
| 1343 |
|
---|
| 1344 | if (client[kRunningIdx] > 256) {
|
---|
| 1345 | client[kQueue].splice(0, client[kRunningIdx])
|
---|
| 1346 | client[kPendingIdx] -= client[kRunningIdx]
|
---|
| 1347 | client[kRunningIdx] = 0
|
---|
| 1348 | }
|
---|
| 1349 | }
|
---|
| 1350 |
|
---|
| 1351 | function _resume (client, sync) {
|
---|
| 1352 | while (true) {
|
---|
| 1353 | if (client.destroyed) {
|
---|
| 1354 | assert(client[kPending] === 0)
|
---|
| 1355 | return
|
---|
| 1356 | }
|
---|
| 1357 |
|
---|
| 1358 | if (client[kClosedResolve] && !client[kSize]) {
|
---|
| 1359 | client[kClosedResolve]()
|
---|
| 1360 | client[kClosedResolve] = null
|
---|
| 1361 | return
|
---|
| 1362 | }
|
---|
| 1363 |
|
---|
| 1364 | const socket = client[kSocket]
|
---|
| 1365 |
|
---|
| 1366 | if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') {
|
---|
| 1367 | if (client[kSize] === 0) {
|
---|
| 1368 | if (!socket[kNoRef] && socket.unref) {
|
---|
| 1369 | socket.unref()
|
---|
| 1370 | socket[kNoRef] = true
|
---|
| 1371 | }
|
---|
| 1372 | } else if (socket[kNoRef] && socket.ref) {
|
---|
| 1373 | socket.ref()
|
---|
| 1374 | socket[kNoRef] = false
|
---|
| 1375 | }
|
---|
| 1376 |
|
---|
| 1377 | if (client[kSize] === 0) {
|
---|
| 1378 | if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
|
---|
| 1379 | socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
|
---|
| 1380 | }
|
---|
| 1381 | } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
|
---|
| 1382 | if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
|
---|
| 1383 | const request = client[kQueue][client[kRunningIdx]]
|
---|
| 1384 | const headersTimeout = request.headersTimeout != null
|
---|
| 1385 | ? request.headersTimeout
|
---|
| 1386 | : client[kHeadersTimeout]
|
---|
| 1387 | socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
|
---|
| 1388 | }
|
---|
| 1389 | }
|
---|
| 1390 | }
|
---|
| 1391 |
|
---|
| 1392 | if (client[kBusy]) {
|
---|
| 1393 | client[kNeedDrain] = 2
|
---|
| 1394 | } else if (client[kNeedDrain] === 2) {
|
---|
| 1395 | if (sync) {
|
---|
| 1396 | client[kNeedDrain] = 1
|
---|
| 1397 | process.nextTick(emitDrain, client)
|
---|
| 1398 | } else {
|
---|
| 1399 | emitDrain(client)
|
---|
| 1400 | }
|
---|
| 1401 | continue
|
---|
| 1402 | }
|
---|
| 1403 |
|
---|
| 1404 | if (client[kPending] === 0) {
|
---|
| 1405 | return
|
---|
| 1406 | }
|
---|
| 1407 |
|
---|
| 1408 | if (client[kRunning] >= (client[kPipelining] || 1)) {
|
---|
| 1409 | return
|
---|
| 1410 | }
|
---|
| 1411 |
|
---|
| 1412 | const request = client[kQueue][client[kPendingIdx]]
|
---|
| 1413 |
|
---|
| 1414 | if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
|
---|
| 1415 | if (client[kRunning] > 0) {
|
---|
| 1416 | return
|
---|
| 1417 | }
|
---|
| 1418 |
|
---|
| 1419 | client[kServerName] = request.servername
|
---|
| 1420 |
|
---|
| 1421 | if (socket && socket.servername !== request.servername) {
|
---|
| 1422 | util.destroy(socket, new InformationalError('servername changed'))
|
---|
| 1423 | return
|
---|
| 1424 | }
|
---|
| 1425 | }
|
---|
| 1426 |
|
---|
| 1427 | if (client[kConnecting]) {
|
---|
| 1428 | return
|
---|
| 1429 | }
|
---|
| 1430 |
|
---|
| 1431 | if (!socket && !client[kHTTP2Session]) {
|
---|
| 1432 | connect(client)
|
---|
| 1433 | return
|
---|
| 1434 | }
|
---|
| 1435 |
|
---|
| 1436 | if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
|
---|
| 1437 | return
|
---|
| 1438 | }
|
---|
| 1439 |
|
---|
| 1440 | if (client[kRunning] > 0 && !request.idempotent) {
|
---|
| 1441 | // Non-idempotent request cannot be retried.
|
---|
| 1442 | // Ensure that no other requests are inflight and
|
---|
| 1443 | // could cause failure.
|
---|
| 1444 | return
|
---|
| 1445 | }
|
---|
| 1446 |
|
---|
| 1447 | if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
|
---|
| 1448 | // Don't dispatch an upgrade until all preceding requests have completed.
|
---|
| 1449 | // A misbehaving server might upgrade the connection before all pipelined
|
---|
| 1450 | // request has completed.
|
---|
| 1451 | return
|
---|
| 1452 | }
|
---|
| 1453 |
|
---|
| 1454 | if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
|
---|
| 1455 | (util.isStream(request.body) || util.isAsyncIterable(request.body))) {
|
---|
| 1456 | // Request with stream or iterator body can error while other requests
|
---|
| 1457 | // are inflight and indirectly error those as well.
|
---|
| 1458 | // Ensure this doesn't happen by waiting for inflight
|
---|
| 1459 | // to complete before dispatching.
|
---|
| 1460 |
|
---|
| 1461 | // Request with stream or iterator body cannot be retried.
|
---|
| 1462 | // Ensure that no other requests are inflight and
|
---|
| 1463 | // could cause failure.
|
---|
| 1464 | return
|
---|
| 1465 | }
|
---|
| 1466 |
|
---|
| 1467 | if (!request.aborted && write(client, request)) {
|
---|
| 1468 | client[kPendingIdx]++
|
---|
| 1469 | } else {
|
---|
| 1470 | client[kQueue].splice(client[kPendingIdx], 1)
|
---|
| 1471 | }
|
---|
| 1472 | }
|
---|
| 1473 | }
|
---|
| 1474 |
|
---|
| 1475 | // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
|
---|
| 1476 | function shouldSendContentLength (method) {
|
---|
| 1477 | return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
|
---|
| 1478 | }
|
---|
| 1479 |
|
---|
| 1480 | function write (client, request) {
|
---|
| 1481 | if (client[kHTTPConnVersion] === 'h2') {
|
---|
| 1482 | writeH2(client, client[kHTTP2Session], request)
|
---|
| 1483 | return
|
---|
| 1484 | }
|
---|
| 1485 |
|
---|
| 1486 | const { body, method, path, host, upgrade, headers, blocking, reset } = request
|
---|
| 1487 |
|
---|
| 1488 | // https://tools.ietf.org/html/rfc7231#section-4.3.1
|
---|
| 1489 | // https://tools.ietf.org/html/rfc7231#section-4.3.2
|
---|
| 1490 | // https://tools.ietf.org/html/rfc7231#section-4.3.5
|
---|
| 1491 |
|
---|
| 1492 | // Sending a payload body on a request that does not
|
---|
| 1493 | // expect it can cause undefined behavior on some
|
---|
| 1494 | // servers and corrupt connection state. Do not
|
---|
| 1495 | // re-use the connection for further requests.
|
---|
| 1496 |
|
---|
| 1497 | const expectsPayload = (
|
---|
| 1498 | method === 'PUT' ||
|
---|
| 1499 | method === 'POST' ||
|
---|
| 1500 | method === 'PATCH'
|
---|
| 1501 | )
|
---|
| 1502 |
|
---|
| 1503 | if (body && typeof body.read === 'function') {
|
---|
| 1504 | // Try to read EOF in order to get length.
|
---|
| 1505 | body.read(0)
|
---|
| 1506 | }
|
---|
| 1507 |
|
---|
| 1508 | const bodyLength = util.bodyLength(body)
|
---|
| 1509 |
|
---|
| 1510 | let contentLength = bodyLength
|
---|
| 1511 |
|
---|
| 1512 | if (contentLength === null) {
|
---|
| 1513 | contentLength = request.contentLength
|
---|
| 1514 | }
|
---|
| 1515 |
|
---|
| 1516 | if (contentLength === 0 && !expectsPayload) {
|
---|
| 1517 | // https://tools.ietf.org/html/rfc7230#section-3.3.2
|
---|
| 1518 | // A user agent SHOULD NOT send a Content-Length header field when
|
---|
| 1519 | // the request message does not contain a payload body and the method
|
---|
| 1520 | // semantics do not anticipate such a body.
|
---|
| 1521 |
|
---|
| 1522 | contentLength = null
|
---|
| 1523 | }
|
---|
| 1524 |
|
---|
| 1525 | // https://github.com/nodejs/undici/issues/2046
|
---|
| 1526 | // A user agent may send a Content-Length header with 0 value, this should be allowed.
|
---|
| 1527 | if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
|
---|
| 1528 | if (client[kStrictContentLength]) {
|
---|
| 1529 | errorRequest(client, request, new RequestContentLengthMismatchError())
|
---|
| 1530 | return false
|
---|
| 1531 | }
|
---|
| 1532 |
|
---|
| 1533 | process.emitWarning(new RequestContentLengthMismatchError())
|
---|
| 1534 | }
|
---|
| 1535 |
|
---|
| 1536 | const socket = client[kSocket]
|
---|
| 1537 |
|
---|
| 1538 | try {
|
---|
| 1539 | request.onConnect((err) => {
|
---|
| 1540 | if (request.aborted || request.completed) {
|
---|
| 1541 | return
|
---|
| 1542 | }
|
---|
| 1543 |
|
---|
| 1544 | errorRequest(client, request, err || new RequestAbortedError())
|
---|
| 1545 |
|
---|
| 1546 | util.destroy(socket, new InformationalError('aborted'))
|
---|
| 1547 | })
|
---|
| 1548 | } catch (err) {
|
---|
| 1549 | errorRequest(client, request, err)
|
---|
| 1550 | }
|
---|
| 1551 |
|
---|
| 1552 | if (request.aborted) {
|
---|
| 1553 | return false
|
---|
| 1554 | }
|
---|
| 1555 |
|
---|
| 1556 | if (method === 'HEAD') {
|
---|
| 1557 | // https://github.com/mcollina/undici/issues/258
|
---|
| 1558 | // Close after a HEAD request to interop with misbehaving servers
|
---|
| 1559 | // that may send a body in the response.
|
---|
| 1560 |
|
---|
| 1561 | socket[kReset] = true
|
---|
| 1562 | }
|
---|
| 1563 |
|
---|
| 1564 | if (upgrade || method === 'CONNECT') {
|
---|
| 1565 | // On CONNECT or upgrade, block pipeline from dispatching further
|
---|
| 1566 | // requests on this connection.
|
---|
| 1567 |
|
---|
| 1568 | socket[kReset] = true
|
---|
| 1569 | }
|
---|
| 1570 |
|
---|
| 1571 | if (reset != null) {
|
---|
| 1572 | socket[kReset] = reset
|
---|
| 1573 | }
|
---|
| 1574 |
|
---|
| 1575 | if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
|
---|
| 1576 | socket[kReset] = true
|
---|
| 1577 | }
|
---|
| 1578 |
|
---|
| 1579 | if (blocking) {
|
---|
| 1580 | socket[kBlocking] = true
|
---|
| 1581 | }
|
---|
| 1582 |
|
---|
| 1583 | let header = `${method} ${path} HTTP/1.1\r\n`
|
---|
| 1584 |
|
---|
| 1585 | if (typeof host === 'string') {
|
---|
| 1586 | header += `host: ${host}\r\n`
|
---|
| 1587 | } else {
|
---|
| 1588 | header += client[kHostHeader]
|
---|
| 1589 | }
|
---|
| 1590 |
|
---|
| 1591 | if (upgrade) {
|
---|
| 1592 | header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
|
---|
| 1593 | } else if (client[kPipelining] && !socket[kReset]) {
|
---|
| 1594 | header += 'connection: keep-alive\r\n'
|
---|
| 1595 | } else {
|
---|
| 1596 | header += 'connection: close\r\n'
|
---|
| 1597 | }
|
---|
| 1598 |
|
---|
| 1599 | if (headers) {
|
---|
| 1600 | header += headers
|
---|
| 1601 | }
|
---|
| 1602 |
|
---|
| 1603 | if (channels.sendHeaders.hasSubscribers) {
|
---|
| 1604 | channels.sendHeaders.publish({ request, headers: header, socket })
|
---|
| 1605 | }
|
---|
| 1606 |
|
---|
| 1607 | /* istanbul ignore else: assertion */
|
---|
| 1608 | if (!body || bodyLength === 0) {
|
---|
| 1609 | if (contentLength === 0) {
|
---|
| 1610 | socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
|
---|
| 1611 | } else {
|
---|
| 1612 | assert(contentLength === null, 'no body must not have content length')
|
---|
| 1613 | socket.write(`${header}\r\n`, 'latin1')
|
---|
| 1614 | }
|
---|
| 1615 | request.onRequestSent()
|
---|
| 1616 | } else if (util.isBuffer(body)) {
|
---|
| 1617 | assert(contentLength === body.byteLength, 'buffer body must have content length')
|
---|
| 1618 |
|
---|
| 1619 | socket.cork()
|
---|
| 1620 | socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
---|
| 1621 | socket.write(body)
|
---|
| 1622 | socket.uncork()
|
---|
| 1623 | request.onBodySent(body)
|
---|
| 1624 | request.onRequestSent()
|
---|
| 1625 | if (!expectsPayload) {
|
---|
| 1626 | socket[kReset] = true
|
---|
| 1627 | }
|
---|
| 1628 | } else if (util.isBlobLike(body)) {
|
---|
| 1629 | if (typeof body.stream === 'function') {
|
---|
| 1630 | writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
|
---|
| 1631 | } else {
|
---|
| 1632 | writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
|
---|
| 1633 | }
|
---|
| 1634 | } else if (util.isStream(body)) {
|
---|
| 1635 | writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
|
---|
| 1636 | } else if (util.isIterable(body)) {
|
---|
| 1637 | writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
|
---|
| 1638 | } else {
|
---|
| 1639 | assert(false)
|
---|
| 1640 | }
|
---|
| 1641 |
|
---|
| 1642 | return true
|
---|
| 1643 | }
|
---|
| 1644 |
|
---|
| 1645 | function writeH2 (client, session, request) {
|
---|
| 1646 | const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
|
---|
| 1647 |
|
---|
| 1648 | let headers
|
---|
| 1649 | if (typeof reqHeaders === 'string') headers = Request[kHTTP2CopyHeaders](reqHeaders.trim())
|
---|
| 1650 | else headers = reqHeaders
|
---|
| 1651 |
|
---|
| 1652 | if (upgrade) {
|
---|
| 1653 | errorRequest(client, request, new Error('Upgrade not supported for H2'))
|
---|
| 1654 | return false
|
---|
| 1655 | }
|
---|
| 1656 |
|
---|
| 1657 | try {
|
---|
| 1658 | // TODO(HTTP/2): Should we call onConnect immediately or on stream ready event?
|
---|
| 1659 | request.onConnect((err) => {
|
---|
| 1660 | if (request.aborted || request.completed) {
|
---|
| 1661 | return
|
---|
| 1662 | }
|
---|
| 1663 |
|
---|
| 1664 | errorRequest(client, request, err || new RequestAbortedError())
|
---|
| 1665 | })
|
---|
| 1666 | } catch (err) {
|
---|
| 1667 | errorRequest(client, request, err)
|
---|
| 1668 | }
|
---|
| 1669 |
|
---|
| 1670 | if (request.aborted) {
|
---|
| 1671 | return false
|
---|
| 1672 | }
|
---|
| 1673 |
|
---|
| 1674 | /** @type {import('node:http2').ClientHttp2Stream} */
|
---|
| 1675 | let stream
|
---|
| 1676 | const h2State = client[kHTTP2SessionState]
|
---|
| 1677 |
|
---|
| 1678 | headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
|
---|
| 1679 | headers[HTTP2_HEADER_METHOD] = method
|
---|
| 1680 |
|
---|
| 1681 | if (method === 'CONNECT') {
|
---|
| 1682 | session.ref()
|
---|
| 1683 | // we are already connected, streams are pending, first request
|
---|
| 1684 | // will create a new stream. We trigger a request to create the stream and wait until
|
---|
| 1685 | // `ready` event is triggered
|
---|
| 1686 | // We disabled endStream to allow the user to write to the stream
|
---|
| 1687 | stream = session.request(headers, { endStream: false, signal })
|
---|
| 1688 |
|
---|
| 1689 | if (stream.id && !stream.pending) {
|
---|
| 1690 | request.onUpgrade(null, null, stream)
|
---|
| 1691 | ++h2State.openStreams
|
---|
| 1692 | } else {
|
---|
| 1693 | stream.once('ready', () => {
|
---|
| 1694 | request.onUpgrade(null, null, stream)
|
---|
| 1695 | ++h2State.openStreams
|
---|
| 1696 | })
|
---|
| 1697 | }
|
---|
| 1698 |
|
---|
| 1699 | stream.once('close', () => {
|
---|
| 1700 | h2State.openStreams -= 1
|
---|
| 1701 | // TODO(HTTP/2): unref only if current streams count is 0
|
---|
| 1702 | if (h2State.openStreams === 0) session.unref()
|
---|
| 1703 | })
|
---|
| 1704 |
|
---|
| 1705 | return true
|
---|
| 1706 | }
|
---|
| 1707 |
|
---|
| 1708 | // https://tools.ietf.org/html/rfc7540#section-8.3
|
---|
| 1709 | // :path and :scheme headers must be omited when sending CONNECT
|
---|
| 1710 |
|
---|
| 1711 | headers[HTTP2_HEADER_PATH] = path
|
---|
| 1712 | headers[HTTP2_HEADER_SCHEME] = 'https'
|
---|
| 1713 |
|
---|
| 1714 | // https://tools.ietf.org/html/rfc7231#section-4.3.1
|
---|
| 1715 | // https://tools.ietf.org/html/rfc7231#section-4.3.2
|
---|
| 1716 | // https://tools.ietf.org/html/rfc7231#section-4.3.5
|
---|
| 1717 |
|
---|
| 1718 | // Sending a payload body on a request that does not
|
---|
| 1719 | // expect it can cause undefined behavior on some
|
---|
| 1720 | // servers and corrupt connection state. Do not
|
---|
| 1721 | // re-use the connection for further requests.
|
---|
| 1722 |
|
---|
| 1723 | const expectsPayload = (
|
---|
| 1724 | method === 'PUT' ||
|
---|
| 1725 | method === 'POST' ||
|
---|
| 1726 | method === 'PATCH'
|
---|
| 1727 | )
|
---|
| 1728 |
|
---|
| 1729 | if (body && typeof body.read === 'function') {
|
---|
| 1730 | // Try to read EOF in order to get length.
|
---|
| 1731 | body.read(0)
|
---|
| 1732 | }
|
---|
| 1733 |
|
---|
| 1734 | let contentLength = util.bodyLength(body)
|
---|
| 1735 |
|
---|
| 1736 | if (contentLength == null) {
|
---|
| 1737 | contentLength = request.contentLength
|
---|
| 1738 | }
|
---|
| 1739 |
|
---|
| 1740 | if (contentLength === 0 || !expectsPayload) {
|
---|
| 1741 | // https://tools.ietf.org/html/rfc7230#section-3.3.2
|
---|
| 1742 | // A user agent SHOULD NOT send a Content-Length header field when
|
---|
| 1743 | // the request message does not contain a payload body and the method
|
---|
| 1744 | // semantics do not anticipate such a body.
|
---|
| 1745 |
|
---|
| 1746 | contentLength = null
|
---|
| 1747 | }
|
---|
| 1748 |
|
---|
| 1749 | // https://github.com/nodejs/undici/issues/2046
|
---|
| 1750 | // A user agent may send a Content-Length header with 0 value, this should be allowed.
|
---|
| 1751 | if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
|
---|
| 1752 | if (client[kStrictContentLength]) {
|
---|
| 1753 | errorRequest(client, request, new RequestContentLengthMismatchError())
|
---|
| 1754 | return false
|
---|
| 1755 | }
|
---|
| 1756 |
|
---|
| 1757 | process.emitWarning(new RequestContentLengthMismatchError())
|
---|
| 1758 | }
|
---|
| 1759 |
|
---|
| 1760 | if (contentLength != null) {
|
---|
| 1761 | assert(body, 'no body must not have content length')
|
---|
| 1762 | headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
|
---|
| 1763 | }
|
---|
| 1764 |
|
---|
| 1765 | session.ref()
|
---|
| 1766 |
|
---|
| 1767 | const shouldEndStream = method === 'GET' || method === 'HEAD'
|
---|
| 1768 | if (expectContinue) {
|
---|
| 1769 | headers[HTTP2_HEADER_EXPECT] = '100-continue'
|
---|
| 1770 | stream = session.request(headers, { endStream: shouldEndStream, signal })
|
---|
| 1771 |
|
---|
| 1772 | stream.once('continue', writeBodyH2)
|
---|
| 1773 | } else {
|
---|
| 1774 | stream = session.request(headers, {
|
---|
| 1775 | endStream: shouldEndStream,
|
---|
| 1776 | signal
|
---|
| 1777 | })
|
---|
| 1778 | writeBodyH2()
|
---|
| 1779 | }
|
---|
| 1780 |
|
---|
| 1781 | // Increment counter as we have new several streams open
|
---|
| 1782 | ++h2State.openStreams
|
---|
| 1783 |
|
---|
| 1784 | stream.once('response', headers => {
|
---|
| 1785 | const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
|
---|
| 1786 |
|
---|
| 1787 | if (request.onHeaders(Number(statusCode), realHeaders, stream.resume.bind(stream), '') === false) {
|
---|
| 1788 | stream.pause()
|
---|
| 1789 | }
|
---|
| 1790 | })
|
---|
| 1791 |
|
---|
| 1792 | stream.once('end', () => {
|
---|
| 1793 | request.onComplete([])
|
---|
| 1794 | })
|
---|
| 1795 |
|
---|
| 1796 | stream.on('data', (chunk) => {
|
---|
| 1797 | if (request.onData(chunk) === false) {
|
---|
| 1798 | stream.pause()
|
---|
| 1799 | }
|
---|
| 1800 | })
|
---|
| 1801 |
|
---|
| 1802 | stream.once('close', () => {
|
---|
| 1803 | h2State.openStreams -= 1
|
---|
| 1804 | // TODO(HTTP/2): unref only if current streams count is 0
|
---|
| 1805 | if (h2State.openStreams === 0) {
|
---|
| 1806 | session.unref()
|
---|
| 1807 | }
|
---|
| 1808 | })
|
---|
| 1809 |
|
---|
| 1810 | stream.once('error', function (err) {
|
---|
| 1811 | if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
|
---|
| 1812 | h2State.streams -= 1
|
---|
| 1813 | util.destroy(stream, err)
|
---|
| 1814 | }
|
---|
| 1815 | })
|
---|
| 1816 |
|
---|
| 1817 | stream.once('frameError', (type, code) => {
|
---|
| 1818 | const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
|
---|
| 1819 | errorRequest(client, request, err)
|
---|
| 1820 |
|
---|
| 1821 | if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
|
---|
| 1822 | h2State.streams -= 1
|
---|
| 1823 | util.destroy(stream, err)
|
---|
| 1824 | }
|
---|
| 1825 | })
|
---|
| 1826 |
|
---|
| 1827 | // stream.on('aborted', () => {
|
---|
| 1828 | // // TODO(HTTP/2): Support aborted
|
---|
| 1829 | // })
|
---|
| 1830 |
|
---|
| 1831 | // stream.on('timeout', () => {
|
---|
| 1832 | // // TODO(HTTP/2): Support timeout
|
---|
| 1833 | // })
|
---|
| 1834 |
|
---|
| 1835 | // stream.on('push', headers => {
|
---|
| 1836 | // // TODO(HTTP/2): Suppor push
|
---|
| 1837 | // })
|
---|
| 1838 |
|
---|
| 1839 | // stream.on('trailers', headers => {
|
---|
| 1840 | // // TODO(HTTP/2): Support trailers
|
---|
| 1841 | // })
|
---|
| 1842 |
|
---|
| 1843 | return true
|
---|
| 1844 |
|
---|
| 1845 | function writeBodyH2 () {
|
---|
| 1846 | /* istanbul ignore else: assertion */
|
---|
| 1847 | if (!body) {
|
---|
| 1848 | request.onRequestSent()
|
---|
| 1849 | } else if (util.isBuffer(body)) {
|
---|
| 1850 | assert(contentLength === body.byteLength, 'buffer body must have content length')
|
---|
| 1851 | stream.cork()
|
---|
| 1852 | stream.write(body)
|
---|
| 1853 | stream.uncork()
|
---|
| 1854 | stream.end()
|
---|
| 1855 | request.onBodySent(body)
|
---|
| 1856 | request.onRequestSent()
|
---|
| 1857 | } else if (util.isBlobLike(body)) {
|
---|
| 1858 | if (typeof body.stream === 'function') {
|
---|
| 1859 | writeIterable({
|
---|
| 1860 | client,
|
---|
| 1861 | request,
|
---|
| 1862 | contentLength,
|
---|
| 1863 | h2stream: stream,
|
---|
| 1864 | expectsPayload,
|
---|
| 1865 | body: body.stream(),
|
---|
| 1866 | socket: client[kSocket],
|
---|
| 1867 | header: ''
|
---|
| 1868 | })
|
---|
| 1869 | } else {
|
---|
| 1870 | writeBlob({
|
---|
| 1871 | body,
|
---|
| 1872 | client,
|
---|
| 1873 | request,
|
---|
| 1874 | contentLength,
|
---|
| 1875 | expectsPayload,
|
---|
| 1876 | h2stream: stream,
|
---|
| 1877 | header: '',
|
---|
| 1878 | socket: client[kSocket]
|
---|
| 1879 | })
|
---|
| 1880 | }
|
---|
| 1881 | } else if (util.isStream(body)) {
|
---|
| 1882 | writeStream({
|
---|
| 1883 | body,
|
---|
| 1884 | client,
|
---|
| 1885 | request,
|
---|
| 1886 | contentLength,
|
---|
| 1887 | expectsPayload,
|
---|
| 1888 | socket: client[kSocket],
|
---|
| 1889 | h2stream: stream,
|
---|
| 1890 | header: ''
|
---|
| 1891 | })
|
---|
| 1892 | } else if (util.isIterable(body)) {
|
---|
| 1893 | writeIterable({
|
---|
| 1894 | body,
|
---|
| 1895 | client,
|
---|
| 1896 | request,
|
---|
| 1897 | contentLength,
|
---|
| 1898 | expectsPayload,
|
---|
| 1899 | header: '',
|
---|
| 1900 | h2stream: stream,
|
---|
| 1901 | socket: client[kSocket]
|
---|
| 1902 | })
|
---|
| 1903 | } else {
|
---|
| 1904 | assert(false)
|
---|
| 1905 | }
|
---|
| 1906 | }
|
---|
| 1907 | }
|
---|
| 1908 |
|
---|
| 1909 | function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
---|
| 1910 | assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
|
---|
| 1911 |
|
---|
| 1912 | if (client[kHTTPConnVersion] === 'h2') {
|
---|
| 1913 | // For HTTP/2, is enough to pipe the stream
|
---|
| 1914 | const pipe = pipeline(
|
---|
| 1915 | body,
|
---|
| 1916 | h2stream,
|
---|
| 1917 | (err) => {
|
---|
| 1918 | if (err) {
|
---|
| 1919 | util.destroy(body, err)
|
---|
| 1920 | util.destroy(h2stream, err)
|
---|
| 1921 | } else {
|
---|
| 1922 | request.onRequestSent()
|
---|
| 1923 | }
|
---|
| 1924 | }
|
---|
| 1925 | )
|
---|
| 1926 |
|
---|
| 1927 | pipe.on('data', onPipeData)
|
---|
| 1928 | pipe.once('end', () => {
|
---|
| 1929 | pipe.removeListener('data', onPipeData)
|
---|
| 1930 | util.destroy(pipe)
|
---|
| 1931 | })
|
---|
| 1932 |
|
---|
| 1933 | function onPipeData (chunk) {
|
---|
| 1934 | request.onBodySent(chunk)
|
---|
| 1935 | }
|
---|
| 1936 |
|
---|
| 1937 | return
|
---|
| 1938 | }
|
---|
| 1939 |
|
---|
| 1940 | let finished = false
|
---|
| 1941 |
|
---|
| 1942 | const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
|
---|
| 1943 |
|
---|
| 1944 | const onData = function (chunk) {
|
---|
| 1945 | if (finished) {
|
---|
| 1946 | return
|
---|
| 1947 | }
|
---|
| 1948 |
|
---|
| 1949 | try {
|
---|
| 1950 | if (!writer.write(chunk) && this.pause) {
|
---|
| 1951 | this.pause()
|
---|
| 1952 | }
|
---|
| 1953 | } catch (err) {
|
---|
| 1954 | util.destroy(this, err)
|
---|
| 1955 | }
|
---|
| 1956 | }
|
---|
| 1957 | const onDrain = function () {
|
---|
| 1958 | if (finished) {
|
---|
| 1959 | return
|
---|
| 1960 | }
|
---|
| 1961 |
|
---|
| 1962 | if (body.resume) {
|
---|
| 1963 | body.resume()
|
---|
| 1964 | }
|
---|
| 1965 | }
|
---|
| 1966 | const onAbort = function () {
|
---|
| 1967 | if (finished) {
|
---|
| 1968 | return
|
---|
| 1969 | }
|
---|
| 1970 | const err = new RequestAbortedError()
|
---|
| 1971 | queueMicrotask(() => onFinished(err))
|
---|
| 1972 | }
|
---|
| 1973 | const onFinished = function (err) {
|
---|
| 1974 | if (finished) {
|
---|
| 1975 | return
|
---|
| 1976 | }
|
---|
| 1977 |
|
---|
| 1978 | finished = true
|
---|
| 1979 |
|
---|
| 1980 | assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
|
---|
| 1981 |
|
---|
| 1982 | socket
|
---|
| 1983 | .off('drain', onDrain)
|
---|
| 1984 | .off('error', onFinished)
|
---|
| 1985 |
|
---|
| 1986 | body
|
---|
| 1987 | .removeListener('data', onData)
|
---|
| 1988 | .removeListener('end', onFinished)
|
---|
| 1989 | .removeListener('error', onFinished)
|
---|
| 1990 | .removeListener('close', onAbort)
|
---|
| 1991 |
|
---|
| 1992 | if (!err) {
|
---|
| 1993 | try {
|
---|
| 1994 | writer.end()
|
---|
| 1995 | } catch (er) {
|
---|
| 1996 | err = er
|
---|
| 1997 | }
|
---|
| 1998 | }
|
---|
| 1999 |
|
---|
| 2000 | writer.destroy(err)
|
---|
| 2001 |
|
---|
| 2002 | if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
|
---|
| 2003 | util.destroy(body, err)
|
---|
| 2004 | } else {
|
---|
| 2005 | util.destroy(body)
|
---|
| 2006 | }
|
---|
| 2007 | }
|
---|
| 2008 |
|
---|
| 2009 | body
|
---|
| 2010 | .on('data', onData)
|
---|
| 2011 | .on('end', onFinished)
|
---|
| 2012 | .on('error', onFinished)
|
---|
| 2013 | .on('close', onAbort)
|
---|
| 2014 |
|
---|
| 2015 | if (body.resume) {
|
---|
| 2016 | body.resume()
|
---|
| 2017 | }
|
---|
| 2018 |
|
---|
| 2019 | socket
|
---|
| 2020 | .on('drain', onDrain)
|
---|
| 2021 | .on('error', onFinished)
|
---|
| 2022 | }
|
---|
| 2023 |
|
---|
| 2024 | async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
---|
| 2025 | assert(contentLength === body.size, 'blob body must have content length')
|
---|
| 2026 |
|
---|
| 2027 | const isH2 = client[kHTTPConnVersion] === 'h2'
|
---|
| 2028 | try {
|
---|
| 2029 | if (contentLength != null && contentLength !== body.size) {
|
---|
| 2030 | throw new RequestContentLengthMismatchError()
|
---|
| 2031 | }
|
---|
| 2032 |
|
---|
| 2033 | const buffer = Buffer.from(await body.arrayBuffer())
|
---|
| 2034 |
|
---|
| 2035 | if (isH2) {
|
---|
| 2036 | h2stream.cork()
|
---|
| 2037 | h2stream.write(buffer)
|
---|
| 2038 | h2stream.uncork()
|
---|
| 2039 | } else {
|
---|
| 2040 | socket.cork()
|
---|
| 2041 | socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
---|
| 2042 | socket.write(buffer)
|
---|
| 2043 | socket.uncork()
|
---|
| 2044 | }
|
---|
| 2045 |
|
---|
| 2046 | request.onBodySent(buffer)
|
---|
| 2047 | request.onRequestSent()
|
---|
| 2048 |
|
---|
| 2049 | if (!expectsPayload) {
|
---|
| 2050 | socket[kReset] = true
|
---|
| 2051 | }
|
---|
| 2052 |
|
---|
| 2053 | resume(client)
|
---|
| 2054 | } catch (err) {
|
---|
| 2055 | util.destroy(isH2 ? h2stream : socket, err)
|
---|
| 2056 | }
|
---|
| 2057 | }
|
---|
| 2058 |
|
---|
| 2059 | async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
---|
| 2060 | assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
|
---|
| 2061 |
|
---|
| 2062 | let callback = null
|
---|
| 2063 | function onDrain () {
|
---|
| 2064 | if (callback) {
|
---|
| 2065 | const cb = callback
|
---|
| 2066 | callback = null
|
---|
| 2067 | cb()
|
---|
| 2068 | }
|
---|
| 2069 | }
|
---|
| 2070 |
|
---|
| 2071 | const waitForDrain = () => new Promise((resolve, reject) => {
|
---|
| 2072 | assert(callback === null)
|
---|
| 2073 |
|
---|
| 2074 | if (socket[kError]) {
|
---|
| 2075 | reject(socket[kError])
|
---|
| 2076 | } else {
|
---|
| 2077 | callback = resolve
|
---|
| 2078 | }
|
---|
| 2079 | })
|
---|
| 2080 |
|
---|
| 2081 | if (client[kHTTPConnVersion] === 'h2') {
|
---|
| 2082 | h2stream
|
---|
| 2083 | .on('close', onDrain)
|
---|
| 2084 | .on('drain', onDrain)
|
---|
| 2085 |
|
---|
| 2086 | try {
|
---|
| 2087 | // It's up to the user to somehow abort the async iterable.
|
---|
| 2088 | for await (const chunk of body) {
|
---|
| 2089 | if (socket[kError]) {
|
---|
| 2090 | throw socket[kError]
|
---|
| 2091 | }
|
---|
| 2092 |
|
---|
| 2093 | const res = h2stream.write(chunk)
|
---|
| 2094 | request.onBodySent(chunk)
|
---|
| 2095 | if (!res) {
|
---|
| 2096 | await waitForDrain()
|
---|
| 2097 | }
|
---|
| 2098 | }
|
---|
| 2099 | } catch (err) {
|
---|
| 2100 | h2stream.destroy(err)
|
---|
| 2101 | } finally {
|
---|
| 2102 | request.onRequestSent()
|
---|
| 2103 | h2stream.end()
|
---|
| 2104 | h2stream
|
---|
| 2105 | .off('close', onDrain)
|
---|
| 2106 | .off('drain', onDrain)
|
---|
| 2107 | }
|
---|
| 2108 |
|
---|
| 2109 | return
|
---|
| 2110 | }
|
---|
| 2111 |
|
---|
| 2112 | socket
|
---|
| 2113 | .on('close', onDrain)
|
---|
| 2114 | .on('drain', onDrain)
|
---|
| 2115 |
|
---|
| 2116 | const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
|
---|
| 2117 | try {
|
---|
| 2118 | // It's up to the user to somehow abort the async iterable.
|
---|
| 2119 | for await (const chunk of body) {
|
---|
| 2120 | if (socket[kError]) {
|
---|
| 2121 | throw socket[kError]
|
---|
| 2122 | }
|
---|
| 2123 |
|
---|
| 2124 | if (!writer.write(chunk)) {
|
---|
| 2125 | await waitForDrain()
|
---|
| 2126 | }
|
---|
| 2127 | }
|
---|
| 2128 |
|
---|
| 2129 | writer.end()
|
---|
| 2130 | } catch (err) {
|
---|
| 2131 | writer.destroy(err)
|
---|
| 2132 | } finally {
|
---|
| 2133 | socket
|
---|
| 2134 | .off('close', onDrain)
|
---|
| 2135 | .off('drain', onDrain)
|
---|
| 2136 | }
|
---|
| 2137 | }
|
---|
| 2138 |
|
---|
| 2139 | class AsyncWriter {
|
---|
| 2140 | constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
|
---|
| 2141 | this.socket = socket
|
---|
| 2142 | this.request = request
|
---|
| 2143 | this.contentLength = contentLength
|
---|
| 2144 | this.client = client
|
---|
| 2145 | this.bytesWritten = 0
|
---|
| 2146 | this.expectsPayload = expectsPayload
|
---|
| 2147 | this.header = header
|
---|
| 2148 |
|
---|
| 2149 | socket[kWriting] = true
|
---|
| 2150 | }
|
---|
| 2151 |
|
---|
| 2152 | write (chunk) {
|
---|
| 2153 | const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
|
---|
| 2154 |
|
---|
| 2155 | if (socket[kError]) {
|
---|
| 2156 | throw socket[kError]
|
---|
| 2157 | }
|
---|
| 2158 |
|
---|
| 2159 | if (socket.destroyed) {
|
---|
| 2160 | return false
|
---|
| 2161 | }
|
---|
| 2162 |
|
---|
| 2163 | const len = Buffer.byteLength(chunk)
|
---|
| 2164 | if (!len) {
|
---|
| 2165 | return true
|
---|
| 2166 | }
|
---|
| 2167 |
|
---|
| 2168 | // We should defer writing chunks.
|
---|
| 2169 | if (contentLength !== null && bytesWritten + len > contentLength) {
|
---|
| 2170 | if (client[kStrictContentLength]) {
|
---|
| 2171 | throw new RequestContentLengthMismatchError()
|
---|
| 2172 | }
|
---|
| 2173 |
|
---|
| 2174 | process.emitWarning(new RequestContentLengthMismatchError())
|
---|
| 2175 | }
|
---|
| 2176 |
|
---|
| 2177 | socket.cork()
|
---|
| 2178 |
|
---|
| 2179 | if (bytesWritten === 0) {
|
---|
| 2180 | if (!expectsPayload) {
|
---|
| 2181 | socket[kReset] = true
|
---|
| 2182 | }
|
---|
| 2183 |
|
---|
| 2184 | if (contentLength === null) {
|
---|
| 2185 | socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
|
---|
| 2186 | } else {
|
---|
| 2187 | socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
---|
| 2188 | }
|
---|
| 2189 | }
|
---|
| 2190 |
|
---|
| 2191 | if (contentLength === null) {
|
---|
| 2192 | socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
|
---|
| 2193 | }
|
---|
| 2194 |
|
---|
| 2195 | this.bytesWritten += len
|
---|
| 2196 |
|
---|
| 2197 | const ret = socket.write(chunk)
|
---|
| 2198 |
|
---|
| 2199 | socket.uncork()
|
---|
| 2200 |
|
---|
| 2201 | request.onBodySent(chunk)
|
---|
| 2202 |
|
---|
| 2203 | if (!ret) {
|
---|
| 2204 | if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
|
---|
| 2205 | // istanbul ignore else: only for jest
|
---|
| 2206 | if (socket[kParser].timeout.refresh) {
|
---|
| 2207 | socket[kParser].timeout.refresh()
|
---|
| 2208 | }
|
---|
| 2209 | }
|
---|
| 2210 | }
|
---|
| 2211 |
|
---|
| 2212 | return ret
|
---|
| 2213 | }
|
---|
| 2214 |
|
---|
| 2215 | end () {
|
---|
| 2216 | const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
|
---|
| 2217 | request.onRequestSent()
|
---|
| 2218 |
|
---|
| 2219 | socket[kWriting] = false
|
---|
| 2220 |
|
---|
| 2221 | if (socket[kError]) {
|
---|
| 2222 | throw socket[kError]
|
---|
| 2223 | }
|
---|
| 2224 |
|
---|
| 2225 | if (socket.destroyed) {
|
---|
| 2226 | return
|
---|
| 2227 | }
|
---|
| 2228 |
|
---|
| 2229 | if (bytesWritten === 0) {
|
---|
| 2230 | if (expectsPayload) {
|
---|
| 2231 | // https://tools.ietf.org/html/rfc7230#section-3.3.2
|
---|
| 2232 | // A user agent SHOULD send a Content-Length in a request message when
|
---|
| 2233 | // no Transfer-Encoding is sent and the request method defines a meaning
|
---|
| 2234 | // for an enclosed payload body.
|
---|
| 2235 |
|
---|
| 2236 | socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
|
---|
| 2237 | } else {
|
---|
| 2238 | socket.write(`${header}\r\n`, 'latin1')
|
---|
| 2239 | }
|
---|
| 2240 | } else if (contentLength === null) {
|
---|
| 2241 | socket.write('\r\n0\r\n\r\n', 'latin1')
|
---|
| 2242 | }
|
---|
| 2243 |
|
---|
| 2244 | if (contentLength !== null && bytesWritten !== contentLength) {
|
---|
| 2245 | if (client[kStrictContentLength]) {
|
---|
| 2246 | throw new RequestContentLengthMismatchError()
|
---|
| 2247 | } else {
|
---|
| 2248 | process.emitWarning(new RequestContentLengthMismatchError())
|
---|
| 2249 | }
|
---|
| 2250 | }
|
---|
| 2251 |
|
---|
| 2252 | if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
|
---|
| 2253 | // istanbul ignore else: only for jest
|
---|
| 2254 | if (socket[kParser].timeout.refresh) {
|
---|
| 2255 | socket[kParser].timeout.refresh()
|
---|
| 2256 | }
|
---|
| 2257 | }
|
---|
| 2258 |
|
---|
| 2259 | resume(client)
|
---|
| 2260 | }
|
---|
| 2261 |
|
---|
| 2262 | destroy (err) {
|
---|
| 2263 | const { socket, client } = this
|
---|
| 2264 |
|
---|
| 2265 | socket[kWriting] = false
|
---|
| 2266 |
|
---|
| 2267 | if (err) {
|
---|
| 2268 | assert(client[kRunning] <= 1, 'pipeline should only contain this request')
|
---|
| 2269 | util.destroy(socket, err)
|
---|
| 2270 | }
|
---|
| 2271 | }
|
---|
| 2272 | }
|
---|
| 2273 |
|
---|
| 2274 | function errorRequest (client, request, err) {
|
---|
| 2275 | try {
|
---|
| 2276 | request.onError(err)
|
---|
| 2277 | assert(request.aborted)
|
---|
| 2278 | } catch (err) {
|
---|
| 2279 | client.emit('error', err)
|
---|
| 2280 | }
|
---|
| 2281 | }
|
---|
| 2282 |
|
---|
| 2283 | module.exports = Client
|
---|