source: trip-planner-front/node_modules/rxjs/src/internal/operators/onErrorResumeNext.ts@ 6a3a178

Last change on this file since 6a3a178 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 8.2 KB
Line 
1import { Observable } from '../Observable';
2import { from } from '../observable/from';
3import { Operator } from '../Operator';
4import { Subscriber } from '../Subscriber';
5import { Subscription } from '../Subscription';
6import { isArray } from '../util/isArray';
7import { ObservableInput, OperatorFunction } from '../types';
8import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
9
10/* tslint:disable:max-line-length */
11export function onErrorResumeNext<T>(): OperatorFunction<T, T>;
12export function onErrorResumeNext<T, T2>(v: ObservableInput<T2>): OperatorFunction<T, T | T2>;
13export function onErrorResumeNext<T, T2, T3>(v: ObservableInput<T2>, v2: ObservableInput<T3>): OperatorFunction<T, T | T2 | T3>;
14export function onErrorResumeNext<T, T2, T3, T4>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>): OperatorFunction<T, T | T2 | T3 | T4>;
15export function onErrorResumeNext<T, T2, T3, T4, T5>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>): OperatorFunction<T, T | T2 | T3 | T4 | T5>;
16export function onErrorResumeNext<T, T2, T3, T4, T5, T6>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>, v5: ObservableInput<T6>): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6>;
17export function onErrorResumeNext<T, T2, T3, T4, T5, T6, T7>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>, v5: ObservableInput<T6>, v6: ObservableInput<T7>): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6 | T7>;
18export function onErrorResumeNext<T, R>(...observables: Array<ObservableInput<any>>): OperatorFunction<T, T | R>;
19export function onErrorResumeNext<T, R>(array: ObservableInput<any>[]): OperatorFunction<T, T | R>;
20/* tslint:enable:max-line-length */
21
22/**
23 * When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one
24 * that was passed.
25 *
26 * <span class="informal">Execute series of Observables no matter what, even if it means swallowing errors.</span>
27 *
28 * ![](onErrorResumeNext.png)
29 *
30 * `onErrorResumeNext` is an operator that accepts a series of Observables, provided either directly as
31 * arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
32 * as the source.
33 *
34 * `onErrorResumeNext` returns an Observable that starts by subscribing and re-emitting values from the source Observable.
35 * When its stream of values ends - no matter if Observable completed or emitted an error - `onErrorResumeNext`
36 * will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
37 * its values as well and - again - when that stream ends, `onErrorResumeNext` will proceed to subscribing yet another
38 * Observable in provided series, no matter if previous Observable completed or ended with an error. This will
39 * be happening until there is no more Observables left in the series, at which point returned Observable will
40 * complete - even if the last subscribed stream ended with an error.
41 *
42 * `onErrorResumeNext` can be therefore thought of as version of {@link concat} operator, which is more permissive
43 * when it comes to the errors emitted by its input Observables. While `concat` subscribes to the next Observable
44 * in series only if previous one successfully completed, `onErrorResumeNext` subscribes even if it ended with
45 * an error.
46 *
47 * Note that you do not get any access to errors emitted by the Observables. In particular do not
48 * expect these errors to appear in error callback passed to {@link Observable#subscribe}. If you want to take
49 * specific actions based on what error was emitted by an Observable, you should try out {@link catchError} instead.
50 *
51 *
52 * ## Example
53 * Subscribe to the next Observable after map fails
54 * ```ts
55 * import { of } from 'rxjs';
56 * import { onErrorResumeNext, map } from 'rxjs/operators';
57 *
58 * of(1, 2, 3, 0).pipe(
59 * map(x => {
60 * if (x === 0) { throw Error(); }
61 * return 10 / x;
62 * }),
63 * onErrorResumeNext(of(1, 2, 3)),
64 * )
65 * .subscribe(
66 * val => console.log(val),
67 * err => console.log(err), // Will never be called.
68 * () => console.log('that\'s it!')
69 * );
70 *
71 * // Logs:
72 * // 10
73 * // 5
74 * // 3.3333333333333335
75 * // 1
76 * // 2
77 * // 3
78 * // "that's it!"
79 * ```
80 *
81 * @see {@link concat}
82 * @see {@link catchError}
83 *
84 * @param {...ObservableInput} observables Observables passed either directly or as an array.
85 * @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes
86 * to the next passed Observable and so on, until it completes or runs out of Observables.
87 * @method onErrorResumeNext
88 * @owner Observable
89 */
90
91export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
92 Array<ObservableInput<any>>>): OperatorFunction<T, R> {
93 if (nextSources.length === 1 && isArray(nextSources[0])) {
94 nextSources = <Array<Observable<any>>>nextSources[0];
95 }
96
97 return (source: Observable<T>) => source.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
98}
99
100/* tslint:disable:max-line-length */
101export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>;
102export function onErrorResumeNextStatic<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
103export function onErrorResumeNextStatic<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
104export function onErrorResumeNextStatic<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
105export function onErrorResumeNextStatic<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;
106
107export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
108export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>;
109/* tslint:enable:max-line-length */
110
111export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> |
112 Array<ObservableInput<any>> |
113 ((...values: Array<any>) => R)>): Observable<R> {
114 let source: ObservableInput<any>|undefined = undefined;
115
116 if (nextSources.length === 1 && isArray(nextSources[0])) {
117 nextSources = nextSources[0] as ObservableInput<any>[];
118 }
119 // TODO: resolve issue with passing no arguments.
120 source = nextSources.shift()!;
121
122 return from(source).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
123}
124
125class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
126 constructor(private nextSources: Array<ObservableInput<any>>) {
127 }
128
129 call(subscriber: Subscriber<R>, source: any): any {
130 return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
131 }
132}
133
134class OnErrorResumeNextSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
135 constructor(protected destination: Subscriber<T>,
136 private nextSources: Array<ObservableInput<any>>) {
137 super(destination);
138 }
139
140 notifyError(): void {
141 this.subscribeToNextSource();
142 }
143
144 notifyComplete(): void {
145 this.subscribeToNextSource();
146 }
147
148 protected _error(err: any): void {
149 this.subscribeToNextSource();
150 this.unsubscribe();
151 }
152
153 protected _complete(): void {
154 this.subscribeToNextSource();
155 this.unsubscribe();
156 }
157
158 private subscribeToNextSource(): void {
159 const next = this.nextSources.shift();
160 if (!!next) {
161 const innerSubscriber = new SimpleInnerSubscriber(this);
162 const destination = this.destination as Subscription;
163 destination.add(innerSubscriber);
164 const innerSubscription = innerSubscribe(next, innerSubscriber);
165 // The returned subscription will usually be the subscriber that was
166 // passed. However, interop subscribers will be wrapped and for
167 // unsubscriptions to chain correctly, the wrapper needs to be added, too.
168 if (innerSubscription !== innerSubscriber) {
169 destination.add(innerSubscription);
170 }
171 } else {
172 this.destination.complete();
173 }
174 }
175}
Note: See TracBrowser for help on using the repository browser.