1 | import { Observable } from '../Observable';
|
---|
2 | import { Operator } from '../Operator';
|
---|
3 | import { Subscriber } from '../Subscriber';
|
---|
4 | import { async } from '../scheduler/async';
|
---|
5 | import { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
|
---|
6 |
|
---|
7 | /**
|
---|
8 | * Emits the most recently emitted value from the source Observable within
|
---|
9 | * periodic time intervals.
|
---|
10 | *
|
---|
11 | * <span class="informal">Samples the source Observable at periodic time
|
---|
12 | * intervals, emitting what it samples.</span>
|
---|
13 | *
|
---|
14 | * ![](sampleTime.png)
|
---|
15 | *
|
---|
16 | * `sampleTime` periodically looks at the source Observable and emits whichever
|
---|
17 | * value it has most recently emitted since the previous sampling, unless the
|
---|
18 | * source has not emitted anything since the previous sampling. The sampling
|
---|
19 | * happens periodically in time every `period` milliseconds (or the time unit
|
---|
20 | * defined by the optional `scheduler` argument). The sampling starts as soon as
|
---|
21 | * the output Observable is subscribed.
|
---|
22 | *
|
---|
23 | * ## Example
|
---|
24 | * Every second, emit the most recent click at most once
|
---|
25 | * ```ts
|
---|
26 | * import { fromEvent } from 'rxjs';
|
---|
27 | * import { sampleTime } from 'rxjs/operators';
|
---|
28 | *
|
---|
29 | * const clicks = fromEvent(document, 'click');
|
---|
30 | * const result = clicks.pipe(sampleTime(1000));
|
---|
31 | * result.subscribe(x => console.log(x));
|
---|
32 | * ```
|
---|
33 | *
|
---|
34 | * @see {@link auditTime}
|
---|
35 | * @see {@link debounceTime}
|
---|
36 | * @see {@link delay}
|
---|
37 | * @see {@link sample}
|
---|
38 | * @see {@link throttleTime}
|
---|
39 | *
|
---|
40 | * @param {number} period The sampling period expressed in milliseconds or the
|
---|
41 | * time unit determined internally by the optional `scheduler`.
|
---|
42 | * @param {SchedulerLike} [scheduler=async] The {@link SchedulerLike} to use for
|
---|
43 | * managing the timers that handle the sampling.
|
---|
44 | * @return {Observable<T>} An Observable that emits the results of sampling the
|
---|
45 | * values emitted by the source Observable at the specified time interval.
|
---|
46 | * @method sampleTime
|
---|
47 | * @owner Observable
|
---|
48 | */
|
---|
49 | export function sampleTime<T>(period: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T> {
|
---|
50 | return (source: Observable<T>) => source.lift(new SampleTimeOperator(period, scheduler));
|
---|
51 | }
|
---|
52 |
|
---|
53 | class SampleTimeOperator<T> implements Operator<T, T> {
|
---|
54 | constructor(private period: number,
|
---|
55 | private scheduler: SchedulerLike) {
|
---|
56 | }
|
---|
57 |
|
---|
58 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
59 | return source.subscribe(new SampleTimeSubscriber(subscriber, this.period, this.scheduler));
|
---|
60 | }
|
---|
61 | }
|
---|
62 |
|
---|
63 | /**
|
---|
64 | * We need this JSDoc comment for affecting ESDoc.
|
---|
65 | * @ignore
|
---|
66 | * @extends {Ignored}
|
---|
67 | */
|
---|
68 | class SampleTimeSubscriber<T> extends Subscriber<T> {
|
---|
69 | lastValue: T;
|
---|
70 | hasValue: boolean = false;
|
---|
71 |
|
---|
72 | constructor(destination: Subscriber<T>,
|
---|
73 | private period: number,
|
---|
74 | private scheduler: SchedulerLike) {
|
---|
75 | super(destination);
|
---|
76 | this.add(scheduler.schedule(dispatchNotification, period, { subscriber: this, period }));
|
---|
77 | }
|
---|
78 |
|
---|
79 | protected _next(value: T) {
|
---|
80 | this.lastValue = value;
|
---|
81 | this.hasValue = true;
|
---|
82 | }
|
---|
83 |
|
---|
84 | notifyNext() {
|
---|
85 | if (this.hasValue) {
|
---|
86 | this.hasValue = false;
|
---|
87 | this.destination.next(this.lastValue);
|
---|
88 | }
|
---|
89 | }
|
---|
90 | }
|
---|
91 |
|
---|
92 | function dispatchNotification<T>(this: SchedulerAction<any>, state: any) {
|
---|
93 | let { subscriber, period } = state;
|
---|
94 | subscriber.notifyNext();
|
---|
95 | this.schedule(state, period);
|
---|
96 | }
|
---|