source: trip-planner-front/node_modules/node-forge/lib/task.js@ 571e0df

Last change on this file since 571e0df was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 19.3 KB
RevLine 
[6a3a178]1/**
2 * Support for concurrent task management and synchronization in web
3 * applications.
4 *
5 * @author Dave Longley
6 * @author David I. Lehn <dlehn@digitalbazaar.com>
7 *
8 * Copyright (c) 2009-2013 Digital Bazaar, Inc.
9 */
10var forge = require('./forge');
11require('./debug');
12require('./log');
13require('./util');
14
15// logging category
16var cat = 'forge.task';
17
18// verbose level
19// 0: off, 1: a little, 2: a whole lot
20// Verbose debug logging is surrounded by a level check to avoid the
21// performance issues with even calling the logging code regardless if it
22// is actually logged. For performance reasons this should not be set to 2
23// for production use.
24// ex: if(sVL >= 2) forge.log.verbose(....)
25var sVL = 0;
26
27// track tasks for debugging
28var sTasks = {};
29var sNextTaskId = 0;
30// debug access
31forge.debug.set(cat, 'tasks', sTasks);
32
33// a map of task type to task queue
34var sTaskQueues = {};
35// debug access
36forge.debug.set(cat, 'queues', sTaskQueues);
37
38// name for unnamed tasks
39var sNoTaskName = '?';
40
41// maximum number of doNext() recursions before a context swap occurs
42// FIXME: might need to tweak this based on the browser
43var sMaxRecursions = 30;
44
45// time slice for doing tasks before a context swap occurs
46// FIXME: might need to tweak this based on the browser
47var sTimeSlice = 20;
48
49/**
50 * Task states.
51 *
52 * READY: ready to start processing
53 * RUNNING: task or a subtask is running
54 * BLOCKED: task is waiting to acquire N permits to continue
55 * SLEEPING: task is sleeping for a period of time
56 * DONE: task is done
57 * ERROR: task has an error
58 */
59var READY = 'ready';
60var RUNNING = 'running';
61var BLOCKED = 'blocked';
62var SLEEPING = 'sleeping';
63var DONE = 'done';
64var ERROR = 'error';
65
66/**
67 * Task actions. Used to control state transitions.
68 *
69 * STOP: stop processing
70 * START: start processing tasks
71 * BLOCK: block task from continuing until 1 or more permits are released
72 * UNBLOCK: release one or more permits
73 * SLEEP: sleep for a period of time
74 * WAKEUP: wakeup early from SLEEPING state
75 * CANCEL: cancel further tasks
76 * FAIL: a failure occured
77 */
78var STOP = 'stop';
79var START = 'start';
80var BLOCK = 'block';
81var UNBLOCK = 'unblock';
82var SLEEP = 'sleep';
83var WAKEUP = 'wakeup';
84var CANCEL = 'cancel';
85var FAIL = 'fail';
86
87/**
88 * State transition table.
89 *
90 * nextState = sStateTable[currentState][action]
91 */
92var sStateTable = {};
93
94sStateTable[READY] = {};
95sStateTable[READY][STOP] = READY;
96sStateTable[READY][START] = RUNNING;
97sStateTable[READY][CANCEL] = DONE;
98sStateTable[READY][FAIL] = ERROR;
99
100sStateTable[RUNNING] = {};
101sStateTable[RUNNING][STOP] = READY;
102sStateTable[RUNNING][START] = RUNNING;
103sStateTable[RUNNING][BLOCK] = BLOCKED;
104sStateTable[RUNNING][UNBLOCK] = RUNNING;
105sStateTable[RUNNING][SLEEP] = SLEEPING;
106sStateTable[RUNNING][WAKEUP] = RUNNING;
107sStateTable[RUNNING][CANCEL] = DONE;
108sStateTable[RUNNING][FAIL] = ERROR;
109
110sStateTable[BLOCKED] = {};
111sStateTable[BLOCKED][STOP] = BLOCKED;
112sStateTable[BLOCKED][START] = BLOCKED;
113sStateTable[BLOCKED][BLOCK] = BLOCKED;
114sStateTable[BLOCKED][UNBLOCK] = BLOCKED;
115sStateTable[BLOCKED][SLEEP] = BLOCKED;
116sStateTable[BLOCKED][WAKEUP] = BLOCKED;
117sStateTable[BLOCKED][CANCEL] = DONE;
118sStateTable[BLOCKED][FAIL] = ERROR;
119
120sStateTable[SLEEPING] = {};
121sStateTable[SLEEPING][STOP] = SLEEPING;
122sStateTable[SLEEPING][START] = SLEEPING;
123sStateTable[SLEEPING][BLOCK] = SLEEPING;
124sStateTable[SLEEPING][UNBLOCK] = SLEEPING;
125sStateTable[SLEEPING][SLEEP] = SLEEPING;
126sStateTable[SLEEPING][WAKEUP] = SLEEPING;
127sStateTable[SLEEPING][CANCEL] = DONE;
128sStateTable[SLEEPING][FAIL] = ERROR;
129
130sStateTable[DONE] = {};
131sStateTable[DONE][STOP] = DONE;
132sStateTable[DONE][START] = DONE;
133sStateTable[DONE][BLOCK] = DONE;
134sStateTable[DONE][UNBLOCK] = DONE;
135sStateTable[DONE][SLEEP] = DONE;
136sStateTable[DONE][WAKEUP] = DONE;
137sStateTable[DONE][CANCEL] = DONE;
138sStateTable[DONE][FAIL] = ERROR;
139
140sStateTable[ERROR] = {};
141sStateTable[ERROR][STOP] = ERROR;
142sStateTable[ERROR][START] = ERROR;
143sStateTable[ERROR][BLOCK] = ERROR;
144sStateTable[ERROR][UNBLOCK] = ERROR;
145sStateTable[ERROR][SLEEP] = ERROR;
146sStateTable[ERROR][WAKEUP] = ERROR;
147sStateTable[ERROR][CANCEL] = ERROR;
148sStateTable[ERROR][FAIL] = ERROR;
149
150/**
151 * Creates a new task.
152 *
153 * @param options options for this task
154 * run: the run function for the task (required)
155 * name: the run function for the task (optional)
156 * parent: parent of this task (optional)
157 *
158 * @return the empty task.
159 */
160var Task = function(options) {
161 // task id
162 this.id = -1;
163
164 // task name
165 this.name = options.name || sNoTaskName;
166
167 // task has no parent
168 this.parent = options.parent || null;
169
170 // save run function
171 this.run = options.run;
172
173 // create a queue of subtasks to run
174 this.subtasks = [];
175
176 // error flag
177 this.error = false;
178
179 // state of the task
180 this.state = READY;
181
182 // number of times the task has been blocked (also the number
183 // of permits needed to be released to continue running)
184 this.blocks = 0;
185
186 // timeout id when sleeping
187 this.timeoutId = null;
188
189 // no swap time yet
190 this.swapTime = null;
191
192 // no user data
193 this.userData = null;
194
195 // initialize task
196 // FIXME: deal with overflow
197 this.id = sNextTaskId++;
198 sTasks[this.id] = this;
199 if(sVL >= 1) {
200 forge.log.verbose(cat, '[%s][%s] init', this.id, this.name, this);
201 }
202};
203
204/**
205 * Logs debug information on this task and the system state.
206 */
207Task.prototype.debug = function(msg) {
208 msg = msg || '';
209 forge.log.debug(cat, msg,
210 '[%s][%s] task:', this.id, this.name, this,
211 'subtasks:', this.subtasks.length,
212 'queue:', sTaskQueues);
213};
214
215/**
216 * Adds a subtask to run after task.doNext() or task.fail() is called.
217 *
218 * @param name human readable name for this task (optional).
219 * @param subrun a function to run that takes the current task as
220 * its first parameter.
221 *
222 * @return the current task (useful for chaining next() calls).
223 */
224Task.prototype.next = function(name, subrun) {
225 // juggle parameters if it looks like no name is given
226 if(typeof(name) === 'function') {
227 subrun = name;
228
229 // inherit parent's name
230 name = this.name;
231 }
232 // create subtask, set parent to this task, propagate callbacks
233 var subtask = new Task({
234 run: subrun,
235 name: name,
236 parent: this
237 });
238 // start subtasks running
239 subtask.state = RUNNING;
240 subtask.type = this.type;
241 subtask.successCallback = this.successCallback || null;
242 subtask.failureCallback = this.failureCallback || null;
243
244 // queue a new subtask
245 this.subtasks.push(subtask);
246
247 return this;
248};
249
250/**
251 * Adds subtasks to run in parallel after task.doNext() or task.fail()
252 * is called.
253 *
254 * @param name human readable name for this task (optional).
255 * @param subrun functions to run that take the current task as
256 * their first parameter.
257 *
258 * @return the current task (useful for chaining next() calls).
259 */
260Task.prototype.parallel = function(name, subrun) {
261 // juggle parameters if it looks like no name is given
262 if(forge.util.isArray(name)) {
263 subrun = name;
264
265 // inherit parent's name
266 name = this.name;
267 }
268 // Wrap parallel tasks in a regular task so they are started at the
269 // proper time.
270 return this.next(name, function(task) {
271 // block waiting for subtasks
272 var ptask = task;
273 ptask.block(subrun.length);
274
275 // we pass the iterator from the loop below as a parameter
276 // to a function because it is otherwise included in the
277 // closure and changes as the loop changes -- causing i
278 // to always be set to its highest value
279 var startParallelTask = function(pname, pi) {
280 forge.task.start({
281 type: pname,
282 run: function(task) {
283 subrun[pi](task);
284 },
285 success: function(task) {
286 ptask.unblock();
287 },
288 failure: function(task) {
289 ptask.unblock();
290 }
291 });
292 };
293
294 for(var i = 0; i < subrun.length; i++) {
295 // Type must be unique so task starts in parallel:
296 // name + private string + task id + sub-task index
297 // start tasks in parallel and unblock when the finish
298 var pname = name + '__parallel-' + task.id + '-' + i;
299 var pi = i;
300 startParallelTask(pname, pi);
301 }
302 });
303};
304
305/**
306 * Stops a running task.
307 */
308Task.prototype.stop = function() {
309 this.state = sStateTable[this.state][STOP];
310};
311
312/**
313 * Starts running a task.
314 */
315Task.prototype.start = function() {
316 this.error = false;
317 this.state = sStateTable[this.state][START];
318
319 // try to restart
320 if(this.state === RUNNING) {
321 this.start = new Date();
322 this.run(this);
323 runNext(this, 0);
324 }
325};
326
327/**
328 * Blocks a task until it one or more permits have been released. The
329 * task will not resume until the requested number of permits have
330 * been released with call(s) to unblock().
331 *
332 * @param n number of permits to wait for(default: 1).
333 */
334Task.prototype.block = function(n) {
335 n = typeof(n) === 'undefined' ? 1 : n;
336 this.blocks += n;
337 if(this.blocks > 0) {
338 this.state = sStateTable[this.state][BLOCK];
339 }
340};
341
342/**
343 * Releases a permit to unblock a task. If a task was blocked by
344 * requesting N permits via block(), then it will only continue
345 * running once enough permits have been released via unblock() calls.
346 *
347 * If multiple processes need to synchronize with a single task then
348 * use a condition variable (see forge.task.createCondition). It is
349 * an error to unblock a task more times than it has been blocked.
350 *
351 * @param n number of permits to release (default: 1).
352 *
353 * @return the current block count (task is unblocked when count is 0)
354 */
355Task.prototype.unblock = function(n) {
356 n = typeof(n) === 'undefined' ? 1 : n;
357 this.blocks -= n;
358 if(this.blocks === 0 && this.state !== DONE) {
359 this.state = RUNNING;
360 runNext(this, 0);
361 }
362 return this.blocks;
363};
364
365/**
366 * Sleep for a period of time before resuming tasks.
367 *
368 * @param n number of milliseconds to sleep (default: 0).
369 */
370Task.prototype.sleep = function(n) {
371 n = typeof(n) === 'undefined' ? 0 : n;
372 this.state = sStateTable[this.state][SLEEP];
373 var self = this;
374 this.timeoutId = setTimeout(function() {
375 self.timeoutId = null;
376 self.state = RUNNING;
377 runNext(self, 0);
378 }, n);
379};
380
381/**
382 * Waits on a condition variable until notified. The next task will
383 * not be scheduled until notification. A condition variable can be
384 * created with forge.task.createCondition().
385 *
386 * Once cond.notify() is called, the task will continue.
387 *
388 * @param cond the condition variable to wait on.
389 */
390Task.prototype.wait = function(cond) {
391 cond.wait(this);
392};
393
394/**
395 * If sleeping, wakeup and continue running tasks.
396 */
397Task.prototype.wakeup = function() {
398 if(this.state === SLEEPING) {
399 cancelTimeout(this.timeoutId);
400 this.timeoutId = null;
401 this.state = RUNNING;
402 runNext(this, 0);
403 }
404};
405
406/**
407 * Cancel all remaining subtasks of this task.
408 */
409Task.prototype.cancel = function() {
410 this.state = sStateTable[this.state][CANCEL];
411 // remove permits needed
412 this.permitsNeeded = 0;
413 // cancel timeouts
414 if(this.timeoutId !== null) {
415 cancelTimeout(this.timeoutId);
416 this.timeoutId = null;
417 }
418 // remove subtasks
419 this.subtasks = [];
420};
421
422/**
423 * Finishes this task with failure and sets error flag. The entire
424 * task will be aborted unless the next task that should execute
425 * is passed as a parameter. This allows levels of subtasks to be
426 * skipped. For instance, to abort only this tasks's subtasks, then
427 * call fail(task.parent). To abort this task's subtasks and its
428 * parent's subtasks, call fail(task.parent.parent). To abort
429 * all tasks and simply call the task callback, call fail() or
430 * fail(null).
431 *
432 * The task callback (success or failure) will always, eventually, be
433 * called.
434 *
435 * @param next the task to continue at, or null to abort entirely.
436 */
437Task.prototype.fail = function(next) {
438 // set error flag
439 this.error = true;
440
441 // finish task
442 finish(this, true);
443
444 if(next) {
445 // propagate task info
446 next.error = this.error;
447 next.swapTime = this.swapTime;
448 next.userData = this.userData;
449
450 // do next task as specified
451 runNext(next, 0);
452 } else {
453 if(this.parent !== null) {
454 // finish root task (ensures it is removed from task queue)
455 var parent = this.parent;
456 while(parent.parent !== null) {
457 // propagate task info
458 parent.error = this.error;
459 parent.swapTime = this.swapTime;
460 parent.userData = this.userData;
461 parent = parent.parent;
462 }
463 finish(parent, true);
464 }
465
466 // call failure callback if one exists
467 if(this.failureCallback) {
468 this.failureCallback(this);
469 }
470 }
471};
472
473/**
474 * Asynchronously start a task.
475 *
476 * @param task the task to start.
477 */
478var start = function(task) {
479 task.error = false;
480 task.state = sStateTable[task.state][START];
481 setTimeout(function() {
482 if(task.state === RUNNING) {
483 task.swapTime = +new Date();
484 task.run(task);
485 runNext(task, 0);
486 }
487 }, 0);
488};
489
490/**
491 * Run the next subtask or finish this task.
492 *
493 * @param task the task to process.
494 * @param recurse the recursion count.
495 */
496var runNext = function(task, recurse) {
497 // get time since last context swap (ms), if enough time has passed set
498 // swap to true to indicate that doNext was performed asynchronously
499 // also, if recurse is too high do asynchronously
500 var swap =
501 (recurse > sMaxRecursions) ||
502 (+new Date() - task.swapTime) > sTimeSlice;
503
504 var doNext = function(recurse) {
505 recurse++;
506 if(task.state === RUNNING) {
507 if(swap) {
508 // update swap time
509 task.swapTime = +new Date();
510 }
511
512 if(task.subtasks.length > 0) {
513 // run next subtask
514 var subtask = task.subtasks.shift();
515 subtask.error = task.error;
516 subtask.swapTime = task.swapTime;
517 subtask.userData = task.userData;
518 subtask.run(subtask);
519 if(!subtask.error) {
520 runNext(subtask, recurse);
521 }
522 } else {
523 finish(task);
524
525 if(!task.error) {
526 // chain back up and run parent
527 if(task.parent !== null) {
528 // propagate task info
529 task.parent.error = task.error;
530 task.parent.swapTime = task.swapTime;
531 task.parent.userData = task.userData;
532
533 // no subtasks left, call run next subtask on parent
534 runNext(task.parent, recurse);
535 }
536 }
537 }
538 }
539 };
540
541 if(swap) {
542 // we're swapping, so run asynchronously
543 setTimeout(doNext, 0);
544 } else {
545 // not swapping, so run synchronously
546 doNext(recurse);
547 }
548};
549
550/**
551 * Finishes a task and looks for the next task in the queue to start.
552 *
553 * @param task the task to finish.
554 * @param suppressCallbacks true to suppress callbacks.
555 */
556var finish = function(task, suppressCallbacks) {
557 // subtask is now done
558 task.state = DONE;
559
560 delete sTasks[task.id];
561 if(sVL >= 1) {
562 forge.log.verbose(cat, '[%s][%s] finish',
563 task.id, task.name, task);
564 }
565
566 // only do queue processing for root tasks
567 if(task.parent === null) {
568 // report error if queue is missing
569 if(!(task.type in sTaskQueues)) {
570 forge.log.error(cat,
571 '[%s][%s] task queue missing [%s]',
572 task.id, task.name, task.type);
573 } else if(sTaskQueues[task.type].length === 0) {
574 // report error if queue is empty
575 forge.log.error(cat,
576 '[%s][%s] task queue empty [%s]',
577 task.id, task.name, task.type);
578 } else if(sTaskQueues[task.type][0] !== task) {
579 // report error if this task isn't the first in the queue
580 forge.log.error(cat,
581 '[%s][%s] task not first in queue [%s]',
582 task.id, task.name, task.type);
583 } else {
584 // remove ourselves from the queue
585 sTaskQueues[task.type].shift();
586 // clean up queue if it is empty
587 if(sTaskQueues[task.type].length === 0) {
588 if(sVL >= 1) {
589 forge.log.verbose(cat, '[%s][%s] delete queue [%s]',
590 task.id, task.name, task.type);
591 }
592 /* Note: Only a task can delete a queue of its own type. This
593 is used as a way to synchronize tasks. If a queue for a certain
594 task type exists, then a task of that type is running.
595 */
596 delete sTaskQueues[task.type];
597 } else {
598 // dequeue the next task and start it
599 if(sVL >= 1) {
600 forge.log.verbose(cat,
601 '[%s][%s] queue start next [%s] remain:%s',
602 task.id, task.name, task.type,
603 sTaskQueues[task.type].length);
604 }
605 sTaskQueues[task.type][0].start();
606 }
607 }
608
609 if(!suppressCallbacks) {
610 // call final callback if one exists
611 if(task.error && task.failureCallback) {
612 task.failureCallback(task);
613 } else if(!task.error && task.successCallback) {
614 task.successCallback(task);
615 }
616 }
617 }
618};
619
620/* Tasks API */
621module.exports = forge.task = forge.task || {};
622
623/**
624 * Starts a new task that will run the passed function asynchronously.
625 *
626 * In order to finish the task, either task.doNext() or task.fail()
627 * *must* be called.
628 *
629 * The task must have a type (a string identifier) that can be used to
630 * synchronize it with other tasks of the same type. That type can also
631 * be used to cancel tasks that haven't started yet.
632 *
633 * To start a task, the following object must be provided as a parameter
634 * (each function takes a task object as its first parameter):
635 *
636 * {
637 * type: the type of task.
638 * run: the function to run to execute the task.
639 * success: a callback to call when the task succeeds (optional).
640 * failure: a callback to call when the task fails (optional).
641 * }
642 *
643 * @param options the object as described above.
644 */
645forge.task.start = function(options) {
646 // create a new task
647 var task = new Task({
648 run: options.run,
649 name: options.name || sNoTaskName
650 });
651 task.type = options.type;
652 task.successCallback = options.success || null;
653 task.failureCallback = options.failure || null;
654
655 // append the task onto the appropriate queue
656 if(!(task.type in sTaskQueues)) {
657 if(sVL >= 1) {
658 forge.log.verbose(cat, '[%s][%s] create queue [%s]',
659 task.id, task.name, task.type);
660 }
661 // create the queue with the new task
662 sTaskQueues[task.type] = [task];
663 start(task);
664 } else {
665 // push the task onto the queue, it will be run after a task
666 // with the same type completes
667 sTaskQueues[options.type].push(task);
668 }
669};
670
671/**
672 * Cancels all tasks of the given type that haven't started yet.
673 *
674 * @param type the type of task to cancel.
675 */
676forge.task.cancel = function(type) {
677 // find the task queue
678 if(type in sTaskQueues) {
679 // empty all but the current task from the queue
680 sTaskQueues[type] = [sTaskQueues[type][0]];
681 }
682};
683
684/**
685 * Creates a condition variable to synchronize tasks. To make a task wait
686 * on the condition variable, call task.wait(condition). To notify all
687 * tasks that are waiting, call condition.notify().
688 *
689 * @return the condition variable.
690 */
691forge.task.createCondition = function() {
692 var cond = {
693 // all tasks that are blocked
694 tasks: {}
695 };
696
697 /**
698 * Causes the given task to block until notify is called. If the task
699 * is already waiting on this condition then this is a no-op.
700 *
701 * @param task the task to cause to wait.
702 */
703 cond.wait = function(task) {
704 // only block once
705 if(!(task.id in cond.tasks)) {
706 task.block();
707 cond.tasks[task.id] = task;
708 }
709 };
710
711 /**
712 * Notifies all waiting tasks to wake up.
713 */
714 cond.notify = function() {
715 // since unblock() will run the next task from here, make sure to
716 // clear the condition's blocked task list before unblocking
717 var tmp = cond.tasks;
718 cond.tasks = {};
719 for(var id in tmp) {
720 tmp[id].unblock();
721 }
722 };
723
724 return cond;
725};
Note: See TracBrowser for help on using the repository browser.