source: trip-planner-front/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts@ e29cc2e

Last change on this file since e29cc2e was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 5.4 KB
Line 
1import { Subject, SubjectSubscriber } from '../Subject';
2import { Operator } from '../Operator';
3import { Observable } from '../Observable';
4import { Subscriber } from '../Subscriber';
5import { Subscription } from '../Subscription';
6import { TeardownLogic } from '../types';
7import { refCount as higherOrderRefCount } from '../operators/refCount';
8
9/**
10 * @class ConnectableObservable<T>
11 */
12export class ConnectableObservable<T> extends Observable<T> {
13
14 protected _subject: Subject<T>;
15 protected _refCount: number = 0;
16 protected _connection: Subscription;
17 /** @internal */
18 _isComplete = false;
19
20 constructor(public source: Observable<T>,
21 protected subjectFactory: () => Subject<T>) {
22 super();
23 }
24
25 /** @deprecated This is an internal implementation detail, do not use. */
26 _subscribe(subscriber: Subscriber<T>) {
27 return this.getSubject().subscribe(subscriber);
28 }
29
30 protected getSubject(): Subject<T> {
31 const subject = this._subject;
32 if (!subject || subject.isStopped) {
33 this._subject = this.subjectFactory();
34 }
35 return this._subject;
36 }
37
38 connect(): Subscription {
39 let connection = this._connection;
40 if (!connection) {
41 this._isComplete = false;
42 connection = this._connection = new Subscription();
43 connection.add(this.source
44 .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
45 if (connection.closed) {
46 this._connection = null;
47 connection = Subscription.EMPTY;
48 }
49 }
50 return connection;
51 }
52
53 refCount(): Observable<T> {
54 return higherOrderRefCount()(this) as Observable<T>;
55 }
56}
57
58export const connectableObservableDescriptor: PropertyDescriptorMap = (() => {
59 const connectableProto = <any>ConnectableObservable.prototype;
60 return {
61 operator: { value: null as null },
62 _refCount: { value: 0, writable: true },
63 _subject: { value: null as null, writable: true },
64 _connection: { value: null as null, writable: true },
65 _subscribe: { value: connectableProto._subscribe },
66 _isComplete: { value: connectableProto._isComplete, writable: true },
67 getSubject: { value: connectableProto.getSubject },
68 connect: { value: connectableProto.connect },
69 refCount: { value: connectableProto.refCount }
70 };
71})();
72
73class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
74 constructor(destination: Subject<T>,
75 private connectable: ConnectableObservable<T>) {
76 super(destination);
77 }
78 protected _error(err: any): void {
79 this._unsubscribe();
80 super._error(err);
81 }
82 protected _complete(): void {
83 this.connectable._isComplete = true;
84 this._unsubscribe();
85 super._complete();
86 }
87 protected _unsubscribe() {
88 const connectable = <any>this.connectable;
89 if (connectable) {
90 this.connectable = null;
91 const connection = connectable._connection;
92 connectable._refCount = 0;
93 connectable._subject = null;
94 connectable._connection = null;
95 if (connection) {
96 connection.unsubscribe();
97 }
98 }
99 }
100}
101
102class RefCountOperator<T> implements Operator<T, T> {
103 constructor(private connectable: ConnectableObservable<T>) {
104 }
105 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
106
107 const { connectable } = this;
108 (<any> connectable)._refCount++;
109
110 const refCounter = new RefCountSubscriber(subscriber, connectable);
111 const subscription = source.subscribe(refCounter);
112
113 if (!refCounter.closed) {
114 (<any> refCounter).connection = connectable.connect();
115 }
116
117 return subscription;
118 }
119}
120
121class RefCountSubscriber<T> extends Subscriber<T> {
122
123 private connection: Subscription;
124
125 constructor(destination: Subscriber<T>,
126 private connectable: ConnectableObservable<T>) {
127 super(destination);
128 }
129
130 protected _unsubscribe() {
131
132 const { connectable } = this;
133 if (!connectable) {
134 this.connection = null;
135 return;
136 }
137
138 this.connectable = null;
139 const refCount = (<any> connectable)._refCount;
140 if (refCount <= 0) {
141 this.connection = null;
142 return;
143 }
144
145 (<any> connectable)._refCount = refCount - 1;
146 if (refCount > 1) {
147 this.connection = null;
148 return;
149 }
150
151 ///
152 // Compare the local RefCountSubscriber's connection Subscription to the
153 // connection Subscription on the shared ConnectableObservable. In cases
154 // where the ConnectableObservable source synchronously emits values, and
155 // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
156 // execution continues to here before the RefCountOperator has a chance to
157 // supply the RefCountSubscriber with the shared connection Subscription.
158 // For example:
159 // ```
160 // range(0, 10).pipe(
161 // publish(),
162 // refCount(),
163 // take(5),
164 // ).subscribe();
165 // ```
166 // In order to account for this case, RefCountSubscriber should only dispose
167 // the ConnectableObservable's shared connection Subscription if the
168 // connection Subscription exists, *and* either:
169 // a. RefCountSubscriber doesn't have a reference to the shared connection
170 // Subscription yet, or,
171 // b. RefCountSubscriber's connection Subscription reference is identical
172 // to the shared connection Subscription
173 ///
174 const { connection } = this;
175 const sharedConnection = (<any> connectable)._connection;
176 this.connection = null;
177
178 if (sharedConnection && (!connection || sharedConnection === connection)) {
179 sharedConnection.unsubscribe();
180 }
181 }
182}
Note: See TracBrowser for help on using the repository browser.