1 | import { Operator } from '../Operator';
|
---|
2 | import { Subscriber } from '../Subscriber';
|
---|
3 | import { Observable } from '../Observable';
|
---|
4 |
|
---|
5 | import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
|
---|
6 |
|
---|
7 | /**
|
---|
8 | * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
|
---|
9 | * calls `error`, this method will resubscribe to the source Observable for a maximum of `count` resubscriptions (given
|
---|
10 | * as a number parameter) rather than propagating the `error` call.
|
---|
11 | *
|
---|
12 | * ![](retry.png)
|
---|
13 | *
|
---|
14 | * Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted
|
---|
15 | * during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second
|
---|
16 | * time and emits: [1, 2, 3, 4, 5] then the complete stream of emissions and notifications
|
---|
17 | * would be: [1, 2, 1, 2, 3, 4, 5, `complete`].
|
---|
18 | *
|
---|
19 | * ## Example
|
---|
20 | * ```ts
|
---|
21 | * import { interval, of, throwError } from 'rxjs';
|
---|
22 | * import { mergeMap, retry } from 'rxjs/operators';
|
---|
23 | *
|
---|
24 | * const source = interval(1000);
|
---|
25 | * const example = source.pipe(
|
---|
26 | * mergeMap(val => {
|
---|
27 | * if(val > 5){
|
---|
28 | * return throwError('Error!');
|
---|
29 | * }
|
---|
30 | * return of(val);
|
---|
31 | * }),
|
---|
32 | * //retry 2 times on error
|
---|
33 | * retry(2)
|
---|
34 | * );
|
---|
35 | *
|
---|
36 | * const subscribe = example.subscribe({
|
---|
37 | * next: val => console.log(val),
|
---|
38 | * error: val => console.log(`${val}: Retried 2 times then quit!`)
|
---|
39 | * });
|
---|
40 | *
|
---|
41 | * // Output:
|
---|
42 | * // 0..1..2..3..4..5..
|
---|
43 | * // 0..1..2..3..4..5..
|
---|
44 | * // 0..1..2..3..4..5..
|
---|
45 | * // "Error!: Retried 2 times then quit!"
|
---|
46 | * ```
|
---|
47 | *
|
---|
48 | * @param {number} count - Number of retry attempts before failing.
|
---|
49 | * @return {Observable} The source Observable modified with the retry logic.
|
---|
50 | * @method retry
|
---|
51 | * @owner Observable
|
---|
52 | */
|
---|
53 | export function retry<T>(count: number = -1): MonoTypeOperatorFunction<T> {
|
---|
54 | return (source: Observable<T>) => source.lift(new RetryOperator(count, source));
|
---|
55 | }
|
---|
56 |
|
---|
57 | class RetryOperator<T> implements Operator<T, T> {
|
---|
58 | constructor(private count: number,
|
---|
59 | private source: Observable<T>) {
|
---|
60 | }
|
---|
61 |
|
---|
62 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
63 | return source.subscribe(new RetrySubscriber(subscriber, this.count, this.source));
|
---|
64 | }
|
---|
65 | }
|
---|
66 |
|
---|
67 | /**
|
---|
68 | * We need this JSDoc comment for affecting ESDoc.
|
---|
69 | * @ignore
|
---|
70 | * @extends {Ignored}
|
---|
71 | */
|
---|
72 | class RetrySubscriber<T> extends Subscriber<T> {
|
---|
73 | constructor(destination: Subscriber<any>,
|
---|
74 | private count: number,
|
---|
75 | private source: Observable<T>) {
|
---|
76 | super(destination);
|
---|
77 | }
|
---|
78 | error(err: any) {
|
---|
79 | if (!this.isStopped) {
|
---|
80 | const { source, count } = this;
|
---|
81 | if (count === 0) {
|
---|
82 | return super.error(err);
|
---|
83 | } else if (count > -1) {
|
---|
84 | this.count = count - 1;
|
---|
85 | }
|
---|
86 | source.subscribe(this._unsubscribeAndRecycle());
|
---|
87 | }
|
---|
88 | }
|
---|
89 | }
|
---|