[6a3a178] | 1 | import { Operator } from '../Operator';
|
---|
| 2 | import { Observable } from '../Observable';
|
---|
| 3 | import { Subscriber } from '../Subscriber';
|
---|
| 4 | import { Subscription } from '../Subscription';
|
---|
| 5 |
|
---|
| 6 | import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types';
|
---|
| 7 | import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
|
---|
| 8 |
|
---|
| 9 | export interface ThrottleConfig {
|
---|
| 10 | leading?: boolean;
|
---|
| 11 | trailing?: boolean;
|
---|
| 12 | }
|
---|
| 13 |
|
---|
| 14 | export const defaultThrottleConfig: ThrottleConfig = {
|
---|
| 15 | leading: true,
|
---|
| 16 | trailing: false
|
---|
| 17 | };
|
---|
| 18 |
|
---|
| 19 | /**
|
---|
| 20 | * Emits a value from the source Observable, then ignores subsequent source
|
---|
| 21 | * values for a duration determined by another Observable, then repeats this
|
---|
| 22 | * process.
|
---|
| 23 | *
|
---|
| 24 | * <span class="informal">It's like {@link throttleTime}, but the silencing
|
---|
| 25 | * duration is determined by a second Observable.</span>
|
---|
| 26 | *
|
---|
| 27 | * ![](throttle.png)
|
---|
| 28 | *
|
---|
| 29 | * `throttle` emits the source Observable values on the output Observable
|
---|
| 30 | * when its internal timer is disabled, and ignores source values when the timer
|
---|
| 31 | * is enabled. Initially, the timer is disabled. As soon as the first source
|
---|
| 32 | * value arrives, it is forwarded to the output Observable, and then the timer
|
---|
| 33 | * is enabled by calling the `durationSelector` function with the source value,
|
---|
| 34 | * which returns the "duration" Observable. When the duration Observable emits a
|
---|
| 35 | * value or completes, the timer is disabled, and this process repeats for the
|
---|
| 36 | * next source value.
|
---|
| 37 | *
|
---|
| 38 | * ## Example
|
---|
| 39 | * Emit clicks at a rate of at most one click per second
|
---|
| 40 | * ```ts
|
---|
| 41 | * import { fromEvent } from 'rxjs';
|
---|
| 42 | * import { throttle } from 'rxjs/operators';
|
---|
| 43 | *
|
---|
| 44 | * const clicks = fromEvent(document, 'click');
|
---|
| 45 | * const result = clicks.pipe(throttle(ev => interval(1000)));
|
---|
| 46 | * result.subscribe(x => console.log(x));
|
---|
| 47 | * ```
|
---|
| 48 | *
|
---|
| 49 | * @see {@link audit}
|
---|
| 50 | * @see {@link debounce}
|
---|
| 51 | * @see {@link delayWhen}
|
---|
| 52 | * @see {@link sample}
|
---|
| 53 | * @see {@link throttleTime}
|
---|
| 54 | *
|
---|
| 55 | * @param {function(value: T): SubscribableOrPromise} durationSelector A function
|
---|
| 56 | * that receives a value from the source Observable, for computing the silencing
|
---|
| 57 | * duration for each source value, returned as an Observable or a Promise.
|
---|
| 58 | * @param {Object} config a configuration object to define `leading` and `trailing` behavior. Defaults
|
---|
| 59 | * to `{ leading: true, trailing: false }`.
|
---|
| 60 | * @return {Observable<T>} An Observable that performs the throttle operation to
|
---|
| 61 | * limit the rate of emissions from the source.
|
---|
| 62 | * @method throttle
|
---|
| 63 | * @owner Observable
|
---|
| 64 | */
|
---|
| 65 | export function throttle<T>(durationSelector: (value: T) => SubscribableOrPromise<any>,
|
---|
| 66 | config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction<T> {
|
---|
| 67 | return (source: Observable<T>) => source.lift(new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing));
|
---|
| 68 | }
|
---|
| 69 |
|
---|
| 70 | class ThrottleOperator<T> implements Operator<T, T> {
|
---|
| 71 | constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>,
|
---|
| 72 | private leading: boolean,
|
---|
| 73 | private trailing: boolean) {
|
---|
| 74 | }
|
---|
| 75 |
|
---|
| 76 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
| 77 | return source.subscribe(
|
---|
| 78 | new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing)
|
---|
| 79 | );
|
---|
| 80 | }
|
---|
| 81 | }
|
---|
| 82 |
|
---|
| 83 | /**
|
---|
| 84 | * We need this JSDoc comment for affecting ESDoc
|
---|
| 85 | * @ignore
|
---|
| 86 | * @extends {Ignored}
|
---|
| 87 | */
|
---|
| 88 | class ThrottleSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
|
---|
| 89 | private _throttled?: Subscription;
|
---|
| 90 | private _sendValue?: T;
|
---|
| 91 | private _hasValue = false;
|
---|
| 92 |
|
---|
| 93 | constructor(protected destination: Subscriber<T>,
|
---|
| 94 | private durationSelector: (value: T) => SubscribableOrPromise<number>,
|
---|
| 95 | private _leading: boolean,
|
---|
| 96 | private _trailing: boolean) {
|
---|
| 97 | super(destination);
|
---|
| 98 | }
|
---|
| 99 |
|
---|
| 100 | protected _next(value: T): void {
|
---|
| 101 | this._hasValue = true;
|
---|
| 102 | this._sendValue = value;
|
---|
| 103 |
|
---|
| 104 | if (!this._throttled) {
|
---|
| 105 | if (this._leading) {
|
---|
| 106 | this.send();
|
---|
| 107 | } else {
|
---|
| 108 | this.throttle(value);
|
---|
| 109 | }
|
---|
| 110 | }
|
---|
| 111 | }
|
---|
| 112 |
|
---|
| 113 | private send() {
|
---|
| 114 | const { _hasValue, _sendValue } = this;
|
---|
| 115 | if (_hasValue) {
|
---|
| 116 | this.destination.next(_sendValue);
|
---|
| 117 | this.throttle(_sendValue!);
|
---|
| 118 | }
|
---|
| 119 | this._hasValue = false;
|
---|
| 120 | this._sendValue = undefined;
|
---|
| 121 | }
|
---|
| 122 |
|
---|
| 123 | private throttle(value: T): void {
|
---|
| 124 | const duration = this.tryDurationSelector(value);
|
---|
| 125 | if (!!duration) {
|
---|
| 126 | this.add(this._throttled = innerSubscribe(duration, new SimpleInnerSubscriber(this)));
|
---|
| 127 | }
|
---|
| 128 | }
|
---|
| 129 |
|
---|
| 130 | private tryDurationSelector(value: T): SubscribableOrPromise<any> | null {
|
---|
| 131 | try {
|
---|
| 132 | return this.durationSelector(value);
|
---|
| 133 | } catch (err) {
|
---|
| 134 | this.destination.error(err);
|
---|
| 135 | return null;
|
---|
| 136 | }
|
---|
| 137 | }
|
---|
| 138 |
|
---|
| 139 | private throttlingDone() {
|
---|
| 140 | const { _throttled, _trailing } = this;
|
---|
| 141 | if (_throttled) {
|
---|
| 142 | _throttled.unsubscribe();
|
---|
| 143 | }
|
---|
| 144 | this._throttled = undefined;
|
---|
| 145 |
|
---|
| 146 | if (_trailing) {
|
---|
| 147 | this.send();
|
---|
| 148 | }
|
---|
| 149 | }
|
---|
| 150 |
|
---|
| 151 | notifyNext(): void {
|
---|
| 152 | this.throttlingDone();
|
---|
| 153 | }
|
---|
| 154 |
|
---|
| 155 | notifyComplete(): void {
|
---|
| 156 | this.throttlingDone();
|
---|
| 157 | }
|
---|
| 158 | }
|
---|