[6a3a178] | 1 | import { Observable } from '../Observable';
|
---|
| 2 | import { ReplaySubject } from '../ReplaySubject';
|
---|
| 3 | import { Subscription } from '../Subscription';
|
---|
| 4 | import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
|
---|
| 5 | import { Subscriber } from '../Subscriber';
|
---|
| 6 |
|
---|
| 7 | export interface ShareReplayConfig {
|
---|
| 8 | bufferSize?: number;
|
---|
| 9 | windowTime?: number;
|
---|
| 10 | refCount: boolean;
|
---|
| 11 | scheduler?: SchedulerLike;
|
---|
| 12 | }
|
---|
| 13 |
|
---|
| 14 | /**
|
---|
| 15 | * Share source and replay specified number of emissions on subscription.
|
---|
| 16 | *
|
---|
| 17 | * This operator is a specialization of `replay` that connects to a source observable
|
---|
| 18 | * and multicasts through a `ReplaySubject` constructed with the specified arguments.
|
---|
| 19 | * A successfully completed source will stay cached in the `shareReplayed observable` forever,
|
---|
| 20 | * but an errored source can be retried.
|
---|
| 21 | *
|
---|
| 22 | * ## Why use shareReplay?
|
---|
| 23 | * You generally want to use `shareReplay` when you have side-effects or taxing computations
|
---|
| 24 | * that you do not wish to be executed amongst multiple subscribers.
|
---|
| 25 | * It may also be valuable in situations where you know you will have late subscribers to
|
---|
| 26 | * a stream that need access to previously emitted values.
|
---|
| 27 | * This ability to replay values on subscription is what differentiates {@link share} and `shareReplay`.
|
---|
| 28 | *
|
---|
| 29 | * ![](shareReplay.png)
|
---|
| 30 | *
|
---|
| 31 | * ## Example
|
---|
| 32 | * ```ts
|
---|
| 33 | * import { interval } from 'rxjs';
|
---|
| 34 | * import { shareReplay, take } from 'rxjs/operators';
|
---|
| 35 | *
|
---|
| 36 | * const obs$ = interval(1000);
|
---|
| 37 | * const shared$ = obs$.pipe(
|
---|
| 38 | * take(4),
|
---|
| 39 | * shareReplay(3)
|
---|
| 40 | * );
|
---|
| 41 | * shared$.subscribe(x => console.log('source A: ', x));
|
---|
| 42 | * shared$.subscribe(y => console.log('source B: ', y));
|
---|
| 43 | *
|
---|
| 44 | * ```
|
---|
| 45 | *
|
---|
| 46 | * @see {@link publish}
|
---|
| 47 | * @see {@link share}
|
---|
| 48 | * @see {@link publishReplay}
|
---|
| 49 | *
|
---|
| 50 | * @param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
|
---|
| 51 | * @param {Number} [windowTime=Number.POSITIVE_INFINITY] Maximum time length of the replay buffer in milliseconds.
|
---|
| 52 | * @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function
|
---|
| 53 | * will be invoked on.
|
---|
| 54 | * @return {Observable} An observable sequence that contains the elements of a sequence produced
|
---|
| 55 | * by multicasting the source sequence within a selector function.
|
---|
| 56 | * @method shareReplay
|
---|
| 57 | * @owner Observable
|
---|
| 58 | */
|
---|
| 59 | export function shareReplay<T>(
|
---|
| 60 | config: ShareReplayConfig
|
---|
| 61 | ): MonoTypeOperatorFunction<T>;
|
---|
| 62 | export function shareReplay<T>(
|
---|
| 63 | bufferSize?: number,
|
---|
| 64 | windowTime?: number,
|
---|
| 65 | scheduler?: SchedulerLike
|
---|
| 66 | ): MonoTypeOperatorFunction<T>;
|
---|
| 67 | export function shareReplay<T>(
|
---|
| 68 | configOrBufferSize?: ShareReplayConfig | number,
|
---|
| 69 | windowTime?: number,
|
---|
| 70 | scheduler?: SchedulerLike
|
---|
| 71 | ): MonoTypeOperatorFunction<T> {
|
---|
| 72 | let config: ShareReplayConfig;
|
---|
| 73 | if (configOrBufferSize && typeof configOrBufferSize === 'object') {
|
---|
| 74 | config = configOrBufferSize as ShareReplayConfig;
|
---|
| 75 | } else {
|
---|
| 76 | config = {
|
---|
| 77 | bufferSize: configOrBufferSize as number | undefined,
|
---|
| 78 | windowTime,
|
---|
| 79 | refCount: false,
|
---|
| 80 | scheduler,
|
---|
| 81 | };
|
---|
| 82 | }
|
---|
| 83 | return (source: Observable<T>) => source.lift(shareReplayOperator(config));
|
---|
| 84 | }
|
---|
| 85 |
|
---|
| 86 | function shareReplayOperator<T>({
|
---|
| 87 | bufferSize = Number.POSITIVE_INFINITY,
|
---|
| 88 | windowTime = Number.POSITIVE_INFINITY,
|
---|
| 89 | refCount: useRefCount,
|
---|
| 90 | scheduler,
|
---|
| 91 | }: ShareReplayConfig) {
|
---|
| 92 | let subject: ReplaySubject<T> | undefined;
|
---|
| 93 | let refCount = 0;
|
---|
| 94 | let subscription: Subscription | undefined;
|
---|
| 95 | let hasError = false;
|
---|
| 96 | let isComplete = false;
|
---|
| 97 |
|
---|
| 98 | return function shareReplayOperation(
|
---|
| 99 | this: Subscriber<T>,
|
---|
| 100 | source: Observable<T>
|
---|
| 101 | ) {
|
---|
| 102 | refCount++;
|
---|
| 103 | let innerSub: Subscription;
|
---|
| 104 | if (!subject || hasError) {
|
---|
| 105 | hasError = false;
|
---|
| 106 | subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
|
---|
| 107 | innerSub = subject.subscribe(this);
|
---|
| 108 | subscription = source.subscribe({
|
---|
| 109 | next(value) {
|
---|
| 110 | subject.next(value);
|
---|
| 111 | },
|
---|
| 112 | error(err) {
|
---|
| 113 | hasError = true;
|
---|
| 114 | subject.error(err);
|
---|
| 115 | },
|
---|
| 116 | complete() {
|
---|
| 117 | isComplete = true;
|
---|
| 118 | subscription = undefined;
|
---|
| 119 | subject.complete();
|
---|
| 120 | },
|
---|
| 121 | });
|
---|
| 122 |
|
---|
| 123 | // Here we need to check to see if the source synchronously completed. Although
|
---|
| 124 | // we're setting `subscription = undefined` in the completion handler, if the source
|
---|
| 125 | // is synchronous, that will happen *before* subscription is set by the return of
|
---|
| 126 | // the `subscribe` call.
|
---|
| 127 | if (isComplete) {
|
---|
| 128 | subscription = undefined;
|
---|
| 129 | }
|
---|
| 130 | } else {
|
---|
| 131 | innerSub = subject.subscribe(this);
|
---|
| 132 | }
|
---|
| 133 |
|
---|
| 134 | this.add(() => {
|
---|
| 135 | refCount--;
|
---|
| 136 | innerSub.unsubscribe();
|
---|
| 137 | innerSub = undefined;
|
---|
| 138 | if (subscription && !isComplete && useRefCount && refCount === 0) {
|
---|
| 139 | subscription.unsubscribe();
|
---|
| 140 | subscription = undefined;
|
---|
| 141 | subject = undefined;
|
---|
| 142 | }
|
---|
| 143 | });
|
---|
| 144 | };
|
---|
| 145 | }
|
---|