[6a3a178] | 1 | import { isFunction } from './util/isFunction';
|
---|
| 2 | import { empty as emptyObserver } from './Observer';
|
---|
| 3 | import { Observer, PartialObserver, TeardownLogic } from './types';
|
---|
| 4 | import { Subscription } from './Subscription';
|
---|
| 5 | import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
|
---|
| 6 | import { config } from './config';
|
---|
| 7 | import { hostReportError } from './util/hostReportError';
|
---|
| 8 |
|
---|
| 9 | /**
|
---|
| 10 | * Implements the {@link Observer} interface and extends the
|
---|
| 11 | * {@link Subscription} class. While the {@link Observer} is the public API for
|
---|
| 12 | * consuming the values of an {@link Observable}, all Observers get converted to
|
---|
| 13 | * a Subscriber, in order to provide Subscription-like capabilities such as
|
---|
| 14 | * `unsubscribe`. Subscriber is a common type in RxJS, and crucial for
|
---|
| 15 | * implementing operators, but it is rarely used as a public API.
|
---|
| 16 | *
|
---|
| 17 | * @class Subscriber<T>
|
---|
| 18 | */
|
---|
| 19 | export class Subscriber<T> extends Subscription implements Observer<T> {
|
---|
| 20 |
|
---|
| 21 | [rxSubscriberSymbol]() { return this; }
|
---|
| 22 |
|
---|
| 23 | /**
|
---|
| 24 | * A static factory for a Subscriber, given a (potentially partial) definition
|
---|
| 25 | * of an Observer.
|
---|
| 26 | * @param {function(x: ?T): void} [next] The `next` callback of an Observer.
|
---|
| 27 | * @param {function(e: ?any): void} [error] The `error` callback of an
|
---|
| 28 | * Observer.
|
---|
| 29 | * @param {function(): void} [complete] The `complete` callback of an
|
---|
| 30 | * Observer.
|
---|
| 31 | * @return {Subscriber<T>} A Subscriber wrapping the (partially defined)
|
---|
| 32 | * Observer represented by the given arguments.
|
---|
| 33 | * @nocollapse
|
---|
| 34 | */
|
---|
| 35 | static create<T>(next?: (x?: T) => void,
|
---|
| 36 | error?: (e?: any) => void,
|
---|
| 37 | complete?: () => void): Subscriber<T> {
|
---|
| 38 | const subscriber = new Subscriber(next, error, complete);
|
---|
| 39 | subscriber.syncErrorThrowable = false;
|
---|
| 40 | return subscriber;
|
---|
| 41 | }
|
---|
| 42 |
|
---|
| 43 | /** @internal */ syncErrorValue: any = null;
|
---|
| 44 | /** @internal */ syncErrorThrown: boolean = false;
|
---|
| 45 | /** @internal */ syncErrorThrowable: boolean = false;
|
---|
| 46 |
|
---|
| 47 | protected isStopped: boolean = false;
|
---|
| 48 | protected destination: PartialObserver<any> | Subscriber<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
|
---|
| 49 |
|
---|
| 50 | /**
|
---|
| 51 | * @param {Observer|function(value: T): void} [destinationOrNext] A partially
|
---|
| 52 | * defined Observer or a `next` callback function.
|
---|
| 53 | * @param {function(e: ?any): void} [error] The `error` callback of an
|
---|
| 54 | * Observer.
|
---|
| 55 | * @param {function(): void} [complete] The `complete` callback of an
|
---|
| 56 | * Observer.
|
---|
| 57 | */
|
---|
| 58 | constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void),
|
---|
| 59 | error?: (e?: any) => void,
|
---|
| 60 | complete?: () => void) {
|
---|
| 61 | super();
|
---|
| 62 |
|
---|
| 63 | switch (arguments.length) {
|
---|
| 64 | case 0:
|
---|
| 65 | this.destination = emptyObserver;
|
---|
| 66 | break;
|
---|
| 67 | case 1:
|
---|
| 68 | if (!destinationOrNext) {
|
---|
| 69 | this.destination = emptyObserver;
|
---|
| 70 | break;
|
---|
| 71 | }
|
---|
| 72 | if (typeof destinationOrNext === 'object') {
|
---|
| 73 | if (destinationOrNext instanceof Subscriber) {
|
---|
| 74 | this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
|
---|
| 75 | this.destination = destinationOrNext;
|
---|
| 76 | destinationOrNext.add(this);
|
---|
| 77 | } else {
|
---|
| 78 | this.syncErrorThrowable = true;
|
---|
| 79 | this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
|
---|
| 80 | }
|
---|
| 81 | break;
|
---|
| 82 | }
|
---|
| 83 | default:
|
---|
| 84 | this.syncErrorThrowable = true;
|
---|
| 85 | this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
|
---|
| 86 | break;
|
---|
| 87 | }
|
---|
| 88 | }
|
---|
| 89 |
|
---|
| 90 | /**
|
---|
| 91 | * The {@link Observer} callback to receive notifications of type `next` from
|
---|
| 92 | * the Observable, with a value. The Observable may call this method 0 or more
|
---|
| 93 | * times.
|
---|
| 94 | * @param {T} [value] The `next` value.
|
---|
| 95 | * @return {void}
|
---|
| 96 | */
|
---|
| 97 | next(value?: T): void {
|
---|
| 98 | if (!this.isStopped) {
|
---|
| 99 | this._next(value);
|
---|
| 100 | }
|
---|
| 101 | }
|
---|
| 102 |
|
---|
| 103 | /**
|
---|
| 104 | * The {@link Observer} callback to receive notifications of type `error` from
|
---|
| 105 | * the Observable, with an attached `Error`. Notifies the Observer that
|
---|
| 106 | * the Observable has experienced an error condition.
|
---|
| 107 | * @param {any} [err] The `error` exception.
|
---|
| 108 | * @return {void}
|
---|
| 109 | */
|
---|
| 110 | error(err?: any): void {
|
---|
| 111 | if (!this.isStopped) {
|
---|
| 112 | this.isStopped = true;
|
---|
| 113 | this._error(err);
|
---|
| 114 | }
|
---|
| 115 | }
|
---|
| 116 |
|
---|
| 117 | /**
|
---|
| 118 | * The {@link Observer} callback to receive a valueless notification of type
|
---|
| 119 | * `complete` from the Observable. Notifies the Observer that the Observable
|
---|
| 120 | * has finished sending push-based notifications.
|
---|
| 121 | * @return {void}
|
---|
| 122 | */
|
---|
| 123 | complete(): void {
|
---|
| 124 | if (!this.isStopped) {
|
---|
| 125 | this.isStopped = true;
|
---|
| 126 | this._complete();
|
---|
| 127 | }
|
---|
| 128 | }
|
---|
| 129 |
|
---|
| 130 | unsubscribe(): void {
|
---|
| 131 | if (this.closed) {
|
---|
| 132 | return;
|
---|
| 133 | }
|
---|
| 134 | this.isStopped = true;
|
---|
| 135 | super.unsubscribe();
|
---|
| 136 | }
|
---|
| 137 |
|
---|
| 138 | protected _next(value: T): void {
|
---|
| 139 | this.destination.next(value);
|
---|
| 140 | }
|
---|
| 141 |
|
---|
| 142 | protected _error(err: any): void {
|
---|
| 143 | this.destination.error(err);
|
---|
| 144 | this.unsubscribe();
|
---|
| 145 | }
|
---|
| 146 |
|
---|
| 147 | protected _complete(): void {
|
---|
| 148 | this.destination.complete();
|
---|
| 149 | this.unsubscribe();
|
---|
| 150 | }
|
---|
| 151 |
|
---|
| 152 | /** @deprecated This is an internal implementation detail, do not use. */
|
---|
| 153 | _unsubscribeAndRecycle(): Subscriber<T> {
|
---|
| 154 | const { _parentOrParents } = this;
|
---|
| 155 | this._parentOrParents = null;
|
---|
| 156 | this.unsubscribe();
|
---|
| 157 | this.closed = false;
|
---|
| 158 | this.isStopped = false;
|
---|
| 159 | this._parentOrParents = _parentOrParents;
|
---|
| 160 | return this;
|
---|
| 161 | }
|
---|
| 162 | }
|
---|
| 163 |
|
---|
| 164 | /**
|
---|
| 165 | * We need this JSDoc comment for affecting ESDoc.
|
---|
| 166 | * @ignore
|
---|
| 167 | * @extends {Ignored}
|
---|
| 168 | */
|
---|
| 169 | export class SafeSubscriber<T> extends Subscriber<T> {
|
---|
| 170 |
|
---|
| 171 | private _context: any;
|
---|
| 172 |
|
---|
| 173 | constructor(private _parentSubscriber: Subscriber<T>,
|
---|
| 174 | observerOrNext?: PartialObserver<T> | ((value: T) => void),
|
---|
| 175 | error?: (e?: any) => void,
|
---|
| 176 | complete?: () => void) {
|
---|
| 177 | super();
|
---|
| 178 |
|
---|
| 179 | let next: ((value: T) => void);
|
---|
| 180 | let context: any = this;
|
---|
| 181 |
|
---|
| 182 | if (isFunction(observerOrNext)) {
|
---|
| 183 | next = (<((value: T) => void)> observerOrNext);
|
---|
| 184 | } else if (observerOrNext) {
|
---|
| 185 | next = (<PartialObserver<T>> observerOrNext).next;
|
---|
| 186 | error = (<PartialObserver<T>> observerOrNext).error;
|
---|
| 187 | complete = (<PartialObserver<T>> observerOrNext).complete;
|
---|
| 188 | if (observerOrNext !== emptyObserver) {
|
---|
| 189 | context = Object.create(observerOrNext);
|
---|
| 190 | if (isFunction(context.unsubscribe)) {
|
---|
| 191 | this.add(<() => void> context.unsubscribe.bind(context));
|
---|
| 192 | }
|
---|
| 193 | context.unsubscribe = this.unsubscribe.bind(this);
|
---|
| 194 | }
|
---|
| 195 | }
|
---|
| 196 |
|
---|
| 197 | this._context = context;
|
---|
| 198 | this._next = next;
|
---|
| 199 | this._error = error;
|
---|
| 200 | this._complete = complete;
|
---|
| 201 | }
|
---|
| 202 |
|
---|
| 203 | next(value?: T): void {
|
---|
| 204 | if (!this.isStopped && this._next) {
|
---|
| 205 | const { _parentSubscriber } = this;
|
---|
| 206 | if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
|
---|
| 207 | this.__tryOrUnsub(this._next, value);
|
---|
| 208 | } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
|
---|
| 209 | this.unsubscribe();
|
---|
| 210 | }
|
---|
| 211 | }
|
---|
| 212 | }
|
---|
| 213 |
|
---|
| 214 | error(err?: any): void {
|
---|
| 215 | if (!this.isStopped) {
|
---|
| 216 | const { _parentSubscriber } = this;
|
---|
| 217 | const { useDeprecatedSynchronousErrorHandling } = config;
|
---|
| 218 | if (this._error) {
|
---|
| 219 | if (!useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
|
---|
| 220 | this.__tryOrUnsub(this._error, err);
|
---|
| 221 | this.unsubscribe();
|
---|
| 222 | } else {
|
---|
| 223 | this.__tryOrSetError(_parentSubscriber, this._error, err);
|
---|
| 224 | this.unsubscribe();
|
---|
| 225 | }
|
---|
| 226 | } else if (!_parentSubscriber.syncErrorThrowable) {
|
---|
| 227 | this.unsubscribe();
|
---|
| 228 | if (useDeprecatedSynchronousErrorHandling) {
|
---|
| 229 | throw err;
|
---|
| 230 | }
|
---|
| 231 | hostReportError(err);
|
---|
| 232 | } else {
|
---|
| 233 | if (useDeprecatedSynchronousErrorHandling) {
|
---|
| 234 | _parentSubscriber.syncErrorValue = err;
|
---|
| 235 | _parentSubscriber.syncErrorThrown = true;
|
---|
| 236 | } else {
|
---|
| 237 | hostReportError(err);
|
---|
| 238 | }
|
---|
| 239 | this.unsubscribe();
|
---|
| 240 | }
|
---|
| 241 | }
|
---|
| 242 | }
|
---|
| 243 |
|
---|
| 244 | complete(): void {
|
---|
| 245 | if (!this.isStopped) {
|
---|
| 246 | const { _parentSubscriber } = this;
|
---|
| 247 | if (this._complete) {
|
---|
| 248 | const wrappedComplete = () => this._complete.call(this._context);
|
---|
| 249 |
|
---|
| 250 | if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
|
---|
| 251 | this.__tryOrUnsub(wrappedComplete);
|
---|
| 252 | this.unsubscribe();
|
---|
| 253 | } else {
|
---|
| 254 | this.__tryOrSetError(_parentSubscriber, wrappedComplete);
|
---|
| 255 | this.unsubscribe();
|
---|
| 256 | }
|
---|
| 257 | } else {
|
---|
| 258 | this.unsubscribe();
|
---|
| 259 | }
|
---|
| 260 | }
|
---|
| 261 | }
|
---|
| 262 |
|
---|
| 263 | private __tryOrUnsub(fn: Function, value?: any): void {
|
---|
| 264 | try {
|
---|
| 265 | fn.call(this._context, value);
|
---|
| 266 | } catch (err) {
|
---|
| 267 | this.unsubscribe();
|
---|
| 268 | if (config.useDeprecatedSynchronousErrorHandling) {
|
---|
| 269 | throw err;
|
---|
| 270 | } else {
|
---|
| 271 | hostReportError(err);
|
---|
| 272 | }
|
---|
| 273 | }
|
---|
| 274 | }
|
---|
| 275 |
|
---|
| 276 | private __tryOrSetError(parent: Subscriber<T>, fn: Function, value?: any): boolean {
|
---|
| 277 | if (!config.useDeprecatedSynchronousErrorHandling) {
|
---|
| 278 | throw new Error('bad call');
|
---|
| 279 | }
|
---|
| 280 | try {
|
---|
| 281 | fn.call(this._context, value);
|
---|
| 282 | } catch (err) {
|
---|
| 283 | if (config.useDeprecatedSynchronousErrorHandling) {
|
---|
| 284 | parent.syncErrorValue = err;
|
---|
| 285 | parent.syncErrorThrown = true;
|
---|
| 286 | return true;
|
---|
| 287 | } else {
|
---|
| 288 | hostReportError(err);
|
---|
| 289 | return true;
|
---|
| 290 | }
|
---|
| 291 | }
|
---|
| 292 | return false;
|
---|
| 293 | }
|
---|
| 294 |
|
---|
| 295 | /** @internal This is an internal implementation detail, do not use. */
|
---|
| 296 | _unsubscribe(): void {
|
---|
| 297 | const { _parentSubscriber } = this;
|
---|
| 298 | this._context = null;
|
---|
| 299 | this._parentSubscriber = null;
|
---|
| 300 | _parentSubscriber.unsubscribe();
|
---|
| 301 | }
|
---|
| 302 | }
|
---|