source: trip-planner-front/node_modules/rxjs/src/internal/operators/exhaust.ts@ 571e0df

Last change on this file since 571e0df was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 3.2 KB
Line 
1import { Operator } from '../Operator';
2import { Observable } from '../Observable';
3import { Subscriber } from '../Subscriber';
4import { ObservableInput, OperatorFunction, TeardownLogic } from '../types';
5import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
6
7export function exhaust<T>(): OperatorFunction<ObservableInput<T>, T>;
8export function exhaust<R>(): OperatorFunction<any, R>;
9
10/**
11 * Converts a higher-order Observable into a first-order Observable by dropping
12 * inner Observables while the previous inner Observable has not yet completed.
13 *
14 * <span class="informal">Flattens an Observable-of-Observables by dropping the
15 * next inner Observables while the current inner is still executing.</span>
16 *
17 * ![](exhaust.png)
18 *
19 * `exhaust` subscribes to an Observable that emits Observables, also known as a
20 * higher-order Observable. Each time it observes one of these emitted inner
21 * Observables, the output Observable begins emitting the items emitted by that
22 * inner Observable. So far, it behaves like {@link mergeAll}. However,
23 * `exhaust` ignores every new inner Observable if the previous Observable has
24 * not yet completed. Once that one completes, it will accept and flatten the
25 * next inner Observable and repeat this process.
26 *
27 * ## Example
28 * Run a finite timer for each click, only if there is no currently active timer
29 * ```ts
30 * import { fromEvent, interval } from 'rxjs';
31 * import { exhaust, map, take } from 'rxjs/operators';
32 *
33 * const clicks = fromEvent(document, 'click');
34 * const higherOrder = clicks.pipe(
35 * map((ev) => interval(1000).pipe(take(5))),
36 * );
37 * const result = higherOrder.pipe(exhaust());
38 * result.subscribe(x => console.log(x));
39 * ```
40 *
41 * @see {@link combineAll}
42 * @see {@link concatAll}
43 * @see {@link switchAll}
44 * @see {@link switchMap}
45 * @see {@link mergeAll}
46 * @see {@link exhaustMap}
47 * @see {@link zipAll}
48 *
49 * @return {Observable} An Observable that takes a source of Observables and propagates the first observable
50 * exclusively until it completes before subscribing to the next.
51 * @method exhaust
52 * @owner Observable
53 */
54export function exhaust<T>(): OperatorFunction<any, T> {
55 return (source: Observable<T>) => source.lift(new SwitchFirstOperator<T>());
56}
57
58class SwitchFirstOperator<T> implements Operator<T, T> {
59 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
60 return source.subscribe(new SwitchFirstSubscriber(subscriber));
61 }
62}
63
64/**
65 * We need this JSDoc comment for affecting ESDoc.
66 * @ignore
67 * @extends {Ignored}
68 */
69class SwitchFirstSubscriber<T> extends SimpleOuterSubscriber<T, T> {
70 private hasCompleted: boolean = false;
71 private hasSubscription: boolean = false;
72
73 constructor(destination: Subscriber<T>) {
74 super(destination);
75 }
76
77 protected _next(value: T): void {
78 if (!this.hasSubscription) {
79 this.hasSubscription = true;
80 this.add(innerSubscribe(value, new SimpleInnerSubscriber(this)));
81 }
82 }
83
84 protected _complete(): void {
85 this.hasCompleted = true;
86 if (!this.hasSubscription) {
87 this.destination.complete!();
88 }
89 }
90
91 notifyComplete(): void {
92 this.hasSubscription = false;
93 if (this.hasCompleted) {
94 this.destination.complete!();
95 }
96 }
97}
Note: See TracBrowser for help on using the repository browser.