1 | "use strict";
|
---|
2 | /**
|
---|
3 | * @license
|
---|
4 | * Copyright Google LLC All Rights Reserved.
|
---|
5 | *
|
---|
6 | * Use of this source code is governed by an MIT-style license that can be
|
---|
7 | * found in the LICENSE file at https://angular.io/license
|
---|
8 | */
|
---|
9 | Object.defineProperty(exports, "__esModule", { value: true });
|
---|
10 | exports.createLoggerJob = exports.createJobFactory = exports.createJobHandler = exports.ChannelAlreadyExistException = void 0;
|
---|
11 | const rxjs_1 = require("rxjs");
|
---|
12 | const operators_1 = require("rxjs/operators");
|
---|
13 | const index_1 = require("../../exception/index");
|
---|
14 | const index_2 = require("../../utils/index");
|
---|
15 | const api_1 = require("./api");
|
---|
16 | class ChannelAlreadyExistException extends index_1.BaseException {
|
---|
17 | constructor(name) {
|
---|
18 | super(`Channel ${JSON.stringify(name)} already exist.`);
|
---|
19 | }
|
---|
20 | }
|
---|
21 | exports.ChannelAlreadyExistException = ChannelAlreadyExistException;
|
---|
22 | /**
|
---|
23 | * Make a simple job handler that sets start and end from a function that's synchronous.
|
---|
24 | *
|
---|
25 | * @param fn The function to create a handler for.
|
---|
26 | * @param options An optional set of properties to set on the handler. Some fields might be
|
---|
27 | * required by registry or schedulers.
|
---|
28 | */
|
---|
29 | function createJobHandler(fn, options = {}) {
|
---|
30 | const handler = (argument, context) => {
|
---|
31 | const description = context.description;
|
---|
32 | const inboundBus = context.inboundBus;
|
---|
33 | const inputChannel = new rxjs_1.Subject();
|
---|
34 | let subscription;
|
---|
35 | return new rxjs_1.Observable((subject) => {
|
---|
36 | function complete() {
|
---|
37 | if (subscription) {
|
---|
38 | subscription.unsubscribe();
|
---|
39 | }
|
---|
40 | subject.next({ kind: api_1.JobOutboundMessageKind.End, description });
|
---|
41 | subject.complete();
|
---|
42 | inputChannel.complete();
|
---|
43 | }
|
---|
44 | // Handle input.
|
---|
45 | const inboundSub = inboundBus.subscribe((message) => {
|
---|
46 | switch (message.kind) {
|
---|
47 | case api_1.JobInboundMessageKind.Ping:
|
---|
48 | subject.next({ kind: api_1.JobOutboundMessageKind.Pong, description, id: message.id });
|
---|
49 | break;
|
---|
50 | case api_1.JobInboundMessageKind.Stop:
|
---|
51 | // There's no way to cancel a promise or a synchronous function, but we do cancel
|
---|
52 | // observables where possible.
|
---|
53 | complete();
|
---|
54 | break;
|
---|
55 | case api_1.JobInboundMessageKind.Input:
|
---|
56 | inputChannel.next(message.value);
|
---|
57 | break;
|
---|
58 | }
|
---|
59 | });
|
---|
60 | // Execute the function with the additional context.
|
---|
61 | const channels = new Map();
|
---|
62 | const newContext = {
|
---|
63 | ...context,
|
---|
64 | input: inputChannel.asObservable(),
|
---|
65 | createChannel(name) {
|
---|
66 | if (channels.has(name)) {
|
---|
67 | throw new ChannelAlreadyExistException(name);
|
---|
68 | }
|
---|
69 | const channelSubject = new rxjs_1.Subject();
|
---|
70 | const channelSub = channelSubject.subscribe((message) => {
|
---|
71 | subject.next({
|
---|
72 | kind: api_1.JobOutboundMessageKind.ChannelMessage,
|
---|
73 | description,
|
---|
74 | name,
|
---|
75 | message,
|
---|
76 | });
|
---|
77 | }, (error) => {
|
---|
78 | subject.next({ kind: api_1.JobOutboundMessageKind.ChannelError, description, name, error });
|
---|
79 | // This can be reopened.
|
---|
80 | channels.delete(name);
|
---|
81 | }, () => {
|
---|
82 | subject.next({ kind: api_1.JobOutboundMessageKind.ChannelComplete, description, name });
|
---|
83 | // This can be reopened.
|
---|
84 | channels.delete(name);
|
---|
85 | });
|
---|
86 | channels.set(name, channelSubject);
|
---|
87 | if (subscription) {
|
---|
88 | subscription.add(channelSub);
|
---|
89 | }
|
---|
90 | return channelSubject;
|
---|
91 | },
|
---|
92 | };
|
---|
93 | subject.next({ kind: api_1.JobOutboundMessageKind.Start, description });
|
---|
94 | let result = fn(argument, newContext);
|
---|
95 | // If the result is a promise, simply wait for it to complete before reporting the result.
|
---|
96 | if (index_2.isPromise(result)) {
|
---|
97 | result = rxjs_1.from(result);
|
---|
98 | }
|
---|
99 | else if (!rxjs_1.isObservable(result)) {
|
---|
100 | result = rxjs_1.of(result);
|
---|
101 | }
|
---|
102 | subscription = result.subscribe((value) => subject.next({ kind: api_1.JobOutboundMessageKind.Output, description, value }), (error) => subject.error(error), () => complete());
|
---|
103 | subscription.add(inboundSub);
|
---|
104 | return subscription;
|
---|
105 | });
|
---|
106 | };
|
---|
107 | return Object.assign(handler, { jobDescription: options });
|
---|
108 | }
|
---|
109 | exports.createJobHandler = createJobHandler;
|
---|
110 | /**
|
---|
111 | * Lazily create a job using a function.
|
---|
112 | * @param loader A factory function that returns a promise/observable of a JobHandler.
|
---|
113 | * @param options Same options as createJob.
|
---|
114 | */
|
---|
115 | function createJobFactory(loader, options = {}) {
|
---|
116 | const handler = (argument, context) => {
|
---|
117 | return rxjs_1.from(loader()).pipe(operators_1.switchMap((fn) => fn(argument, context)));
|
---|
118 | };
|
---|
119 | return Object.assign(handler, { jobDescription: options });
|
---|
120 | }
|
---|
121 | exports.createJobFactory = createJobFactory;
|
---|
122 | /**
|
---|
123 | * Creates a job that logs out input/output messages of another Job. The messages are still
|
---|
124 | * propagated to the other job.
|
---|
125 | */
|
---|
126 | function createLoggerJob(job, logger) {
|
---|
127 | const handler = (argument, context) => {
|
---|
128 | context.inboundBus
|
---|
129 | .pipe(operators_1.tap((message) => logger.info(`Input: ${JSON.stringify(message)}`)))
|
---|
130 | .subscribe();
|
---|
131 | return job(argument, context).pipe(operators_1.tap((message) => logger.info(`Message: ${JSON.stringify(message)}`), (error) => logger.warn(`Error: ${JSON.stringify(error)}`), () => logger.info(`Completed`)));
|
---|
132 | };
|
---|
133 | return Object.assign(handler, job);
|
---|
134 | }
|
---|
135 | exports.createLoggerJob = createLoggerJob;
|
---|