source: trip-planner-front/node_modules/rxjs/src/internal/observable/race.ts@ e29cc2e

Last change on this file since e29cc2e was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 5.0 KB
Line 
1import { Observable } from '../Observable';
2import { isArray } from '../util/isArray';
3import { fromArray } from './fromArray';
4import { Operator } from '../Operator';
5import { Subscriber } from '../Subscriber';
6import { Subscription } from '../Subscription';
7import { TeardownLogic, ObservableInput } from '../types';
8import { OuterSubscriber } from '../OuterSubscriber';
9import { InnerSubscriber } from '../InnerSubscriber';
10import { subscribeToResult } from '../util/subscribeToResult';
11
12// tslint:disable:max-line-length
13export function race<A>(arg: [ObservableInput<A>]): Observable<A>;
14export function race<A, B>(arg: [ObservableInput<A>, ObservableInput<B>]): Observable<A | B>;
15export function race<A, B, C>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>]): Observable<A | B | C>;
16export function race<A, B, C, D>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>]): Observable<A | B | C | D>;
17export function race<A, B, C, D, E>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>, ObservableInput<E>]): Observable<A | B | C | D | E>;
18export function race<T>(arg: ObservableInput<T>[]): Observable<T>;
19export function race(arg: ObservableInput<any>[]): Observable<{}>;
20
21export function race<A>(a: ObservableInput<A>): Observable<A>;
22export function race<A, B>(a: ObservableInput<A>, b: ObservableInput<B>): Observable<A | B>;
23export function race<A, B, C>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>): Observable<A | B | C>;
24export function race<A, B, C, D>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>, d: ObservableInput<D>): Observable<A | B | C | D>;
25export function race<A, B, C, D, E>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>, d: ObservableInput<D>, e: ObservableInput<E>): Observable<A | B | C | D | E>;
26// tslint:enable:max-line-length
27
28export function race<T>(observables: ObservableInput<T>[]): Observable<T>;
29export function race(observables: ObservableInput<any>[]): Observable<{}>;
30export function race<T>(...observables: ObservableInput<T>[]): Observable<T>;
31export function race(...observables: ObservableInput<any>[]): Observable<{}>;
32
33/**
34 * Returns an Observable that mirrors the first source Observable to emit an item.
35 *
36 * ## Example
37 * ### Subscribes to the observable that was the first to start emitting.
38 *
39 * ```ts
40 * import { race, interval } from 'rxjs';
41 * import { mapTo } from 'rxjs/operators';
42 *
43 * const obs1 = interval(1000).pipe(mapTo('fast one'));
44 * const obs2 = interval(3000).pipe(mapTo('medium one'));
45 * const obs3 = interval(5000).pipe(mapTo('slow one'));
46 *
47 * race(obs3, obs1, obs2)
48 * .subscribe(
49 * winner => console.log(winner)
50 * );
51 *
52 * // result:
53 * // a series of 'fast one'
54 * ```
55 *
56 * @param {...Observables} ...observables sources used to race for which Observable emits first.
57 * @return {Observable} an Observable that mirrors the output of the first Observable to emit an item.
58 * @static true
59 * @name race
60 * @owner Observable
61 */
62export function race<T>(...observables: ObservableInput<any>[]): Observable<T> {
63 // if the only argument is an array, it was most likely called with
64 // `race([obs1, obs2, ...])`
65 if (observables.length === 1) {
66 if (isArray(observables[0])) {
67 observables = observables[0] as Observable<any>[];
68 } else {
69 return observables[0] as Observable<T>;
70 }
71 }
72
73 return fromArray(observables, undefined).lift(new RaceOperator<T>());
74}
75
76export class RaceOperator<T> implements Operator<T, T> {
77 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
78 return source.subscribe(new RaceSubscriber(subscriber));
79 }
80}
81
82/**
83 * We need this JSDoc comment for affecting ESDoc.
84 * @ignore
85 * @extends {Ignored}
86 */
87export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
88 private hasFirst: boolean = false;
89 private observables: Observable<any>[] = [];
90 private subscriptions: Subscription[] = [];
91
92 constructor(destination: Subscriber<T>) {
93 super(destination);
94 }
95
96 protected _next(observable: any): void {
97 this.observables.push(observable);
98 }
99
100 protected _complete() {
101 const observables = this.observables;
102 const len = observables.length;
103
104 if (len === 0) {
105 this.destination.complete!();
106 } else {
107 for (let i = 0; i < len && !this.hasFirst; i++) {
108 const observable = observables[i];
109 const subscription = subscribeToResult(this, observable, undefined, i)!;
110
111 if (this.subscriptions) {
112 this.subscriptions.push(subscription);
113 }
114 this.add(subscription);
115 }
116 this.observables = null!;
117 }
118 }
119
120 notifyNext(_outerValue: T, innerValue: T,
121 outerIndex: number): void {
122 if (!this.hasFirst) {
123 this.hasFirst = true;
124
125 for (let i = 0; i < this.subscriptions.length; i++) {
126 if (i !== outerIndex) {
127 let subscription = this.subscriptions[i];
128
129 subscription.unsubscribe();
130 this.remove(subscription);
131 }
132 }
133
134 this.subscriptions = null!;
135 }
136
137 this.destination.next!(innerValue);
138 }
139}
Note: See TracBrowser for help on using the repository browser.