source: trip-planner-front/node_modules/rxjs/src/internal/operators/shareReplay.ts@ 8d391a1

Last change on this file since 8d391a1 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 4.6 KB
Line 
1import { Observable } from '../Observable';
2import { ReplaySubject } from '../ReplaySubject';
3import { Subscription } from '../Subscription';
4import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
5import { Subscriber } from '../Subscriber';
6
7export interface ShareReplayConfig {
8 bufferSize?: number;
9 windowTime?: number;
10 refCount: boolean;
11 scheduler?: SchedulerLike;
12}
13
14/**
15 * Share source and replay specified number of emissions on subscription.
16 *
17 * This operator is a specialization of `replay` that connects to a source observable
18 * and multicasts through a `ReplaySubject` constructed with the specified arguments.
19 * A successfully completed source will stay cached in the `shareReplayed observable` forever,
20 * but an errored source can be retried.
21 *
22 * ## Why use shareReplay?
23 * You generally want to use `shareReplay` when you have side-effects or taxing computations
24 * that you do not wish to be executed amongst multiple subscribers.
25 * It may also be valuable in situations where you know you will have late subscribers to
26 * a stream that need access to previously emitted values.
27 * This ability to replay values on subscription is what differentiates {@link share} and `shareReplay`.
28 *
29 * ![](shareReplay.png)
30 *
31 * ## Example
32 * ```ts
33 * import { interval } from 'rxjs';
34 * import { shareReplay, take } from 'rxjs/operators';
35 *
36 * const obs$ = interval(1000);
37 * const shared$ = obs$.pipe(
38 * take(4),
39 * shareReplay(3)
40 * );
41 * shared$.subscribe(x => console.log('source A: ', x));
42 * shared$.subscribe(y => console.log('source B: ', y));
43 *
44 * ```
45 *
46 * @see {@link publish}
47 * @see {@link share}
48 * @see {@link publishReplay}
49 *
50 * @param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
51 * @param {Number} [windowTime=Number.POSITIVE_INFINITY] Maximum time length of the replay buffer in milliseconds.
52 * @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function
53 * will be invoked on.
54 * @return {Observable} An observable sequence that contains the elements of a sequence produced
55 * by multicasting the source sequence within a selector function.
56 * @method shareReplay
57 * @owner Observable
58 */
59export function shareReplay<T>(
60 config: ShareReplayConfig
61): MonoTypeOperatorFunction<T>;
62export function shareReplay<T>(
63 bufferSize?: number,
64 windowTime?: number,
65 scheduler?: SchedulerLike
66): MonoTypeOperatorFunction<T>;
67export function shareReplay<T>(
68 configOrBufferSize?: ShareReplayConfig | number,
69 windowTime?: number,
70 scheduler?: SchedulerLike
71): MonoTypeOperatorFunction<T> {
72 let config: ShareReplayConfig;
73 if (configOrBufferSize && typeof configOrBufferSize === 'object') {
74 config = configOrBufferSize as ShareReplayConfig;
75 } else {
76 config = {
77 bufferSize: configOrBufferSize as number | undefined,
78 windowTime,
79 refCount: false,
80 scheduler,
81 };
82 }
83 return (source: Observable<T>) => source.lift(shareReplayOperator(config));
84}
85
86function shareReplayOperator<T>({
87 bufferSize = Number.POSITIVE_INFINITY,
88 windowTime = Number.POSITIVE_INFINITY,
89 refCount: useRefCount,
90 scheduler,
91}: ShareReplayConfig) {
92 let subject: ReplaySubject<T> | undefined;
93 let refCount = 0;
94 let subscription: Subscription | undefined;
95 let hasError = false;
96 let isComplete = false;
97
98 return function shareReplayOperation(
99 this: Subscriber<T>,
100 source: Observable<T>
101 ) {
102 refCount++;
103 let innerSub: Subscription;
104 if (!subject || hasError) {
105 hasError = false;
106 subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
107 innerSub = subject.subscribe(this);
108 subscription = source.subscribe({
109 next(value) {
110 subject.next(value);
111 },
112 error(err) {
113 hasError = true;
114 subject.error(err);
115 },
116 complete() {
117 isComplete = true;
118 subscription = undefined;
119 subject.complete();
120 },
121 });
122
123 // Here we need to check to see if the source synchronously completed. Although
124 // we're setting `subscription = undefined` in the completion handler, if the source
125 // is synchronous, that will happen *before* subscription is set by the return of
126 // the `subscribe` call.
127 if (isComplete) {
128 subscription = undefined;
129 }
130 } else {
131 innerSub = subject.subscribe(this);
132 }
133
134 this.add(() => {
135 refCount--;
136 innerSub.unsubscribe();
137 innerSub = undefined;
138 if (subscription && !isComplete && useRefCount && refCount === 0) {
139 subscription.unsubscribe();
140 subscription = undefined;
141 subject = undefined;
142 }
143 });
144 };
145}
Note: See TracBrowser for help on using the repository browser.