[6a3a178] | 1 | import { Operator } from '../Operator';
|
---|
| 2 | import { Subscriber } from '../Subscriber';
|
---|
| 3 | import { Observable } from '../Observable';
|
---|
| 4 |
|
---|
| 5 | import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
|
---|
| 6 |
|
---|
| 7 | /**
|
---|
| 8 | * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
|
---|
| 9 | * calls `error`, this method will resubscribe to the source Observable for a maximum of `count` resubscriptions (given
|
---|
| 10 | * as a number parameter) rather than propagating the `error` call.
|
---|
| 11 | *
|
---|
| 12 | * ![](retry.png)
|
---|
| 13 | *
|
---|
| 14 | * Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted
|
---|
| 15 | * during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second
|
---|
| 16 | * time and emits: [1, 2, 3, 4, 5] then the complete stream of emissions and notifications
|
---|
| 17 | * would be: [1, 2, 1, 2, 3, 4, 5, `complete`].
|
---|
| 18 | *
|
---|
| 19 | * ## Example
|
---|
| 20 | * ```ts
|
---|
| 21 | * import { interval, of, throwError } from 'rxjs';
|
---|
| 22 | * import { mergeMap, retry } from 'rxjs/operators';
|
---|
| 23 | *
|
---|
| 24 | * const source = interval(1000);
|
---|
| 25 | * const example = source.pipe(
|
---|
| 26 | * mergeMap(val => {
|
---|
| 27 | * if(val > 5){
|
---|
| 28 | * return throwError('Error!');
|
---|
| 29 | * }
|
---|
| 30 | * return of(val);
|
---|
| 31 | * }),
|
---|
| 32 | * //retry 2 times on error
|
---|
| 33 | * retry(2)
|
---|
| 34 | * );
|
---|
| 35 | *
|
---|
| 36 | * const subscribe = example.subscribe({
|
---|
| 37 | * next: val => console.log(val),
|
---|
| 38 | * error: val => console.log(`${val}: Retried 2 times then quit!`)
|
---|
| 39 | * });
|
---|
| 40 | *
|
---|
| 41 | * // Output:
|
---|
| 42 | * // 0..1..2..3..4..5..
|
---|
| 43 | * // 0..1..2..3..4..5..
|
---|
| 44 | * // 0..1..2..3..4..5..
|
---|
| 45 | * // "Error!: Retried 2 times then quit!"
|
---|
| 46 | * ```
|
---|
| 47 | *
|
---|
| 48 | * @param {number} count - Number of retry attempts before failing.
|
---|
| 49 | * @return {Observable} The source Observable modified with the retry logic.
|
---|
| 50 | * @method retry
|
---|
| 51 | * @owner Observable
|
---|
| 52 | */
|
---|
| 53 | export function retry<T>(count: number = -1): MonoTypeOperatorFunction<T> {
|
---|
| 54 | return (source: Observable<T>) => source.lift(new RetryOperator(count, source));
|
---|
| 55 | }
|
---|
| 56 |
|
---|
| 57 | class RetryOperator<T> implements Operator<T, T> {
|
---|
| 58 | constructor(private count: number,
|
---|
| 59 | private source: Observable<T>) {
|
---|
| 60 | }
|
---|
| 61 |
|
---|
| 62 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
| 63 | return source.subscribe(new RetrySubscriber(subscriber, this.count, this.source));
|
---|
| 64 | }
|
---|
| 65 | }
|
---|
| 66 |
|
---|
| 67 | /**
|
---|
| 68 | * We need this JSDoc comment for affecting ESDoc.
|
---|
| 69 | * @ignore
|
---|
| 70 | * @extends {Ignored}
|
---|
| 71 | */
|
---|
| 72 | class RetrySubscriber<T> extends Subscriber<T> {
|
---|
| 73 | constructor(destination: Subscriber<any>,
|
---|
| 74 | private count: number,
|
---|
| 75 | private source: Observable<T>) {
|
---|
| 76 | super(destination);
|
---|
| 77 | }
|
---|
| 78 | error(err: any) {
|
---|
| 79 | if (!this.isStopped) {
|
---|
| 80 | const { source, count } = this;
|
---|
| 81 | if (count === 0) {
|
---|
| 82 | return super.error(err);
|
---|
| 83 | } else if (count > -1) {
|
---|
| 84 | this.count = count - 1;
|
---|
| 85 | }
|
---|
| 86 | source.subscribe(this._unsubscribeAndRecycle());
|
---|
| 87 | }
|
---|
| 88 | }
|
---|
| 89 | }
|
---|