import { Observable } from '../Observable'; import { isArray } from '../util/isArray'; import { fromArray } from './fromArray'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { TeardownLogic, ObservableInput } from '../types'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; // tslint:disable:max-line-length export function race(arg: [ObservableInput]): Observable; export function race(arg: [ObservableInput, ObservableInput]): Observable; export function race(arg: [ObservableInput, ObservableInput, ObservableInput]): Observable; export function race(arg: [ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable; export function race(arg: [ObservableInput, ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable; export function race(arg: ObservableInput[]): Observable; export function race(arg: ObservableInput[]): Observable<{}>; export function race(a: ObservableInput): Observable; export function race(a: ObservableInput, b: ObservableInput): Observable; export function race(a: ObservableInput, b: ObservableInput, c: ObservableInput): Observable; export function race(a: ObservableInput, b: ObservableInput, c: ObservableInput, d: ObservableInput): Observable; export function race(a: ObservableInput, b: ObservableInput, c: ObservableInput, d: ObservableInput, e: ObservableInput): Observable; // tslint:enable:max-line-length export function race(observables: ObservableInput[]): Observable; export function race(observables: ObservableInput[]): Observable<{}>; export function race(...observables: ObservableInput[]): Observable; export function race(...observables: ObservableInput[]): Observable<{}>; /** * Returns an Observable that mirrors the first source Observable to emit an item. * * ## Example * ### Subscribes to the observable that was the first to start emitting. * * ```ts * import { race, interval } from 'rxjs'; * import { mapTo } from 'rxjs/operators'; * * const obs1 = interval(1000).pipe(mapTo('fast one')); * const obs2 = interval(3000).pipe(mapTo('medium one')); * const obs3 = interval(5000).pipe(mapTo('slow one')); * * race(obs3, obs1, obs2) * .subscribe( * winner => console.log(winner) * ); * * // result: * // a series of 'fast one' * ``` * * @param {...Observables} ...observables sources used to race for which Observable emits first. * @return {Observable} an Observable that mirrors the output of the first Observable to emit an item. * @static true * @name race * @owner Observable */ export function race(...observables: ObservableInput[]): Observable { // if the only argument is an array, it was most likely called with // `race([obs1, obs2, ...])` if (observables.length === 1) { if (isArray(observables[0])) { observables = observables[0] as Observable[]; } else { return observables[0] as Observable; } } return fromArray(observables, undefined).lift(new RaceOperator()); } export class RaceOperator implements Operator { call(subscriber: Subscriber, source: any): TeardownLogic { return source.subscribe(new RaceSubscriber(subscriber)); } } /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ export class RaceSubscriber extends OuterSubscriber { private hasFirst: boolean = false; private observables: Observable[] = []; private subscriptions: Subscription[] = []; constructor(destination: Subscriber) { super(destination); } protected _next(observable: any): void { this.observables.push(observable); } protected _complete() { const observables = this.observables; const len = observables.length; if (len === 0) { this.destination.complete!(); } else { for (let i = 0; i < len && !this.hasFirst; i++) { const observable = observables[i]; const subscription = subscribeToResult(this, observable, undefined, i)!; if (this.subscriptions) { this.subscriptions.push(subscription); } this.add(subscription); } this.observables = null!; } } notifyNext(_outerValue: T, innerValue: T, outerIndex: number): void { if (!this.hasFirst) { this.hasFirst = true; for (let i = 0; i < this.subscriptions.length; i++) { if (i !== outerIndex) { let subscription = this.subscriptions[i]; subscription.unsubscribe(); this.remove(subscription); } } this.subscriptions = null!; } this.destination.next!(innerValue); } }