import { Observable } from '../Observable';
import { isArray } from '../util/isArray';
import { fromArray } from './fromArray';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { TeardownLogic, ObservableInput } from '../types';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
// tslint:disable:max-line-length
export function race(arg: [ObservableInput]): Observable;
export function race(arg: [ObservableInput, ObservableInput]): Observable;
export function race(arg: [ObservableInput, ObservableInput, ObservableInput]): Observable;
export function race(arg: [ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable;
export function race(arg: [ObservableInput, ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable;
export function race(arg: ObservableInput[]): Observable;
export function race(arg: ObservableInput[]): Observable<{}>;
export function race(a: ObservableInput): Observable;
export function race(a: ObservableInput, b: ObservableInput): Observable;
export function race(a: ObservableInput, b: ObservableInput, c: ObservableInput): Observable;
export function race(a: ObservableInput, b: ObservableInput, c: ObservableInput, d: ObservableInput): Observable;
export function race(a: ObservableInput, b: ObservableInput, c: ObservableInput, d: ObservableInput, e: ObservableInput): Observable;
// tslint:enable:max-line-length
export function race(observables: ObservableInput[]): Observable;
export function race(observables: ObservableInput[]): Observable<{}>;
export function race(...observables: ObservableInput[]): Observable;
export function race(...observables: ObservableInput[]): Observable<{}>;
/**
* Returns an Observable that mirrors the first source Observable to emit an item.
*
* ## Example
* ### Subscribes to the observable that was the first to start emitting.
*
* ```ts
* import { race, interval } from 'rxjs';
* import { mapTo } from 'rxjs/operators';
*
* const obs1 = interval(1000).pipe(mapTo('fast one'));
* const obs2 = interval(3000).pipe(mapTo('medium one'));
* const obs3 = interval(5000).pipe(mapTo('slow one'));
*
* race(obs3, obs1, obs2)
* .subscribe(
* winner => console.log(winner)
* );
*
* // result:
* // a series of 'fast one'
* ```
*
* @param {...Observables} ...observables sources used to race for which Observable emits first.
* @return {Observable} an Observable that mirrors the output of the first Observable to emit an item.
* @static true
* @name race
* @owner Observable
*/
export function race(...observables: ObservableInput[]): Observable {
// if the only argument is an array, it was most likely called with
// `race([obs1, obs2, ...])`
if (observables.length === 1) {
if (isArray(observables[0])) {
observables = observables[0] as Observable[];
} else {
return observables[0] as Observable;
}
}
return fromArray(observables, undefined).lift(new RaceOperator());
}
export class RaceOperator implements Operator {
call(subscriber: Subscriber, source: any): TeardownLogic {
return source.subscribe(new RaceSubscriber(subscriber));
}
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
export class RaceSubscriber extends OuterSubscriber {
private hasFirst: boolean = false;
private observables: Observable[] = [];
private subscriptions: Subscription[] = [];
constructor(destination: Subscriber) {
super(destination);
}
protected _next(observable: any): void {
this.observables.push(observable);
}
protected _complete() {
const observables = this.observables;
const len = observables.length;
if (len === 0) {
this.destination.complete!();
} else {
for (let i = 0; i < len && !this.hasFirst; i++) {
const observable = observables[i];
const subscription = subscribeToResult(this, observable, undefined, i)!;
if (this.subscriptions) {
this.subscriptions.push(subscription);
}
this.add(subscription);
}
this.observables = null!;
}
}
notifyNext(_outerValue: T, innerValue: T,
outerIndex: number): void {
if (!this.hasFirst) {
this.hasFirst = true;
for (let i = 0; i < this.subscriptions.length; i++) {
if (i !== outerIndex) {
let subscription = this.subscriptions[i];
subscription.unsubscribe();
this.remove(subscription);
}
}
this.subscriptions = null!;
}
this.destination.next!(innerValue);
}
}