1 | import { Operator } from '../Operator';
|
---|
2 | import { Subscriber } from '../Subscriber';
|
---|
3 | import { Observable } from '../Observable';
|
---|
4 | import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types';
|
---|
5 | import { Subscription } from '../Subscription';
|
---|
6 | import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
|
---|
7 |
|
---|
8 | /**
|
---|
9 | * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
|
---|
10 | *
|
---|
11 | * The `skipUntil` operator causes the observable stream to skip the emission of values until the passed in observable emits the first value.
|
---|
12 | * This can be particularly useful in combination with user interactions, responses of http requests or waiting for specific times to pass by.
|
---|
13 | *
|
---|
14 | * ![](skipUntil.png)
|
---|
15 | *
|
---|
16 | * Internally the `skipUntil` operator subscribes to the passed in observable (in the following called *notifier*) in order to recognize the emission
|
---|
17 | * of its first value. When this happens, the operator unsubscribes from the *notifier* and starts emitting the values of the *source*
|
---|
18 | * observable. It will never let the *source* observable emit any values if the *notifier* completes or throws an error without emitting
|
---|
19 | * a value before.
|
---|
20 | *
|
---|
21 | * ## Example
|
---|
22 | *
|
---|
23 | * In the following example, all emitted values of the interval observable are skipped until the user clicks anywhere within the page.
|
---|
24 | *
|
---|
25 | * ```ts
|
---|
26 | * import { interval, fromEvent } from 'rxjs';
|
---|
27 | * import { skipUntil } from 'rxjs/operators';
|
---|
28 | *
|
---|
29 | * const intervalObservable = interval(1000);
|
---|
30 | * const click = fromEvent(document, 'click');
|
---|
31 | *
|
---|
32 | * const emitAfterClick = intervalObservable.pipe(
|
---|
33 | * skipUntil(click)
|
---|
34 | * );
|
---|
35 | * // clicked at 4.6s. output: 5...6...7...8........ or
|
---|
36 | * // clicked at 7.3s. output: 8...9...10..11.......
|
---|
37 | * const subscribe = emitAfterClick.subscribe(value => console.log(value));
|
---|
38 | * ```
|
---|
39 | *
|
---|
40 | * @param {Observable} notifier - The second Observable that has to emit an item before the source Observable's elements begin to
|
---|
41 | * be mirrored by the resulting Observable.
|
---|
42 | * @return {Observable<T>} An Observable that skips items from the source Observable until the second Observable emits
|
---|
43 | * an item, then emits the remaining items.
|
---|
44 | * @method skipUntil
|
---|
45 | * @owner Observable
|
---|
46 | */
|
---|
47 | export function skipUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T> {
|
---|
48 | return (source: Observable<T>) => source.lift(new SkipUntilOperator(notifier));
|
---|
49 | }
|
---|
50 |
|
---|
51 | class SkipUntilOperator<T> implements Operator<T, T> {
|
---|
52 | constructor(private notifier: Observable<any>) {
|
---|
53 | }
|
---|
54 |
|
---|
55 | call(destination: Subscriber<T>, source: any): TeardownLogic {
|
---|
56 | return source.subscribe(new SkipUntilSubscriber(destination, this.notifier));
|
---|
57 | }
|
---|
58 | }
|
---|
59 |
|
---|
60 | /**
|
---|
61 | * We need this JSDoc comment for affecting ESDoc.
|
---|
62 | * @ignore
|
---|
63 | * @extends {Ignored}
|
---|
64 | */
|
---|
65 | class SkipUntilSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
|
---|
66 |
|
---|
67 | private hasValue: boolean = false;
|
---|
68 | private innerSubscription?: Subscription;
|
---|
69 |
|
---|
70 | constructor(destination: Subscriber<R>, notifier: ObservableInput<any>) {
|
---|
71 | super(destination);
|
---|
72 | const innerSubscriber = new SimpleInnerSubscriber(this);
|
---|
73 | this.add(innerSubscriber);
|
---|
74 | this.innerSubscription = innerSubscriber;
|
---|
75 | const innerSubscription = innerSubscribe(notifier, innerSubscriber);
|
---|
76 | // The returned subscription will usually be the subscriber that was
|
---|
77 | // passed. However, interop subscribers will be wrapped and for
|
---|
78 | // unsubscriptions to chain correctly, the wrapper needs to be added, too.
|
---|
79 | if (innerSubscription !== innerSubscriber) {
|
---|
80 | this.add(innerSubscription);
|
---|
81 | this.innerSubscription = innerSubscription;
|
---|
82 | }
|
---|
83 | }
|
---|
84 |
|
---|
85 | protected _next(value: T) {
|
---|
86 | if (this.hasValue) {
|
---|
87 | super._next(value);
|
---|
88 | }
|
---|
89 | }
|
---|
90 |
|
---|
91 | notifyNext(): void {
|
---|
92 | this.hasValue = true;
|
---|
93 | if (this.innerSubscription) {
|
---|
94 | this.innerSubscription.unsubscribe();
|
---|
95 | }
|
---|
96 | }
|
---|
97 |
|
---|
98 | notifyComplete() {
|
---|
99 | /* do nothing */
|
---|
100 | }
|
---|
101 | }
|
---|