source: trip-planner-front/node_modules/rxjs/src/internal/operators/bufferWhen.ts

Last change on this file was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 3.9 KB
Line 
1import { Operator } from '../Operator';
2import { Subscriber } from '../Subscriber';
3import { Observable } from '../Observable';
4import { Subscription } from '../Subscription';
5import { OperatorFunction } from '../types';
6import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
7
8/**
9 * Buffers the source Observable values, using a factory function of closing
10 * Observables to determine when to close, emit, and reset the buffer.
11 *
12 * <span class="informal">Collects values from the past as an array. When it
13 * starts collecting values, it calls a function that returns an Observable that
14 * tells when to close the buffer and restart collecting.</span>
15 *
16 * ![](bufferWhen.png)
17 *
18 * Opens a buffer immediately, then closes the buffer when the observable
19 * returned by calling `closingSelector` function emits a value. When it closes
20 * the buffer, it immediately opens a new buffer and repeats the process.
21 *
22 * ## Example
23 *
24 * Emit an array of the last clicks every [1-5] random seconds
25 *
26 * ```ts
27 * import { fromEvent, interval } from 'rxjs';
28 * import { bufferWhen } from 'rxjs/operators';
29 *
30 * const clicks = fromEvent(document, 'click');
31 * const buffered = clicks.pipe(bufferWhen(() =>
32 * interval(1000 + Math.random() * 4000)
33 * ));
34 * buffered.subscribe(x => console.log(x));
35 * ```
36 *
37 *
38 * @see {@link buffer}
39 * @see {@link bufferCount}
40 * @see {@link bufferTime}
41 * @see {@link bufferToggle}
42 * @see {@link windowWhen}
43 *
44 * @param {function(): Observable} closingSelector A function that takes no
45 * arguments and returns an Observable that signals buffer closure.
46 * @return {Observable<T[]>} An observable of arrays of buffered values.
47 * @method bufferWhen
48 * @owner Observable
49 */
50export function bufferWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, T[]> {
51 return function (source: Observable<T>) {
52 return source.lift(new BufferWhenOperator(closingSelector));
53 };
54}
55
56class BufferWhenOperator<T> implements Operator<T, T[]> {
57
58 constructor(private closingSelector: () => Observable<any>) {
59 }
60
61 call(subscriber: Subscriber<T[]>, source: any): any {
62 return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
63 }
64}
65
66/**
67 * We need this JSDoc comment for affecting ESDoc.
68 * @ignore
69 * @extends {Ignored}
70 */
71class BufferWhenSubscriber<T> extends SimpleOuterSubscriber<T, any> {
72 private buffer?: T[];
73 private subscribing: boolean = false;
74 private closingSubscription?: Subscription;
75
76 constructor(destination: Subscriber<T[]>, private closingSelector: () => Observable<any>) {
77 super(destination);
78 this.openBuffer();
79 }
80
81 protected _next(value: T) {
82 this.buffer!.push(value);
83 }
84
85 protected _complete() {
86 const buffer = this.buffer;
87 if (buffer) {
88 this.destination.next!(buffer);
89 }
90 super._complete();
91 }
92
93 /** @deprecated This is an internal implementation detail, do not use. */
94 _unsubscribe() {
95 this.buffer = undefined;
96 this.subscribing = false;
97 }
98
99 notifyNext(): void {
100 this.openBuffer();
101 }
102
103 notifyComplete(): void {
104 if (this.subscribing) {
105 this.complete();
106 } else {
107 this.openBuffer();
108 }
109 }
110
111 openBuffer() {
112 let { closingSubscription } = this;
113
114 if (closingSubscription) {
115 this.remove(closingSubscription);
116 closingSubscription.unsubscribe();
117 }
118
119 const buffer = this.buffer;
120 if (this.buffer) {
121 this.destination.next!(buffer);
122 }
123
124 this.buffer = [];
125
126 let closingNotifier;
127 try {
128 const { closingSelector } = this;
129 closingNotifier = closingSelector();
130 } catch (err) {
131 return this.error(err);
132 }
133 closingSubscription = new Subscription();
134 this.closingSubscription = closingSubscription;
135 this.add(closingSubscription);
136 this.subscribing = true;
137 closingSubscription.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this)));
138 this.subscribing = false;
139 }
140}
Note: See TracBrowser for help on using the repository browser.