1 | import { Observable } from '../Observable';
|
---|
2 | import { Operator } from '../Operator';
|
---|
3 | import { Subscriber } from '../Subscriber';
|
---|
4 | import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
|
---|
5 | import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
|
---|
6 |
|
---|
7 | /**
|
---|
8 | * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
|
---|
9 | *
|
---|
10 | * If a keySelector function is provided, then it will project each value from the source observable into a new value that it will
|
---|
11 | * check for equality with previously projected values. If a keySelector function is not provided, it will use each value from the
|
---|
12 | * source observable directly with an equality check against previous values.
|
---|
13 | *
|
---|
14 | * In JavaScript runtimes that support `Set`, this operator will use a `Set` to improve performance of the distinct value checking.
|
---|
15 | *
|
---|
16 | * In other runtimes, this operator will use a minimal implementation of `Set` that relies on an `Array` and `indexOf` under the
|
---|
17 | * hood, so performance will degrade as more values are checked for distinction. Even in newer browsers, a long-running `distinct`
|
---|
18 | * use might result in memory leaks. To help alleviate this in some scenarios, an optional `flushes` parameter is also provided so
|
---|
19 | * that the internal `Set` can be "flushed", basically clearing it of values.
|
---|
20 | *
|
---|
21 | * ## Examples
|
---|
22 | * A simple example with numbers
|
---|
23 | * ```ts
|
---|
24 | * import { of } from 'rxjs';
|
---|
25 | * import { distinct } from 'rxjs/operators';
|
---|
26 | *
|
---|
27 | * of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1).pipe(
|
---|
28 | * distinct(),
|
---|
29 | * )
|
---|
30 | * .subscribe(x => console.log(x)); // 1, 2, 3, 4
|
---|
31 | * ```
|
---|
32 | *
|
---|
33 | * An example using a keySelector function
|
---|
34 | * ```typescript
|
---|
35 | * import { of } from 'rxjs';
|
---|
36 | * import { distinct } from 'rxjs/operators';
|
---|
37 | *
|
---|
38 | * interface Person {
|
---|
39 | * age: number,
|
---|
40 | * name: string
|
---|
41 | * }
|
---|
42 | *
|
---|
43 | * of<Person>(
|
---|
44 | * { age: 4, name: 'Foo'},
|
---|
45 | * { age: 7, name: 'Bar'},
|
---|
46 | * { age: 5, name: 'Foo'},
|
---|
47 | * ).pipe(
|
---|
48 | * distinct((p: Person) => p.name),
|
---|
49 | * )
|
---|
50 | * .subscribe(x => console.log(x));
|
---|
51 | *
|
---|
52 | * // displays:
|
---|
53 | * // { age: 4, name: 'Foo' }
|
---|
54 | * // { age: 7, name: 'Bar' }
|
---|
55 | * ```
|
---|
56 | * @see {@link distinctUntilChanged}
|
---|
57 | * @see {@link distinctUntilKeyChanged}
|
---|
58 | *
|
---|
59 | * @param {function} [keySelector] Optional function to select which value you want to check as distinct.
|
---|
60 | * @param {Observable} [flushes] Optional Observable for flushing the internal HashSet of the operator.
|
---|
61 | * @return {Observable} An Observable that emits items from the source Observable with distinct values.
|
---|
62 | * @method distinct
|
---|
63 | * @owner Observable
|
---|
64 | */
|
---|
65 | export function distinct<T, K>(keySelector?: (value: T) => K,
|
---|
66 | flushes?: Observable<any>): MonoTypeOperatorFunction<T> {
|
---|
67 | return (source: Observable<T>) => source.lift(new DistinctOperator(keySelector, flushes));
|
---|
68 | }
|
---|
69 |
|
---|
70 | class DistinctOperator<T, K> implements Operator<T, T> {
|
---|
71 | constructor(private keySelector?: (value: T) => K, private flushes?: Observable<any>) {
|
---|
72 | }
|
---|
73 |
|
---|
74 | call(subscriber: Subscriber<T>, source: any): TeardownLogic {
|
---|
75 | return source.subscribe(new DistinctSubscriber(subscriber, this.keySelector, this.flushes));
|
---|
76 | }
|
---|
77 | }
|
---|
78 |
|
---|
79 | /**
|
---|
80 | * We need this JSDoc comment for affecting ESDoc.
|
---|
81 | * @ignore
|
---|
82 | * @extends {Ignored}
|
---|
83 | */
|
---|
84 | export class DistinctSubscriber<T, K> extends SimpleOuterSubscriber<T, T> {
|
---|
85 | private values = new Set<K>();
|
---|
86 |
|
---|
87 | constructor(destination: Subscriber<T>, private keySelector?: (value: T) => K, flushes?: Observable<any>) {
|
---|
88 | super(destination);
|
---|
89 |
|
---|
90 | if (flushes) {
|
---|
91 | this.add(innerSubscribe(flushes, new SimpleInnerSubscriber(this)));
|
---|
92 | }
|
---|
93 | }
|
---|
94 |
|
---|
95 | notifyNext(): void {
|
---|
96 | this.values.clear();
|
---|
97 | }
|
---|
98 |
|
---|
99 | notifyError(error: any): void {
|
---|
100 | this._error(error);
|
---|
101 | }
|
---|
102 |
|
---|
103 | protected _next(value: T): void {
|
---|
104 | if (this.keySelector) {
|
---|
105 | this._useKeySelector(value);
|
---|
106 | } else {
|
---|
107 | this._finalizeNext(value, value);
|
---|
108 | }
|
---|
109 | }
|
---|
110 |
|
---|
111 | private _useKeySelector(value: T): void {
|
---|
112 | let key: K;
|
---|
113 | const { destination } = this;
|
---|
114 | try {
|
---|
115 | key = this.keySelector!(value);
|
---|
116 | } catch (err) {
|
---|
117 | destination.error!(err);
|
---|
118 | return;
|
---|
119 | }
|
---|
120 | this._finalizeNext(key, value);
|
---|
121 | }
|
---|
122 |
|
---|
123 | private _finalizeNext(key: K|T, value: T) {
|
---|
124 | const { values } = this;
|
---|
125 | if (!values.has(<K>key)) {
|
---|
126 | values.add(<K>key);
|
---|
127 | this.destination.next!(value);
|
---|
128 | }
|
---|
129 | }
|
---|
130 |
|
---|
131 | }
|
---|