source: trip-planner-front/node_modules/rxjs/src/internal/operators/multicast.ts@ 6a80231

Last change on this file since 6a80231 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 3.5 KB
Line 
1import { Subject } from '../Subject';
2import { Operator } from '../Operator';
3import { Subscriber } from '../Subscriber';
4import { Observable } from '../Observable';
5import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable';
6import { MonoTypeOperatorFunction, OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types';
7
8/* tslint:disable:max-line-length */
9export function multicast<T>(subject: Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
10export function multicast<T, O extends ObservableInput<any>>(subject: Subject<T>, selector: (shared: Observable<T>) => O): UnaryFunction<Observable<T>, ConnectableObservable<ObservedValueOf<O>>>;
11export function multicast<T>(subjectFactory: (this: Observable<T>) => Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
12export function multicast<T, O extends ObservableInput<any>>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
13/* tslint:enable:max-line-length */
14
15/**
16 * Returns an Observable that emits the results of invoking a specified selector on items
17 * emitted by a ConnectableObservable that shares a single subscription to the underlying stream.
18 *
19 * ![](multicast.png)
20 *
21 * @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate subject through
22 * which the source sequence's elements will be multicast to the selector function
23 * or Subject to push source elements into.
24 * @param {Function} [selector] - Optional selector function that can use the multicasted source stream
25 * as many times as needed, without causing multiple subscriptions to the source stream.
26 * Subscribers to the given source will receive all notifications of the source from the
27 * time of the subscription forward.
28 * @return {Observable} An Observable that emits the results of invoking the selector
29 * on the items emitted by a `ConnectableObservable` that shares a single subscription to
30 * the underlying stream.
31 * @method multicast
32 * @owner Observable
33 */
34export function multicast<T, R>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
35 selector?: (source: Observable<T>) => Observable<R>): OperatorFunction<T, R> {
36 return function multicastOperatorFunction(source: Observable<T>): Observable<R> {
37 let subjectFactory: () => Subject<T>;
38 if (typeof subjectOrSubjectFactory === 'function') {
39 subjectFactory = <() => Subject<T>>subjectOrSubjectFactory;
40 } else {
41 subjectFactory = function subjectFactory() {
42 return <Subject<T>>subjectOrSubjectFactory;
43 };
44 }
45
46 if (typeof selector === 'function') {
47 return source.lift(new MulticastOperator(subjectFactory, selector));
48 }
49
50 const connectable: any = Object.create(source, connectableObservableDescriptor);
51 connectable.source = source;
52 connectable.subjectFactory = subjectFactory;
53
54 return <ConnectableObservable<R>> connectable;
55 };
56}
57
58export class MulticastOperator<T, R> implements Operator<T, R> {
59 constructor(private subjectFactory: () => Subject<T>,
60 private selector: (source: Observable<T>) => Observable<R>) {
61 }
62 call(subscriber: Subscriber<R>, source: any): any {
63 const { selector } = this;
64 const subject = this.subjectFactory();
65 const subscription = selector(subject).subscribe(subscriber);
66 subscription.add(source.subscribe(subject));
67 return subscription;
68 }
69}
Note: See TracBrowser for help on using the repository browser.