[6a3a178] | 1 | import { Subscriber } from '../Subscriber';
|
---|
| 2 | import { Subscription } from '../Subscription';
|
---|
| 3 | import { Observable } from '../Observable';
|
---|
| 4 | import { Operator } from '../Operator';
|
---|
| 5 | import { Subject } from '../Subject';
|
---|
| 6 | import { OperatorFunction } from '../types';
|
---|
| 7 |
|
---|
| 8 | /* tslint:disable:max-line-length */
|
---|
| 9 | export function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
|
---|
| 10 | export function groupBy<T, K>(keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, T>>;
|
---|
| 11 | export function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, R>>;
|
---|
| 12 | export function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, subjectSelector?: () => Subject<R>): OperatorFunction<T, GroupedObservable<K, R>>;
|
---|
| 13 | /* tslint:enable:max-line-length */
|
---|
| 14 |
|
---|
| 15 | /**
|
---|
| 16 | * Groups the items emitted by an Observable according to a specified criterion,
|
---|
| 17 | * and emits these grouped items as `GroupedObservables`, one
|
---|
| 18 | * {@link GroupedObservable} per group.
|
---|
| 19 | *
|
---|
| 20 | * ![](groupBy.png)
|
---|
| 21 | *
|
---|
| 22 | * When the Observable emits an item, a key is computed for this item with the keySelector function.
|
---|
| 23 | *
|
---|
| 24 | * If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Elsewhere, a new
|
---|
| 25 | * {@link GroupedObservable} for this key is created and emits.
|
---|
| 26 | *
|
---|
| 27 | * A {@link GroupedObservable} represents values belonging to the same group represented by a common key. The common
|
---|
| 28 | * key is available as the key field of a {@link GroupedObservable} instance.
|
---|
| 29 | *
|
---|
| 30 | * The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements
|
---|
| 31 | * returned by the elementSelector function.
|
---|
| 32 | *
|
---|
| 33 | * ## Examples
|
---|
| 34 | *
|
---|
| 35 | * ### Group objects by id and return as array
|
---|
| 36 | *
|
---|
| 37 | * ```ts
|
---|
| 38 | * import { of } from 'rxjs';
|
---|
| 39 | * import { mergeMap, groupBy, reduce } from 'rxjs/operators';
|
---|
| 40 | *
|
---|
| 41 | * of(
|
---|
| 42 | * {id: 1, name: 'JavaScript'},
|
---|
| 43 | * {id: 2, name: 'Parcel'},
|
---|
| 44 | * {id: 2, name: 'webpack'},
|
---|
| 45 | * {id: 1, name: 'TypeScript'},
|
---|
| 46 | * {id: 3, name: 'TSLint'}
|
---|
| 47 | * ).pipe(
|
---|
| 48 | * groupBy(p => p.id),
|
---|
| 49 | * mergeMap((group$) => group$.pipe(reduce((acc, cur) => [...acc, cur], []))),
|
---|
| 50 | * )
|
---|
| 51 | * .subscribe(p => console.log(p));
|
---|
| 52 | *
|
---|
| 53 | * // displays:
|
---|
| 54 | * // [ { id: 1, name: 'JavaScript'},
|
---|
| 55 | * // { id: 1, name: 'TypeScript'} ]
|
---|
| 56 | * //
|
---|
| 57 | * // [ { id: 2, name: 'Parcel'},
|
---|
| 58 | * // { id: 2, name: 'webpack'} ]
|
---|
| 59 | * //
|
---|
| 60 | * // [ { id: 3, name: 'TSLint'} ]
|
---|
| 61 | * ```
|
---|
| 62 | *
|
---|
| 63 | * ### Pivot data on the id field
|
---|
| 64 | *
|
---|
| 65 | * ```ts
|
---|
| 66 | * import { of } from 'rxjs';
|
---|
| 67 | * import { groupBy, map, mergeMap, reduce } from 'rxjs/operators';
|
---|
| 68 | *
|
---|
| 69 | * of(
|
---|
| 70 | * { id: 1, name: 'JavaScript' },
|
---|
| 71 | * { id: 2, name: 'Parcel' },
|
---|
| 72 | * { id: 2, name: 'webpack' },
|
---|
| 73 | * { id: 1, name: 'TypeScript' },
|
---|
| 74 | * { id: 3, name: 'TSLint' }
|
---|
| 75 | * )
|
---|
| 76 | * .pipe(
|
---|
| 77 | * groupBy(p => p.id, p => p.name),
|
---|
| 78 | * mergeMap(group$ =>
|
---|
| 79 | * group$.pipe(reduce((acc, cur) => [...acc, cur], [`${group$.key}`]))
|
---|
| 80 | * ),
|
---|
| 81 | * map(arr => ({ id: parseInt(arr[0], 10), values: arr.slice(1) }))
|
---|
| 82 | * )
|
---|
| 83 | * .subscribe(p => console.log(p));
|
---|
| 84 | *
|
---|
| 85 | * // displays:
|
---|
| 86 | * // { id: 1, values: [ 'JavaScript', 'TypeScript' ] }
|
---|
| 87 | * // { id: 2, values: [ 'Parcel', 'webpack' ] }
|
---|
| 88 | * // { id: 3, values: [ 'TSLint' ] }
|
---|
| 89 | * ```
|
---|
| 90 | *
|
---|
| 91 | * @param {function(value: T): K} keySelector A function that extracts the key
|
---|
| 92 | * for each item.
|
---|
| 93 | * @param {function(value: T): R} [elementSelector] A function that extracts the
|
---|
| 94 | * return element for each item.
|
---|
| 95 | * @param {function(grouped: GroupedObservable<K,R>): Observable<any>} [durationSelector]
|
---|
| 96 | * A function that returns an Observable to determine how long each group should
|
---|
| 97 | * exist.
|
---|
| 98 | * @return {Observable<GroupedObservable<K,R>>} An Observable that emits
|
---|
| 99 | * GroupedObservables, each of which corresponds to a unique key value and each
|
---|
| 100 | * of which emits those items from the source Observable that share that key
|
---|
| 101 | * value.
|
---|
| 102 | * @method groupBy
|
---|
| 103 | * @owner Observable
|
---|
| 104 | */
|
---|
| 105 | export function groupBy<T, K, R>(keySelector: (value: T) => K,
|
---|
| 106 | elementSelector?: ((value: T) => R) | void,
|
---|
| 107 | durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
|
---|
| 108 | subjectSelector?: () => Subject<R>): OperatorFunction<T, GroupedObservable<K, R>> {
|
---|
| 109 | return (source: Observable<T>) =>
|
---|
| 110 | source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
|
---|
| 111 | }
|
---|
| 112 |
|
---|
| 113 | export interface RefCountSubscription {
|
---|
| 114 | count: number;
|
---|
| 115 | unsubscribe: () => void;
|
---|
| 116 | closed: boolean;
|
---|
| 117 | attemptedToUnsubscribe: boolean;
|
---|
| 118 | }
|
---|
| 119 |
|
---|
| 120 | class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
|
---|
| 121 | constructor(private keySelector: (value: T) => K,
|
---|
| 122 | private elementSelector?: ((value: T) => R) | void,
|
---|
| 123 | private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
|
---|
| 124 | private subjectSelector?: () => Subject<R>) {
|
---|
| 125 | }
|
---|
| 126 |
|
---|
| 127 | call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any {
|
---|
| 128 | return source.subscribe(new GroupBySubscriber(
|
---|
| 129 | subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector
|
---|
| 130 | ));
|
---|
| 131 | }
|
---|
| 132 | }
|
---|
| 133 |
|
---|
| 134 | /**
|
---|
| 135 | * We need this JSDoc comment for affecting ESDoc.
|
---|
| 136 | * @ignore
|
---|
| 137 | * @extends {Ignored}
|
---|
| 138 | */
|
---|
| 139 | class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscription {
|
---|
| 140 | private groups: Map<K, Subject<T | R>> = null;
|
---|
| 141 | public attemptedToUnsubscribe: boolean = false;
|
---|
| 142 | public count: number = 0;
|
---|
| 143 |
|
---|
| 144 | constructor(destination: Subscriber<GroupedObservable<K, R>>,
|
---|
| 145 | private keySelector: (value: T) => K,
|
---|
| 146 | private elementSelector?: ((value: T) => R) | void,
|
---|
| 147 | private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
|
---|
| 148 | private subjectSelector?: () => Subject<R>) {
|
---|
| 149 | super(destination);
|
---|
| 150 | }
|
---|
| 151 |
|
---|
| 152 | protected _next(value: T): void {
|
---|
| 153 | let key: K;
|
---|
| 154 | try {
|
---|
| 155 | key = this.keySelector(value);
|
---|
| 156 | } catch (err) {
|
---|
| 157 | this.error(err);
|
---|
| 158 | return;
|
---|
| 159 | }
|
---|
| 160 |
|
---|
| 161 | this._group(value, key);
|
---|
| 162 | }
|
---|
| 163 |
|
---|
| 164 | private _group(value: T, key: K) {
|
---|
| 165 | let groups = this.groups;
|
---|
| 166 |
|
---|
| 167 | if (!groups) {
|
---|
| 168 | groups = this.groups = new Map<K, Subject<T | R>>();
|
---|
| 169 | }
|
---|
| 170 |
|
---|
| 171 | let group = groups.get(key);
|
---|
| 172 |
|
---|
| 173 | let element: R;
|
---|
| 174 | if (this.elementSelector) {
|
---|
| 175 | try {
|
---|
| 176 | element = this.elementSelector(value);
|
---|
| 177 | } catch (err) {
|
---|
| 178 | this.error(err);
|
---|
| 179 | }
|
---|
| 180 | } else {
|
---|
| 181 | element = <any>value;
|
---|
| 182 | }
|
---|
| 183 |
|
---|
| 184 | if (!group) {
|
---|
| 185 | group = (this.subjectSelector ? this.subjectSelector() : new Subject<R>()) as Subject<T | R>;
|
---|
| 186 | groups.set(key, group);
|
---|
| 187 | const groupedObservable = new GroupedObservable(key, group, this);
|
---|
| 188 | this.destination.next(groupedObservable);
|
---|
| 189 | if (this.durationSelector) {
|
---|
| 190 | let duration: any;
|
---|
| 191 | try {
|
---|
| 192 | duration = this.durationSelector(new GroupedObservable<K, R>(key, <Subject<R>>group));
|
---|
| 193 | } catch (err) {
|
---|
| 194 | this.error(err);
|
---|
| 195 | return;
|
---|
| 196 | }
|
---|
| 197 | this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
|
---|
| 198 | }
|
---|
| 199 | }
|
---|
| 200 |
|
---|
| 201 | if (!group.closed) {
|
---|
| 202 | group.next(element);
|
---|
| 203 | }
|
---|
| 204 | }
|
---|
| 205 |
|
---|
| 206 | protected _error(err: any): void {
|
---|
| 207 | const groups = this.groups;
|
---|
| 208 | if (groups) {
|
---|
| 209 | groups.forEach((group, key) => {
|
---|
| 210 | group.error(err);
|
---|
| 211 | });
|
---|
| 212 |
|
---|
| 213 | groups.clear();
|
---|
| 214 | }
|
---|
| 215 | this.destination.error(err);
|
---|
| 216 | }
|
---|
| 217 |
|
---|
| 218 | protected _complete(): void {
|
---|
| 219 | const groups = this.groups;
|
---|
| 220 | if (groups) {
|
---|
| 221 | groups.forEach((group, key) => {
|
---|
| 222 | group.complete();
|
---|
| 223 | });
|
---|
| 224 |
|
---|
| 225 | groups.clear();
|
---|
| 226 | }
|
---|
| 227 | this.destination.complete();
|
---|
| 228 | }
|
---|
| 229 |
|
---|
| 230 | removeGroup(key: K): void {
|
---|
| 231 | this.groups.delete(key);
|
---|
| 232 | }
|
---|
| 233 |
|
---|
| 234 | unsubscribe() {
|
---|
| 235 | if (!this.closed) {
|
---|
| 236 | this.attemptedToUnsubscribe = true;
|
---|
| 237 | if (this.count === 0) {
|
---|
| 238 | super.unsubscribe();
|
---|
| 239 | }
|
---|
| 240 | }
|
---|
| 241 | }
|
---|
| 242 | }
|
---|
| 243 |
|
---|
| 244 | /**
|
---|
| 245 | * We need this JSDoc comment for affecting ESDoc.
|
---|
| 246 | * @ignore
|
---|
| 247 | * @extends {Ignored}
|
---|
| 248 | */
|
---|
| 249 | class GroupDurationSubscriber<K, T> extends Subscriber<T> {
|
---|
| 250 | constructor(private key: K,
|
---|
| 251 | private group: Subject<T>,
|
---|
| 252 | private parent: GroupBySubscriber<any, K, T | any>) {
|
---|
| 253 | super(group);
|
---|
| 254 | }
|
---|
| 255 |
|
---|
| 256 | protected _next(value: T): void {
|
---|
| 257 | this.complete();
|
---|
| 258 | }
|
---|
| 259 |
|
---|
| 260 | /** @deprecated This is an internal implementation detail, do not use. */
|
---|
| 261 | _unsubscribe() {
|
---|
| 262 | const { parent, key } = this;
|
---|
| 263 | this.key = this.parent = null;
|
---|
| 264 | if (parent) {
|
---|
| 265 | parent.removeGroup(key);
|
---|
| 266 | }
|
---|
| 267 | }
|
---|
| 268 | }
|
---|
| 269 |
|
---|
| 270 | /**
|
---|
| 271 | * An Observable representing values belonging to the same group represented by
|
---|
| 272 | * a common key. The values emitted by a GroupedObservable come from the source
|
---|
| 273 | * Observable. The common key is available as the field `key` on a
|
---|
| 274 | * GroupedObservable instance.
|
---|
| 275 | *
|
---|
| 276 | * @class GroupedObservable<K, T>
|
---|
| 277 | */
|
---|
| 278 | export class GroupedObservable<K, T> extends Observable<T> {
|
---|
| 279 | /** @deprecated Do not construct this type. Internal use only */
|
---|
| 280 | constructor(public key: K,
|
---|
| 281 | private groupSubject: Subject<T>,
|
---|
| 282 | private refCountSubscription?: RefCountSubscription) {
|
---|
| 283 | super();
|
---|
| 284 | }
|
---|
| 285 |
|
---|
| 286 | /** @deprecated This is an internal implementation detail, do not use. */
|
---|
| 287 | _subscribe(subscriber: Subscriber<T>) {
|
---|
| 288 | const subscription = new Subscription();
|
---|
| 289 | const { refCountSubscription, groupSubject } = this;
|
---|
| 290 | if (refCountSubscription && !refCountSubscription.closed) {
|
---|
| 291 | subscription.add(new InnerRefCountSubscription(refCountSubscription));
|
---|
| 292 | }
|
---|
| 293 | subscription.add(groupSubject.subscribe(subscriber));
|
---|
| 294 | return subscription;
|
---|
| 295 | }
|
---|
| 296 | }
|
---|
| 297 |
|
---|
| 298 | /**
|
---|
| 299 | * We need this JSDoc comment for affecting ESDoc.
|
---|
| 300 | * @ignore
|
---|
| 301 | * @extends {Ignored}
|
---|
| 302 | */
|
---|
| 303 | class InnerRefCountSubscription extends Subscription {
|
---|
| 304 | constructor(private parent: RefCountSubscription) {
|
---|
| 305 | super();
|
---|
| 306 | parent.count++;
|
---|
| 307 | }
|
---|
| 308 |
|
---|
| 309 | unsubscribe() {
|
---|
| 310 | const parent = this.parent;
|
---|
| 311 | if (!parent.closed && !this.closed) {
|
---|
| 312 | super.unsubscribe();
|
---|
| 313 | parent.count -= 1;
|
---|
| 314 | if (parent.count === 0 && parent.attemptedToUnsubscribe) {
|
---|
| 315 | parent.unsubscribe();
|
---|
| 316 | }
|
---|
| 317 | }
|
---|
| 318 | }
|
---|
| 319 | }
|
---|