source: node_modules/undici/lib/websocket/receiver.js

main
Last change on this file was d24f17c, checked in by Aleksandar Panovski <apano77@…>, 15 months ago

Initial commit

  • Property mode set to 100644
File size: 10.5 KB
Line 
1'use strict'
2
3const { Writable } = require('stream')
4const diagnosticsChannel = require('diagnostics_channel')
5const { parserStates, opcodes, states, emptyBuffer } = require('./constants')
6const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
7const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util')
8const { WebsocketFrameSend } = require('./frame')
9
10// This code was influenced by ws released under the MIT license.
11// Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
12// Copyright (c) 2013 Arnout Kazemier and contributors
13// Copyright (c) 2016 Luigi Pinca and contributors
14
15const channels = {}
16channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
17channels.pong = diagnosticsChannel.channel('undici:websocket:pong')
18
19class ByteParser extends Writable {
20 #buffers = []
21 #byteOffset = 0
22
23 #state = parserStates.INFO
24
25 #info = {}
26 #fragments = []
27
28 constructor (ws) {
29 super()
30
31 this.ws = ws
32 }
33
34 /**
35 * @param {Buffer} chunk
36 * @param {() => void} callback
37 */
38 _write (chunk, _, callback) {
39 this.#buffers.push(chunk)
40 this.#byteOffset += chunk.length
41
42 this.run(callback)
43 }
44
45 /**
46 * Runs whenever a new chunk is received.
47 * Callback is called whenever there are no more chunks buffering,
48 * or not enough bytes are buffered to parse.
49 */
50 run (callback) {
51 while (true) {
52 if (this.#state === parserStates.INFO) {
53 // If there aren't enough bytes to parse the payload length, etc.
54 if (this.#byteOffset < 2) {
55 return callback()
56 }
57
58 const buffer = this.consume(2)
59
60 this.#info.fin = (buffer[0] & 0x80) !== 0
61 this.#info.opcode = buffer[0] & 0x0F
62
63 // If we receive a fragmented message, we use the type of the first
64 // frame to parse the full message as binary/text, when it's terminated
65 this.#info.originalOpcode ??= this.#info.opcode
66
67 this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
68
69 if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
70 // Only text and binary frames can be fragmented
71 failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
72 return
73 }
74
75 const payloadLength = buffer[1] & 0x7F
76
77 if (payloadLength <= 125) {
78 this.#info.payloadLength = payloadLength
79 this.#state = parserStates.READ_DATA
80 } else if (payloadLength === 126) {
81 this.#state = parserStates.PAYLOADLENGTH_16
82 } else if (payloadLength === 127) {
83 this.#state = parserStates.PAYLOADLENGTH_64
84 }
85
86 if (this.#info.fragmented && payloadLength > 125) {
87 // A fragmented frame can't be fragmented itself
88 failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
89 return
90 } else if (
91 (this.#info.opcode === opcodes.PING ||
92 this.#info.opcode === opcodes.PONG ||
93 this.#info.opcode === opcodes.CLOSE) &&
94 payloadLength > 125
95 ) {
96 // Control frames can have a payload length of 125 bytes MAX
97 failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
98 return
99 } else if (this.#info.opcode === opcodes.CLOSE) {
100 if (payloadLength === 1) {
101 failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
102 return
103 }
104
105 const body = this.consume(payloadLength)
106
107 this.#info.closeInfo = this.parseCloseBody(false, body)
108
109 if (!this.ws[kSentClose]) {
110 // If an endpoint receives a Close frame and did not previously send a
111 // Close frame, the endpoint MUST send a Close frame in response. (When
112 // sending a Close frame in response, the endpoint typically echos the
113 // status code it received.)
114 const body = Buffer.allocUnsafe(2)
115 body.writeUInt16BE(this.#info.closeInfo.code, 0)
116 const closeFrame = new WebsocketFrameSend(body)
117
118 this.ws[kResponse].socket.write(
119 closeFrame.createFrame(opcodes.CLOSE),
120 (err) => {
121 if (!err) {
122 this.ws[kSentClose] = true
123 }
124 }
125 )
126 }
127
128 // Upon either sending or receiving a Close control frame, it is said
129 // that _The WebSocket Closing Handshake is Started_ and that the
130 // WebSocket connection is in the CLOSING state.
131 this.ws[kReadyState] = states.CLOSING
132 this.ws[kReceivedClose] = true
133
134 this.end()
135
136 return
137 } else if (this.#info.opcode === opcodes.PING) {
138 // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
139 // response, unless it already received a Close frame.
140 // A Pong frame sent in response to a Ping frame must have identical
141 // "Application data"
142
143 const body = this.consume(payloadLength)
144
145 if (!this.ws[kReceivedClose]) {
146 const frame = new WebsocketFrameSend(body)
147
148 this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
149
150 if (channels.ping.hasSubscribers) {
151 channels.ping.publish({
152 payload: body
153 })
154 }
155 }
156
157 this.#state = parserStates.INFO
158
159 if (this.#byteOffset > 0) {
160 continue
161 } else {
162 callback()
163 return
164 }
165 } else if (this.#info.opcode === opcodes.PONG) {
166 // A Pong frame MAY be sent unsolicited. This serves as a
167 // unidirectional heartbeat. A response to an unsolicited Pong frame is
168 // not expected.
169
170 const body = this.consume(payloadLength)
171
172 if (channels.pong.hasSubscribers) {
173 channels.pong.publish({
174 payload: body
175 })
176 }
177
178 if (this.#byteOffset > 0) {
179 continue
180 } else {
181 callback()
182 return
183 }
184 }
185 } else if (this.#state === parserStates.PAYLOADLENGTH_16) {
186 if (this.#byteOffset < 2) {
187 return callback()
188 }
189
190 const buffer = this.consume(2)
191
192 this.#info.payloadLength = buffer.readUInt16BE(0)
193 this.#state = parserStates.READ_DATA
194 } else if (this.#state === parserStates.PAYLOADLENGTH_64) {
195 if (this.#byteOffset < 8) {
196 return callback()
197 }
198
199 const buffer = this.consume(8)
200 const upper = buffer.readUInt32BE(0)
201
202 // 2^31 is the maxinimum bytes an arraybuffer can contain
203 // on 32-bit systems. Although, on 64-bit systems, this is
204 // 2^53-1 bytes.
205 // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length
206 // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
207 // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
208 if (upper > 2 ** 31 - 1) {
209 failWebsocketConnection(this.ws, 'Received payload length > 2^31 bytes.')
210 return
211 }
212
213 const lower = buffer.readUInt32BE(4)
214
215 this.#info.payloadLength = (upper << 8) + lower
216 this.#state = parserStates.READ_DATA
217 } else if (this.#state === parserStates.READ_DATA) {
218 if (this.#byteOffset < this.#info.payloadLength) {
219 // If there is still more data in this chunk that needs to be read
220 return callback()
221 } else if (this.#byteOffset >= this.#info.payloadLength) {
222 // If the server sent multiple frames in a single chunk
223
224 const body = this.consume(this.#info.payloadLength)
225
226 this.#fragments.push(body)
227
228 // If the frame is unfragmented, or a fragmented frame was terminated,
229 // a message was received
230 if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) {
231 const fullMessage = Buffer.concat(this.#fragments)
232
233 websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage)
234
235 this.#info = {}
236 this.#fragments.length = 0
237 }
238
239 this.#state = parserStates.INFO
240 }
241 }
242
243 if (this.#byteOffset > 0) {
244 continue
245 } else {
246 callback()
247 break
248 }
249 }
250 }
251
252 /**
253 * Take n bytes from the buffered Buffers
254 * @param {number} n
255 * @returns {Buffer|null}
256 */
257 consume (n) {
258 if (n > this.#byteOffset) {
259 return null
260 } else if (n === 0) {
261 return emptyBuffer
262 }
263
264 if (this.#buffers[0].length === n) {
265 this.#byteOffset -= this.#buffers[0].length
266 return this.#buffers.shift()
267 }
268
269 const buffer = Buffer.allocUnsafe(n)
270 let offset = 0
271
272 while (offset !== n) {
273 const next = this.#buffers[0]
274 const { length } = next
275
276 if (length + offset === n) {
277 buffer.set(this.#buffers.shift(), offset)
278 break
279 } else if (length + offset > n) {
280 buffer.set(next.subarray(0, n - offset), offset)
281 this.#buffers[0] = next.subarray(n - offset)
282 break
283 } else {
284 buffer.set(this.#buffers.shift(), offset)
285 offset += next.length
286 }
287 }
288
289 this.#byteOffset -= n
290
291 return buffer
292 }
293
294 parseCloseBody (onlyCode, data) {
295 // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
296 /** @type {number|undefined} */
297 let code
298
299 if (data.length >= 2) {
300 // _The WebSocket Connection Close Code_ is
301 // defined as the status code (Section 7.4) contained in the first Close
302 // control frame received by the application
303 code = data.readUInt16BE(0)
304 }
305
306 if (onlyCode) {
307 if (!isValidStatusCode(code)) {
308 return null
309 }
310
311 return { code }
312 }
313
314 // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6
315 /** @type {Buffer} */
316 let reason = data.subarray(2)
317
318 // Remove BOM
319 if (reason[0] === 0xEF && reason[1] === 0xBB && reason[2] === 0xBF) {
320 reason = reason.subarray(3)
321 }
322
323 if (code !== undefined && !isValidStatusCode(code)) {
324 return null
325 }
326
327 try {
328 // TODO: optimize this
329 reason = new TextDecoder('utf-8', { fatal: true }).decode(reason)
330 } catch {
331 return null
332 }
333
334 return { code, reason }
335 }
336
337 get closingInfo () {
338 return this.#info.closeInfo
339 }
340}
341
342module.exports = {
343 ByteParser
344}
Note: See TracBrowser for help on using the repository browser.