[6a3a178] | 1 | import { Observable } from '../Observable';
|
---|
| 2 | import { isArray } from '../util/isArray';
|
---|
| 3 | import { fromArray } from './fromArray';
|
---|
| 4 | import { Operator } from '../Operator';
|
---|
| 5 | import { Subscriber } from '../Subscriber';
|
---|
| 6 | import { Subscription } from '../Subscription';
|
---|
| 7 | import { TeardownLogic, ObservableInput } from '../types';
|
---|
| 8 | import { OuterSubscriber } from '../OuterSubscriber';
|
---|
| 9 | import { InnerSubscriber } from '../InnerSubscriber';
|
---|
| 10 | import { subscribeToResult } from '../util/subscribeToResult';
|
---|
| 11 |
|
---|
| 12 | // tslint:disable:max-line-length
|
---|
| 13 | export function race<A>(arg: [ObservableInput<A>]): Observable<A>;
|
---|
| 14 | export function race<A, B>(arg: [ObservableInput<A>, ObservableInput<B>]): Observable<A | B>;
|
---|
| 15 | export function race<A, B, C>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>]): Observable<A | B | C>;
|
---|
| 16 | export function race<A, B, C, D>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>]): Observable<A | B | C | D>;
|
---|
| 17 | export function race<A, B, C, D, E>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>, ObservableInput<E>]): Observable<A | B | C | D | E>;
|
---|
| 18 | export function race<T>(arg: ObservableInput<T>[]): Observable<T>;
|
---|
| 19 | export function race(arg: ObservableInput<any>[]): Observable<{}>;
|
---|
| 20 |
|
---|
| 21 | export function race<A>(a: ObservableInput<A>): Observable<A>;
|
---|
| 22 | export function race<A, B>(a: ObservableInput<A>, b: ObservableInput<B>): Observable<A | B>;
|
---|
| 23 | export function race<A, B, C>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>): Observable<A | B | C>;
|
---|
| 24 | export function race<A, B, C, D>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>, d: ObservableInput<D>): Observable<A | B | C | D>;
|
---|
| 25 | export function race<A, B, C, D, E>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>, d: ObservableInput<D>, e: ObservableInput<E>): Observable<A | B | C | D | E>;
|
---|
| 26 | // tslint:enable:max-line-length
|
---|
| 27 |
|
---|
| 28 | export function race<T>(observables: ObservableInput<T>[]): Observable<T>;
|
---|
| 29 | export function race(observables: ObservableInput<any>[]): Observable<{}>;
|
---|
| 30 | export function race<T>(...observables: ObservableInput<T>[]): Observable<T>;
|
---|
| 31 | export function race(...observables: ObservableInput<any>[]): Observable<{}>;
|
---|
| 32 |
|
---|
| 33 | /**
|
---|
| 34 | * Returns an Observable that mirrors the first source Observable to emit an item.
|
---|
| 35 | *
|
---|
| 36 | * ## Example
|
---|
| 37 | * ### Subscribes to the observable that was the first to start emitting.
|
---|
| 38 | *
|
---|
| 39 | * ```ts
|
---|
| 40 | * import { race, interval } from 'rxjs';
|
---|
| 41 | * import { mapTo } from 'rxjs/operators';
|
---|
| 42 | *
|
---|
| 43 | * const obs1 = interval(1000).pipe(mapTo('fast one'));
|
---|
| 44 | * const obs2 = interval(3000).pipe(mapTo('medium one'));
|
---|
| 45 | * const obs3 = interval(5000).pipe(mapTo('slow one'));
|
---|
| 46 | *
|
---|
| 47 | * race(obs3, obs1, obs2)
|
---|
| 48 | * .subscribe(
|
---|
| 49 | * winner => console.log(winner)
|
---|
| 50 | * );
|
---|
| 51 | *
|
---|
| 52 | * // result:
|
---|
| 53 | * // a series of 'fast one'
|
---|
| 54 | * ```
|
---|
| 55 | *
|
---|
| 56 | * @param {...Observables} ...observables sources used to race for which Observable emits first.
|
---|
| 57 | * @return {Observable} an Observable that mirrors the output of the first Observable to emit an item.
|
---|
| 58 | * @static true
|
---|
| 59 | * @name race
|
---|
| 60 | * @owner Observable
|
---|
| 61 | */
|
---|
| 62 | export function race<T>(...observables: ObservableInput<any>[]): Observable<T> {
|
---|
| 63 | // if the only argument is an array, it was most likely called with
|
---|
| 64 | // `race([obs1, obs2, ...])`
|
---|
| 65 | if (observables.length === 1) {
|
---|
| 66 | if (isArray(observables[0])) {
|
---|
| 67 | observables = observables[0] as Observable<any>[];
|
---|
| 68 | } else {
|
---|
| 69 | return observables[0] as Observable<T>;
|
---|
| 70 | }
|
---|
| 71 | }
|
---|
| 72 |
|
---|
| 73 | return fromArray(observables, undefined).lift(new RaceOperator<T>());
|
---|
| 74 | }
|
---|
| 75 |
|
---|
| 76 | export class RaceOperator<T> implements Operator<T, T> {
|
---|
| 77 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
| 78 | return source.subscribe(new RaceSubscriber(subscriber));
|
---|
| 79 | }
|
---|
| 80 | }
|
---|
| 81 |
|
---|
| 82 | /**
|
---|
| 83 | * We need this JSDoc comment for affecting ESDoc.
|
---|
| 84 | * @ignore
|
---|
| 85 | * @extends {Ignored}
|
---|
| 86 | */
|
---|
| 87 | export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
|
---|
| 88 | private hasFirst: boolean = false;
|
---|
| 89 | private observables: Observable<any>[] = [];
|
---|
| 90 | private subscriptions: Subscription[] = [];
|
---|
| 91 |
|
---|
| 92 | constructor(destination: Subscriber<T>) {
|
---|
| 93 | super(destination);
|
---|
| 94 | }
|
---|
| 95 |
|
---|
| 96 | protected _next(observable: any): void {
|
---|
| 97 | this.observables.push(observable);
|
---|
| 98 | }
|
---|
| 99 |
|
---|
| 100 | protected _complete() {
|
---|
| 101 | const observables = this.observables;
|
---|
| 102 | const len = observables.length;
|
---|
| 103 |
|
---|
| 104 | if (len === 0) {
|
---|
| 105 | this.destination.complete!();
|
---|
| 106 | } else {
|
---|
| 107 | for (let i = 0; i < len && !this.hasFirst; i++) {
|
---|
| 108 | const observable = observables[i];
|
---|
| 109 | const subscription = subscribeToResult(this, observable, undefined, i)!;
|
---|
| 110 |
|
---|
| 111 | if (this.subscriptions) {
|
---|
| 112 | this.subscriptions.push(subscription);
|
---|
| 113 | }
|
---|
| 114 | this.add(subscription);
|
---|
| 115 | }
|
---|
| 116 | this.observables = null!;
|
---|
| 117 | }
|
---|
| 118 | }
|
---|
| 119 |
|
---|
| 120 | notifyNext(_outerValue: T, innerValue: T,
|
---|
| 121 | outerIndex: number): void {
|
---|
| 122 | if (!this.hasFirst) {
|
---|
| 123 | this.hasFirst = true;
|
---|
| 124 |
|
---|
| 125 | for (let i = 0; i < this.subscriptions.length; i++) {
|
---|
| 126 | if (i !== outerIndex) {
|
---|
| 127 | let subscription = this.subscriptions[i];
|
---|
| 128 |
|
---|
| 129 | subscription.unsubscribe();
|
---|
| 130 | this.remove(subscription);
|
---|
| 131 | }
|
---|
| 132 | }
|
---|
| 133 |
|
---|
| 134 | this.subscriptions = null!;
|
---|
| 135 | }
|
---|
| 136 |
|
---|
| 137 | this.destination.next!(innerValue);
|
---|
| 138 | }
|
---|
| 139 | }
|
---|