source: trip-planner-front/node_modules/rxjs/src/internal/operators/bufferTime.ts@ 8d391a1

Last change on this file since 8d391a1 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 8.5 KB
Line 
1import { Operator } from '../Operator';
2import { async } from '../scheduler/async';
3import { Observable } from '../Observable';
4import { Subscriber } from '../Subscriber';
5import { Subscription } from '../Subscription';
6import { isScheduler } from '../util/isScheduler';
7import { OperatorFunction, SchedulerAction, SchedulerLike } from '../types';
8
9/* tslint:disable:max-line-length */
10export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
11export function bufferTime<T>(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
12export function bufferTime<T>(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
13/* tslint:enable:max-line-length */
14
15/**
16 * Buffers the source Observable values for a specific time period.
17 *
18 * <span class="informal">Collects values from the past as an array, and emits
19 * those arrays periodically in time.</span>
20 *
21 * ![](bufferTime.png)
22 *
23 * Buffers values from the source for a specific time duration `bufferTimeSpan`.
24 * Unless the optional argument `bufferCreationInterval` is given, it emits and
25 * resets the buffer every `bufferTimeSpan` milliseconds. If
26 * `bufferCreationInterval` is given, this operator opens the buffer every
27 * `bufferCreationInterval` milliseconds and closes (emits and resets) the
28 * buffer every `bufferTimeSpan` milliseconds. When the optional argument
29 * `maxBufferSize` is specified, the buffer will be closed either after
30 * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
31 *
32 * ## Examples
33 *
34 * Every second, emit an array of the recent click events
35 *
36 * ```ts
37 * import { fromEvent } from 'rxjs';
38 * import { bufferTime } from 'rxjs/operators';
39 *
40 * const clicks = fromEvent(document, 'click');
41 * const buffered = clicks.pipe(bufferTime(1000));
42 * buffered.subscribe(x => console.log(x));
43 * ```
44 *
45 * Every 5 seconds, emit the click events from the next 2 seconds
46 *
47 * ```ts
48 * import { fromEvent } from 'rxjs';
49 * import { bufferTime } from 'rxjs/operators';
50 *
51 * const clicks = fromEvent(document, 'click');
52 * const buffered = clicks.pipe(bufferTime(2000, 5000));
53 * buffered.subscribe(x => console.log(x));
54 * ```
55 *
56 * @see {@link buffer}
57 * @see {@link bufferCount}
58 * @see {@link bufferToggle}
59 * @see {@link bufferWhen}
60 * @see {@link windowTime}
61 *
62 * @param {number} bufferTimeSpan The amount of time to fill each buffer array.
63 * @param {number} [bufferCreationInterval] The interval at which to start new
64 * buffers.
65 * @param {number} [maxBufferSize] The maximum buffer size.
66 * @param {SchedulerLike} [scheduler=async] The scheduler on which to schedule the
67 * intervals that determine buffer boundaries.
68 * @return {Observable<T[]>} An observable of arrays of buffered values.
69 * @method bufferTime
70 * @owner Observable
71 */
72export function bufferTime<T>(bufferTimeSpan: number): OperatorFunction<T, T[]> {
73 let length: number = arguments.length;
74
75 let scheduler: SchedulerLike = async;
76 if (isScheduler(arguments[arguments.length - 1])) {
77 scheduler = arguments[arguments.length - 1];
78 length--;
79 }
80
81 let bufferCreationInterval: number = null;
82 if (length >= 2) {
83 bufferCreationInterval = arguments[1];
84 }
85
86 let maxBufferSize: number = Number.POSITIVE_INFINITY;
87 if (length >= 3) {
88 maxBufferSize = arguments[2];
89 }
90
91 return function bufferTimeOperatorFunction(source: Observable<T>) {
92 return source.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
93 };
94}
95
96class BufferTimeOperator<T> implements Operator<T, T[]> {
97 constructor(private bufferTimeSpan: number,
98 private bufferCreationInterval: number,
99 private maxBufferSize: number,
100 private scheduler: SchedulerLike) {
101 }
102
103 call(subscriber: Subscriber<T[]>, source: any): any {
104 return source.subscribe(new BufferTimeSubscriber(
105 subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler
106 ));
107 }
108}
109
110class Context<T> {
111 buffer: T[] = [];
112 closeAction: Subscription;
113}
114
115interface DispatchCreateArg<T> {
116 bufferTimeSpan: number;
117 bufferCreationInterval: number;
118 subscriber: BufferTimeSubscriber<T>;
119 scheduler: SchedulerLike;
120}
121
122interface DispatchCloseArg<T> {
123 subscriber: BufferTimeSubscriber<T>;
124 context: Context<T>;
125}
126
127/**
128 * We need this JSDoc comment for affecting ESDoc.
129 * @ignore
130 * @extends {Ignored}
131 */
132class BufferTimeSubscriber<T> extends Subscriber<T> {
133 private contexts: Array<Context<T>> = [];
134 private timespanOnly: boolean;
135
136 constructor(destination: Subscriber<T[]>,
137 private bufferTimeSpan: number,
138 private bufferCreationInterval: number,
139 private maxBufferSize: number,
140 private scheduler: SchedulerLike) {
141 super(destination);
142 const context = this.openContext();
143 this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
144 if (this.timespanOnly) {
145 const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
146 this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
147 } else {
148 const closeState = { subscriber: this, context };
149 const creationState: DispatchCreateArg<T> = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
150 this.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, closeState));
151 this.add(scheduler.schedule<DispatchCreateArg<T>>(dispatchBufferCreation, bufferCreationInterval, creationState));
152 }
153 }
154
155 protected _next(value: T) {
156 const contexts = this.contexts;
157 const len = contexts.length;
158 let filledBufferContext: Context<T>;
159 for (let i = 0; i < len; i++) {
160 const context = contexts[i];
161 const buffer = context.buffer;
162 buffer.push(value);
163 if (buffer.length == this.maxBufferSize) {
164 filledBufferContext = context;
165 }
166 }
167
168 if (filledBufferContext) {
169 this.onBufferFull(filledBufferContext);
170 }
171 }
172
173 protected _error(err: any) {
174 this.contexts.length = 0;
175 super._error(err);
176 }
177
178 protected _complete() {
179 const { contexts, destination } = this;
180 while (contexts.length > 0) {
181 const context = contexts.shift();
182 destination.next(context.buffer);
183 }
184 super._complete();
185 }
186
187 /** @deprecated This is an internal implementation detail, do not use. */
188 _unsubscribe() {
189 this.contexts = null;
190 }
191
192 protected onBufferFull(context: Context<T>) {
193 this.closeContext(context);
194 const closeAction = context.closeAction;
195 closeAction.unsubscribe();
196 this.remove(closeAction);
197
198 if (!this.closed && this.timespanOnly) {
199 context = this.openContext();
200 const bufferTimeSpan = this.bufferTimeSpan;
201 const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
202 this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
203 }
204 }
205
206 openContext(): Context<T> {
207 const context: Context<T> = new Context<T>();
208 this.contexts.push(context);
209 return context;
210 }
211
212 closeContext(context: Context<T>) {
213 this.destination.next(context.buffer);
214 const contexts = this.contexts;
215
216 const spliceIndex = contexts ? contexts.indexOf(context) : -1;
217 if (spliceIndex >= 0) {
218 contexts.splice(contexts.indexOf(context), 1);
219 }
220 }
221}
222
223function dispatchBufferTimeSpanOnly(this: SchedulerAction<any>, state: any) {
224 const subscriber: BufferTimeSubscriber<any> = state.subscriber;
225
226 const prevContext = state.context;
227 if (prevContext) {
228 subscriber.closeContext(prevContext);
229 }
230
231 if (!subscriber.closed) {
232 state.context = subscriber.openContext();
233 state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
234 }
235}
236
237function dispatchBufferCreation<T>(this: SchedulerAction<DispatchCreateArg<T>>, state: DispatchCreateArg<T>) {
238 const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
239 const context = subscriber.openContext();
240 const action = <SchedulerAction<DispatchCreateArg<T>>>this;
241 if (!subscriber.closed) {
242 subscriber.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
243 action.schedule(state, bufferCreationInterval);
244 }
245}
246
247function dispatchBufferClose<T>(arg: DispatchCloseArg<T>) {
248 const { subscriber, context } = arg;
249 subscriber.closeContext(context);
250}
Note: See TracBrowser for help on using the repository browser.