source: trip-planner-front/node_modules/rxjs/src/internal/Subject.ts@ 8d391a1

Last change on this file since 8d391a1 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 4.8 KB
Line 
1import { Operator } from './Operator';
2import { Observable } from './Observable';
3import { Subscriber } from './Subscriber';
4import { Subscription } from './Subscription';
5import { Observer, SubscriptionLike, TeardownLogic } from './types';
6import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
7import { SubjectSubscription } from './SubjectSubscription';
8import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
9
10/**
11 * @class SubjectSubscriber<T>
12 */
13export class SubjectSubscriber<T> extends Subscriber<T> {
14 constructor(protected destination: Subject<T>) {
15 super(destination);
16 }
17}
18
19/**
20 * A Subject is a special type of Observable that allows values to be
21 * multicasted to many Observers. Subjects are like EventEmitters.
22 *
23 * Every Subject is an Observable and an Observer. You can subscribe to a
24 * Subject, and you can call next to feed values as well as error and complete.
25 *
26 * @class Subject<T>
27 */
28export class Subject<T> extends Observable<T> implements SubscriptionLike {
29
30 [rxSubscriberSymbol]() {
31 return new SubjectSubscriber(this);
32 }
33
34 observers: Observer<T>[] = [];
35
36 closed = false;
37
38 isStopped = false;
39
40 hasError = false;
41
42 thrownError: any = null;
43
44 constructor() {
45 super();
46 }
47
48 /**@nocollapse
49 * @deprecated use new Subject() instead
50 */
51 static create: Function = <T>(destination: Observer<T>, source: Observable<T>): AnonymousSubject<T> => {
52 return new AnonymousSubject<T>(destination, source);
53 }
54
55 lift<R>(operator: Operator<T, R>): Observable<R> {
56 const subject = new AnonymousSubject(this, this);
57 subject.operator = <any>operator;
58 return <any>subject;
59 }
60
61 next(value?: T) {
62 if (this.closed) {
63 throw new ObjectUnsubscribedError();
64 }
65 if (!this.isStopped) {
66 const { observers } = this;
67 const len = observers.length;
68 const copy = observers.slice();
69 for (let i = 0; i < len; i++) {
70 copy[i].next(value);
71 }
72 }
73 }
74
75 error(err: any) {
76 if (this.closed) {
77 throw new ObjectUnsubscribedError();
78 }
79 this.hasError = true;
80 this.thrownError = err;
81 this.isStopped = true;
82 const { observers } = this;
83 const len = observers.length;
84 const copy = observers.slice();
85 for (let i = 0; i < len; i++) {
86 copy[i].error(err);
87 }
88 this.observers.length = 0;
89 }
90
91 complete() {
92 if (this.closed) {
93 throw new ObjectUnsubscribedError();
94 }
95 this.isStopped = true;
96 const { observers } = this;
97 const len = observers.length;
98 const copy = observers.slice();
99 for (let i = 0; i < len; i++) {
100 copy[i].complete();
101 }
102 this.observers.length = 0;
103 }
104
105 unsubscribe() {
106 this.isStopped = true;
107 this.closed = true;
108 this.observers = null;
109 }
110
111 /** @deprecated This is an internal implementation detail, do not use. */
112 _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
113 if (this.closed) {
114 throw new ObjectUnsubscribedError();
115 } else {
116 return super._trySubscribe(subscriber);
117 }
118 }
119
120 /** @deprecated This is an internal implementation detail, do not use. */
121 _subscribe(subscriber: Subscriber<T>): Subscription {
122 if (this.closed) {
123 throw new ObjectUnsubscribedError();
124 } else if (this.hasError) {
125 subscriber.error(this.thrownError);
126 return Subscription.EMPTY;
127 } else if (this.isStopped) {
128 subscriber.complete();
129 return Subscription.EMPTY;
130 } else {
131 this.observers.push(subscriber);
132 return new SubjectSubscription(this, subscriber);
133 }
134 }
135
136 /**
137 * Creates a new Observable with this Subject as the source. You can do this
138 * to create customize Observer-side logic of the Subject and conceal it from
139 * code that uses the Observable.
140 * @return {Observable} Observable that the Subject casts to
141 */
142 asObservable(): Observable<T> {
143 const observable = new Observable<T>();
144 (<any>observable).source = this;
145 return observable;
146 }
147}
148
149/**
150 * @class AnonymousSubject<T>
151 */
152export class AnonymousSubject<T> extends Subject<T> {
153 constructor(protected destination?: Observer<T>, source?: Observable<T>) {
154 super();
155 this.source = source;
156 }
157
158 next(value: T) {
159 const { destination } = this;
160 if (destination && destination.next) {
161 destination.next(value);
162 }
163 }
164
165 error(err: any) {
166 const { destination } = this;
167 if (destination && destination.error) {
168 this.destination.error(err);
169 }
170 }
171
172 complete() {
173 const { destination } = this;
174 if (destination && destination.complete) {
175 this.destination.complete();
176 }
177 }
178
179 /** @deprecated This is an internal implementation detail, do not use. */
180 _subscribe(subscriber: Subscriber<T>): Subscription {
181 const { source } = this;
182 if (source) {
183 return this.source.subscribe(subscriber);
184 } else {
185 return Subscription.EMPTY;
186 }
187 }
188}
Note: See TracBrowser for help on using the repository browser.