[6a3a178] | 1 | 'use strict';
|
---|
| 2 |
|
---|
| 3 | const { Duplex } = require('stream');
|
---|
| 4 |
|
---|
| 5 | /**
|
---|
| 6 | * Emits the `'close'` event on a stream.
|
---|
| 7 | *
|
---|
| 8 | * @param {stream.Duplex} The stream.
|
---|
| 9 | * @private
|
---|
| 10 | */
|
---|
| 11 | function emitClose(stream) {
|
---|
| 12 | stream.emit('close');
|
---|
| 13 | }
|
---|
| 14 |
|
---|
| 15 | /**
|
---|
| 16 | * The listener of the `'end'` event.
|
---|
| 17 | *
|
---|
| 18 | * @private
|
---|
| 19 | */
|
---|
| 20 | function duplexOnEnd() {
|
---|
| 21 | if (!this.destroyed && this._writableState.finished) {
|
---|
| 22 | this.destroy();
|
---|
| 23 | }
|
---|
| 24 | }
|
---|
| 25 |
|
---|
| 26 | /**
|
---|
| 27 | * The listener of the `'error'` event.
|
---|
| 28 | *
|
---|
| 29 | * @param {Error} err The error
|
---|
| 30 | * @private
|
---|
| 31 | */
|
---|
| 32 | function duplexOnError(err) {
|
---|
| 33 | this.removeListener('error', duplexOnError);
|
---|
| 34 | this.destroy();
|
---|
| 35 | if (this.listenerCount('error') === 0) {
|
---|
| 36 | // Do not suppress the throwing behavior.
|
---|
| 37 | this.emit('error', err);
|
---|
| 38 | }
|
---|
| 39 | }
|
---|
| 40 |
|
---|
| 41 | /**
|
---|
| 42 | * Wraps a `WebSocket` in a duplex stream.
|
---|
| 43 | *
|
---|
| 44 | * @param {WebSocket} ws The `WebSocket` to wrap
|
---|
| 45 | * @param {Object} [options] The options for the `Duplex` constructor
|
---|
| 46 | * @return {stream.Duplex} The duplex stream
|
---|
| 47 | * @public
|
---|
| 48 | */
|
---|
| 49 | function createWebSocketStream(ws, options) {
|
---|
| 50 | let resumeOnReceiverDrain = true;
|
---|
| 51 |
|
---|
| 52 | function receiverOnDrain() {
|
---|
| 53 | if (resumeOnReceiverDrain) ws._socket.resume();
|
---|
| 54 | }
|
---|
| 55 |
|
---|
| 56 | if (ws.readyState === ws.CONNECTING) {
|
---|
| 57 | ws.once('open', function open() {
|
---|
| 58 | ws._receiver.removeAllListeners('drain');
|
---|
| 59 | ws._receiver.on('drain', receiverOnDrain);
|
---|
| 60 | });
|
---|
| 61 | } else {
|
---|
| 62 | ws._receiver.removeAllListeners('drain');
|
---|
| 63 | ws._receiver.on('drain', receiverOnDrain);
|
---|
| 64 | }
|
---|
| 65 |
|
---|
| 66 | const duplex = new Duplex({
|
---|
| 67 | ...options,
|
---|
| 68 | autoDestroy: false,
|
---|
| 69 | emitClose: false,
|
---|
| 70 | objectMode: false,
|
---|
| 71 | writableObjectMode: false
|
---|
| 72 | });
|
---|
| 73 |
|
---|
| 74 | ws.on('message', function message(msg) {
|
---|
| 75 | if (!duplex.push(msg)) {
|
---|
| 76 | resumeOnReceiverDrain = false;
|
---|
| 77 | ws._socket.pause();
|
---|
| 78 | }
|
---|
| 79 | });
|
---|
| 80 |
|
---|
| 81 | ws.once('error', function error(err) {
|
---|
| 82 | if (duplex.destroyed) return;
|
---|
| 83 |
|
---|
| 84 | duplex.destroy(err);
|
---|
| 85 | });
|
---|
| 86 |
|
---|
| 87 | ws.once('close', function close() {
|
---|
| 88 | if (duplex.destroyed) return;
|
---|
| 89 |
|
---|
| 90 | duplex.push(null);
|
---|
| 91 | });
|
---|
| 92 |
|
---|
| 93 | duplex._destroy = function (err, callback) {
|
---|
| 94 | if (ws.readyState === ws.CLOSED) {
|
---|
| 95 | callback(err);
|
---|
| 96 | process.nextTick(emitClose, duplex);
|
---|
| 97 | return;
|
---|
| 98 | }
|
---|
| 99 |
|
---|
| 100 | let called = false;
|
---|
| 101 |
|
---|
| 102 | ws.once('error', function error(err) {
|
---|
| 103 | called = true;
|
---|
| 104 | callback(err);
|
---|
| 105 | });
|
---|
| 106 |
|
---|
| 107 | ws.once('close', function close() {
|
---|
| 108 | if (!called) callback(err);
|
---|
| 109 | process.nextTick(emitClose, duplex);
|
---|
| 110 | });
|
---|
| 111 | ws.terminate();
|
---|
| 112 | };
|
---|
| 113 |
|
---|
| 114 | duplex._final = function (callback) {
|
---|
| 115 | if (ws.readyState === ws.CONNECTING) {
|
---|
| 116 | ws.once('open', function open() {
|
---|
| 117 | duplex._final(callback);
|
---|
| 118 | });
|
---|
| 119 | return;
|
---|
| 120 | }
|
---|
| 121 |
|
---|
| 122 | // If the value of the `_socket` property is `null` it means that `ws` is a
|
---|
| 123 | // client websocket and the handshake failed. In fact, when this happens, a
|
---|
| 124 | // socket is never assigned to the websocket. Wait for the `'error'` event
|
---|
| 125 | // that will be emitted by the websocket.
|
---|
| 126 | if (ws._socket === null) return;
|
---|
| 127 |
|
---|
| 128 | if (ws._socket._writableState.finished) {
|
---|
| 129 | callback();
|
---|
| 130 | if (duplex._readableState.endEmitted) duplex.destroy();
|
---|
| 131 | } else {
|
---|
| 132 | ws._socket.once('finish', function finish() {
|
---|
| 133 | // `duplex` is not destroyed here because the `'end'` event will be
|
---|
| 134 | // emitted on `duplex` after this `'finish'` event. The EOF signaling
|
---|
| 135 | // `null` chunk is, in fact, pushed when the websocket emits `'close'`.
|
---|
| 136 | callback();
|
---|
| 137 | });
|
---|
| 138 | ws.close();
|
---|
| 139 | }
|
---|
| 140 | };
|
---|
| 141 |
|
---|
| 142 | duplex._read = function () {
|
---|
| 143 | if (ws.readyState === ws.OPEN && !resumeOnReceiverDrain) {
|
---|
| 144 | resumeOnReceiverDrain = true;
|
---|
| 145 | if (!ws._receiver._writableState.needDrain) ws._socket.resume();
|
---|
| 146 | }
|
---|
| 147 | };
|
---|
| 148 |
|
---|
| 149 | duplex._write = function (chunk, encoding, callback) {
|
---|
| 150 | if (ws.readyState === ws.CONNECTING) {
|
---|
| 151 | ws.once('open', function open() {
|
---|
| 152 | duplex._write(chunk, encoding, callback);
|
---|
| 153 | });
|
---|
| 154 | return;
|
---|
| 155 | }
|
---|
| 156 |
|
---|
| 157 | ws.send(chunk, callback);
|
---|
| 158 | };
|
---|
| 159 |
|
---|
| 160 | duplex.on('end', duplexOnEnd);
|
---|
| 161 | duplex.on('error', duplexOnError);
|
---|
| 162 | return duplex;
|
---|
| 163 | }
|
---|
| 164 |
|
---|
| 165 | module.exports = createWebSocketStream;
|
---|