source: trip-planner-front/node_modules/rxjs/src/internal/Subscriber.ts@ 6a3a178

Last change on this file since 6a3a178 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 9.2 KB
Line 
1import { isFunction } from './util/isFunction';
2import { empty as emptyObserver } from './Observer';
3import { Observer, PartialObserver, TeardownLogic } from './types';
4import { Subscription } from './Subscription';
5import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
6import { config } from './config';
7import { hostReportError } from './util/hostReportError';
8
9/**
10 * Implements the {@link Observer} interface and extends the
11 * {@link Subscription} class. While the {@link Observer} is the public API for
12 * consuming the values of an {@link Observable}, all Observers get converted to
13 * a Subscriber, in order to provide Subscription-like capabilities such as
14 * `unsubscribe`. Subscriber is a common type in RxJS, and crucial for
15 * implementing operators, but it is rarely used as a public API.
16 *
17 * @class Subscriber<T>
18 */
19export class Subscriber<T> extends Subscription implements Observer<T> {
20
21 [rxSubscriberSymbol]() { return this; }
22
23 /**
24 * A static factory for a Subscriber, given a (potentially partial) definition
25 * of an Observer.
26 * @param {function(x: ?T): void} [next] The `next` callback of an Observer.
27 * @param {function(e: ?any): void} [error] The `error` callback of an
28 * Observer.
29 * @param {function(): void} [complete] The `complete` callback of an
30 * Observer.
31 * @return {Subscriber<T>} A Subscriber wrapping the (partially defined)
32 * Observer represented by the given arguments.
33 * @nocollapse
34 */
35 static create<T>(next?: (x?: T) => void,
36 error?: (e?: any) => void,
37 complete?: () => void): Subscriber<T> {
38 const subscriber = new Subscriber(next, error, complete);
39 subscriber.syncErrorThrowable = false;
40 return subscriber;
41 }
42
43 /** @internal */ syncErrorValue: any = null;
44 /** @internal */ syncErrorThrown: boolean = false;
45 /** @internal */ syncErrorThrowable: boolean = false;
46
47 protected isStopped: boolean = false;
48 protected destination: PartialObserver<any> | Subscriber<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
49
50 /**
51 * @param {Observer|function(value: T): void} [destinationOrNext] A partially
52 * defined Observer or a `next` callback function.
53 * @param {function(e: ?any): void} [error] The `error` callback of an
54 * Observer.
55 * @param {function(): void} [complete] The `complete` callback of an
56 * Observer.
57 */
58 constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void),
59 error?: (e?: any) => void,
60 complete?: () => void) {
61 super();
62
63 switch (arguments.length) {
64 case 0:
65 this.destination = emptyObserver;
66 break;
67 case 1:
68 if (!destinationOrNext) {
69 this.destination = emptyObserver;
70 break;
71 }
72 if (typeof destinationOrNext === 'object') {
73 if (destinationOrNext instanceof Subscriber) {
74 this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
75 this.destination = destinationOrNext;
76 destinationOrNext.add(this);
77 } else {
78 this.syncErrorThrowable = true;
79 this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
80 }
81 break;
82 }
83 default:
84 this.syncErrorThrowable = true;
85 this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
86 break;
87 }
88 }
89
90 /**
91 * The {@link Observer} callback to receive notifications of type `next` from
92 * the Observable, with a value. The Observable may call this method 0 or more
93 * times.
94 * @param {T} [value] The `next` value.
95 * @return {void}
96 */
97 next(value?: T): void {
98 if (!this.isStopped) {
99 this._next(value);
100 }
101 }
102
103 /**
104 * The {@link Observer} callback to receive notifications of type `error` from
105 * the Observable, with an attached `Error`. Notifies the Observer that
106 * the Observable has experienced an error condition.
107 * @param {any} [err] The `error` exception.
108 * @return {void}
109 */
110 error(err?: any): void {
111 if (!this.isStopped) {
112 this.isStopped = true;
113 this._error(err);
114 }
115 }
116
117 /**
118 * The {@link Observer} callback to receive a valueless notification of type
119 * `complete` from the Observable. Notifies the Observer that the Observable
120 * has finished sending push-based notifications.
121 * @return {void}
122 */
123 complete(): void {
124 if (!this.isStopped) {
125 this.isStopped = true;
126 this._complete();
127 }
128 }
129
130 unsubscribe(): void {
131 if (this.closed) {
132 return;
133 }
134 this.isStopped = true;
135 super.unsubscribe();
136 }
137
138 protected _next(value: T): void {
139 this.destination.next(value);
140 }
141
142 protected _error(err: any): void {
143 this.destination.error(err);
144 this.unsubscribe();
145 }
146
147 protected _complete(): void {
148 this.destination.complete();
149 this.unsubscribe();
150 }
151
152 /** @deprecated This is an internal implementation detail, do not use. */
153 _unsubscribeAndRecycle(): Subscriber<T> {
154 const { _parentOrParents } = this;
155 this._parentOrParents = null;
156 this.unsubscribe();
157 this.closed = false;
158 this.isStopped = false;
159 this._parentOrParents = _parentOrParents;
160 return this;
161 }
162}
163
164/**
165 * We need this JSDoc comment for affecting ESDoc.
166 * @ignore
167 * @extends {Ignored}
168 */
169export class SafeSubscriber<T> extends Subscriber<T> {
170
171 private _context: any;
172
173 constructor(private _parentSubscriber: Subscriber<T>,
174 observerOrNext?: PartialObserver<T> | ((value: T) => void),
175 error?: (e?: any) => void,
176 complete?: () => void) {
177 super();
178
179 let next: ((value: T) => void);
180 let context: any = this;
181
182 if (isFunction(observerOrNext)) {
183 next = (<((value: T) => void)> observerOrNext);
184 } else if (observerOrNext) {
185 next = (<PartialObserver<T>> observerOrNext).next;
186 error = (<PartialObserver<T>> observerOrNext).error;
187 complete = (<PartialObserver<T>> observerOrNext).complete;
188 if (observerOrNext !== emptyObserver) {
189 context = Object.create(observerOrNext);
190 if (isFunction(context.unsubscribe)) {
191 this.add(<() => void> context.unsubscribe.bind(context));
192 }
193 context.unsubscribe = this.unsubscribe.bind(this);
194 }
195 }
196
197 this._context = context;
198 this._next = next;
199 this._error = error;
200 this._complete = complete;
201 }
202
203 next(value?: T): void {
204 if (!this.isStopped && this._next) {
205 const { _parentSubscriber } = this;
206 if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
207 this.__tryOrUnsub(this._next, value);
208 } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
209 this.unsubscribe();
210 }
211 }
212 }
213
214 error(err?: any): void {
215 if (!this.isStopped) {
216 const { _parentSubscriber } = this;
217 const { useDeprecatedSynchronousErrorHandling } = config;
218 if (this._error) {
219 if (!useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
220 this.__tryOrUnsub(this._error, err);
221 this.unsubscribe();
222 } else {
223 this.__tryOrSetError(_parentSubscriber, this._error, err);
224 this.unsubscribe();
225 }
226 } else if (!_parentSubscriber.syncErrorThrowable) {
227 this.unsubscribe();
228 if (useDeprecatedSynchronousErrorHandling) {
229 throw err;
230 }
231 hostReportError(err);
232 } else {
233 if (useDeprecatedSynchronousErrorHandling) {
234 _parentSubscriber.syncErrorValue = err;
235 _parentSubscriber.syncErrorThrown = true;
236 } else {
237 hostReportError(err);
238 }
239 this.unsubscribe();
240 }
241 }
242 }
243
244 complete(): void {
245 if (!this.isStopped) {
246 const { _parentSubscriber } = this;
247 if (this._complete) {
248 const wrappedComplete = () => this._complete.call(this._context);
249
250 if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
251 this.__tryOrUnsub(wrappedComplete);
252 this.unsubscribe();
253 } else {
254 this.__tryOrSetError(_parentSubscriber, wrappedComplete);
255 this.unsubscribe();
256 }
257 } else {
258 this.unsubscribe();
259 }
260 }
261 }
262
263 private __tryOrUnsub(fn: Function, value?: any): void {
264 try {
265 fn.call(this._context, value);
266 } catch (err) {
267 this.unsubscribe();
268 if (config.useDeprecatedSynchronousErrorHandling) {
269 throw err;
270 } else {
271 hostReportError(err);
272 }
273 }
274 }
275
276 private __tryOrSetError(parent: Subscriber<T>, fn: Function, value?: any): boolean {
277 if (!config.useDeprecatedSynchronousErrorHandling) {
278 throw new Error('bad call');
279 }
280 try {
281 fn.call(this._context, value);
282 } catch (err) {
283 if (config.useDeprecatedSynchronousErrorHandling) {
284 parent.syncErrorValue = err;
285 parent.syncErrorThrown = true;
286 return true;
287 } else {
288 hostReportError(err);
289 return true;
290 }
291 }
292 return false;
293 }
294
295 /** @internal This is an internal implementation detail, do not use. */
296 _unsubscribe(): void {
297 const { _parentSubscriber } = this;
298 this._context = null;
299 this._parentSubscriber = null;
300 _parentSubscriber.unsubscribe();
301 }
302}
Note: See TracBrowser for help on using the repository browser.