source: trip-planner-front/node_modules/engine.io/build/socket.js@ 1ad8e64

Last change on this file since 1ad8e64 was e29cc2e, checked in by Ema <ema_spirova@…>, 3 years ago

primeNG components

  • Property mode set to 100644
File size: 15.2 KB
RevLine 
[e29cc2e]1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.Socket = void 0;
4const events_1 = require("events");
5const debug_1 = require("debug");
6const debug = (0, debug_1.default)("engine:socket");
7class Socket extends events_1.EventEmitter {
8 /**
9 * Client class (abstract).
10 *
11 * @api private
12 */
13 constructor(id, server, transport, req, protocol) {
14 super();
15 this.id = id;
16 this.server = server;
17 this.upgrading = false;
18 this.upgraded = false;
19 this.readyState = "opening";
20 this.writeBuffer = [];
21 this.packetsFn = [];
22 this.sentCallbackFn = [];
23 this.cleanupFn = [];
24 this.request = req;
25 this.protocol = protocol;
26 // Cache IP since it might not be in the req later
27 if (req.websocket && req.websocket._socket) {
28 this.remoteAddress = req.websocket._socket.remoteAddress;
29 }
30 else {
31 this.remoteAddress = req.connection.remoteAddress;
32 }
33 this.checkIntervalTimer = null;
34 this.upgradeTimeoutTimer = null;
35 this.pingTimeoutTimer = null;
36 this.pingIntervalTimer = null;
37 this.setTransport(transport);
38 this.onOpen();
39 }
40 get readyState() {
41 return this._readyState;
42 }
43 set readyState(state) {
44 debug("readyState updated from %s to %s", this._readyState, state);
45 this._readyState = state;
46 }
47 /**
48 * Called upon transport considered open.
49 *
50 * @api private
51 */
52 onOpen() {
53 this.readyState = "open";
54 // sends an `open` packet
55 this.transport.sid = this.id;
56 this.sendPacket("open", JSON.stringify({
57 sid: this.id,
58 upgrades: this.getAvailableUpgrades(),
59 pingInterval: this.server.opts.pingInterval,
60 pingTimeout: this.server.opts.pingTimeout
61 }));
62 if (this.server.opts.initialPacket) {
63 this.sendPacket("message", this.server.opts.initialPacket);
64 }
65 this.emit("open");
66 if (this.protocol === 3) {
67 // in protocol v3, the client sends a ping, and the server answers with a pong
68 this.resetPingTimeout(this.server.opts.pingInterval + this.server.opts.pingTimeout);
69 }
70 else {
71 // in protocol v4, the server sends a ping, and the client answers with a pong
72 this.schedulePing();
73 }
74 }
75 /**
76 * Called upon transport packet.
77 *
78 * @param {Object} packet
79 * @api private
80 */
81 onPacket(packet) {
82 if ("open" !== this.readyState) {
83 return debug("packet received with closed socket");
84 }
85 // export packet event
86 debug(`received packet ${packet.type}`);
87 this.emit("packet", packet);
88 // Reset ping timeout on any packet, incoming data is a good sign of
89 // other side's liveness
90 this.resetPingTimeout(this.server.opts.pingInterval + this.server.opts.pingTimeout);
91 switch (packet.type) {
92 case "ping":
93 if (this.transport.protocol !== 3) {
94 this.onError("invalid heartbeat direction");
95 return;
96 }
97 debug("got ping");
98 this.sendPacket("pong");
99 this.emit("heartbeat");
100 break;
101 case "pong":
102 if (this.transport.protocol === 3) {
103 this.onError("invalid heartbeat direction");
104 return;
105 }
106 debug("got pong");
107 this.pingIntervalTimer.refresh();
108 this.emit("heartbeat");
109 break;
110 case "error":
111 this.onClose("parse error");
112 break;
113 case "message":
114 this.emit("data", packet.data);
115 this.emit("message", packet.data);
116 break;
117 }
118 }
119 /**
120 * Called upon transport error.
121 *
122 * @param {Error} error object
123 * @api private
124 */
125 onError(err) {
126 debug("transport error");
127 this.onClose("transport error", err);
128 }
129 /**
130 * Pings client every `this.pingInterval` and expects response
131 * within `this.pingTimeout` or closes connection.
132 *
133 * @api private
134 */
135 schedulePing() {
136 this.pingIntervalTimer = setTimeout(() => {
137 debug("writing ping packet - expecting pong within %sms", this.server.opts.pingTimeout);
138 this.sendPacket("ping");
139 this.resetPingTimeout(this.server.opts.pingTimeout);
140 }, this.server.opts.pingInterval);
141 }
142 /**
143 * Resets ping timeout.
144 *
145 * @api private
146 */
147 resetPingTimeout(timeout) {
148 clearTimeout(this.pingTimeoutTimer);
149 this.pingTimeoutTimer = setTimeout(() => {
150 if (this.readyState === "closed")
151 return;
152 this.onClose("ping timeout");
153 }, timeout);
154 }
155 /**
156 * Attaches handlers for the given transport.
157 *
158 * @param {Transport} transport
159 * @api private
160 */
161 setTransport(transport) {
162 const onError = this.onError.bind(this);
163 const onPacket = this.onPacket.bind(this);
164 const flush = this.flush.bind(this);
165 const onClose = this.onClose.bind(this, "transport close");
166 this.transport = transport;
167 this.transport.once("error", onError);
168 this.transport.on("packet", onPacket);
169 this.transport.on("drain", flush);
170 this.transport.once("close", onClose);
171 // this function will manage packet events (also message callbacks)
172 this.setupSendCallback();
173 this.cleanupFn.push(function () {
174 transport.removeListener("error", onError);
175 transport.removeListener("packet", onPacket);
176 transport.removeListener("drain", flush);
177 transport.removeListener("close", onClose);
178 });
179 }
180 /**
181 * Upgrades socket to the given transport
182 *
183 * @param {Transport} transport
184 * @api private
185 */
186 maybeUpgrade(transport) {
187 debug('might upgrade socket transport from "%s" to "%s"', this.transport.name, transport.name);
188 this.upgrading = true;
189 // set transport upgrade timer
190 this.upgradeTimeoutTimer = setTimeout(() => {
191 debug("client did not complete upgrade - closing transport");
192 cleanup();
193 if ("open" === transport.readyState) {
194 transport.close();
195 }
196 }, this.server.opts.upgradeTimeout);
197 const onPacket = packet => {
198 if ("ping" === packet.type && "probe" === packet.data) {
199 debug("got probe ping packet, sending pong");
200 transport.send([{ type: "pong", data: "probe" }]);
201 this.emit("upgrading", transport);
202 clearInterval(this.checkIntervalTimer);
203 this.checkIntervalTimer = setInterval(check, 100);
204 }
205 else if ("upgrade" === packet.type && this.readyState !== "closed") {
206 debug("got upgrade packet - upgrading");
207 cleanup();
208 this.transport.discard();
209 this.upgraded = true;
210 this.clearTransport();
211 this.setTransport(transport);
212 this.emit("upgrade", transport);
213 this.flush();
214 if (this.readyState === "closing") {
215 transport.close(() => {
216 this.onClose("forced close");
217 });
218 }
219 }
220 else {
221 cleanup();
222 transport.close();
223 }
224 };
225 // we force a polling cycle to ensure a fast upgrade
226 const check = () => {
227 if ("polling" === this.transport.name && this.transport.writable) {
228 debug("writing a noop packet to polling for fast upgrade");
229 this.transport.send([{ type: "noop" }]);
230 }
231 };
232 const cleanup = () => {
233 this.upgrading = false;
234 clearInterval(this.checkIntervalTimer);
235 this.checkIntervalTimer = null;
236 clearTimeout(this.upgradeTimeoutTimer);
237 this.upgradeTimeoutTimer = null;
238 transport.removeListener("packet", onPacket);
239 transport.removeListener("close", onTransportClose);
240 transport.removeListener("error", onError);
241 this.removeListener("close", onClose);
242 };
243 const onError = err => {
244 debug("client did not complete upgrade - %s", err);
245 cleanup();
246 transport.close();
247 transport = null;
248 };
249 const onTransportClose = () => {
250 onError("transport closed");
251 };
252 const onClose = () => {
253 onError("socket closed");
254 };
255 transport.on("packet", onPacket);
256 transport.once("close", onTransportClose);
257 transport.once("error", onError);
258 this.once("close", onClose);
259 }
260 /**
261 * Clears listeners and timers associated with current transport.
262 *
263 * @api private
264 */
265 clearTransport() {
266 let cleanup;
267 const toCleanUp = this.cleanupFn.length;
268 for (let i = 0; i < toCleanUp; i++) {
269 cleanup = this.cleanupFn.shift();
270 cleanup();
271 }
272 // silence further transport errors and prevent uncaught exceptions
273 this.transport.on("error", function () {
274 debug("error triggered by discarded transport");
275 });
276 // ensure transport won't stay open
277 this.transport.close();
278 clearTimeout(this.pingTimeoutTimer);
279 }
280 /**
281 * Called upon transport considered closed.
282 * Possible reasons: `ping timeout`, `client error`, `parse error`,
283 * `transport error`, `server close`, `transport close`
284 */
285 onClose(reason, description) {
286 if ("closed" !== this.readyState) {
287 this.readyState = "closed";
288 // clear timers
289 clearTimeout(this.pingIntervalTimer);
290 clearTimeout(this.pingTimeoutTimer);
291 clearInterval(this.checkIntervalTimer);
292 this.checkIntervalTimer = null;
293 clearTimeout(this.upgradeTimeoutTimer);
294 // clean writeBuffer in next tick, so developers can still
295 // grab the writeBuffer on 'close' event
296 process.nextTick(() => {
297 this.writeBuffer = [];
298 });
299 this.packetsFn = [];
300 this.sentCallbackFn = [];
301 this.clearTransport();
302 this.emit("close", reason, description);
303 }
304 }
305 /**
306 * Setup and manage send callback
307 *
308 * @api private
309 */
310 setupSendCallback() {
311 // the message was sent successfully, execute the callback
312 const onDrain = () => {
313 if (this.sentCallbackFn.length > 0) {
314 const seqFn = this.sentCallbackFn.splice(0, 1)[0];
315 if ("function" === typeof seqFn) {
316 debug("executing send callback");
317 seqFn(this.transport);
318 }
319 else if (Array.isArray(seqFn)) {
320 debug("executing batch send callback");
321 const l = seqFn.length;
322 let i = 0;
323 for (; i < l; i++) {
324 if ("function" === typeof seqFn[i]) {
325 seqFn[i](this.transport);
326 }
327 }
328 }
329 }
330 };
331 this.transport.on("drain", onDrain);
332 this.cleanupFn.push(() => {
333 this.transport.removeListener("drain", onDrain);
334 });
335 }
336 /**
337 * Sends a message packet.
338 *
339 * @param {String} message
340 * @param {Object} options
341 * @param {Function} callback
342 * @return {Socket} for chaining
343 * @api public
344 */
345 send(data, options, callback) {
346 this.sendPacket("message", data, options, callback);
347 return this;
348 }
349 write(data, options, callback) {
350 this.sendPacket("message", data, options, callback);
351 return this;
352 }
353 /**
354 * Sends a packet.
355 *
356 * @param {String} packet type
357 * @param {String} optional, data
358 * @param {Object} options
359 * @api private
360 */
361 sendPacket(type, data, options, callback) {
362 if ("function" === typeof options) {
363 callback = options;
364 options = null;
365 }
366 options = options || {};
367 options.compress = false !== options.compress;
368 if ("closing" !== this.readyState && "closed" !== this.readyState) {
369 debug('sending packet "%s" (%s)', type, data);
370 const packet = {
371 type: type,
372 options: options
373 };
374 if (data)
375 packet.data = data;
376 // exports packetCreate event
377 this.emit("packetCreate", packet);
378 this.writeBuffer.push(packet);
379 // add send callback to object, if defined
380 if (callback)
381 this.packetsFn.push(callback);
382 this.flush();
383 }
384 }
385 /**
386 * Attempts to flush the packets buffer.
387 *
388 * @api private
389 */
390 flush() {
391 if ("closed" !== this.readyState &&
392 this.transport.writable &&
393 this.writeBuffer.length) {
394 debug("flushing buffer to transport");
395 this.emit("flush", this.writeBuffer);
396 this.server.emit("flush", this, this.writeBuffer);
397 const wbuf = this.writeBuffer;
398 this.writeBuffer = [];
399 if (!this.transport.supportsFraming) {
400 this.sentCallbackFn.push(this.packetsFn);
401 }
402 else {
403 this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
404 }
405 this.packetsFn = [];
406 this.transport.send(wbuf);
407 this.emit("drain");
408 this.server.emit("drain", this);
409 }
410 }
411 /**
412 * Get available upgrades for this socket.
413 *
414 * @api private
415 */
416 getAvailableUpgrades() {
417 const availableUpgrades = [];
418 const allUpgrades = this.server.upgrades(this.transport.name);
419 let i = 0;
420 const l = allUpgrades.length;
421 for (; i < l; ++i) {
422 const upg = allUpgrades[i];
423 if (this.server.opts.transports.indexOf(upg) !== -1) {
424 availableUpgrades.push(upg);
425 }
426 }
427 return availableUpgrades;
428 }
429 /**
430 * Closes the socket and underlying transport.
431 *
432 * @param {Boolean} discard - optional, discard the transport
433 * @return {Socket} for chaining
434 * @api public
435 */
436 close(discard) {
437 if ("open" !== this.readyState)
438 return;
439 this.readyState = "closing";
440 if (this.writeBuffer.length) {
441 this.once("drain", this.closeTransport.bind(this, discard));
442 return;
443 }
444 this.closeTransport(discard);
445 }
446 /**
447 * Closes the underlying transport.
448 *
449 * @param {Boolean} discard
450 * @api private
451 */
452 closeTransport(discard) {
453 if (discard)
454 this.transport.discard();
455 this.transport.close(this.onClose.bind(this, "forced close"));
456 }
457}
458exports.Socket = Socket;
Note: See TracBrowser for help on using the repository browser.