1 | import { Observable } from '../Observable';
|
---|
2 | import { isArray } from '../util/isArray';
|
---|
3 | import { fromArray } from './fromArray';
|
---|
4 | import { Operator } from '../Operator';
|
---|
5 | import { Subscriber } from '../Subscriber';
|
---|
6 | import { Subscription } from '../Subscription';
|
---|
7 | import { TeardownLogic, ObservableInput } from '../types';
|
---|
8 | import { OuterSubscriber } from '../OuterSubscriber';
|
---|
9 | import { InnerSubscriber } from '../InnerSubscriber';
|
---|
10 | import { subscribeToResult } from '../util/subscribeToResult';
|
---|
11 |
|
---|
12 | // tslint:disable:max-line-length
|
---|
13 | export function race<A>(arg: [ObservableInput<A>]): Observable<A>;
|
---|
14 | export function race<A, B>(arg: [ObservableInput<A>, ObservableInput<B>]): Observable<A | B>;
|
---|
15 | export function race<A, B, C>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>]): Observable<A | B | C>;
|
---|
16 | export function race<A, B, C, D>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>]): Observable<A | B | C | D>;
|
---|
17 | export function race<A, B, C, D, E>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>, ObservableInput<E>]): Observable<A | B | C | D | E>;
|
---|
18 | export function race<T>(arg: ObservableInput<T>[]): Observable<T>;
|
---|
19 | export function race(arg: ObservableInput<any>[]): Observable<{}>;
|
---|
20 |
|
---|
21 | export function race<A>(a: ObservableInput<A>): Observable<A>;
|
---|
22 | export function race<A, B>(a: ObservableInput<A>, b: ObservableInput<B>): Observable<A | B>;
|
---|
23 | export function race<A, B, C>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>): Observable<A | B | C>;
|
---|
24 | export function race<A, B, C, D>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>, d: ObservableInput<D>): Observable<A | B | C | D>;
|
---|
25 | export function race<A, B, C, D, E>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>, d: ObservableInput<D>, e: ObservableInput<E>): Observable<A | B | C | D | E>;
|
---|
26 | // tslint:enable:max-line-length
|
---|
27 |
|
---|
28 | export function race<T>(observables: ObservableInput<T>[]): Observable<T>;
|
---|
29 | export function race(observables: ObservableInput<any>[]): Observable<{}>;
|
---|
30 | export function race<T>(...observables: ObservableInput<T>[]): Observable<T>;
|
---|
31 | export function race(...observables: ObservableInput<any>[]): Observable<{}>;
|
---|
32 |
|
---|
33 | /**
|
---|
34 | * Returns an Observable that mirrors the first source Observable to emit an item.
|
---|
35 | *
|
---|
36 | * ## Example
|
---|
37 | * ### Subscribes to the observable that was the first to start emitting.
|
---|
38 | *
|
---|
39 | * ```ts
|
---|
40 | * import { race, interval } from 'rxjs';
|
---|
41 | * import { mapTo } from 'rxjs/operators';
|
---|
42 | *
|
---|
43 | * const obs1 = interval(1000).pipe(mapTo('fast one'));
|
---|
44 | * const obs2 = interval(3000).pipe(mapTo('medium one'));
|
---|
45 | * const obs3 = interval(5000).pipe(mapTo('slow one'));
|
---|
46 | *
|
---|
47 | * race(obs3, obs1, obs2)
|
---|
48 | * .subscribe(
|
---|
49 | * winner => console.log(winner)
|
---|
50 | * );
|
---|
51 | *
|
---|
52 | * // result:
|
---|
53 | * // a series of 'fast one'
|
---|
54 | * ```
|
---|
55 | *
|
---|
56 | * @param {...Observables} ...observables sources used to race for which Observable emits first.
|
---|
57 | * @return {Observable} an Observable that mirrors the output of the first Observable to emit an item.
|
---|
58 | * @static true
|
---|
59 | * @name race
|
---|
60 | * @owner Observable
|
---|
61 | */
|
---|
62 | export function race<T>(...observables: ObservableInput<any>[]): Observable<T> {
|
---|
63 | // if the only argument is an array, it was most likely called with
|
---|
64 | // `race([obs1, obs2, ...])`
|
---|
65 | if (observables.length === 1) {
|
---|
66 | if (isArray(observables[0])) {
|
---|
67 | observables = observables[0] as Observable<any>[];
|
---|
68 | } else {
|
---|
69 | return observables[0] as Observable<T>;
|
---|
70 | }
|
---|
71 | }
|
---|
72 |
|
---|
73 | return fromArray(observables, undefined).lift(new RaceOperator<T>());
|
---|
74 | }
|
---|
75 |
|
---|
76 | export class RaceOperator<T> implements Operator<T, T> {
|
---|
77 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
78 | return source.subscribe(new RaceSubscriber(subscriber));
|
---|
79 | }
|
---|
80 | }
|
---|
81 |
|
---|
82 | /**
|
---|
83 | * We need this JSDoc comment for affecting ESDoc.
|
---|
84 | * @ignore
|
---|
85 | * @extends {Ignored}
|
---|
86 | */
|
---|
87 | export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
|
---|
88 | private hasFirst: boolean = false;
|
---|
89 | private observables: Observable<any>[] = [];
|
---|
90 | private subscriptions: Subscription[] = [];
|
---|
91 |
|
---|
92 | constructor(destination: Subscriber<T>) {
|
---|
93 | super(destination);
|
---|
94 | }
|
---|
95 |
|
---|
96 | protected _next(observable: any): void {
|
---|
97 | this.observables.push(observable);
|
---|
98 | }
|
---|
99 |
|
---|
100 | protected _complete() {
|
---|
101 | const observables = this.observables;
|
---|
102 | const len = observables.length;
|
---|
103 |
|
---|
104 | if (len === 0) {
|
---|
105 | this.destination.complete!();
|
---|
106 | } else {
|
---|
107 | for (let i = 0; i < len && !this.hasFirst; i++) {
|
---|
108 | const observable = observables[i];
|
---|
109 | const subscription = subscribeToResult(this, observable, undefined, i)!;
|
---|
110 |
|
---|
111 | if (this.subscriptions) {
|
---|
112 | this.subscriptions.push(subscription);
|
---|
113 | }
|
---|
114 | this.add(subscription);
|
---|
115 | }
|
---|
116 | this.observables = null!;
|
---|
117 | }
|
---|
118 | }
|
---|
119 |
|
---|
120 | notifyNext(_outerValue: T, innerValue: T,
|
---|
121 | outerIndex: number): void {
|
---|
122 | if (!this.hasFirst) {
|
---|
123 | this.hasFirst = true;
|
---|
124 |
|
---|
125 | for (let i = 0; i < this.subscriptions.length; i++) {
|
---|
126 | if (i !== outerIndex) {
|
---|
127 | let subscription = this.subscriptions[i];
|
---|
128 |
|
---|
129 | subscription.unsubscribe();
|
---|
130 | this.remove(subscription);
|
---|
131 | }
|
---|
132 | }
|
---|
133 |
|
---|
134 | this.subscriptions = null!;
|
---|
135 | }
|
---|
136 |
|
---|
137 | this.destination.next!(innerValue);
|
---|
138 | }
|
---|
139 | }
|
---|