1 | import { Observable } from '../Observable';
|
---|
2 | import { from } from '../observable/from';
|
---|
3 | import { Operator } from '../Operator';
|
---|
4 | import { Subscriber } from '../Subscriber';
|
---|
5 | import { Subscription } from '../Subscription';
|
---|
6 | import { isArray } from '../util/isArray';
|
---|
7 | import { ObservableInput, OperatorFunction } from '../types';
|
---|
8 | import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
|
---|
9 |
|
---|
10 | /* tslint:disable:max-line-length */
|
---|
11 | export function onErrorResumeNext<T>(): OperatorFunction<T, T>;
|
---|
12 | export function onErrorResumeNext<T, T2>(v: ObservableInput<T2>): OperatorFunction<T, T | T2>;
|
---|
13 | export function onErrorResumeNext<T, T2, T3>(v: ObservableInput<T2>, v2: ObservableInput<T3>): OperatorFunction<T, T | T2 | T3>;
|
---|
14 | export function onErrorResumeNext<T, T2, T3, T4>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>): OperatorFunction<T, T | T2 | T3 | T4>;
|
---|
15 | export function onErrorResumeNext<T, T2, T3, T4, T5>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>): OperatorFunction<T, T | T2 | T3 | T4 | T5>;
|
---|
16 | export function onErrorResumeNext<T, T2, T3, T4, T5, T6>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>, v5: ObservableInput<T6>): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6>;
|
---|
17 | export function onErrorResumeNext<T, T2, T3, T4, T5, T6, T7>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>, v5: ObservableInput<T6>, v6: ObservableInput<T7>): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6 | T7>;
|
---|
18 | export function onErrorResumeNext<T, R>(...observables: Array<ObservableInput<any>>): OperatorFunction<T, T | R>;
|
---|
19 | export function onErrorResumeNext<T, R>(array: ObservableInput<any>[]): OperatorFunction<T, T | R>;
|
---|
20 | /* tslint:enable:max-line-length */
|
---|
21 |
|
---|
22 | /**
|
---|
23 | * When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one
|
---|
24 | * that was passed.
|
---|
25 | *
|
---|
26 | * <span class="informal">Execute series of Observables no matter what, even if it means swallowing errors.</span>
|
---|
27 | *
|
---|
28 | * ![](onErrorResumeNext.png)
|
---|
29 | *
|
---|
30 | * `onErrorResumeNext` is an operator that accepts a series of Observables, provided either directly as
|
---|
31 | * arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
|
---|
32 | * as the source.
|
---|
33 | *
|
---|
34 | * `onErrorResumeNext` returns an Observable that starts by subscribing and re-emitting values from the source Observable.
|
---|
35 | * When its stream of values ends - no matter if Observable completed or emitted an error - `onErrorResumeNext`
|
---|
36 | * will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
|
---|
37 | * its values as well and - again - when that stream ends, `onErrorResumeNext` will proceed to subscribing yet another
|
---|
38 | * Observable in provided series, no matter if previous Observable completed or ended with an error. This will
|
---|
39 | * be happening until there is no more Observables left in the series, at which point returned Observable will
|
---|
40 | * complete - even if the last subscribed stream ended with an error.
|
---|
41 | *
|
---|
42 | * `onErrorResumeNext` can be therefore thought of as version of {@link concat} operator, which is more permissive
|
---|
43 | * when it comes to the errors emitted by its input Observables. While `concat` subscribes to the next Observable
|
---|
44 | * in series only if previous one successfully completed, `onErrorResumeNext` subscribes even if it ended with
|
---|
45 | * an error.
|
---|
46 | *
|
---|
47 | * Note that you do not get any access to errors emitted by the Observables. In particular do not
|
---|
48 | * expect these errors to appear in error callback passed to {@link Observable#subscribe}. If you want to take
|
---|
49 | * specific actions based on what error was emitted by an Observable, you should try out {@link catchError} instead.
|
---|
50 | *
|
---|
51 | *
|
---|
52 | * ## Example
|
---|
53 | * Subscribe to the next Observable after map fails
|
---|
54 | * ```ts
|
---|
55 | * import { of } from 'rxjs';
|
---|
56 | * import { onErrorResumeNext, map } from 'rxjs/operators';
|
---|
57 | *
|
---|
58 | * of(1, 2, 3, 0).pipe(
|
---|
59 | * map(x => {
|
---|
60 | * if (x === 0) { throw Error(); }
|
---|
61 | * return 10 / x;
|
---|
62 | * }),
|
---|
63 | * onErrorResumeNext(of(1, 2, 3)),
|
---|
64 | * )
|
---|
65 | * .subscribe(
|
---|
66 | * val => console.log(val),
|
---|
67 | * err => console.log(err), // Will never be called.
|
---|
68 | * () => console.log('that\'s it!')
|
---|
69 | * );
|
---|
70 | *
|
---|
71 | * // Logs:
|
---|
72 | * // 10
|
---|
73 | * // 5
|
---|
74 | * // 3.3333333333333335
|
---|
75 | * // 1
|
---|
76 | * // 2
|
---|
77 | * // 3
|
---|
78 | * // "that's it!"
|
---|
79 | * ```
|
---|
80 | *
|
---|
81 | * @see {@link concat}
|
---|
82 | * @see {@link catchError}
|
---|
83 | *
|
---|
84 | * @param {...ObservableInput} observables Observables passed either directly or as an array.
|
---|
85 | * @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes
|
---|
86 | * to the next passed Observable and so on, until it completes or runs out of Observables.
|
---|
87 | * @method onErrorResumeNext
|
---|
88 | * @owner Observable
|
---|
89 | */
|
---|
90 |
|
---|
91 | export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
|
---|
92 | Array<ObservableInput<any>>>): OperatorFunction<T, R> {
|
---|
93 | if (nextSources.length === 1 && isArray(nextSources[0])) {
|
---|
94 | nextSources = <Array<Observable<any>>>nextSources[0];
|
---|
95 | }
|
---|
96 |
|
---|
97 | return (source: Observable<T>) => source.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
|
---|
98 | }
|
---|
99 |
|
---|
100 | /* tslint:disable:max-line-length */
|
---|
101 | export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>;
|
---|
102 | export function onErrorResumeNextStatic<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
|
---|
103 | export function onErrorResumeNextStatic<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
|
---|
104 | export function onErrorResumeNextStatic<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
|
---|
105 | export function onErrorResumeNextStatic<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;
|
---|
106 |
|
---|
107 | export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
|
---|
108 | export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>;
|
---|
109 | /* tslint:enable:max-line-length */
|
---|
110 |
|
---|
111 | export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> |
|
---|
112 | Array<ObservableInput<any>> |
|
---|
113 | ((...values: Array<any>) => R)>): Observable<R> {
|
---|
114 | let source: ObservableInput<any>|undefined = undefined;
|
---|
115 |
|
---|
116 | if (nextSources.length === 1 && isArray(nextSources[0])) {
|
---|
117 | nextSources = nextSources[0] as ObservableInput<any>[];
|
---|
118 | }
|
---|
119 | // TODO: resolve issue with passing no arguments.
|
---|
120 | source = nextSources.shift()!;
|
---|
121 |
|
---|
122 | return from(source).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
|
---|
123 | }
|
---|
124 |
|
---|
125 | class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
|
---|
126 | constructor(private nextSources: Array<ObservableInput<any>>) {
|
---|
127 | }
|
---|
128 |
|
---|
129 | call(subscriber: Subscriber<R>, source: any): any {
|
---|
130 | return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
|
---|
131 | }
|
---|
132 | }
|
---|
133 |
|
---|
134 | class OnErrorResumeNextSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
|
---|
135 | constructor(protected destination: Subscriber<T>,
|
---|
136 | private nextSources: Array<ObservableInput<any>>) {
|
---|
137 | super(destination);
|
---|
138 | }
|
---|
139 |
|
---|
140 | notifyError(): void {
|
---|
141 | this.subscribeToNextSource();
|
---|
142 | }
|
---|
143 |
|
---|
144 | notifyComplete(): void {
|
---|
145 | this.subscribeToNextSource();
|
---|
146 | }
|
---|
147 |
|
---|
148 | protected _error(err: any): void {
|
---|
149 | this.subscribeToNextSource();
|
---|
150 | this.unsubscribe();
|
---|
151 | }
|
---|
152 |
|
---|
153 | protected _complete(): void {
|
---|
154 | this.subscribeToNextSource();
|
---|
155 | this.unsubscribe();
|
---|
156 | }
|
---|
157 |
|
---|
158 | private subscribeToNextSource(): void {
|
---|
159 | const next = this.nextSources.shift();
|
---|
160 | if (!!next) {
|
---|
161 | const innerSubscriber = new SimpleInnerSubscriber(this);
|
---|
162 | const destination = this.destination as Subscription;
|
---|
163 | destination.add(innerSubscriber);
|
---|
164 | const innerSubscription = innerSubscribe(next, innerSubscriber);
|
---|
165 | // The returned subscription will usually be the subscriber that was
|
---|
166 | // passed. However, interop subscribers will be wrapped and for
|
---|
167 | // unsubscriptions to chain correctly, the wrapper needs to be added, too.
|
---|
168 | if (innerSubscription !== innerSubscriber) {
|
---|
169 | destination.add(innerSubscription);
|
---|
170 | }
|
---|
171 | } else {
|
---|
172 | this.destination.complete();
|
---|
173 | }
|
---|
174 | }
|
---|
175 | }
|
---|