source: trip-planner-front/node_modules/rxjs/src/internal/operators/bufferCount.ts@ 6a3a178

Last change on this file since 6a3a178 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 4.5 KB
Line 
1import { Operator } from '../Operator';
2import { Subscriber } from '../Subscriber';
3import { Observable } from '../Observable';
4import { OperatorFunction, TeardownLogic } from '../types';
5
6/**
7 * Buffers the source Observable values until the size hits the maximum
8 * `bufferSize` given.
9 *
10 * <span class="informal">Collects values from the past as an array, and emits
11 * that array only when its size reaches `bufferSize`.</span>
12 *
13 * ![](bufferCount.png)
14 *
15 * Buffers a number of values from the source Observable by `bufferSize` then
16 * emits the buffer and clears it, and starts a new buffer each
17 * `startBufferEvery` values. If `startBufferEvery` is not provided or is
18 * `null`, then new buffers are started immediately at the start of the source
19 * and when each buffer closes and is emitted.
20 *
21 * ## Examples
22 *
23 * Emit the last two click events as an array
24 *
25 * ```ts
26 * import { fromEvent } from 'rxjs';
27 * import { bufferCount } from 'rxjs/operators';
28 *
29 * const clicks = fromEvent(document, 'click');
30 * const buffered = clicks.pipe(bufferCount(2));
31 * buffered.subscribe(x => console.log(x));
32 * ```
33 *
34 * On every click, emit the last two click events as an array
35 *
36 * ```ts
37 * import { fromEvent } from 'rxjs';
38 * import { bufferCount } from 'rxjs/operators';
39 *
40 * const clicks = fromEvent(document, 'click');
41 * const buffered = clicks.pipe(bufferCount(2, 1));
42 * buffered.subscribe(x => console.log(x));
43 * ```
44 *
45 * @see {@link buffer}
46 * @see {@link bufferTime}
47 * @see {@link bufferToggle}
48 * @see {@link bufferWhen}
49 * @see {@link pairwise}
50 * @see {@link windowCount}
51 *
52 * @param {number} bufferSize The maximum size of the buffer emitted.
53 * @param {number} [startBufferEvery] Interval at which to start a new buffer.
54 * For example if `startBufferEvery` is `2`, then a new buffer will be started
55 * on every other value from the source. A new buffer is started at the
56 * beginning of the source by default.
57 * @return {Observable<T[]>} An Observable of arrays of buffered values.
58 * @method bufferCount
59 * @owner Observable
60 */
61export function bufferCount<T>(bufferSize: number, startBufferEvery: number = null): OperatorFunction<T, T[]> {
62 return function bufferCountOperatorFunction(source: Observable<T>) {
63 return source.lift(new BufferCountOperator<T>(bufferSize, startBufferEvery));
64 };
65}
66
67class BufferCountOperator<T> implements Operator<T, T[]> {
68 private subscriberClass: any;
69
70 constructor(private bufferSize: number, private startBufferEvery: number) {
71 if (!startBufferEvery || bufferSize === startBufferEvery) {
72 this.subscriberClass = BufferCountSubscriber;
73 } else {
74 this.subscriberClass = BufferSkipCountSubscriber;
75 }
76 }
77
78 call(subscriber: Subscriber<T[]>, source: any): TeardownLogic {
79 return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery));
80 }
81}
82
83/**
84 * We need this JSDoc comment for affecting ESDoc.
85 * @ignore
86 * @extends {Ignored}
87 */
88class BufferCountSubscriber<T> extends Subscriber<T> {
89 private buffer: T[] = [];
90
91 constructor(destination: Subscriber<T[]>, private bufferSize: number) {
92 super(destination);
93 }
94
95 protected _next(value: T): void {
96 const buffer = this.buffer;
97
98 buffer.push(value);
99
100 if (buffer.length == this.bufferSize) {
101 this.destination.next(buffer);
102 this.buffer = [];
103 }
104 }
105
106 protected _complete(): void {
107 const buffer = this.buffer;
108 if (buffer.length > 0) {
109 this.destination.next(buffer);
110 }
111 super._complete();
112 }
113}
114
115/**
116 * We need this JSDoc comment for affecting ESDoc.
117 * @ignore
118 * @extends {Ignored}
119 */
120class BufferSkipCountSubscriber<T> extends Subscriber<T> {
121 private buffers: Array<T[]> = [];
122 private count: number = 0;
123
124 constructor(destination: Subscriber<T[]>, private bufferSize: number, private startBufferEvery: number) {
125 super(destination);
126 }
127
128 protected _next(value: T): void {
129 const { bufferSize, startBufferEvery, buffers, count } = this;
130
131 this.count++;
132 if (count % startBufferEvery === 0) {
133 buffers.push([]);
134 }
135
136 for (let i = buffers.length; i--; ) {
137 const buffer = buffers[i];
138 buffer.push(value);
139 if (buffer.length === bufferSize) {
140 buffers.splice(i, 1);
141 this.destination.next(buffer);
142 }
143 }
144 }
145
146 protected _complete(): void {
147 const { buffers, destination } = this;
148
149 while (buffers.length > 0) {
150 let buffer = buffers.shift();
151 if (buffer.length > 0) {
152 destination.next(buffer);
153 }
154 }
155 super._complete();
156 }
157
158}
Note: See TracBrowser for help on using the repository browser.