[6a3a178] | 1 | import { Subject } from '../Subject';
|
---|
| 2 | import { Operator } from '../Operator';
|
---|
| 3 | import { async } from '../scheduler/async';
|
---|
| 4 | import { Subscriber } from '../Subscriber';
|
---|
| 5 | import { Observable } from '../Observable';
|
---|
| 6 | import { Subscription } from '../Subscription';
|
---|
| 7 | import { isNumeric } from '../util/isNumeric';
|
---|
| 8 | import { isScheduler } from '../util/isScheduler';
|
---|
| 9 | import { OperatorFunction, SchedulerLike, SchedulerAction } from '../types';
|
---|
| 10 |
|
---|
| 11 | /**
|
---|
| 12 | * Branch out the source Observable values as a nested Observable periodically
|
---|
| 13 | * in time.
|
---|
| 14 | *
|
---|
| 15 | * <span class="informal">It's like {@link bufferTime}, but emits a nested
|
---|
| 16 | * Observable instead of an array.</span>
|
---|
| 17 | *
|
---|
| 18 | * ![](windowTime.png)
|
---|
| 19 | *
|
---|
| 20 | * Returns an Observable that emits windows of items it collects from the source
|
---|
| 21 | * Observable. The output Observable starts a new window periodically, as
|
---|
| 22 | * determined by the `windowCreationInterval` argument. It emits each window
|
---|
| 23 | * after a fixed timespan, specified by the `windowTimeSpan` argument. When the
|
---|
| 24 | * source Observable completes or encounters an error, the output Observable
|
---|
| 25 | * emits the current window and propagates the notification from the source
|
---|
| 26 | * Observable. If `windowCreationInterval` is not provided, the output
|
---|
| 27 | * Observable starts a new window when the previous window of duration
|
---|
| 28 | * `windowTimeSpan` completes. If `maxWindowCount` is provided, each window
|
---|
| 29 | * will emit at most fixed number of values. Window will complete immediately
|
---|
| 30 | * after emitting last value and next one still will open as specified by
|
---|
| 31 | * `windowTimeSpan` and `windowCreationInterval` arguments.
|
---|
| 32 | *
|
---|
| 33 | * ## Examples
|
---|
| 34 | * In every window of 1 second each, emit at most 2 click events
|
---|
| 35 | * ```ts
|
---|
| 36 | * import { fromEvent } from 'rxjs';
|
---|
| 37 | * import { windowTime, map, mergeAll, take } from 'rxjs/operators';
|
---|
| 38 | *
|
---|
| 39 | * const clicks = fromEvent(document, 'click');
|
---|
| 40 | * const result = clicks.pipe(
|
---|
| 41 | * windowTime(1000),
|
---|
| 42 | * map(win => win.pipe(take(2))), // each window has at most 2 emissions
|
---|
| 43 | * mergeAll(), // flatten the Observable-of-Observables
|
---|
| 44 | * );
|
---|
| 45 | * result.subscribe(x => console.log(x));
|
---|
| 46 | * ```
|
---|
| 47 | *
|
---|
| 48 | * Every 5 seconds start a window 1 second long, and emit at most 2 click events per window
|
---|
| 49 | * ```ts
|
---|
| 50 | * import { fromEvent } from 'rxjs';
|
---|
| 51 | * import { windowTime, map, mergeAll, take } from 'rxjs/operators';
|
---|
| 52 | *
|
---|
| 53 | * const clicks = fromEvent(document, 'click');
|
---|
| 54 | * const result = clicks.pipe(
|
---|
| 55 | * windowTime(1000, 5000),
|
---|
| 56 | * map(win => win.pipe(take(2))), // each window has at most 2 emissions
|
---|
| 57 | * mergeAll(), // flatten the Observable-of-Observables
|
---|
| 58 | * );
|
---|
| 59 | * result.subscribe(x => console.log(x));
|
---|
| 60 | * ```
|
---|
| 61 | *
|
---|
| 62 | * Same as example above but with maxWindowCount instead of take
|
---|
| 63 | * ```ts
|
---|
| 64 | * import { fromEvent } from 'rxjs';
|
---|
| 65 | * import { windowTime, mergeAll } from 'rxjs/operators';
|
---|
| 66 | *
|
---|
| 67 | * const clicks = fromEvent(document, 'click');
|
---|
| 68 | * const result = clicks.pipe(
|
---|
| 69 | * windowTime(1000, 5000, 2), // each window has still at most 2 emissions
|
---|
| 70 | * mergeAll(), // flatten the Observable-of-Observables
|
---|
| 71 | * );
|
---|
| 72 | * result.subscribe(x => console.log(x));
|
---|
| 73 | * ```
|
---|
| 74 | *
|
---|
| 75 | * @see {@link window}
|
---|
| 76 | * @see {@link windowCount}
|
---|
| 77 | * @see {@link windowToggle}
|
---|
| 78 | * @see {@link windowWhen}
|
---|
| 79 | * @see {@link bufferTime}
|
---|
| 80 | *
|
---|
| 81 | * @param {number} windowTimeSpan The amount of time to fill each window.
|
---|
| 82 | * @param {number} [windowCreationInterval] The interval at which to start new
|
---|
| 83 | * windows.
|
---|
| 84 | * @param {number} [maxWindowSize=Number.POSITIVE_INFINITY] Max number of
|
---|
| 85 | * values each window can emit before completion.
|
---|
| 86 | * @param {SchedulerLike} [scheduler=async] The scheduler on which to schedule the
|
---|
| 87 | * intervals that determine window boundaries.
|
---|
| 88 | * @return {Observable<Observable<T>>} An observable of windows, which in turn
|
---|
| 89 | * are Observables.
|
---|
| 90 | * @method windowTime
|
---|
| 91 | * @owner Observable
|
---|
| 92 | */
|
---|
| 93 | export function windowTime<T>(windowTimeSpan: number,
|
---|
| 94 | scheduler?: SchedulerLike): OperatorFunction<T, Observable<T>>;
|
---|
| 95 | export function windowTime<T>(windowTimeSpan: number,
|
---|
| 96 | windowCreationInterval: number,
|
---|
| 97 | scheduler?: SchedulerLike): OperatorFunction<T, Observable<T>>;
|
---|
| 98 | export function windowTime<T>(windowTimeSpan: number,
|
---|
| 99 | windowCreationInterval: number,
|
---|
| 100 | maxWindowSize: number,
|
---|
| 101 | scheduler?: SchedulerLike): OperatorFunction<T, Observable<T>>;
|
---|
| 102 |
|
---|
| 103 | export function windowTime<T>(windowTimeSpan: number): OperatorFunction<T, Observable<T>> {
|
---|
| 104 | let scheduler: SchedulerLike = async;
|
---|
| 105 | let windowCreationInterval: number = null;
|
---|
| 106 | let maxWindowSize: number = Number.POSITIVE_INFINITY;
|
---|
| 107 |
|
---|
| 108 | if (isScheduler(arguments[3])) {
|
---|
| 109 | scheduler = arguments[3];
|
---|
| 110 | }
|
---|
| 111 |
|
---|
| 112 | if (isScheduler(arguments[2])) {
|
---|
| 113 | scheduler = arguments[2];
|
---|
| 114 | } else if (isNumeric(arguments[2])) {
|
---|
| 115 | maxWindowSize = Number(arguments[2]);
|
---|
| 116 | }
|
---|
| 117 |
|
---|
| 118 | if (isScheduler(arguments[1])) {
|
---|
| 119 | scheduler = arguments[1];
|
---|
| 120 | } else if (isNumeric(arguments[1])) {
|
---|
| 121 | windowCreationInterval = Number(arguments[1]);
|
---|
| 122 | }
|
---|
| 123 |
|
---|
| 124 | return function windowTimeOperatorFunction(source: Observable<T>) {
|
---|
| 125 | return source.lift(new WindowTimeOperator<T>(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler));
|
---|
| 126 | };
|
---|
| 127 | }
|
---|
| 128 |
|
---|
| 129 | class WindowTimeOperator<T> implements Operator<T, Observable<T>> {
|
---|
| 130 |
|
---|
| 131 | constructor(private windowTimeSpan: number,
|
---|
| 132 | private windowCreationInterval: number | null,
|
---|
| 133 | private maxWindowSize: number,
|
---|
| 134 | private scheduler: SchedulerLike) {
|
---|
| 135 | }
|
---|
| 136 |
|
---|
| 137 | call(subscriber: Subscriber<Observable<T>>, source: any): any {
|
---|
| 138 | return source.subscribe(new WindowTimeSubscriber(
|
---|
| 139 | subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler
|
---|
| 140 | ));
|
---|
| 141 | }
|
---|
| 142 | }
|
---|
| 143 |
|
---|
| 144 | interface CreationState<T> {
|
---|
| 145 | windowTimeSpan: number;
|
---|
| 146 | windowCreationInterval: number;
|
---|
| 147 | subscriber: WindowTimeSubscriber<T>;
|
---|
| 148 | scheduler: SchedulerLike;
|
---|
| 149 | }
|
---|
| 150 |
|
---|
| 151 | interface TimeSpanOnlyState<T> {
|
---|
| 152 | window: CountedSubject<T>;
|
---|
| 153 | windowTimeSpan: number;
|
---|
| 154 | subscriber: WindowTimeSubscriber<T>;
|
---|
| 155 | }
|
---|
| 156 |
|
---|
| 157 | interface CloseWindowContext<T> {
|
---|
| 158 | action: SchedulerAction<CreationState<T>>;
|
---|
| 159 | subscription: Subscription;
|
---|
| 160 | }
|
---|
| 161 |
|
---|
| 162 | interface CloseState<T> {
|
---|
| 163 | subscriber: WindowTimeSubscriber<T>;
|
---|
| 164 | window: CountedSubject<T>;
|
---|
| 165 | context: CloseWindowContext<T>;
|
---|
| 166 | }
|
---|
| 167 |
|
---|
| 168 | class CountedSubject<T> extends Subject<T> {
|
---|
| 169 | private _numberOfNextedValues: number = 0;
|
---|
| 170 |
|
---|
| 171 | next(value?: T): void {
|
---|
| 172 | this._numberOfNextedValues++;
|
---|
| 173 | super.next(value);
|
---|
| 174 | }
|
---|
| 175 |
|
---|
| 176 | get numberOfNextedValues(): number {
|
---|
| 177 | return this._numberOfNextedValues;
|
---|
| 178 | }
|
---|
| 179 | }
|
---|
| 180 |
|
---|
| 181 | /**
|
---|
| 182 | * We need this JSDoc comment for affecting ESDoc.
|
---|
| 183 | * @ignore
|
---|
| 184 | * @extends {Ignored}
|
---|
| 185 | */
|
---|
| 186 | class WindowTimeSubscriber<T> extends Subscriber<T> {
|
---|
| 187 | private windows: CountedSubject<T>[] = [];
|
---|
| 188 |
|
---|
| 189 | constructor(protected destination: Subscriber<Observable<T>>,
|
---|
| 190 | private windowTimeSpan: number,
|
---|
| 191 | private windowCreationInterval: number | null,
|
---|
| 192 | private maxWindowSize: number,
|
---|
| 193 | private scheduler: SchedulerLike) {
|
---|
| 194 | super(destination);
|
---|
| 195 |
|
---|
| 196 | const window = this.openWindow();
|
---|
| 197 | if (windowCreationInterval !== null && windowCreationInterval >= 0) {
|
---|
| 198 | const closeState: CloseState<T> = { subscriber: this, window, context: <any>null };
|
---|
| 199 | const creationState: CreationState<T> = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler };
|
---|
| 200 | this.add(scheduler.schedule<CloseState<T>>(dispatchWindowClose, windowTimeSpan, closeState));
|
---|
| 201 | this.add(scheduler.schedule<CreationState<T>>(dispatchWindowCreation, windowCreationInterval, creationState));
|
---|
| 202 | } else {
|
---|
| 203 | const timeSpanOnlyState: TimeSpanOnlyState<T> = { subscriber: this, window, windowTimeSpan };
|
---|
| 204 | this.add(scheduler.schedule<TimeSpanOnlyState<T>>(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
|
---|
| 205 | }
|
---|
| 206 | }
|
---|
| 207 |
|
---|
| 208 | protected _next(value: T): void {
|
---|
| 209 | const windows = this.windows;
|
---|
| 210 | const len = windows.length;
|
---|
| 211 | for (let i = 0; i < len; i++) {
|
---|
| 212 | const window = windows[i];
|
---|
| 213 | if (!window.closed) {
|
---|
| 214 | window.next(value);
|
---|
| 215 | if (window.numberOfNextedValues >= this.maxWindowSize) {
|
---|
| 216 | this.closeWindow(window);
|
---|
| 217 | }
|
---|
| 218 | }
|
---|
| 219 | }
|
---|
| 220 | }
|
---|
| 221 |
|
---|
| 222 | protected _error(err: any): void {
|
---|
| 223 | const windows = this.windows;
|
---|
| 224 | while (windows.length > 0) {
|
---|
| 225 | windows.shift().error(err);
|
---|
| 226 | }
|
---|
| 227 | this.destination.error(err);
|
---|
| 228 | }
|
---|
| 229 |
|
---|
| 230 | protected _complete(): void {
|
---|
| 231 | const windows = this.windows;
|
---|
| 232 | while (windows.length > 0) {
|
---|
| 233 | const window = windows.shift();
|
---|
| 234 | if (!window.closed) {
|
---|
| 235 | window.complete();
|
---|
| 236 | }
|
---|
| 237 | }
|
---|
| 238 | this.destination.complete();
|
---|
| 239 | }
|
---|
| 240 |
|
---|
| 241 | public openWindow(): CountedSubject<T> {
|
---|
| 242 | const window = new CountedSubject<T>();
|
---|
| 243 | this.windows.push(window);
|
---|
| 244 | const destination = this.destination;
|
---|
| 245 | destination.next(window);
|
---|
| 246 | return window;
|
---|
| 247 | }
|
---|
| 248 |
|
---|
| 249 | public closeWindow(window: CountedSubject<T>): void {
|
---|
| 250 | window.complete();
|
---|
| 251 | const windows = this.windows;
|
---|
| 252 | windows.splice(windows.indexOf(window), 1);
|
---|
| 253 | }
|
---|
| 254 | }
|
---|
| 255 |
|
---|
| 256 | function dispatchWindowTimeSpanOnly<T>(this: SchedulerAction<TimeSpanOnlyState<T>>, state: TimeSpanOnlyState<T>): void {
|
---|
| 257 | const { subscriber, windowTimeSpan, window } = state;
|
---|
| 258 | if (window) {
|
---|
| 259 | subscriber.closeWindow(window);
|
---|
| 260 | }
|
---|
| 261 | state.window = subscriber.openWindow();
|
---|
| 262 | this.schedule(state, windowTimeSpan);
|
---|
| 263 | }
|
---|
| 264 |
|
---|
| 265 | function dispatchWindowCreation<T>(this: SchedulerAction<CreationState<T>>, state: CreationState<T>): void {
|
---|
| 266 | const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
|
---|
| 267 | const window = subscriber.openWindow();
|
---|
| 268 | const action = this;
|
---|
| 269 | let context: CloseWindowContext<T> = { action, subscription: <any>null };
|
---|
| 270 | const timeSpanState: CloseState<T> = { subscriber, window, context };
|
---|
| 271 | context.subscription = scheduler.schedule<CloseState<T>>(dispatchWindowClose, windowTimeSpan, timeSpanState);
|
---|
| 272 | action.add(context.subscription);
|
---|
| 273 | action.schedule(state, windowCreationInterval);
|
---|
| 274 | }
|
---|
| 275 |
|
---|
| 276 | function dispatchWindowClose<T>(state: CloseState<T>): void {
|
---|
| 277 | const { subscriber, window, context } = state;
|
---|
| 278 | if (context && context.action && context.subscription) {
|
---|
| 279 | context.action.remove(context.subscription);
|
---|
| 280 | }
|
---|
| 281 | subscriber.closeWindow(window);
|
---|
| 282 | }
|
---|