1 | "use strict";
|
---|
2 | /**
|
---|
3 | * @license
|
---|
4 | * Copyright Google LLC All Rights Reserved.
|
---|
5 | *
|
---|
6 | * Use of this source code is governed by an MIT-style license that can be
|
---|
7 | * found in the LICENSE file at https://angular.io/license
|
---|
8 | */
|
---|
9 | Object.defineProperty(exports, "__esModule", { value: true });
|
---|
10 | exports.SimpleScheduler = exports.JobOutputSchemaValidationError = exports.JobInboundMessageSchemaValidationError = exports.JobArgumentSchemaValidationError = void 0;
|
---|
11 | const rxjs_1 = require("rxjs");
|
---|
12 | const operators_1 = require("rxjs/operators");
|
---|
13 | const json_1 = require("../../json");
|
---|
14 | const api_1 = require("./api");
|
---|
15 | const exception_1 = require("./exception");
|
---|
16 | class JobArgumentSchemaValidationError extends json_1.schema.SchemaValidationException {
|
---|
17 | constructor(errors) {
|
---|
18 | super(errors, 'Job Argument failed to validate. Errors: ');
|
---|
19 | }
|
---|
20 | }
|
---|
21 | exports.JobArgumentSchemaValidationError = JobArgumentSchemaValidationError;
|
---|
22 | class JobInboundMessageSchemaValidationError extends json_1.schema.SchemaValidationException {
|
---|
23 | constructor(errors) {
|
---|
24 | super(errors, 'Job Inbound Message failed to validate. Errors: ');
|
---|
25 | }
|
---|
26 | }
|
---|
27 | exports.JobInboundMessageSchemaValidationError = JobInboundMessageSchemaValidationError;
|
---|
28 | class JobOutputSchemaValidationError extends json_1.schema.SchemaValidationException {
|
---|
29 | constructor(errors) {
|
---|
30 | super(errors, 'Job Output failed to validate. Errors: ');
|
---|
31 | }
|
---|
32 | }
|
---|
33 | exports.JobOutputSchemaValidationError = JobOutputSchemaValidationError;
|
---|
34 | function _jobShare() {
|
---|
35 | // This is the same code as a `shareReplay()` operator, but uses a dumber Subject rather than a
|
---|
36 | // ReplaySubject.
|
---|
37 | return (source) => {
|
---|
38 | let refCount = 0;
|
---|
39 | let subject;
|
---|
40 | let hasError = false;
|
---|
41 | let isComplete = false;
|
---|
42 | let subscription;
|
---|
43 | return new rxjs_1.Observable((subscriber) => {
|
---|
44 | let innerSub;
|
---|
45 | refCount++;
|
---|
46 | if (!subject) {
|
---|
47 | subject = new rxjs_1.Subject();
|
---|
48 | innerSub = subject.subscribe(subscriber);
|
---|
49 | subscription = source.subscribe({
|
---|
50 | next(value) {
|
---|
51 | subject.next(value);
|
---|
52 | },
|
---|
53 | error(err) {
|
---|
54 | hasError = true;
|
---|
55 | subject.error(err);
|
---|
56 | },
|
---|
57 | complete() {
|
---|
58 | isComplete = true;
|
---|
59 | subject.complete();
|
---|
60 | },
|
---|
61 | });
|
---|
62 | }
|
---|
63 | else {
|
---|
64 | innerSub = subject.subscribe(subscriber);
|
---|
65 | }
|
---|
66 | return () => {
|
---|
67 | refCount--;
|
---|
68 | innerSub.unsubscribe();
|
---|
69 | if (subscription && refCount === 0 && (isComplete || hasError)) {
|
---|
70 | subscription.unsubscribe();
|
---|
71 | }
|
---|
72 | };
|
---|
73 | });
|
---|
74 | };
|
---|
75 | }
|
---|
76 | /**
|
---|
77 | * Simple scheduler. Should be the base of all registries and schedulers.
|
---|
78 | */
|
---|
79 | class SimpleScheduler {
|
---|
80 | constructor(_jobRegistry, _schemaRegistry = new json_1.schema.CoreSchemaRegistry()) {
|
---|
81 | this._jobRegistry = _jobRegistry;
|
---|
82 | this._schemaRegistry = _schemaRegistry;
|
---|
83 | this._internalJobDescriptionMap = new Map();
|
---|
84 | this._queue = [];
|
---|
85 | this._pauseCounter = 0;
|
---|
86 | }
|
---|
87 | _getInternalDescription(name) {
|
---|
88 | const maybeHandler = this._internalJobDescriptionMap.get(name);
|
---|
89 | if (maybeHandler !== undefined) {
|
---|
90 | return rxjs_1.of(maybeHandler);
|
---|
91 | }
|
---|
92 | const handler = this._jobRegistry.get(name);
|
---|
93 | return handler.pipe(operators_1.switchMap((handler) => {
|
---|
94 | if (handler === null) {
|
---|
95 | return rxjs_1.of(null);
|
---|
96 | }
|
---|
97 | const description = {
|
---|
98 | // Make a copy of it to be sure it's proper JSON.
|
---|
99 | ...JSON.parse(JSON.stringify(handler.jobDescription)),
|
---|
100 | name: handler.jobDescription.name || name,
|
---|
101 | argument: handler.jobDescription.argument || true,
|
---|
102 | input: handler.jobDescription.input || true,
|
---|
103 | output: handler.jobDescription.output || true,
|
---|
104 | channels: handler.jobDescription.channels || {},
|
---|
105 | };
|
---|
106 | const handlerWithExtra = Object.assign(handler.bind(undefined), {
|
---|
107 | jobDescription: description,
|
---|
108 | argumentV: this._schemaRegistry.compile(description.argument).pipe(operators_1.shareReplay(1)),
|
---|
109 | inputV: this._schemaRegistry.compile(description.input).pipe(operators_1.shareReplay(1)),
|
---|
110 | outputV: this._schemaRegistry.compile(description.output).pipe(operators_1.shareReplay(1)),
|
---|
111 | });
|
---|
112 | this._internalJobDescriptionMap.set(name, handlerWithExtra);
|
---|
113 | return rxjs_1.of(handlerWithExtra);
|
---|
114 | }));
|
---|
115 | }
|
---|
116 | /**
|
---|
117 | * Get a job description for a named job.
|
---|
118 | *
|
---|
119 | * @param name The name of the job.
|
---|
120 | * @returns A description, or null if the job is not registered.
|
---|
121 | */
|
---|
122 | getDescription(name) {
|
---|
123 | return rxjs_1.concat(this._getInternalDescription(name).pipe(operators_1.map((x) => x && x.jobDescription)), rxjs_1.of(null)).pipe(operators_1.first());
|
---|
124 | }
|
---|
125 | /**
|
---|
126 | * Returns true if the job name has been registered.
|
---|
127 | * @param name The name of the job.
|
---|
128 | * @returns True if the job exists, false otherwise.
|
---|
129 | */
|
---|
130 | has(name) {
|
---|
131 | return this.getDescription(name).pipe(operators_1.map((x) => x !== null));
|
---|
132 | }
|
---|
133 | /**
|
---|
134 | * Pause the scheduler, temporary queueing _new_ jobs. Returns a resume function that should be
|
---|
135 | * used to resume execution. If multiple `pause()` were called, all their resume functions must
|
---|
136 | * be called before the Scheduler actually starts new jobs. Additional calls to the same resume
|
---|
137 | * function will have no effect.
|
---|
138 | *
|
---|
139 | * Jobs already running are NOT paused. This is pausing the scheduler only.
|
---|
140 | */
|
---|
141 | pause() {
|
---|
142 | let called = false;
|
---|
143 | this._pauseCounter++;
|
---|
144 | return () => {
|
---|
145 | if (!called) {
|
---|
146 | called = true;
|
---|
147 | if (--this._pauseCounter == 0) {
|
---|
148 | // Resume the queue.
|
---|
149 | const q = this._queue;
|
---|
150 | this._queue = [];
|
---|
151 | q.forEach((fn) => fn());
|
---|
152 | }
|
---|
153 | }
|
---|
154 | };
|
---|
155 | }
|
---|
156 | /**
|
---|
157 | * Schedule a job to be run, using its name.
|
---|
158 | * @param name The name of job to be run.
|
---|
159 | * @param argument The argument to send to the job when starting it.
|
---|
160 | * @param options Scheduling options.
|
---|
161 | * @returns The Job being run.
|
---|
162 | */
|
---|
163 | schedule(name, argument, options) {
|
---|
164 | if (this._pauseCounter > 0) {
|
---|
165 | const waitable = new rxjs_1.Subject();
|
---|
166 | this._queue.push(() => waitable.complete());
|
---|
167 | return this._scheduleJob(name, argument, options || {}, waitable);
|
---|
168 | }
|
---|
169 | return this._scheduleJob(name, argument, options || {}, rxjs_1.EMPTY);
|
---|
170 | }
|
---|
171 | /**
|
---|
172 | * Filter messages.
|
---|
173 | * @private
|
---|
174 | */
|
---|
175 | _filterJobOutboundMessages(message, state) {
|
---|
176 | switch (message.kind) {
|
---|
177 | case api_1.JobOutboundMessageKind.OnReady:
|
---|
178 | return state == api_1.JobState.Queued;
|
---|
179 | case api_1.JobOutboundMessageKind.Start:
|
---|
180 | return state == api_1.JobState.Ready;
|
---|
181 | case api_1.JobOutboundMessageKind.End:
|
---|
182 | return state == api_1.JobState.Started || state == api_1.JobState.Ready;
|
---|
183 | }
|
---|
184 | return true;
|
---|
185 | }
|
---|
186 | /**
|
---|
187 | * Return a new state. This is just to simplify the reading of the _createJob method.
|
---|
188 | * @private
|
---|
189 | */
|
---|
190 | _updateState(message, state) {
|
---|
191 | switch (message.kind) {
|
---|
192 | case api_1.JobOutboundMessageKind.OnReady:
|
---|
193 | return api_1.JobState.Ready;
|
---|
194 | case api_1.JobOutboundMessageKind.Start:
|
---|
195 | return api_1.JobState.Started;
|
---|
196 | case api_1.JobOutboundMessageKind.End:
|
---|
197 | return api_1.JobState.Ended;
|
---|
198 | }
|
---|
199 | return state;
|
---|
200 | }
|
---|
201 | /**
|
---|
202 | * Create the job.
|
---|
203 | * @private
|
---|
204 | */
|
---|
205 | _createJob(name, argument, handler, inboundBus, outboundBus) {
|
---|
206 | const schemaRegistry = this._schemaRegistry;
|
---|
207 | const channelsSubject = new Map();
|
---|
208 | const channels = new Map();
|
---|
209 | let state = api_1.JobState.Queued;
|
---|
210 | let pingId = 0;
|
---|
211 | // Create the input channel by having a filter.
|
---|
212 | const input = new rxjs_1.Subject();
|
---|
213 | input
|
---|
214 | .pipe(operators_1.concatMap((message) => handler.pipe(operators_1.switchMap((handler) => {
|
---|
215 | if (handler === null) {
|
---|
216 | throw new exception_1.JobDoesNotExistException(name);
|
---|
217 | }
|
---|
218 | else {
|
---|
219 | return handler.inputV.pipe(operators_1.switchMap((validate) => validate(message)));
|
---|
220 | }
|
---|
221 | }))), operators_1.filter((result) => result.success), operators_1.map((result) => result.data))
|
---|
222 | .subscribe((value) => inboundBus.next({ kind: api_1.JobInboundMessageKind.Input, value }));
|
---|
223 | outboundBus = rxjs_1.concat(outboundBus,
|
---|
224 | // Add an End message at completion. This will be filtered out if the job actually send an
|
---|
225 | // End.
|
---|
226 | handler.pipe(operators_1.switchMap((handler) => {
|
---|
227 | if (handler) {
|
---|
228 | return rxjs_1.of({
|
---|
229 | kind: api_1.JobOutboundMessageKind.End,
|
---|
230 | description: handler.jobDescription,
|
---|
231 | });
|
---|
232 | }
|
---|
233 | else {
|
---|
234 | return rxjs_1.EMPTY;
|
---|
235 | }
|
---|
236 | }))).pipe(operators_1.filter((message) => this._filterJobOutboundMessages(message, state)),
|
---|
237 | // Update internal logic and Job<> members.
|
---|
238 | operators_1.tap((message) => {
|
---|
239 | // Update the state.
|
---|
240 | state = this._updateState(message, state);
|
---|
241 | switch (message.kind) {
|
---|
242 | case api_1.JobOutboundMessageKind.ChannelCreate: {
|
---|
243 | const maybeSubject = channelsSubject.get(message.name);
|
---|
244 | // If it doesn't exist or it's closed on the other end.
|
---|
245 | if (!maybeSubject) {
|
---|
246 | const s = new rxjs_1.Subject();
|
---|
247 | channelsSubject.set(message.name, s);
|
---|
248 | channels.set(message.name, s.asObservable());
|
---|
249 | }
|
---|
250 | break;
|
---|
251 | }
|
---|
252 | case api_1.JobOutboundMessageKind.ChannelMessage: {
|
---|
253 | const maybeSubject = channelsSubject.get(message.name);
|
---|
254 | if (maybeSubject) {
|
---|
255 | maybeSubject.next(message.message);
|
---|
256 | }
|
---|
257 | break;
|
---|
258 | }
|
---|
259 | case api_1.JobOutboundMessageKind.ChannelComplete: {
|
---|
260 | const maybeSubject = channelsSubject.get(message.name);
|
---|
261 | if (maybeSubject) {
|
---|
262 | maybeSubject.complete();
|
---|
263 | channelsSubject.delete(message.name);
|
---|
264 | }
|
---|
265 | break;
|
---|
266 | }
|
---|
267 | case api_1.JobOutboundMessageKind.ChannelError: {
|
---|
268 | const maybeSubject = channelsSubject.get(message.name);
|
---|
269 | if (maybeSubject) {
|
---|
270 | maybeSubject.error(message.error);
|
---|
271 | channelsSubject.delete(message.name);
|
---|
272 | }
|
---|
273 | break;
|
---|
274 | }
|
---|
275 | }
|
---|
276 | }, () => {
|
---|
277 | state = api_1.JobState.Errored;
|
---|
278 | }),
|
---|
279 | // Do output validation (might include default values so this might have side
|
---|
280 | // effects). We keep all messages in order.
|
---|
281 | operators_1.concatMap((message) => {
|
---|
282 | if (message.kind !== api_1.JobOutboundMessageKind.Output) {
|
---|
283 | return rxjs_1.of(message);
|
---|
284 | }
|
---|
285 | return handler.pipe(operators_1.switchMap((handler) => {
|
---|
286 | if (handler === null) {
|
---|
287 | throw new exception_1.JobDoesNotExistException(name);
|
---|
288 | }
|
---|
289 | else {
|
---|
290 | return handler.outputV.pipe(operators_1.switchMap((validate) => validate(message.value)), operators_1.switchMap((output) => {
|
---|
291 | if (!output.success) {
|
---|
292 | throw new JobOutputSchemaValidationError(output.errors);
|
---|
293 | }
|
---|
294 | return rxjs_1.of({
|
---|
295 | ...message,
|
---|
296 | output: output.data,
|
---|
297 | });
|
---|
298 | }));
|
---|
299 | }
|
---|
300 | }));
|
---|
301 | }), _jobShare());
|
---|
302 | const output = outboundBus.pipe(operators_1.filter((x) => x.kind == api_1.JobOutboundMessageKind.Output), operators_1.map((x) => x.value), operators_1.shareReplay(1));
|
---|
303 | // Return the Job.
|
---|
304 | return {
|
---|
305 | get state() {
|
---|
306 | return state;
|
---|
307 | },
|
---|
308 | argument,
|
---|
309 | description: handler.pipe(operators_1.switchMap((handler) => {
|
---|
310 | if (handler === null) {
|
---|
311 | throw new exception_1.JobDoesNotExistException(name);
|
---|
312 | }
|
---|
313 | else {
|
---|
314 | return rxjs_1.of(handler.jobDescription);
|
---|
315 | }
|
---|
316 | })),
|
---|
317 | output,
|
---|
318 | getChannel(name, schema = true) {
|
---|
319 | let maybeObservable = channels.get(name);
|
---|
320 | if (!maybeObservable) {
|
---|
321 | const s = new rxjs_1.Subject();
|
---|
322 | channelsSubject.set(name, s);
|
---|
323 | channels.set(name, s.asObservable());
|
---|
324 | maybeObservable = s.asObservable();
|
---|
325 | }
|
---|
326 | return maybeObservable.pipe(
|
---|
327 | // Keep the order of messages.
|
---|
328 | operators_1.concatMap((message) => {
|
---|
329 | return schemaRegistry.compile(schema).pipe(operators_1.switchMap((validate) => validate(message)), operators_1.filter((x) => x.success), operators_1.map((x) => x.data));
|
---|
330 | }));
|
---|
331 | },
|
---|
332 | ping() {
|
---|
333 | const id = pingId++;
|
---|
334 | inboundBus.next({ kind: api_1.JobInboundMessageKind.Ping, id });
|
---|
335 | return outboundBus.pipe(operators_1.filter((x) => x.kind === api_1.JobOutboundMessageKind.Pong && x.id == id), operators_1.first(), operators_1.ignoreElements());
|
---|
336 | },
|
---|
337 | stop() {
|
---|
338 | inboundBus.next({ kind: api_1.JobInboundMessageKind.Stop });
|
---|
339 | },
|
---|
340 | input,
|
---|
341 | inboundBus,
|
---|
342 | outboundBus,
|
---|
343 | };
|
---|
344 | }
|
---|
345 | _scheduleJob(name, argument, options, waitable) {
|
---|
346 | // Get handler first, since this can error out if there's no handler for the job name.
|
---|
347 | const handler = this._getInternalDescription(name);
|
---|
348 | const optionsDeps = (options && options.dependencies) || [];
|
---|
349 | const dependencies = Array.isArray(optionsDeps) ? optionsDeps : [optionsDeps];
|
---|
350 | const inboundBus = new rxjs_1.Subject();
|
---|
351 | const outboundBus = rxjs_1.concat(
|
---|
352 | // Wait for dependencies, make sure to not report messages from dependencies. Subscribe to
|
---|
353 | // all dependencies at the same time so they run concurrently.
|
---|
354 | rxjs_1.merge(...dependencies.map((x) => x.outboundBus)).pipe(operators_1.ignoreElements()),
|
---|
355 | // Wait for pause() to clear (if necessary).
|
---|
356 | waitable, rxjs_1.from(handler).pipe(operators_1.switchMap((handler) => new rxjs_1.Observable((subscriber) => {
|
---|
357 | if (!handler) {
|
---|
358 | throw new exception_1.JobDoesNotExistException(name);
|
---|
359 | }
|
---|
360 | // Validate the argument.
|
---|
361 | return handler.argumentV
|
---|
362 | .pipe(operators_1.switchMap((validate) => validate(argument)), operators_1.switchMap((output) => {
|
---|
363 | if (!output.success) {
|
---|
364 | throw new JobArgumentSchemaValidationError(output.errors);
|
---|
365 | }
|
---|
366 | const argument = output.data;
|
---|
367 | const description = handler.jobDescription;
|
---|
368 | subscriber.next({ kind: api_1.JobOutboundMessageKind.OnReady, description });
|
---|
369 | const context = {
|
---|
370 | description,
|
---|
371 | dependencies: [...dependencies],
|
---|
372 | inboundBus: inboundBus.asObservable(),
|
---|
373 | scheduler: this,
|
---|
374 | };
|
---|
375 | return handler(argument, context);
|
---|
376 | }))
|
---|
377 | .subscribe(subscriber);
|
---|
378 | }))));
|
---|
379 | return this._createJob(name, argument, handler, inboundBus, outboundBus);
|
---|
380 | }
|
---|
381 | }
|
---|
382 | exports.SimpleScheduler = SimpleScheduler;
|
---|