source: trip-planner-front/node_modules/rxjs/_esm2015/internal/observable/dom/WebSocketSubject.js

Last change on this file was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 6.8 KB
Line 
1import { Subject, AnonymousSubject } from '../../Subject';
2import { Subscriber } from '../../Subscriber';
3import { Observable } from '../../Observable';
4import { Subscription } from '../../Subscription';
5import { ReplaySubject } from '../../ReplaySubject';
6const DEFAULT_WEBSOCKET_CONFIG = {
7 url: '',
8 deserializer: (e) => JSON.parse(e.data),
9 serializer: (value) => JSON.stringify(value),
10};
11const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
12export class WebSocketSubject extends AnonymousSubject {
13 constructor(urlConfigOrSource, destination) {
14 super();
15 if (urlConfigOrSource instanceof Observable) {
16 this.destination = destination;
17 this.source = urlConfigOrSource;
18 }
19 else {
20 const config = this._config = Object.assign({}, DEFAULT_WEBSOCKET_CONFIG);
21 this._output = new Subject();
22 if (typeof urlConfigOrSource === 'string') {
23 config.url = urlConfigOrSource;
24 }
25 else {
26 for (let key in urlConfigOrSource) {
27 if (urlConfigOrSource.hasOwnProperty(key)) {
28 config[key] = urlConfigOrSource[key];
29 }
30 }
31 }
32 if (!config.WebSocketCtor && WebSocket) {
33 config.WebSocketCtor = WebSocket;
34 }
35 else if (!config.WebSocketCtor) {
36 throw new Error('no WebSocket constructor can be found');
37 }
38 this.destination = new ReplaySubject();
39 }
40 }
41 lift(operator) {
42 const sock = new WebSocketSubject(this._config, this.destination);
43 sock.operator = operator;
44 sock.source = this;
45 return sock;
46 }
47 _resetState() {
48 this._socket = null;
49 if (!this.source) {
50 this.destination = new ReplaySubject();
51 }
52 this._output = new Subject();
53 }
54 multiplex(subMsg, unsubMsg, messageFilter) {
55 const self = this;
56 return new Observable((observer) => {
57 try {
58 self.next(subMsg());
59 }
60 catch (err) {
61 observer.error(err);
62 }
63 const subscription = self.subscribe(x => {
64 try {
65 if (messageFilter(x)) {
66 observer.next(x);
67 }
68 }
69 catch (err) {
70 observer.error(err);
71 }
72 }, err => observer.error(err), () => observer.complete());
73 return () => {
74 try {
75 self.next(unsubMsg());
76 }
77 catch (err) {
78 observer.error(err);
79 }
80 subscription.unsubscribe();
81 };
82 });
83 }
84 _connectSocket() {
85 const { WebSocketCtor, protocol, url, binaryType } = this._config;
86 const observer = this._output;
87 let socket = null;
88 try {
89 socket = protocol ?
90 new WebSocketCtor(url, protocol) :
91 new WebSocketCtor(url);
92 this._socket = socket;
93 if (binaryType) {
94 this._socket.binaryType = binaryType;
95 }
96 }
97 catch (e) {
98 observer.error(e);
99 return;
100 }
101 const subscription = new Subscription(() => {
102 this._socket = null;
103 if (socket && socket.readyState === 1) {
104 socket.close();
105 }
106 });
107 socket.onopen = (e) => {
108 const { _socket } = this;
109 if (!_socket) {
110 socket.close();
111 this._resetState();
112 return;
113 }
114 const { openObserver } = this._config;
115 if (openObserver) {
116 openObserver.next(e);
117 }
118 const queue = this.destination;
119 this.destination = Subscriber.create((x) => {
120 if (socket.readyState === 1) {
121 try {
122 const { serializer } = this._config;
123 socket.send(serializer(x));
124 }
125 catch (e) {
126 this.destination.error(e);
127 }
128 }
129 }, (e) => {
130 const { closingObserver } = this._config;
131 if (closingObserver) {
132 closingObserver.next(undefined);
133 }
134 if (e && e.code) {
135 socket.close(e.code, e.reason);
136 }
137 else {
138 observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
139 }
140 this._resetState();
141 }, () => {
142 const { closingObserver } = this._config;
143 if (closingObserver) {
144 closingObserver.next(undefined);
145 }
146 socket.close();
147 this._resetState();
148 });
149 if (queue && queue instanceof ReplaySubject) {
150 subscription.add(queue.subscribe(this.destination));
151 }
152 };
153 socket.onerror = (e) => {
154 this._resetState();
155 observer.error(e);
156 };
157 socket.onclose = (e) => {
158 this._resetState();
159 const { closeObserver } = this._config;
160 if (closeObserver) {
161 closeObserver.next(e);
162 }
163 if (e.wasClean) {
164 observer.complete();
165 }
166 else {
167 observer.error(e);
168 }
169 };
170 socket.onmessage = (e) => {
171 try {
172 const { deserializer } = this._config;
173 observer.next(deserializer(e));
174 }
175 catch (err) {
176 observer.error(err);
177 }
178 };
179 }
180 _subscribe(subscriber) {
181 const { source } = this;
182 if (source) {
183 return source.subscribe(subscriber);
184 }
185 if (!this._socket) {
186 this._connectSocket();
187 }
188 this._output.subscribe(subscriber);
189 subscriber.add(() => {
190 const { _socket } = this;
191 if (this._output.observers.length === 0) {
192 if (_socket && _socket.readyState === 1) {
193 _socket.close();
194 }
195 this._resetState();
196 }
197 });
198 return subscriber;
199 }
200 unsubscribe() {
201 const { _socket } = this;
202 if (_socket && _socket.readyState === 1) {
203 _socket.close();
204 }
205 this._resetState();
206 super.unsubscribe();
207 }
208}
209//# sourceMappingURL=WebSocketSubject.js.map
Note: See TracBrowser for help on using the repository browser.