source: trip-planner-front/node_modules/rxjs/src/internal/operators/windowToggle.ts

Last change on this file was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 6.2 KB
RevLine 
[6a3a178]1import { Operator } from '../Operator';
2import { Subscriber } from '../Subscriber';
3import { Observable } from '../Observable';
4import { Subject } from '../Subject';
5import { Subscription } from '../Subscription';
6import { OuterSubscriber } from '../OuterSubscriber';
7import { InnerSubscriber } from '../InnerSubscriber';
8import { subscribeToResult } from '../util/subscribeToResult';
9import { OperatorFunction } from '../types';
10
11/**
12 * Branch out the source Observable values as a nested Observable starting from
13 * an emission from `openings` and ending when the output of `closingSelector`
14 * emits.
15 *
16 * <span class="informal">It's like {@link bufferToggle}, but emits a nested
17 * Observable instead of an array.</span>
18 *
19 * ![](windowToggle.png)
20 *
21 * Returns an Observable that emits windows of items it collects from the source
22 * Observable. The output Observable emits windows that contain those items
23 * emitted by the source Observable between the time when the `openings`
24 * Observable emits an item and when the Observable returned by
25 * `closingSelector` emits an item.
26 *
27 * ## Example
28 * Every other second, emit the click events from the next 500ms
29 * ```ts
30 * import { fromEvent, interval, EMPTY } from 'rxjs';
31 * import { windowToggle, mergeAll } from 'rxjs/operators';
32 *
33 * const clicks = fromEvent(document, 'click');
34 * const openings = interval(1000);
35 * const result = clicks.pipe(
36 * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
37 * mergeAll()
38 * );
39 * result.subscribe(x => console.log(x));
40 * ```
41 *
42 * @see {@link window}
43 * @see {@link windowCount}
44 * @see {@link windowTime}
45 * @see {@link windowWhen}
46 * @see {@link bufferToggle}
47 *
48 * @param {Observable<O>} openings An observable of notifications to start new
49 * windows.
50 * @param {function(value: O): Observable} closingSelector A function that takes
51 * the value emitted by the `openings` observable and returns an Observable,
52 * which, when it emits (either `next` or `complete`), signals that the
53 * associated window should complete.
54 * @return {Observable<Observable<T>>} An observable of windows, which in turn
55 * are Observables.
56 * @method windowToggle
57 * @owner Observable
58 */
59export function windowToggle<T, O>(openings: Observable<O>,
60 closingSelector: (openValue: O) => Observable<any>): OperatorFunction<T, Observable<T>> {
61 return (source: Observable<T>) => source.lift(new WindowToggleOperator<T, O>(openings, closingSelector));
62}
63
64class WindowToggleOperator<T, O> implements Operator<T, Observable<T>> {
65
66 constructor(private openings: Observable<O>,
67 private closingSelector: (openValue: O) => Observable<any>) {
68 }
69
70 call(subscriber: Subscriber<Observable<T>>, source: any): any {
71 return source.subscribe(new WindowToggleSubscriber(
72 subscriber, this.openings, this.closingSelector
73 ));
74 }
75}
76
77interface WindowContext<T> {
78 window: Subject<T>;
79 subscription: Subscription;
80}
81
82/**
83 * We need this JSDoc comment for affecting ESDoc.
84 * @ignore
85 * @extends {Ignored}
86 */
87class WindowToggleSubscriber<T, O> extends OuterSubscriber<T, any> {
88 private contexts: WindowContext<T>[] = [];
89 private openSubscription: Subscription;
90
91 constructor(destination: Subscriber<Observable<T>>,
92 private openings: Observable<O>,
93 private closingSelector: (openValue: O) => Observable<any>) {
94 super(destination);
95 this.add(this.openSubscription = subscribeToResult(this, openings, openings as any));
96 }
97
98 protected _next(value: T) {
99 const { contexts } = this;
100 if (contexts) {
101 const len = contexts.length;
102 for (let i = 0; i < len; i++) {
103 contexts[i].window.next(value);
104 }
105 }
106 }
107
108 protected _error(err: any) {
109
110 const { contexts } = this;
111 this.contexts = null;
112
113 if (contexts) {
114 const len = contexts.length;
115 let index = -1;
116
117 while (++index < len) {
118 const context = contexts[index];
119 context.window.error(err);
120 context.subscription.unsubscribe();
121 }
122 }
123
124 super._error(err);
125 }
126
127 protected _complete() {
128 const { contexts } = this;
129 this.contexts = null;
130 if (contexts) {
131 const len = contexts.length;
132 let index = -1;
133 while (++index < len) {
134 const context = contexts[index];
135 context.window.complete();
136 context.subscription.unsubscribe();
137 }
138 }
139 super._complete();
140 }
141
142 /** @deprecated This is an internal implementation detail, do not use. */
143 _unsubscribe() {
144 const { contexts } = this;
145 this.contexts = null;
146 if (contexts) {
147 const len = contexts.length;
148 let index = -1;
149 while (++index < len) {
150 const context = contexts[index];
151 context.window.unsubscribe();
152 context.subscription.unsubscribe();
153 }
154 }
155 }
156
157 notifyNext(outerValue: any, innerValue: any,
158 outerIndex: number, innerIndex: number,
159 innerSub: InnerSubscriber<T, any>): void {
160
161 if (outerValue === this.openings) {
162 let closingNotifier;
163 try {
164 const { closingSelector } = this;
165 closingNotifier = closingSelector(innerValue);
166 } catch (e) {
167 return this.error(e);
168 }
169
170 const window = new Subject<T>();
171 const subscription = new Subscription();
172 const context = { window, subscription };
173 this.contexts.push(context);
174 const innerSubscription = subscribeToResult(this, closingNotifier, context as any);
175
176 if (innerSubscription.closed) {
177 this.closeWindow(this.contexts.length - 1);
178 } else {
179 (<any>innerSubscription).context = context;
180 subscription.add(innerSubscription);
181 }
182
183 this.destination.next(window);
184 } else {
185 this.closeWindow(this.contexts.indexOf(outerValue));
186 }
187 }
188
189 notifyError(err: any): void {
190 this.error(err);
191 }
192
193 notifyComplete(inner: Subscription): void {
194 if (inner !== this.openSubscription) {
195 this.closeWindow(this.contexts.indexOf((<any> inner).context));
196 }
197 }
198
199 private closeWindow(index: number): void {
200 if (index === -1) {
201 return;
202 }
203
204 const { contexts } = this;
205 const context = contexts[index];
206 const { window, subscription } = context;
207 contexts.splice(index, 1);
208 window.complete();
209 subscription.unsubscribe();
210 }
211}
Note: See TracBrowser for help on using the repository browser.