source: trip-planner-front/node_modules/rxjs/src/internal/operators/refCount.ts@ 8d391a1

Last change on this file since 8d391a1 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 5.0 KB
Line 
1import { Operator } from '../Operator';
2import { Subscriber } from '../Subscriber';
3import { Subscription } from '../Subscription';
4import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
5import { ConnectableObservable } from '../observable/ConnectableObservable';
6import { Observable } from '../Observable';
7
8/**
9 * Make a {@link ConnectableObservable} behave like a ordinary observable and automates the way
10 * you can connect to it.
11 *
12 * Internally it counts the subscriptions to the observable and subscribes (only once) to the source if
13 * the number of subscriptions is larger than 0. If the number of subscriptions is smaller than 1, it
14 * unsubscribes from the source. This way you can make sure that everything before the *published*
15 * refCount has only a single subscription independently of the number of subscribers to the target
16 * observable.
17 *
18 * Note that using the {@link share} operator is exactly the same as using the *publish* operator
19 * (making the observable hot) and the *refCount* operator in a sequence.
20 *
21 * ![](refCount.png)
22 *
23 * ## Example
24 *
25 * In the following example there are two intervals turned into connectable observables
26 * by using the *publish* operator. The first one uses the *refCount* operator, the
27 * second one does not use it. You will notice that a connectable observable does nothing
28 * until you call its connect function.
29 *
30 * ```ts
31 * import { interval } from 'rxjs';
32 * import { tap, publish, refCount } from 'rxjs/operators';
33 *
34 * // Turn the interval observable into a ConnectableObservable (hot)
35 * const refCountInterval = interval(400).pipe(
36 * tap((num) => console.log(`refCount ${num}`)),
37 * publish(),
38 * refCount()
39 * );
40 *
41 * const publishedInterval = interval(400).pipe(
42 * tap((num) => console.log(`publish ${num}`)),
43 * publish()
44 * );
45 *
46 * refCountInterval.subscribe();
47 * refCountInterval.subscribe();
48 * // 'refCount 0' -----> 'refCount 1' -----> etc
49 * // All subscriptions will receive the same value and the tap (and
50 * // every other operator) before the publish operator will be executed
51 * // only once per event independently of the number of subscriptions.
52 *
53 * publishedInterval.subscribe();
54 * // Nothing happens until you call .connect() on the observable.
55 * ```
56 *
57 * @see {@link ConnectableObservable}
58 * @see {@link share}
59 * @see {@link publish}
60 */
61export function refCount<T>(): MonoTypeOperatorFunction<T> {
62 return function refCountOperatorFunction(source: ConnectableObservable<T>): Observable<T> {
63 return source.lift(new RefCountOperator(source));
64 } as MonoTypeOperatorFunction<T>;
65}
66
67class RefCountOperator<T> implements Operator<T, T> {
68 constructor(private connectable: ConnectableObservable<T>) {
69 }
70 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
71
72 const { connectable } = this;
73 (<any> connectable)._refCount++;
74
75 const refCounter = new RefCountSubscriber(subscriber, connectable);
76 const subscription = source.subscribe(refCounter);
77
78 if (!refCounter.closed) {
79 (<any> refCounter).connection = connectable.connect();
80 }
81
82 return subscription;
83 }
84}
85
86class RefCountSubscriber<T> extends Subscriber<T> {
87
88 private connection: Subscription;
89
90 constructor(destination: Subscriber<T>,
91 private connectable: ConnectableObservable<T>) {
92 super(destination);
93 }
94
95 protected _unsubscribe() {
96
97 const { connectable } = this;
98 if (!connectable) {
99 this.connection = null;
100 return;
101 }
102
103 this.connectable = null;
104 const refCount = (<any> connectable)._refCount;
105 if (refCount <= 0) {
106 this.connection = null;
107 return;
108 }
109
110 (<any> connectable)._refCount = refCount - 1;
111 if (refCount > 1) {
112 this.connection = null;
113 return;
114 }
115
116 ///
117 // Compare the local RefCountSubscriber's connection Subscription to the
118 // connection Subscription on the shared ConnectableObservable. In cases
119 // where the ConnectableObservable source synchronously emits values, and
120 // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
121 // execution continues to here before the RefCountOperator has a chance to
122 // supply the RefCountSubscriber with the shared connection Subscription.
123 // For example:
124 // ```
125 // range(0, 10).pipe(
126 // publish(),
127 // refCount(),
128 // take(5),
129 // )
130 // .subscribe();
131 // ```
132 // In order to account for this case, RefCountSubscriber should only dispose
133 // the ConnectableObservable's shared connection Subscription if the
134 // connection Subscription exists, *and* either:
135 // a. RefCountSubscriber doesn't have a reference to the shared connection
136 // Subscription yet, or,
137 // b. RefCountSubscriber's connection Subscription reference is identical
138 // to the shared connection Subscription
139 ///
140 const { connection } = this;
141 const sharedConnection = (<any> connectable)._connection;
142 this.connection = null;
143
144 if (sharedConnection && (!connection || sharedConnection === connection)) {
145 sharedConnection.unsubscribe();
146 }
147 }
148}
Note: See TracBrowser for help on using the repository browser.