source: trip-planner-front/node_modules/rxjs/_esm2015/internal/operators/expand.js@ 6a3a178

Last change on this file since 6a3a178 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 3.0 KB
Line 
1import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
2export function expand(project, concurrent = Number.POSITIVE_INFINITY, scheduler) {
3 concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent;
4 return (source) => source.lift(new ExpandOperator(project, concurrent, scheduler));
5}
6export class ExpandOperator {
7 constructor(project, concurrent, scheduler) {
8 this.project = project;
9 this.concurrent = concurrent;
10 this.scheduler = scheduler;
11 }
12 call(subscriber, source) {
13 return source.subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler));
14 }
15}
16export class ExpandSubscriber extends SimpleOuterSubscriber {
17 constructor(destination, project, concurrent, scheduler) {
18 super(destination);
19 this.project = project;
20 this.concurrent = concurrent;
21 this.scheduler = scheduler;
22 this.index = 0;
23 this.active = 0;
24 this.hasCompleted = false;
25 if (concurrent < Number.POSITIVE_INFINITY) {
26 this.buffer = [];
27 }
28 }
29 static dispatch(arg) {
30 const { subscriber, result, value, index } = arg;
31 subscriber.subscribeToProjection(result, value, index);
32 }
33 _next(value) {
34 const destination = this.destination;
35 if (destination.closed) {
36 this._complete();
37 return;
38 }
39 const index = this.index++;
40 if (this.active < this.concurrent) {
41 destination.next(value);
42 try {
43 const { project } = this;
44 const result = project(value, index);
45 if (!this.scheduler) {
46 this.subscribeToProjection(result, value, index);
47 }
48 else {
49 const state = { subscriber: this, result, value, index };
50 const destination = this.destination;
51 destination.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state));
52 }
53 }
54 catch (e) {
55 destination.error(e);
56 }
57 }
58 else {
59 this.buffer.push(value);
60 }
61 }
62 subscribeToProjection(result, value, index) {
63 this.active++;
64 const destination = this.destination;
65 destination.add(innerSubscribe(result, new SimpleInnerSubscriber(this)));
66 }
67 _complete() {
68 this.hasCompleted = true;
69 if (this.hasCompleted && this.active === 0) {
70 this.destination.complete();
71 }
72 this.unsubscribe();
73 }
74 notifyNext(innerValue) {
75 this._next(innerValue);
76 }
77 notifyComplete() {
78 const buffer = this.buffer;
79 this.active--;
80 if (buffer && buffer.length > 0) {
81 this._next(buffer.shift());
82 }
83 if (this.hasCompleted && this.active === 0) {
84 this.destination.complete();
85 }
86 }
87}
88//# sourceMappingURL=expand.js.map
Note: See TracBrowser for help on using the repository browser.