1 | import { Operator } from '../Operator';
|
---|
2 | import { Subscriber } from '../Subscriber';
|
---|
3 | import { Observable } from '../Observable';
|
---|
4 | import { Subject } from '../Subject';
|
---|
5 | import { Subscription } from '../Subscription';
|
---|
6 |
|
---|
7 | import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
|
---|
8 | import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
|
---|
9 |
|
---|
10 | /**
|
---|
11 | * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
|
---|
12 | * calls `error`, this method will emit the Throwable that caused the error to the Observable returned from `notifier`.
|
---|
13 | * If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child
|
---|
14 | * subscription. Otherwise this method will resubscribe to the source Observable.
|
---|
15 | *
|
---|
16 | * ![](retryWhen.png)
|
---|
17 | *
|
---|
18 | * @param {function(errors: Observable): Observable} notifier - Receives an Observable of notifications with which a
|
---|
19 | * user can `complete` or `error`, aborting the retry.
|
---|
20 | * @return {Observable} The source Observable modified with retry logic.
|
---|
21 | * @method retryWhen
|
---|
22 | * @owner Observable
|
---|
23 | */
|
---|
24 | export function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T> {
|
---|
25 | return (source: Observable<T>) => source.lift(new RetryWhenOperator(notifier, source));
|
---|
26 | }
|
---|
27 |
|
---|
28 | class RetryWhenOperator<T> implements Operator<T, T> {
|
---|
29 | constructor(protected notifier: (errors: Observable<any>) => Observable<any>,
|
---|
30 | protected source: Observable<T>) {
|
---|
31 | }
|
---|
32 |
|
---|
33 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
34 | return source.subscribe(new RetryWhenSubscriber(subscriber, this.notifier, this.source));
|
---|
35 | }
|
---|
36 | }
|
---|
37 |
|
---|
38 | /**
|
---|
39 | * We need this JSDoc comment for affecting ESDoc.
|
---|
40 | * @ignore
|
---|
41 | * @extends {Ignored}
|
---|
42 | */
|
---|
43 | class RetryWhenSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
|
---|
44 |
|
---|
45 | private errors?: Subject<any>;
|
---|
46 | private retries?: Observable<any>;
|
---|
47 | private retriesSubscription?: Subscription;
|
---|
48 |
|
---|
49 | constructor(destination: Subscriber<R>,
|
---|
50 | private notifier: (errors: Observable<any>) => Observable<any>,
|
---|
51 | private source: Observable<T>) {
|
---|
52 | super(destination);
|
---|
53 | }
|
---|
54 |
|
---|
55 | error(err: any) {
|
---|
56 | if (!this.isStopped) {
|
---|
57 |
|
---|
58 | let errors = this.errors;
|
---|
59 | let retries: any = this.retries;
|
---|
60 | let retriesSubscription = this.retriesSubscription;
|
---|
61 |
|
---|
62 | if (!retries) {
|
---|
63 | errors = new Subject();
|
---|
64 | try {
|
---|
65 | const { notifier } = this;
|
---|
66 | retries = notifier(errors);
|
---|
67 | } catch (e) {
|
---|
68 | return super.error(e);
|
---|
69 | }
|
---|
70 | retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this));
|
---|
71 | } else {
|
---|
72 | this.errors = undefined;
|
---|
73 | this.retriesSubscription = undefined;
|
---|
74 | }
|
---|
75 |
|
---|
76 | this._unsubscribeAndRecycle();
|
---|
77 |
|
---|
78 | this.errors = errors;
|
---|
79 | this.retries = retries;
|
---|
80 | this.retriesSubscription = retriesSubscription;
|
---|
81 |
|
---|
82 | errors!.next(err);
|
---|
83 | }
|
---|
84 | }
|
---|
85 |
|
---|
86 | /** @deprecated This is an internal implementation detail, do not use. */
|
---|
87 | _unsubscribe() {
|
---|
88 | const { errors, retriesSubscription } = this;
|
---|
89 | if (errors) {
|
---|
90 | errors.unsubscribe();
|
---|
91 | this.errors = undefined;
|
---|
92 | }
|
---|
93 | if (retriesSubscription) {
|
---|
94 | retriesSubscription.unsubscribe();
|
---|
95 | this.retriesSubscription = undefined;
|
---|
96 | }
|
---|
97 | this.retries = undefined;
|
---|
98 | }
|
---|
99 |
|
---|
100 | notifyNext(): void {
|
---|
101 | const { _unsubscribe } = this;
|
---|
102 |
|
---|
103 | this._unsubscribe = null!;
|
---|
104 | this._unsubscribeAndRecycle();
|
---|
105 | this._unsubscribe = _unsubscribe;
|
---|
106 |
|
---|
107 | this.source.subscribe(this);
|
---|
108 | }
|
---|
109 | }
|
---|