[6a3a178] | 1 | import { Operator } from '../Operator';
|
---|
| 2 | import { Subscriber } from '../Subscriber';
|
---|
| 3 | import { Subscription } from '../Subscription';
|
---|
| 4 | import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
|
---|
| 5 | import { ConnectableObservable } from '../observable/ConnectableObservable';
|
---|
| 6 | import { Observable } from '../Observable';
|
---|
| 7 |
|
---|
| 8 | /**
|
---|
| 9 | * Make a {@link ConnectableObservable} behave like a ordinary observable and automates the way
|
---|
| 10 | * you can connect to it.
|
---|
| 11 | *
|
---|
| 12 | * Internally it counts the subscriptions to the observable and subscribes (only once) to the source if
|
---|
| 13 | * the number of subscriptions is larger than 0. If the number of subscriptions is smaller than 1, it
|
---|
| 14 | * unsubscribes from the source. This way you can make sure that everything before the *published*
|
---|
| 15 | * refCount has only a single subscription independently of the number of subscribers to the target
|
---|
| 16 | * observable.
|
---|
| 17 | *
|
---|
| 18 | * Note that using the {@link share} operator is exactly the same as using the *publish* operator
|
---|
| 19 | * (making the observable hot) and the *refCount* operator in a sequence.
|
---|
| 20 | *
|
---|
| 21 | * ![](refCount.png)
|
---|
| 22 | *
|
---|
| 23 | * ## Example
|
---|
| 24 | *
|
---|
| 25 | * In the following example there are two intervals turned into connectable observables
|
---|
| 26 | * by using the *publish* operator. The first one uses the *refCount* operator, the
|
---|
| 27 | * second one does not use it. You will notice that a connectable observable does nothing
|
---|
| 28 | * until you call its connect function.
|
---|
| 29 | *
|
---|
| 30 | * ```ts
|
---|
| 31 | * import { interval } from 'rxjs';
|
---|
| 32 | * import { tap, publish, refCount } from 'rxjs/operators';
|
---|
| 33 | *
|
---|
| 34 | * // Turn the interval observable into a ConnectableObservable (hot)
|
---|
| 35 | * const refCountInterval = interval(400).pipe(
|
---|
| 36 | * tap((num) => console.log(`refCount ${num}`)),
|
---|
| 37 | * publish(),
|
---|
| 38 | * refCount()
|
---|
| 39 | * );
|
---|
| 40 | *
|
---|
| 41 | * const publishedInterval = interval(400).pipe(
|
---|
| 42 | * tap((num) => console.log(`publish ${num}`)),
|
---|
| 43 | * publish()
|
---|
| 44 | * );
|
---|
| 45 | *
|
---|
| 46 | * refCountInterval.subscribe();
|
---|
| 47 | * refCountInterval.subscribe();
|
---|
| 48 | * // 'refCount 0' -----> 'refCount 1' -----> etc
|
---|
| 49 | * // All subscriptions will receive the same value and the tap (and
|
---|
| 50 | * // every other operator) before the publish operator will be executed
|
---|
| 51 | * // only once per event independently of the number of subscriptions.
|
---|
| 52 | *
|
---|
| 53 | * publishedInterval.subscribe();
|
---|
| 54 | * // Nothing happens until you call .connect() on the observable.
|
---|
| 55 | * ```
|
---|
| 56 | *
|
---|
| 57 | * @see {@link ConnectableObservable}
|
---|
| 58 | * @see {@link share}
|
---|
| 59 | * @see {@link publish}
|
---|
| 60 | */
|
---|
| 61 | export function refCount<T>(): MonoTypeOperatorFunction<T> {
|
---|
| 62 | return function refCountOperatorFunction(source: ConnectableObservable<T>): Observable<T> {
|
---|
| 63 | return source.lift(new RefCountOperator(source));
|
---|
| 64 | } as MonoTypeOperatorFunction<T>;
|
---|
| 65 | }
|
---|
| 66 |
|
---|
| 67 | class RefCountOperator<T> implements Operator<T, T> {
|
---|
| 68 | constructor(private connectable: ConnectableObservable<T>) {
|
---|
| 69 | }
|
---|
| 70 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
| 71 |
|
---|
| 72 | const { connectable } = this;
|
---|
| 73 | (<any> connectable)._refCount++;
|
---|
| 74 |
|
---|
| 75 | const refCounter = new RefCountSubscriber(subscriber, connectable);
|
---|
| 76 | const subscription = source.subscribe(refCounter);
|
---|
| 77 |
|
---|
| 78 | if (!refCounter.closed) {
|
---|
| 79 | (<any> refCounter).connection = connectable.connect();
|
---|
| 80 | }
|
---|
| 81 |
|
---|
| 82 | return subscription;
|
---|
| 83 | }
|
---|
| 84 | }
|
---|
| 85 |
|
---|
| 86 | class RefCountSubscriber<T> extends Subscriber<T> {
|
---|
| 87 |
|
---|
| 88 | private connection: Subscription;
|
---|
| 89 |
|
---|
| 90 | constructor(destination: Subscriber<T>,
|
---|
| 91 | private connectable: ConnectableObservable<T>) {
|
---|
| 92 | super(destination);
|
---|
| 93 | }
|
---|
| 94 |
|
---|
| 95 | protected _unsubscribe() {
|
---|
| 96 |
|
---|
| 97 | const { connectable } = this;
|
---|
| 98 | if (!connectable) {
|
---|
| 99 | this.connection = null;
|
---|
| 100 | return;
|
---|
| 101 | }
|
---|
| 102 |
|
---|
| 103 | this.connectable = null;
|
---|
| 104 | const refCount = (<any> connectable)._refCount;
|
---|
| 105 | if (refCount <= 0) {
|
---|
| 106 | this.connection = null;
|
---|
| 107 | return;
|
---|
| 108 | }
|
---|
| 109 |
|
---|
| 110 | (<any> connectable)._refCount = refCount - 1;
|
---|
| 111 | if (refCount > 1) {
|
---|
| 112 | this.connection = null;
|
---|
| 113 | return;
|
---|
| 114 | }
|
---|
| 115 |
|
---|
| 116 | ///
|
---|
| 117 | // Compare the local RefCountSubscriber's connection Subscription to the
|
---|
| 118 | // connection Subscription on the shared ConnectableObservable. In cases
|
---|
| 119 | // where the ConnectableObservable source synchronously emits values, and
|
---|
| 120 | // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
|
---|
| 121 | // execution continues to here before the RefCountOperator has a chance to
|
---|
| 122 | // supply the RefCountSubscriber with the shared connection Subscription.
|
---|
| 123 | // For example:
|
---|
| 124 | // ```
|
---|
| 125 | // range(0, 10).pipe(
|
---|
| 126 | // publish(),
|
---|
| 127 | // refCount(),
|
---|
| 128 | // take(5),
|
---|
| 129 | // )
|
---|
| 130 | // .subscribe();
|
---|
| 131 | // ```
|
---|
| 132 | // In order to account for this case, RefCountSubscriber should only dispose
|
---|
| 133 | // the ConnectableObservable's shared connection Subscription if the
|
---|
| 134 | // connection Subscription exists, *and* either:
|
---|
| 135 | // a. RefCountSubscriber doesn't have a reference to the shared connection
|
---|
| 136 | // Subscription yet, or,
|
---|
| 137 | // b. RefCountSubscriber's connection Subscription reference is identical
|
---|
| 138 | // to the shared connection Subscription
|
---|
| 139 | ///
|
---|
| 140 | const { connection } = this;
|
---|
| 141 | const sharedConnection = (<any> connectable)._connection;
|
---|
| 142 | this.connection = null;
|
---|
| 143 |
|
---|
| 144 | if (sharedConnection && (!connection || sharedConnection === connection)) {
|
---|
| 145 | sharedConnection.unsubscribe();
|
---|
| 146 | }
|
---|
| 147 | }
|
---|
| 148 | }
|
---|