1 | import { Operator } from '../Operator';
|
---|
2 | import { Observable } from '../Observable';
|
---|
3 | import { Subscriber } from '../Subscriber';
|
---|
4 | import { Subscription } from '../Subscription';
|
---|
5 |
|
---|
6 | import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types';
|
---|
7 | import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
|
---|
8 |
|
---|
9 | export interface ThrottleConfig {
|
---|
10 | leading?: boolean;
|
---|
11 | trailing?: boolean;
|
---|
12 | }
|
---|
13 |
|
---|
14 | export const defaultThrottleConfig: ThrottleConfig = {
|
---|
15 | leading: true,
|
---|
16 | trailing: false
|
---|
17 | };
|
---|
18 |
|
---|
19 | /**
|
---|
20 | * Emits a value from the source Observable, then ignores subsequent source
|
---|
21 | * values for a duration determined by another Observable, then repeats this
|
---|
22 | * process.
|
---|
23 | *
|
---|
24 | * <span class="informal">It's like {@link throttleTime}, but the silencing
|
---|
25 | * duration is determined by a second Observable.</span>
|
---|
26 | *
|
---|
27 | * ![](throttle.png)
|
---|
28 | *
|
---|
29 | * `throttle` emits the source Observable values on the output Observable
|
---|
30 | * when its internal timer is disabled, and ignores source values when the timer
|
---|
31 | * is enabled. Initially, the timer is disabled. As soon as the first source
|
---|
32 | * value arrives, it is forwarded to the output Observable, and then the timer
|
---|
33 | * is enabled by calling the `durationSelector` function with the source value,
|
---|
34 | * which returns the "duration" Observable. When the duration Observable emits a
|
---|
35 | * value or completes, the timer is disabled, and this process repeats for the
|
---|
36 | * next source value.
|
---|
37 | *
|
---|
38 | * ## Example
|
---|
39 | * Emit clicks at a rate of at most one click per second
|
---|
40 | * ```ts
|
---|
41 | * import { fromEvent } from 'rxjs';
|
---|
42 | * import { throttle } from 'rxjs/operators';
|
---|
43 | *
|
---|
44 | * const clicks = fromEvent(document, 'click');
|
---|
45 | * const result = clicks.pipe(throttle(ev => interval(1000)));
|
---|
46 | * result.subscribe(x => console.log(x));
|
---|
47 | * ```
|
---|
48 | *
|
---|
49 | * @see {@link audit}
|
---|
50 | * @see {@link debounce}
|
---|
51 | * @see {@link delayWhen}
|
---|
52 | * @see {@link sample}
|
---|
53 | * @see {@link throttleTime}
|
---|
54 | *
|
---|
55 | * @param {function(value: T): SubscribableOrPromise} durationSelector A function
|
---|
56 | * that receives a value from the source Observable, for computing the silencing
|
---|
57 | * duration for each source value, returned as an Observable or a Promise.
|
---|
58 | * @param {Object} config a configuration object to define `leading` and `trailing` behavior. Defaults
|
---|
59 | * to `{ leading: true, trailing: false }`.
|
---|
60 | * @return {Observable<T>} An Observable that performs the throttle operation to
|
---|
61 | * limit the rate of emissions from the source.
|
---|
62 | * @method throttle
|
---|
63 | * @owner Observable
|
---|
64 | */
|
---|
65 | export function throttle<T>(durationSelector: (value: T) => SubscribableOrPromise<any>,
|
---|
66 | config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction<T> {
|
---|
67 | return (source: Observable<T>) => source.lift(new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing));
|
---|
68 | }
|
---|
69 |
|
---|
70 | class ThrottleOperator<T> implements Operator<T, T> {
|
---|
71 | constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>,
|
---|
72 | private leading: boolean,
|
---|
73 | private trailing: boolean) {
|
---|
74 | }
|
---|
75 |
|
---|
76 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
77 | return source.subscribe(
|
---|
78 | new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing)
|
---|
79 | );
|
---|
80 | }
|
---|
81 | }
|
---|
82 |
|
---|
83 | /**
|
---|
84 | * We need this JSDoc comment for affecting ESDoc
|
---|
85 | * @ignore
|
---|
86 | * @extends {Ignored}
|
---|
87 | */
|
---|
88 | class ThrottleSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
|
---|
89 | private _throttled?: Subscription;
|
---|
90 | private _sendValue?: T;
|
---|
91 | private _hasValue = false;
|
---|
92 |
|
---|
93 | constructor(protected destination: Subscriber<T>,
|
---|
94 | private durationSelector: (value: T) => SubscribableOrPromise<number>,
|
---|
95 | private _leading: boolean,
|
---|
96 | private _trailing: boolean) {
|
---|
97 | super(destination);
|
---|
98 | }
|
---|
99 |
|
---|
100 | protected _next(value: T): void {
|
---|
101 | this._hasValue = true;
|
---|
102 | this._sendValue = value;
|
---|
103 |
|
---|
104 | if (!this._throttled) {
|
---|
105 | if (this._leading) {
|
---|
106 | this.send();
|
---|
107 | } else {
|
---|
108 | this.throttle(value);
|
---|
109 | }
|
---|
110 | }
|
---|
111 | }
|
---|
112 |
|
---|
113 | private send() {
|
---|
114 | const { _hasValue, _sendValue } = this;
|
---|
115 | if (_hasValue) {
|
---|
116 | this.destination.next(_sendValue);
|
---|
117 | this.throttle(_sendValue!);
|
---|
118 | }
|
---|
119 | this._hasValue = false;
|
---|
120 | this._sendValue = undefined;
|
---|
121 | }
|
---|
122 |
|
---|
123 | private throttle(value: T): void {
|
---|
124 | const duration = this.tryDurationSelector(value);
|
---|
125 | if (!!duration) {
|
---|
126 | this.add(this._throttled = innerSubscribe(duration, new SimpleInnerSubscriber(this)));
|
---|
127 | }
|
---|
128 | }
|
---|
129 |
|
---|
130 | private tryDurationSelector(value: T): SubscribableOrPromise<any> | null {
|
---|
131 | try {
|
---|
132 | return this.durationSelector(value);
|
---|
133 | } catch (err) {
|
---|
134 | this.destination.error(err);
|
---|
135 | return null;
|
---|
136 | }
|
---|
137 | }
|
---|
138 |
|
---|
139 | private throttlingDone() {
|
---|
140 | const { _throttled, _trailing } = this;
|
---|
141 | if (_throttled) {
|
---|
142 | _throttled.unsubscribe();
|
---|
143 | }
|
---|
144 | this._throttled = undefined;
|
---|
145 |
|
---|
146 | if (_trailing) {
|
---|
147 | this.send();
|
---|
148 | }
|
---|
149 | }
|
---|
150 |
|
---|
151 | notifyNext(): void {
|
---|
152 | this.throttlingDone();
|
---|
153 | }
|
---|
154 |
|
---|
155 | notifyComplete(): void {
|
---|
156 | this.throttlingDone();
|
---|
157 | }
|
---|
158 | }
|
---|