source: trip-planner-front/node_modules/engine.io/lib/socket.js@ ceaed42

Last change on this file since ceaed42 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 13.1 KB
Line 
1const EventEmitter = require("events");
2const debug = require("debug")("engine:socket");
3
4class Socket extends EventEmitter {
5 /**
6 * Client class (abstract).
7 *
8 * @api private
9 */
10 constructor(id, server, transport, req, protocol) {
11 super();
12 this.id = id;
13 this.server = server;
14 this.upgrading = false;
15 this.upgraded = false;
16 this.readyState = "opening";
17 this.writeBuffer = [];
18 this.packetsFn = [];
19 this.sentCallbackFn = [];
20 this.cleanupFn = [];
21 this.request = req;
22 this.protocol = protocol;
23
24 // Cache IP since it might not be in the req later
25 if (req.websocket && req.websocket._socket) {
26 this.remoteAddress = req.websocket._socket.remoteAddress;
27 } else {
28 this.remoteAddress = req.connection.remoteAddress;
29 }
30
31 this.checkIntervalTimer = null;
32 this.upgradeTimeoutTimer = null;
33 this.pingTimeoutTimer = null;
34 this.pingIntervalTimer = null;
35
36 this.setTransport(transport);
37 this.onOpen();
38 }
39
40 /**
41 * Called upon transport considered open.
42 *
43 * @api private
44 */
45 onOpen() {
46 this.readyState = "open";
47
48 // sends an `open` packet
49 this.transport.sid = this.id;
50 this.sendPacket(
51 "open",
52 JSON.stringify({
53 sid: this.id,
54 upgrades: this.getAvailableUpgrades(),
55 pingInterval: this.server.opts.pingInterval,
56 pingTimeout: this.server.opts.pingTimeout
57 })
58 );
59
60 if (this.server.opts.initialPacket) {
61 this.sendPacket("message", this.server.opts.initialPacket);
62 }
63
64 this.emit("open");
65
66 if (this.protocol === 3) {
67 // in protocol v3, the client sends a ping, and the server answers with a pong
68 this.resetPingTimeout(
69 this.server.opts.pingInterval + this.server.opts.pingTimeout
70 );
71 } else {
72 // in protocol v4, the server sends a ping, and the client answers with a pong
73 this.schedulePing();
74 }
75 }
76
77 /**
78 * Called upon transport packet.
79 *
80 * @param {Object} packet
81 * @api private
82 */
83 onPacket(packet) {
84 if ("open" === this.readyState) {
85 // export packet event
86 debug("packet");
87 this.emit("packet", packet);
88
89 // Reset ping timeout on any packet, incoming data is a good sign of
90 // other side's liveness
91 this.resetPingTimeout(
92 this.server.opts.pingInterval + this.server.opts.pingTimeout
93 );
94
95 switch (packet.type) {
96 case "ping":
97 if (this.transport.protocol !== 3) {
98 this.onError("invalid heartbeat direction");
99 return;
100 }
101 debug("got ping");
102 this.sendPacket("pong");
103 this.emit("heartbeat");
104 break;
105
106 case "pong":
107 if (this.transport.protocol === 3) {
108 this.onError("invalid heartbeat direction");
109 return;
110 }
111 debug("got pong");
112 this.schedulePing();
113 this.emit("heartbeat");
114 break;
115
116 case "error":
117 this.onClose("parse error");
118 break;
119
120 case "message":
121 this.emit("data", packet.data);
122 this.emit("message", packet.data);
123 break;
124 }
125 } else {
126 debug("packet received with closed socket");
127 }
128 }
129
130 /**
131 * Called upon transport error.
132 *
133 * @param {Error} error object
134 * @api private
135 */
136 onError(err) {
137 debug("transport error");
138 this.onClose("transport error", err);
139 }
140
141 /**
142 * Pings client every `this.pingInterval` and expects response
143 * within `this.pingTimeout` or closes connection.
144 *
145 * @api private
146 */
147 schedulePing() {
148 clearTimeout(this.pingIntervalTimer);
149 this.pingIntervalTimer = setTimeout(() => {
150 debug(
151 "writing ping packet - expecting pong within %sms",
152 this.server.opts.pingTimeout
153 );
154 this.sendPacket("ping");
155 this.resetPingTimeout(this.server.opts.pingTimeout);
156 }, this.server.opts.pingInterval);
157 }
158
159 /**
160 * Resets ping timeout.
161 *
162 * @api private
163 */
164 resetPingTimeout(timeout) {
165 clearTimeout(this.pingTimeoutTimer);
166 this.pingTimeoutTimer = setTimeout(() => {
167 if (this.readyState === "closed") return;
168 this.onClose("ping timeout");
169 }, timeout);
170 }
171
172 /**
173 * Attaches handlers for the given transport.
174 *
175 * @param {Transport} transport
176 * @api private
177 */
178 setTransport(transport) {
179 const onError = this.onError.bind(this);
180 const onPacket = this.onPacket.bind(this);
181 const flush = this.flush.bind(this);
182 const onClose = this.onClose.bind(this, "transport close");
183
184 this.transport = transport;
185 this.transport.once("error", onError);
186 this.transport.on("packet", onPacket);
187 this.transport.on("drain", flush);
188 this.transport.once("close", onClose);
189 // this function will manage packet events (also message callbacks)
190 this.setupSendCallback();
191
192 this.cleanupFn.push(function() {
193 transport.removeListener("error", onError);
194 transport.removeListener("packet", onPacket);
195 transport.removeListener("drain", flush);
196 transport.removeListener("close", onClose);
197 });
198 }
199
200 /**
201 * Upgrades socket to the given transport
202 *
203 * @param {Transport} transport
204 * @api private
205 */
206 maybeUpgrade(transport) {
207 debug(
208 'might upgrade socket transport from "%s" to "%s"',
209 this.transport.name,
210 transport.name
211 );
212
213 this.upgrading = true;
214
215 const self = this;
216
217 // set transport upgrade timer
218 self.upgradeTimeoutTimer = setTimeout(function() {
219 debug("client did not complete upgrade - closing transport");
220 cleanup();
221 if ("open" === transport.readyState) {
222 transport.close();
223 }
224 }, this.server.opts.upgradeTimeout);
225
226 function onPacket(packet) {
227 if ("ping" === packet.type && "probe" === packet.data) {
228 transport.send([{ type: "pong", data: "probe" }]);
229 self.emit("upgrading", transport);
230 clearInterval(self.checkIntervalTimer);
231 self.checkIntervalTimer = setInterval(check, 100);
232 } else if ("upgrade" === packet.type && self.readyState !== "closed") {
233 debug("got upgrade packet - upgrading");
234 cleanup();
235 self.transport.discard();
236 self.upgraded = true;
237 self.clearTransport();
238 self.setTransport(transport);
239 self.emit("upgrade", transport);
240 self.flush();
241 if (self.readyState === "closing") {
242 transport.close(function() {
243 self.onClose("forced close");
244 });
245 }
246 } else {
247 cleanup();
248 transport.close();
249 }
250 }
251
252 // we force a polling cycle to ensure a fast upgrade
253 function check() {
254 if ("polling" === self.transport.name && self.transport.writable) {
255 debug("writing a noop packet to polling for fast upgrade");
256 self.transport.send([{ type: "noop" }]);
257 }
258 }
259
260 function cleanup() {
261 self.upgrading = false;
262
263 clearInterval(self.checkIntervalTimer);
264 self.checkIntervalTimer = null;
265
266 clearTimeout(self.upgradeTimeoutTimer);
267 self.upgradeTimeoutTimer = null;
268
269 transport.removeListener("packet", onPacket);
270 transport.removeListener("close", onTransportClose);
271 transport.removeListener("error", onError);
272 self.removeListener("close", onClose);
273 }
274
275 function onError(err) {
276 debug("client did not complete upgrade - %s", err);
277 cleanup();
278 transport.close();
279 transport = null;
280 }
281
282 function onTransportClose() {
283 onError("transport closed");
284 }
285
286 function onClose() {
287 onError("socket closed");
288 }
289
290 transport.on("packet", onPacket);
291 transport.once("close", onTransportClose);
292 transport.once("error", onError);
293
294 self.once("close", onClose);
295 }
296
297 /**
298 * Clears listeners and timers associated with current transport.
299 *
300 * @api private
301 */
302 clearTransport() {
303 let cleanup;
304
305 const toCleanUp = this.cleanupFn.length;
306
307 for (let i = 0; i < toCleanUp; i++) {
308 cleanup = this.cleanupFn.shift();
309 cleanup();
310 }
311
312 // silence further transport errors and prevent uncaught exceptions
313 this.transport.on("error", function() {
314 debug("error triggered by discarded transport");
315 });
316
317 // ensure transport won't stay open
318 this.transport.close();
319
320 clearTimeout(this.pingTimeoutTimer);
321 }
322
323 /**
324 * Called upon transport considered closed.
325 * Possible reasons: `ping timeout`, `client error`, `parse error`,
326 * `transport error`, `server close`, `transport close`
327 */
328 onClose(reason, description) {
329 if ("closed" !== this.readyState) {
330 this.readyState = "closed";
331
332 // clear timers
333 clearTimeout(this.pingIntervalTimer);
334 clearTimeout(this.pingTimeoutTimer);
335
336 clearInterval(this.checkIntervalTimer);
337 this.checkIntervalTimer = null;
338 clearTimeout(this.upgradeTimeoutTimer);
339 const self = this;
340 // clean writeBuffer in next tick, so developers can still
341 // grab the writeBuffer on 'close' event
342 process.nextTick(function() {
343 self.writeBuffer = [];
344 });
345 this.packetsFn = [];
346 this.sentCallbackFn = [];
347 this.clearTransport();
348 this.emit("close", reason, description);
349 }
350 }
351
352 /**
353 * Setup and manage send callback
354 *
355 * @api private
356 */
357 setupSendCallback() {
358 const self = this;
359 this.transport.on("drain", onDrain);
360
361 this.cleanupFn.push(function() {
362 self.transport.removeListener("drain", onDrain);
363 });
364
365 // the message was sent successfully, execute the callback
366 function onDrain() {
367 if (self.sentCallbackFn.length > 0) {
368 const seqFn = self.sentCallbackFn.splice(0, 1)[0];
369 if ("function" === typeof seqFn) {
370 debug("executing send callback");
371 seqFn(self.transport);
372 } else if (Array.isArray(seqFn)) {
373 debug("executing batch send callback");
374 const l = seqFn.length;
375 let i = 0;
376 for (; i < l; i++) {
377 if ("function" === typeof seqFn[i]) {
378 seqFn[i](self.transport);
379 }
380 }
381 }
382 }
383 }
384 }
385
386 /**
387 * Sends a message packet.
388 *
389 * @param {String} message
390 * @param {Object} options
391 * @param {Function} callback
392 * @return {Socket} for chaining
393 * @api public
394 */
395 send(data, options, callback) {
396 this.sendPacket("message", data, options, callback);
397 return this;
398 }
399
400 write(data, options, callback) {
401 this.sendPacket("message", data, options, callback);
402 return this;
403 }
404
405 /**
406 * Sends a packet.
407 *
408 * @param {String} packet type
409 * @param {String} optional, data
410 * @param {Object} options
411 * @api private
412 */
413 sendPacket(type, data, options, callback) {
414 if ("function" === typeof options) {
415 callback = options;
416 options = null;
417 }
418
419 options = options || {};
420 options.compress = false !== options.compress;
421
422 if ("closing" !== this.readyState && "closed" !== this.readyState) {
423 debug('sending packet "%s" (%s)', type, data);
424
425 const packet = {
426 type: type,
427 options: options
428 };
429 if (data) packet.data = data;
430
431 // exports packetCreate event
432 this.emit("packetCreate", packet);
433
434 this.writeBuffer.push(packet);
435
436 // add send callback to object, if defined
437 if (callback) this.packetsFn.push(callback);
438
439 this.flush();
440 }
441 }
442
443 /**
444 * Attempts to flush the packets buffer.
445 *
446 * @api private
447 */
448 flush() {
449 if (
450 "closed" !== this.readyState &&
451 this.transport.writable &&
452 this.writeBuffer.length
453 ) {
454 debug("flushing buffer to transport");
455 this.emit("flush", this.writeBuffer);
456 this.server.emit("flush", this, this.writeBuffer);
457 const wbuf = this.writeBuffer;
458 this.writeBuffer = [];
459 if (!this.transport.supportsFraming) {
460 this.sentCallbackFn.push(this.packetsFn);
461 } else {
462 this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
463 }
464 this.packetsFn = [];
465 this.transport.send(wbuf);
466 this.emit("drain");
467 this.server.emit("drain", this);
468 }
469 }
470
471 /**
472 * Get available upgrades for this socket.
473 *
474 * @api private
475 */
476 getAvailableUpgrades() {
477 const availableUpgrades = [];
478 const allUpgrades = this.server.upgrades(this.transport.name);
479 let i = 0;
480 const l = allUpgrades.length;
481 for (; i < l; ++i) {
482 const upg = allUpgrades[i];
483 if (this.server.opts.transports.indexOf(upg) !== -1) {
484 availableUpgrades.push(upg);
485 }
486 }
487 return availableUpgrades;
488 }
489
490 /**
491 * Closes the socket and underlying transport.
492 *
493 * @param {Boolean} optional, discard
494 * @return {Socket} for chaining
495 * @api public
496 */
497 close(discard) {
498 if ("open" !== this.readyState) return;
499
500 this.readyState = "closing";
501
502 if (this.writeBuffer.length) {
503 this.once("drain", this.closeTransport.bind(this, discard));
504 return;
505 }
506
507 this.closeTransport(discard);
508 }
509
510 /**
511 * Closes the underlying transport.
512 *
513 * @param {Boolean} discard
514 * @api private
515 */
516 closeTransport(discard) {
517 if (discard) this.transport.discard();
518 this.transport.close(this.onClose.bind(this, "forced close"));
519 }
520}
521
522module.exports = Socket;
Note: See TracBrowser for help on using the repository browser.