source: trip-planner-front/node_modules/rxjs/_esm2015/internal/operators/mergeMap.js@ 6a3a178

Last change on this file since 6a3a178 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 2.7 KB
Line 
1import { map } from './map';
2import { from } from '../observable/from';
3import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
4export function mergeMap(project, resultSelector, concurrent = Number.POSITIVE_INFINITY) {
5 if (typeof resultSelector === 'function') {
6 return (source) => source.pipe(mergeMap((a, i) => from(project(a, i)).pipe(map((b, ii) => resultSelector(a, b, i, ii))), concurrent));
7 }
8 else if (typeof resultSelector === 'number') {
9 concurrent = resultSelector;
10 }
11 return (source) => source.lift(new MergeMapOperator(project, concurrent));
12}
13export class MergeMapOperator {
14 constructor(project, concurrent = Number.POSITIVE_INFINITY) {
15 this.project = project;
16 this.concurrent = concurrent;
17 }
18 call(observer, source) {
19 return source.subscribe(new MergeMapSubscriber(observer, this.project, this.concurrent));
20 }
21}
22export class MergeMapSubscriber extends SimpleOuterSubscriber {
23 constructor(destination, project, concurrent = Number.POSITIVE_INFINITY) {
24 super(destination);
25 this.project = project;
26 this.concurrent = concurrent;
27 this.hasCompleted = false;
28 this.buffer = [];
29 this.active = 0;
30 this.index = 0;
31 }
32 _next(value) {
33 if (this.active < this.concurrent) {
34 this._tryNext(value);
35 }
36 else {
37 this.buffer.push(value);
38 }
39 }
40 _tryNext(value) {
41 let result;
42 const index = this.index++;
43 try {
44 result = this.project(value, index);
45 }
46 catch (err) {
47 this.destination.error(err);
48 return;
49 }
50 this.active++;
51 this._innerSub(result);
52 }
53 _innerSub(ish) {
54 const innerSubscriber = new SimpleInnerSubscriber(this);
55 const destination = this.destination;
56 destination.add(innerSubscriber);
57 const innerSubscription = innerSubscribe(ish, innerSubscriber);
58 if (innerSubscription !== innerSubscriber) {
59 destination.add(innerSubscription);
60 }
61 }
62 _complete() {
63 this.hasCompleted = true;
64 if (this.active === 0 && this.buffer.length === 0) {
65 this.destination.complete();
66 }
67 this.unsubscribe();
68 }
69 notifyNext(innerValue) {
70 this.destination.next(innerValue);
71 }
72 notifyComplete() {
73 const buffer = this.buffer;
74 this.active--;
75 if (buffer.length > 0) {
76 this._next(buffer.shift());
77 }
78 else if (this.active === 0 && this.hasCompleted) {
79 this.destination.complete();
80 }
81 }
82}
83export const flatMap = mergeMap;
84//# sourceMappingURL=mergeMap.js.map
Note: See TracBrowser for help on using the repository browser.