source: trip-planner-front/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts@ 1ad8e64

Last change on this file since 1ad8e64 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 11.8 KB
Line 
1import { Subject, AnonymousSubject } from '../../Subject';
2import { Subscriber } from '../../Subscriber';
3import { Observable } from '../../Observable';
4import { Subscription } from '../../Subscription';
5import { Operator } from '../../Operator';
6import { ReplaySubject } from '../../ReplaySubject';
7import { Observer, NextObserver } from '../../types';
8
9/**
10 * WebSocketSubjectConfig is a plain Object that allows us to make our
11 * webSocket configurable.
12 *
13 * <span class="informal">Provides flexibility to {@link webSocket}</span>
14 *
15 * It defines a set of properties to provide custom behavior in specific
16 * moments of the socket's lifecycle. When the connection opens we can
17 * use `openObserver`, when the connection is closed `closeObserver`, if we
18 * are interested in listening for data comming from server: `deserializer`,
19 * which allows us to customize the deserialization strategy of data before passing it
20 * to the socket client. By default `deserializer` is going to apply `JSON.parse` to each message comming
21 * from the Server.
22 *
23 * ## Example
24 * **deserializer**, the default for this property is `JSON.parse` but since there are just two options
25 * for incomming data, either be text or binarydata. We can apply a custom deserialization strategy
26 * or just simply skip the default behaviour.
27 * ```ts
28 * import { webSocket } from 'rxjs/webSocket';
29 *
30 * const wsSubject = webSocket({
31 * url: 'ws://localhost:8081',
32 * //Apply any transformation of your choice.
33 * deserializer: ({data}) => data
34 * });
35 *
36 * wsSubject.subscribe(console.log);
37 *
38 * // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
39 * //output
40 * //
41 * // This is a msg from the server
42 * ```
43 *
44 * **serializer** allows us tom apply custom serialization strategy but for the outgoing messages
45 * ```ts
46 * import { webSocket } from 'rxjs/webSocket';
47 *
48 * const wsSubject = webSocket({
49 * url: 'ws://localhost:8081',
50 * //Apply any transformation of your choice.
51 * serializer: msg => JSON.stringify({channel: "webDevelopment", msg: msg})
52 * });
53 *
54 * wsSubject.subscribe(() => subject.next("msg to the server"));
55 *
56 * // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
57 * //output
58 * //
59 * // {"channel":"webDevelopment","msg":"msg to the server"}
60 * ```
61 *
62 * **closeObserver** allows us to set a custom error when an error raise up.
63 * ```ts
64 * import { webSocket } from 'rxjs/webSocket';
65 *
66 * const wsSubject = webSocket({
67 * url: 'ws://localhost:8081',
68 * closeObserver: {
69 next(closeEvent) {
70 const customError = { code: 6666, reason: "Custom evil reason" }
71 console.log(`code: ${customError.code}, reason: ${customError.reason}`);
72 }
73 }
74 * });
75 *
76 * //output
77 * // code: 6666, reason: Custom evil reason
78 * ```
79 *
80 * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
81 * webSocket or sending notification that the connection was successful, this is when
82 * openObserver is usefull for.
83 * ```ts
84 * import { webSocket } from 'rxjs/webSocket';
85 *
86 * const wsSubject = webSocket({
87 * url: 'ws://localhost:8081',
88 * openObserver: {
89 * next: () => {
90 * console.log('connetion ok');
91 * }
92 * },
93 * });
94 *
95 * //output
96 * // connetion ok`
97 * ```
98 * */
99
100export interface WebSocketSubjectConfig<T> {
101 /** The url of the socket server to connect to */
102 url: string;
103 /** The protocol to use to connect */
104 protocol?: string | Array<string>;
105 /** @deprecated use {@link deserializer} */
106 resultSelector?: (e: MessageEvent) => T;
107 /**
108 * A serializer used to create messages from passed values before the
109 * messages are sent to the server. Defaults to JSON.stringify.
110 */
111 serializer?: (value: T) => WebSocketMessage;
112 /**
113 * A deserializer used for messages arriving on the socket from the
114 * server. Defaults to JSON.parse.
115 */
116 deserializer?: (e: MessageEvent) => T;
117 /**
118 * An Observer that watches when open events occur on the underlying web socket.
119 */
120 openObserver?: NextObserver<Event>;
121 /**
122 * An Observer than watches when close events occur on the underlying webSocket
123 */
124 closeObserver?: NextObserver<CloseEvent>;
125 /**
126 * An Observer that watches when a close is about to occur due to
127 * unsubscription.
128 */
129 closingObserver?: NextObserver<void>;
130 /**
131 * A WebSocket constructor to use. This is useful for situations like using a
132 * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
133 * for testing purposes
134 */
135 WebSocketCtor?: { new(url: string, protocols?: string|string[]): WebSocket };
136 /** Sets the `binaryType` property of the underlying WebSocket. */
137 binaryType?: 'blob' | 'arraybuffer';
138}
139
140const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
141 url: '',
142 deserializer: (e: MessageEvent) => JSON.parse(e.data),
143 serializer: (value: any) => JSON.stringify(value),
144};
145
146const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
147 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
148
149export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
150
151export class WebSocketSubject<T> extends AnonymousSubject<T> {
152
153 private _config: WebSocketSubjectConfig<T>;
154
155 /** @deprecated This is an internal implementation detail, do not use. */
156 _output: Subject<T>;
157
158 private _socket: WebSocket;
159
160 constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
161 super();
162 if (urlConfigOrSource instanceof Observable) {
163 this.destination = destination;
164 this.source = urlConfigOrSource as Observable<T>;
165 } else {
166 const config = this._config = { ...DEFAULT_WEBSOCKET_CONFIG };
167 this._output = new Subject<T>();
168 if (typeof urlConfigOrSource === 'string') {
169 config.url = urlConfigOrSource;
170 } else {
171 for (let key in urlConfigOrSource) {
172 if (urlConfigOrSource.hasOwnProperty(key)) {
173 config[key] = urlConfigOrSource[key];
174 }
175 }
176 }
177
178 if (!config.WebSocketCtor && WebSocket) {
179 config.WebSocketCtor = WebSocket;
180 } else if (!config.WebSocketCtor) {
181 throw new Error('no WebSocket constructor can be found');
182 }
183 this.destination = new ReplaySubject();
184 }
185 }
186
187 lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
188 const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, <any> this.destination);
189 sock.operator = operator;
190 sock.source = this;
191 return sock;
192 }
193
194 private _resetState() {
195 this._socket = null;
196 if (!this.source) {
197 this.destination = new ReplaySubject();
198 }
199 this._output = new Subject<T>();
200 }
201
202 /**
203 * Creates an {@link Observable}, that when subscribed to, sends a message,
204 * defined by the `subMsg` function, to the server over the socket to begin a
205 * subscription to data over that socket. Once data arrives, the
206 * `messageFilter` argument will be used to select the appropriate data for
207 * the resulting Observable. When teardown occurs, either due to
208 * unsubscription, completion or error, a message defined by the `unsubMsg`
209 * argument will be send to the server over the WebSocketSubject.
210 *
211 * @param subMsg A function to generate the subscription message to be sent to
212 * the server. This will still be processed by the serializer in the
213 * WebSocketSubject's config. (Which defaults to JSON serialization)
214 * @param unsubMsg A function to generate the unsubscription message to be
215 * sent to the server at teardown. This will still be processed by the
216 * serializer in the WebSocketSubject's config.
217 * @param messageFilter A predicate for selecting the appropriate messages
218 * from the server for the output stream.
219 */
220 multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
221 const self = this;
222 return new Observable((observer: Observer<any>) => {
223 try {
224 self.next(subMsg());
225 } catch (err) {
226 observer.error(err);
227 }
228
229 const subscription = self.subscribe(x => {
230 try {
231 if (messageFilter(x)) {
232 observer.next(x);
233 }
234 } catch (err) {
235 observer.error(err);
236 }
237 },
238 err => observer.error(err),
239 () => observer.complete());
240
241 return () => {
242 try {
243 self.next(unsubMsg());
244 } catch (err) {
245 observer.error(err);
246 }
247 subscription.unsubscribe();
248 };
249 });
250 }
251
252 private _connectSocket() {
253 const { WebSocketCtor, protocol, url, binaryType } = this._config;
254 const observer = this._output;
255
256 let socket: WebSocket = null;
257 try {
258 socket = protocol ?
259 new WebSocketCtor(url, protocol) :
260 new WebSocketCtor(url);
261 this._socket = socket;
262 if (binaryType) {
263 this._socket.binaryType = binaryType;
264 }
265 } catch (e) {
266 observer.error(e);
267 return;
268 }
269
270 const subscription = new Subscription(() => {
271 this._socket = null;
272 if (socket && socket.readyState === 1) {
273 socket.close();
274 }
275 });
276
277 socket.onopen = (e: Event) => {
278 const { _socket } = this;
279 if (!_socket) {
280 socket.close();
281 this._resetState();
282 return;
283 }
284 const { openObserver } = this._config;
285 if (openObserver) {
286 openObserver.next(e);
287 }
288
289 const queue = this.destination;
290
291 this.destination = Subscriber.create<T>(
292 (x) => {
293 if (socket.readyState === 1) {
294 try {
295 const { serializer } = this._config;
296 socket.send(serializer(x));
297 } catch (e) {
298 this.destination.error(e);
299 }
300 }
301 },
302 (e) => {
303 const { closingObserver } = this._config;
304 if (closingObserver) {
305 closingObserver.next(undefined);
306 }
307 if (e && e.code) {
308 socket.close(e.code, e.reason);
309 } else {
310 observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
311 }
312 this._resetState();
313 },
314 () => {
315 const { closingObserver } = this._config;
316 if (closingObserver) {
317 closingObserver.next(undefined);
318 }
319 socket.close();
320 this._resetState();
321 }
322 ) as Subscriber<any>;
323
324 if (queue && queue instanceof ReplaySubject) {
325 subscription.add((<ReplaySubject<T>>queue).subscribe(this.destination));
326 }
327 };
328
329 socket.onerror = (e: Event) => {
330 this._resetState();
331 observer.error(e);
332 };
333
334 socket.onclose = (e: CloseEvent) => {
335 this._resetState();
336 const { closeObserver } = this._config;
337 if (closeObserver) {
338 closeObserver.next(e);
339 }
340 if (e.wasClean) {
341 observer.complete();
342 } else {
343 observer.error(e);
344 }
345 };
346
347 socket.onmessage = (e: MessageEvent) => {
348 try {
349 const { deserializer } = this._config;
350 observer.next(deserializer(e));
351 } catch (err) {
352 observer.error(err);
353 }
354 };
355 }
356
357 /** @deprecated This is an internal implementation detail, do not use. */
358 _subscribe(subscriber: Subscriber<T>): Subscription {
359 const { source } = this;
360 if (source) {
361 return source.subscribe(subscriber);
362 }
363 if (!this._socket) {
364 this._connectSocket();
365 }
366 this._output.subscribe(subscriber);
367 subscriber.add(() => {
368 const { _socket } = this;
369 if (this._output.observers.length === 0) {
370 if (_socket && _socket.readyState === 1) {
371 _socket.close();
372 }
373 this._resetState();
374 }
375 });
376 return subscriber;
377 }
378
379 unsubscribe() {
380 const { _socket } = this;
381 if (_socket && _socket.readyState === 1) {
382 _socket.close();
383 }
384 this._resetState();
385 super.unsubscribe();
386 }
387}
Note: See TracBrowser for help on using the repository browser.