[6a3a178] | 1 | import { Operator } from '../Operator';
|
---|
| 2 | import { Subscriber } from '../Subscriber';
|
---|
| 3 | import { Observable } from '../Observable';
|
---|
| 4 | import { Subject } from '../Subject';
|
---|
| 5 | import { Subscription } from '../Subscription';
|
---|
| 6 |
|
---|
| 7 | import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
|
---|
| 8 | import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
|
---|
| 9 |
|
---|
| 10 | /**
|
---|
| 11 | * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
|
---|
| 12 | * calls `error`, this method will emit the Throwable that caused the error to the Observable returned from `notifier`.
|
---|
| 13 | * If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child
|
---|
| 14 | * subscription. Otherwise this method will resubscribe to the source Observable.
|
---|
| 15 | *
|
---|
| 16 | * ![](retryWhen.png)
|
---|
| 17 | *
|
---|
| 18 | * @param {function(errors: Observable): Observable} notifier - Receives an Observable of notifications with which a
|
---|
| 19 | * user can `complete` or `error`, aborting the retry.
|
---|
| 20 | * @return {Observable} The source Observable modified with retry logic.
|
---|
| 21 | * @method retryWhen
|
---|
| 22 | * @owner Observable
|
---|
| 23 | */
|
---|
| 24 | export function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T> {
|
---|
| 25 | return (source: Observable<T>) => source.lift(new RetryWhenOperator(notifier, source));
|
---|
| 26 | }
|
---|
| 27 |
|
---|
| 28 | class RetryWhenOperator<T> implements Operator<T, T> {
|
---|
| 29 | constructor(protected notifier: (errors: Observable<any>) => Observable<any>,
|
---|
| 30 | protected source: Observable<T>) {
|
---|
| 31 | }
|
---|
| 32 |
|
---|
| 33 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
| 34 | return source.subscribe(new RetryWhenSubscriber(subscriber, this.notifier, this.source));
|
---|
| 35 | }
|
---|
| 36 | }
|
---|
| 37 |
|
---|
| 38 | /**
|
---|
| 39 | * We need this JSDoc comment for affecting ESDoc.
|
---|
| 40 | * @ignore
|
---|
| 41 | * @extends {Ignored}
|
---|
| 42 | */
|
---|
| 43 | class RetryWhenSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
|
---|
| 44 |
|
---|
| 45 | private errors?: Subject<any>;
|
---|
| 46 | private retries?: Observable<any>;
|
---|
| 47 | private retriesSubscription?: Subscription;
|
---|
| 48 |
|
---|
| 49 | constructor(destination: Subscriber<R>,
|
---|
| 50 | private notifier: (errors: Observable<any>) => Observable<any>,
|
---|
| 51 | private source: Observable<T>) {
|
---|
| 52 | super(destination);
|
---|
| 53 | }
|
---|
| 54 |
|
---|
| 55 | error(err: any) {
|
---|
| 56 | if (!this.isStopped) {
|
---|
| 57 |
|
---|
| 58 | let errors = this.errors;
|
---|
| 59 | let retries: any = this.retries;
|
---|
| 60 | let retriesSubscription = this.retriesSubscription;
|
---|
| 61 |
|
---|
| 62 | if (!retries) {
|
---|
| 63 | errors = new Subject();
|
---|
| 64 | try {
|
---|
| 65 | const { notifier } = this;
|
---|
| 66 | retries = notifier(errors);
|
---|
| 67 | } catch (e) {
|
---|
| 68 | return super.error(e);
|
---|
| 69 | }
|
---|
| 70 | retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this));
|
---|
| 71 | } else {
|
---|
| 72 | this.errors = undefined;
|
---|
| 73 | this.retriesSubscription = undefined;
|
---|
| 74 | }
|
---|
| 75 |
|
---|
| 76 | this._unsubscribeAndRecycle();
|
---|
| 77 |
|
---|
| 78 | this.errors = errors;
|
---|
| 79 | this.retries = retries;
|
---|
| 80 | this.retriesSubscription = retriesSubscription;
|
---|
| 81 |
|
---|
| 82 | errors!.next(err);
|
---|
| 83 | }
|
---|
| 84 | }
|
---|
| 85 |
|
---|
| 86 | /** @deprecated This is an internal implementation detail, do not use. */
|
---|
| 87 | _unsubscribe() {
|
---|
| 88 | const { errors, retriesSubscription } = this;
|
---|
| 89 | if (errors) {
|
---|
| 90 | errors.unsubscribe();
|
---|
| 91 | this.errors = undefined;
|
---|
| 92 | }
|
---|
| 93 | if (retriesSubscription) {
|
---|
| 94 | retriesSubscription.unsubscribe();
|
---|
| 95 | this.retriesSubscription = undefined;
|
---|
| 96 | }
|
---|
| 97 | this.retries = undefined;
|
---|
| 98 | }
|
---|
| 99 |
|
---|
| 100 | notifyNext(): void {
|
---|
| 101 | const { _unsubscribe } = this;
|
---|
| 102 |
|
---|
| 103 | this._unsubscribe = null!;
|
---|
| 104 | this._unsubscribeAndRecycle();
|
---|
| 105 | this._unsubscribe = _unsubscribe;
|
---|
| 106 |
|
---|
| 107 | this.source.subscribe(this);
|
---|
| 108 | }
|
---|
| 109 | }
|
---|