1 | import { isFunction } from './util/isFunction';
|
---|
2 | import { empty as emptyObserver } from './Observer';
|
---|
3 | import { Observer, PartialObserver, TeardownLogic } from './types';
|
---|
4 | import { Subscription } from './Subscription';
|
---|
5 | import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
|
---|
6 | import { config } from './config';
|
---|
7 | import { hostReportError } from './util/hostReportError';
|
---|
8 |
|
---|
9 | /**
|
---|
10 | * Implements the {@link Observer} interface and extends the
|
---|
11 | * {@link Subscription} class. While the {@link Observer} is the public API for
|
---|
12 | * consuming the values of an {@link Observable}, all Observers get converted to
|
---|
13 | * a Subscriber, in order to provide Subscription-like capabilities such as
|
---|
14 | * `unsubscribe`. Subscriber is a common type in RxJS, and crucial for
|
---|
15 | * implementing operators, but it is rarely used as a public API.
|
---|
16 | *
|
---|
17 | * @class Subscriber<T>
|
---|
18 | */
|
---|
19 | export class Subscriber<T> extends Subscription implements Observer<T> {
|
---|
20 |
|
---|
21 | [rxSubscriberSymbol]() { return this; }
|
---|
22 |
|
---|
23 | /**
|
---|
24 | * A static factory for a Subscriber, given a (potentially partial) definition
|
---|
25 | * of an Observer.
|
---|
26 | * @param {function(x: ?T): void} [next] The `next` callback of an Observer.
|
---|
27 | * @param {function(e: ?any): void} [error] The `error` callback of an
|
---|
28 | * Observer.
|
---|
29 | * @param {function(): void} [complete] The `complete` callback of an
|
---|
30 | * Observer.
|
---|
31 | * @return {Subscriber<T>} A Subscriber wrapping the (partially defined)
|
---|
32 | * Observer represented by the given arguments.
|
---|
33 | * @nocollapse
|
---|
34 | */
|
---|
35 | static create<T>(next?: (x?: T) => void,
|
---|
36 | error?: (e?: any) => void,
|
---|
37 | complete?: () => void): Subscriber<T> {
|
---|
38 | const subscriber = new Subscriber(next, error, complete);
|
---|
39 | subscriber.syncErrorThrowable = false;
|
---|
40 | return subscriber;
|
---|
41 | }
|
---|
42 |
|
---|
43 | /** @internal */ syncErrorValue: any = null;
|
---|
44 | /** @internal */ syncErrorThrown: boolean = false;
|
---|
45 | /** @internal */ syncErrorThrowable: boolean = false;
|
---|
46 |
|
---|
47 | protected isStopped: boolean = false;
|
---|
48 | protected destination: PartialObserver<any> | Subscriber<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
|
---|
49 |
|
---|
50 | /**
|
---|
51 | * @param {Observer|function(value: T): void} [destinationOrNext] A partially
|
---|
52 | * defined Observer or a `next` callback function.
|
---|
53 | * @param {function(e: ?any): void} [error] The `error` callback of an
|
---|
54 | * Observer.
|
---|
55 | * @param {function(): void} [complete] The `complete` callback of an
|
---|
56 | * Observer.
|
---|
57 | */
|
---|
58 | constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void),
|
---|
59 | error?: (e?: any) => void,
|
---|
60 | complete?: () => void) {
|
---|
61 | super();
|
---|
62 |
|
---|
63 | switch (arguments.length) {
|
---|
64 | case 0:
|
---|
65 | this.destination = emptyObserver;
|
---|
66 | break;
|
---|
67 | case 1:
|
---|
68 | if (!destinationOrNext) {
|
---|
69 | this.destination = emptyObserver;
|
---|
70 | break;
|
---|
71 | }
|
---|
72 | if (typeof destinationOrNext === 'object') {
|
---|
73 | if (destinationOrNext instanceof Subscriber) {
|
---|
74 | this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
|
---|
75 | this.destination = destinationOrNext;
|
---|
76 | destinationOrNext.add(this);
|
---|
77 | } else {
|
---|
78 | this.syncErrorThrowable = true;
|
---|
79 | this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
|
---|
80 | }
|
---|
81 | break;
|
---|
82 | }
|
---|
83 | default:
|
---|
84 | this.syncErrorThrowable = true;
|
---|
85 | this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
|
---|
86 | break;
|
---|
87 | }
|
---|
88 | }
|
---|
89 |
|
---|
90 | /**
|
---|
91 | * The {@link Observer} callback to receive notifications of type `next` from
|
---|
92 | * the Observable, with a value. The Observable may call this method 0 or more
|
---|
93 | * times.
|
---|
94 | * @param {T} [value] The `next` value.
|
---|
95 | * @return {void}
|
---|
96 | */
|
---|
97 | next(value?: T): void {
|
---|
98 | if (!this.isStopped) {
|
---|
99 | this._next(value);
|
---|
100 | }
|
---|
101 | }
|
---|
102 |
|
---|
103 | /**
|
---|
104 | * The {@link Observer} callback to receive notifications of type `error` from
|
---|
105 | * the Observable, with an attached `Error`. Notifies the Observer that
|
---|
106 | * the Observable has experienced an error condition.
|
---|
107 | * @param {any} [err] The `error` exception.
|
---|
108 | * @return {void}
|
---|
109 | */
|
---|
110 | error(err?: any): void {
|
---|
111 | if (!this.isStopped) {
|
---|
112 | this.isStopped = true;
|
---|
113 | this._error(err);
|
---|
114 | }
|
---|
115 | }
|
---|
116 |
|
---|
117 | /**
|
---|
118 | * The {@link Observer} callback to receive a valueless notification of type
|
---|
119 | * `complete` from the Observable. Notifies the Observer that the Observable
|
---|
120 | * has finished sending push-based notifications.
|
---|
121 | * @return {void}
|
---|
122 | */
|
---|
123 | complete(): void {
|
---|
124 | if (!this.isStopped) {
|
---|
125 | this.isStopped = true;
|
---|
126 | this._complete();
|
---|
127 | }
|
---|
128 | }
|
---|
129 |
|
---|
130 | unsubscribe(): void {
|
---|
131 | if (this.closed) {
|
---|
132 | return;
|
---|
133 | }
|
---|
134 | this.isStopped = true;
|
---|
135 | super.unsubscribe();
|
---|
136 | }
|
---|
137 |
|
---|
138 | protected _next(value: T): void {
|
---|
139 | this.destination.next(value);
|
---|
140 | }
|
---|
141 |
|
---|
142 | protected _error(err: any): void {
|
---|
143 | this.destination.error(err);
|
---|
144 | this.unsubscribe();
|
---|
145 | }
|
---|
146 |
|
---|
147 | protected _complete(): void {
|
---|
148 | this.destination.complete();
|
---|
149 | this.unsubscribe();
|
---|
150 | }
|
---|
151 |
|
---|
152 | /** @deprecated This is an internal implementation detail, do not use. */
|
---|
153 | _unsubscribeAndRecycle(): Subscriber<T> {
|
---|
154 | const { _parentOrParents } = this;
|
---|
155 | this._parentOrParents = null;
|
---|
156 | this.unsubscribe();
|
---|
157 | this.closed = false;
|
---|
158 | this.isStopped = false;
|
---|
159 | this._parentOrParents = _parentOrParents;
|
---|
160 | return this;
|
---|
161 | }
|
---|
162 | }
|
---|
163 |
|
---|
164 | /**
|
---|
165 | * We need this JSDoc comment for affecting ESDoc.
|
---|
166 | * @ignore
|
---|
167 | * @extends {Ignored}
|
---|
168 | */
|
---|
169 | export class SafeSubscriber<T> extends Subscriber<T> {
|
---|
170 |
|
---|
171 | private _context: any;
|
---|
172 |
|
---|
173 | constructor(private _parentSubscriber: Subscriber<T>,
|
---|
174 | observerOrNext?: PartialObserver<T> | ((value: T) => void),
|
---|
175 | error?: (e?: any) => void,
|
---|
176 | complete?: () => void) {
|
---|
177 | super();
|
---|
178 |
|
---|
179 | let next: ((value: T) => void);
|
---|
180 | let context: any = this;
|
---|
181 |
|
---|
182 | if (isFunction(observerOrNext)) {
|
---|
183 | next = (<((value: T) => void)> observerOrNext);
|
---|
184 | } else if (observerOrNext) {
|
---|
185 | next = (<PartialObserver<T>> observerOrNext).next;
|
---|
186 | error = (<PartialObserver<T>> observerOrNext).error;
|
---|
187 | complete = (<PartialObserver<T>> observerOrNext).complete;
|
---|
188 | if (observerOrNext !== emptyObserver) {
|
---|
189 | context = Object.create(observerOrNext);
|
---|
190 | if (isFunction(context.unsubscribe)) {
|
---|
191 | this.add(<() => void> context.unsubscribe.bind(context));
|
---|
192 | }
|
---|
193 | context.unsubscribe = this.unsubscribe.bind(this);
|
---|
194 | }
|
---|
195 | }
|
---|
196 |
|
---|
197 | this._context = context;
|
---|
198 | this._next = next;
|
---|
199 | this._error = error;
|
---|
200 | this._complete = complete;
|
---|
201 | }
|
---|
202 |
|
---|
203 | next(value?: T): void {
|
---|
204 | if (!this.isStopped && this._next) {
|
---|
205 | const { _parentSubscriber } = this;
|
---|
206 | if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
|
---|
207 | this.__tryOrUnsub(this._next, value);
|
---|
208 | } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
|
---|
209 | this.unsubscribe();
|
---|
210 | }
|
---|
211 | }
|
---|
212 | }
|
---|
213 |
|
---|
214 | error(err?: any): void {
|
---|
215 | if (!this.isStopped) {
|
---|
216 | const { _parentSubscriber } = this;
|
---|
217 | const { useDeprecatedSynchronousErrorHandling } = config;
|
---|
218 | if (this._error) {
|
---|
219 | if (!useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
|
---|
220 | this.__tryOrUnsub(this._error, err);
|
---|
221 | this.unsubscribe();
|
---|
222 | } else {
|
---|
223 | this.__tryOrSetError(_parentSubscriber, this._error, err);
|
---|
224 | this.unsubscribe();
|
---|
225 | }
|
---|
226 | } else if (!_parentSubscriber.syncErrorThrowable) {
|
---|
227 | this.unsubscribe();
|
---|
228 | if (useDeprecatedSynchronousErrorHandling) {
|
---|
229 | throw err;
|
---|
230 | }
|
---|
231 | hostReportError(err);
|
---|
232 | } else {
|
---|
233 | if (useDeprecatedSynchronousErrorHandling) {
|
---|
234 | _parentSubscriber.syncErrorValue = err;
|
---|
235 | _parentSubscriber.syncErrorThrown = true;
|
---|
236 | } else {
|
---|
237 | hostReportError(err);
|
---|
238 | }
|
---|
239 | this.unsubscribe();
|
---|
240 | }
|
---|
241 | }
|
---|
242 | }
|
---|
243 |
|
---|
244 | complete(): void {
|
---|
245 | if (!this.isStopped) {
|
---|
246 | const { _parentSubscriber } = this;
|
---|
247 | if (this._complete) {
|
---|
248 | const wrappedComplete = () => this._complete.call(this._context);
|
---|
249 |
|
---|
250 | if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
|
---|
251 | this.__tryOrUnsub(wrappedComplete);
|
---|
252 | this.unsubscribe();
|
---|
253 | } else {
|
---|
254 | this.__tryOrSetError(_parentSubscriber, wrappedComplete);
|
---|
255 | this.unsubscribe();
|
---|
256 | }
|
---|
257 | } else {
|
---|
258 | this.unsubscribe();
|
---|
259 | }
|
---|
260 | }
|
---|
261 | }
|
---|
262 |
|
---|
263 | private __tryOrUnsub(fn: Function, value?: any): void {
|
---|
264 | try {
|
---|
265 | fn.call(this._context, value);
|
---|
266 | } catch (err) {
|
---|
267 | this.unsubscribe();
|
---|
268 | if (config.useDeprecatedSynchronousErrorHandling) {
|
---|
269 | throw err;
|
---|
270 | } else {
|
---|
271 | hostReportError(err);
|
---|
272 | }
|
---|
273 | }
|
---|
274 | }
|
---|
275 |
|
---|
276 | private __tryOrSetError(parent: Subscriber<T>, fn: Function, value?: any): boolean {
|
---|
277 | if (!config.useDeprecatedSynchronousErrorHandling) {
|
---|
278 | throw new Error('bad call');
|
---|
279 | }
|
---|
280 | try {
|
---|
281 | fn.call(this._context, value);
|
---|
282 | } catch (err) {
|
---|
283 | if (config.useDeprecatedSynchronousErrorHandling) {
|
---|
284 | parent.syncErrorValue = err;
|
---|
285 | parent.syncErrorThrown = true;
|
---|
286 | return true;
|
---|
287 | } else {
|
---|
288 | hostReportError(err);
|
---|
289 | return true;
|
---|
290 | }
|
---|
291 | }
|
---|
292 | return false;
|
---|
293 | }
|
---|
294 |
|
---|
295 | /** @internal This is an internal implementation detail, do not use. */
|
---|
296 | _unsubscribe(): void {
|
---|
297 | const { _parentSubscriber } = this;
|
---|
298 | this._context = null;
|
---|
299 | this._parentSubscriber = null;
|
---|
300 | _parentSubscriber.unsubscribe();
|
---|
301 | }
|
---|
302 | }
|
---|