1 | 'use strict';
|
---|
2 |
|
---|
3 | const { Duplex } = require('stream');
|
---|
4 |
|
---|
5 | /**
|
---|
6 | * Emits the `'close'` event on a stream.
|
---|
7 | *
|
---|
8 | * @param {stream.Duplex} The stream.
|
---|
9 | * @private
|
---|
10 | */
|
---|
11 | function emitClose(stream) {
|
---|
12 | stream.emit('close');
|
---|
13 | }
|
---|
14 |
|
---|
15 | /**
|
---|
16 | * The listener of the `'end'` event.
|
---|
17 | *
|
---|
18 | * @private
|
---|
19 | */
|
---|
20 | function duplexOnEnd() {
|
---|
21 | if (!this.destroyed && this._writableState.finished) {
|
---|
22 | this.destroy();
|
---|
23 | }
|
---|
24 | }
|
---|
25 |
|
---|
26 | /**
|
---|
27 | * The listener of the `'error'` event.
|
---|
28 | *
|
---|
29 | * @param {Error} err The error
|
---|
30 | * @private
|
---|
31 | */
|
---|
32 | function duplexOnError(err) {
|
---|
33 | this.removeListener('error', duplexOnError);
|
---|
34 | this.destroy();
|
---|
35 | if (this.listenerCount('error') === 0) {
|
---|
36 | // Do not suppress the throwing behavior.
|
---|
37 | this.emit('error', err);
|
---|
38 | }
|
---|
39 | }
|
---|
40 |
|
---|
41 | /**
|
---|
42 | * Wraps a `WebSocket` in a duplex stream.
|
---|
43 | *
|
---|
44 | * @param {WebSocket} ws The `WebSocket` to wrap
|
---|
45 | * @param {Object} [options] The options for the `Duplex` constructor
|
---|
46 | * @return {stream.Duplex} The duplex stream
|
---|
47 | * @public
|
---|
48 | */
|
---|
49 | function createWebSocketStream(ws, options) {
|
---|
50 | let resumeOnReceiverDrain = true;
|
---|
51 |
|
---|
52 | function receiverOnDrain() {
|
---|
53 | if (resumeOnReceiverDrain) ws._socket.resume();
|
---|
54 | }
|
---|
55 |
|
---|
56 | if (ws.readyState === ws.CONNECTING) {
|
---|
57 | ws.once('open', function open() {
|
---|
58 | ws._receiver.removeAllListeners('drain');
|
---|
59 | ws._receiver.on('drain', receiverOnDrain);
|
---|
60 | });
|
---|
61 | } else {
|
---|
62 | ws._receiver.removeAllListeners('drain');
|
---|
63 | ws._receiver.on('drain', receiverOnDrain);
|
---|
64 | }
|
---|
65 |
|
---|
66 | const duplex = new Duplex({
|
---|
67 | ...options,
|
---|
68 | autoDestroy: false,
|
---|
69 | emitClose: false,
|
---|
70 | objectMode: false,
|
---|
71 | writableObjectMode: false
|
---|
72 | });
|
---|
73 |
|
---|
74 | ws.on('message', function message(msg) {
|
---|
75 | if (!duplex.push(msg)) {
|
---|
76 | resumeOnReceiverDrain = false;
|
---|
77 | ws._socket.pause();
|
---|
78 | }
|
---|
79 | });
|
---|
80 |
|
---|
81 | ws.once('error', function error(err) {
|
---|
82 | if (duplex.destroyed) return;
|
---|
83 |
|
---|
84 | duplex.destroy(err);
|
---|
85 | });
|
---|
86 |
|
---|
87 | ws.once('close', function close() {
|
---|
88 | if (duplex.destroyed) return;
|
---|
89 |
|
---|
90 | duplex.push(null);
|
---|
91 | });
|
---|
92 |
|
---|
93 | duplex._destroy = function (err, callback) {
|
---|
94 | if (ws.readyState === ws.CLOSED) {
|
---|
95 | callback(err);
|
---|
96 | process.nextTick(emitClose, duplex);
|
---|
97 | return;
|
---|
98 | }
|
---|
99 |
|
---|
100 | let called = false;
|
---|
101 |
|
---|
102 | ws.once('error', function error(err) {
|
---|
103 | called = true;
|
---|
104 | callback(err);
|
---|
105 | });
|
---|
106 |
|
---|
107 | ws.once('close', function close() {
|
---|
108 | if (!called) callback(err);
|
---|
109 | process.nextTick(emitClose, duplex);
|
---|
110 | });
|
---|
111 | ws.terminate();
|
---|
112 | };
|
---|
113 |
|
---|
114 | duplex._final = function (callback) {
|
---|
115 | if (ws.readyState === ws.CONNECTING) {
|
---|
116 | ws.once('open', function open() {
|
---|
117 | duplex._final(callback);
|
---|
118 | });
|
---|
119 | return;
|
---|
120 | }
|
---|
121 |
|
---|
122 | // If the value of the `_socket` property is `null` it means that `ws` is a
|
---|
123 | // client websocket and the handshake failed. In fact, when this happens, a
|
---|
124 | // socket is never assigned to the websocket. Wait for the `'error'` event
|
---|
125 | // that will be emitted by the websocket.
|
---|
126 | if (ws._socket === null) return;
|
---|
127 |
|
---|
128 | if (ws._socket._writableState.finished) {
|
---|
129 | callback();
|
---|
130 | if (duplex._readableState.endEmitted) duplex.destroy();
|
---|
131 | } else {
|
---|
132 | ws._socket.once('finish', function finish() {
|
---|
133 | // `duplex` is not destroyed here because the `'end'` event will be
|
---|
134 | // emitted on `duplex` after this `'finish'` event. The EOF signaling
|
---|
135 | // `null` chunk is, in fact, pushed when the websocket emits `'close'`.
|
---|
136 | callback();
|
---|
137 | });
|
---|
138 | ws.close();
|
---|
139 | }
|
---|
140 | };
|
---|
141 |
|
---|
142 | duplex._read = function () {
|
---|
143 | if (ws.readyState === ws.OPEN && !resumeOnReceiverDrain) {
|
---|
144 | resumeOnReceiverDrain = true;
|
---|
145 | if (!ws._receiver._writableState.needDrain) ws._socket.resume();
|
---|
146 | }
|
---|
147 | };
|
---|
148 |
|
---|
149 | duplex._write = function (chunk, encoding, callback) {
|
---|
150 | if (ws.readyState === ws.CONNECTING) {
|
---|
151 | ws.once('open', function open() {
|
---|
152 | duplex._write(chunk, encoding, callback);
|
---|
153 | });
|
---|
154 | return;
|
---|
155 | }
|
---|
156 |
|
---|
157 | ws.send(chunk, callback);
|
---|
158 | };
|
---|
159 |
|
---|
160 | duplex.on('end', duplexOnEnd);
|
---|
161 | duplex.on('error', duplexOnError);
|
---|
162 | return duplex;
|
---|
163 | }
|
---|
164 |
|
---|
165 | module.exports = createWebSocketStream;
|
---|