source: trip-planner-front/node_modules/rxjs/src/internal/operators/timeoutWith.ts@ e29cc2e

Last change on this file since e29cc2e was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 6.1 KB
Line 
1import { Operator } from '../Operator';
2import { Subscriber } from '../Subscriber';
3import { async } from '../scheduler/async';
4import { Observable } from '../Observable';
5import { isDate } from '../util/isDate';
6import { ObservableInput, OperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
7import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
8
9/* tslint:disable:max-line-length */
10export function timeoutWith<T, R>(due: number | Date, withObservable: ObservableInput<R>, scheduler?: SchedulerLike): OperatorFunction<T, T | R>;
11/* tslint:enable:max-line-length */
12
13/**
14 *
15 * Errors if Observable does not emit a value in given time span, in case of which
16 * subscribes to the second Observable.
17 *
18 * <span class="informal">It's a version of `timeout` operator that let's you specify fallback Observable.</span>
19 *
20 * ![](timeoutWith.png)
21 *
22 * `timeoutWith` is a variation of `timeout` operator. It behaves exactly the same,
23 * still accepting as a first argument either a number or a Date, which control - respectively -
24 * when values of source Observable should be emitted or when it should complete.
25 *
26 * The only difference is that it accepts a second, required parameter. This parameter
27 * should be an Observable which will be subscribed when source Observable fails any timeout check.
28 * So whenever regular `timeout` would emit an error, `timeoutWith` will instead start re-emitting
29 * values from second Observable. Note that this fallback Observable is not checked for timeouts
30 * itself, so it can emit values and complete at arbitrary points in time. From the moment of a second
31 * subscription, Observable returned from `timeoutWith` simply mirrors fallback stream. When that
32 * stream completes, it completes as well.
33 *
34 * Scheduler, which in case of `timeout` is provided as as second argument, can be still provided
35 * here - as a third, optional parameter. It still is used to schedule timeout checks and -
36 * as a consequence - when second Observable will be subscribed, since subscription happens
37 * immediately after failing check.
38 *
39 * ## Example
40 * Add fallback observable
41 * ```ts
42 * import { interval } from 'rxjs';
43 * import { timeoutWith } from 'rxjs/operators';
44 *
45 * const seconds = interval(1000);
46 * const minutes = interval(60 * 1000);
47 *
48 * seconds.pipe(timeoutWith(900, minutes))
49 * .subscribe(
50 * value => console.log(value), // After 900ms, will start emitting `minutes`,
51 * // since first value of `seconds` will not arrive fast enough.
52 * err => console.log(err), // Would be called after 900ms in case of `timeout`,
53 * // but here will never be called.
54 * );
55 * ```
56 *
57 * @param {number|Date} due Number specifying period within which Observable must emit values
58 * or Date specifying before when Observable should complete
59 * @param {Observable<T>} withObservable Observable which will be subscribed if source fails timeout check.
60 * @param {SchedulerLike} [scheduler] Scheduler controlling when timeout checks occur.
61 * @return {Observable<T>} Observable that mirrors behaviour of source or, when timeout check fails, of an Observable
62 * passed as a second parameter.
63 * @method timeoutWith
64 * @owner Observable
65 */
66export function timeoutWith<T, R>(due: number | Date,
67 withObservable: ObservableInput<R>,
68 scheduler: SchedulerLike = async): OperatorFunction<T, T | R> {
69 return (source: Observable<T>) => {
70 let absoluteTimeout = isDate(due);
71 let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
72 return source.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
73 };
74}
75
76class TimeoutWithOperator<T> implements Operator<T, T> {
77 constructor(private waitFor: number,
78 private absoluteTimeout: boolean,
79 private withObservable: ObservableInput<any>,
80 private scheduler: SchedulerLike) {
81 }
82
83 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
84 return source.subscribe(new TimeoutWithSubscriber(
85 subscriber, this.absoluteTimeout, this.waitFor, this.withObservable, this.scheduler
86 ));
87 }
88}
89
90/**
91 * We need this JSDoc comment for affecting ESDoc.
92 * @ignore
93 * @extends {Ignored}
94 */
95class TimeoutWithSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
96
97 private action?: SchedulerAction<TimeoutWithSubscriber<T, R>>;
98
99 constructor(destination: Subscriber<T>,
100 private absoluteTimeout: boolean,
101 private waitFor: number,
102 private withObservable: ObservableInput<any>,
103 private scheduler: SchedulerLike) {
104 super(destination);
105 this.scheduleTimeout();
106 }
107
108 private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
109 const { withObservable } = subscriber;
110 subscriber._unsubscribeAndRecycle();
111 subscriber.add(innerSubscribe(withObservable, new SimpleInnerSubscriber(subscriber)));
112 }
113
114 private scheduleTimeout(): void {
115 const { action } = this;
116 if (action) {
117 // Recycle the action if we've already scheduled one. All the production
118 // Scheduler Actions mutate their state/delay time and return themeselves.
119 // VirtualActions are immutable, so they create and return a clone. In this
120 // case, we need to set the action reference to the most recent VirtualAction,
121 // to ensure that's the one we clone from next time.
122 this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
123 } else {
124 this.add(this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule<TimeoutWithSubscriber<T, R>>(
125 TimeoutWithSubscriber.dispatchTimeout as any, this.waitFor, this
126 )));
127 }
128 }
129
130 protected _next(value: T): void {
131 if (!this.absoluteTimeout) {
132 this.scheduleTimeout();
133 }
134 super._next(value);
135 }
136
137 /** @deprecated This is an internal implementation detail, do not use. */
138 _unsubscribe() {
139 this.action = undefined;
140 this.scheduler = null!;
141 this.withObservable = null!;
142 }
143}
Note: See TracBrowser for help on using the repository browser.