[6a3a178] | 1 | /**
|
---|
| 2 | * Support for concurrent task management and synchronization in web
|
---|
| 3 | * applications.
|
---|
| 4 | *
|
---|
| 5 | * @author Dave Longley
|
---|
| 6 | * @author David I. Lehn <dlehn@digitalbazaar.com>
|
---|
| 7 | *
|
---|
| 8 | * Copyright (c) 2009-2013 Digital Bazaar, Inc.
|
---|
| 9 | */
|
---|
| 10 | var forge = require('./forge');
|
---|
| 11 | require('./debug');
|
---|
| 12 | require('./log');
|
---|
| 13 | require('./util');
|
---|
| 14 |
|
---|
| 15 | // logging category
|
---|
| 16 | var cat = 'forge.task';
|
---|
| 17 |
|
---|
| 18 | // verbose level
|
---|
| 19 | // 0: off, 1: a little, 2: a whole lot
|
---|
| 20 | // Verbose debug logging is surrounded by a level check to avoid the
|
---|
| 21 | // performance issues with even calling the logging code regardless if it
|
---|
| 22 | // is actually logged. For performance reasons this should not be set to 2
|
---|
| 23 | // for production use.
|
---|
| 24 | // ex: if(sVL >= 2) forge.log.verbose(....)
|
---|
| 25 | var sVL = 0;
|
---|
| 26 |
|
---|
| 27 | // track tasks for debugging
|
---|
| 28 | var sTasks = {};
|
---|
| 29 | var sNextTaskId = 0;
|
---|
| 30 | // debug access
|
---|
| 31 | forge.debug.set(cat, 'tasks', sTasks);
|
---|
| 32 |
|
---|
| 33 | // a map of task type to task queue
|
---|
| 34 | var sTaskQueues = {};
|
---|
| 35 | // debug access
|
---|
| 36 | forge.debug.set(cat, 'queues', sTaskQueues);
|
---|
| 37 |
|
---|
| 38 | // name for unnamed tasks
|
---|
| 39 | var sNoTaskName = '?';
|
---|
| 40 |
|
---|
| 41 | // maximum number of doNext() recursions before a context swap occurs
|
---|
| 42 | // FIXME: might need to tweak this based on the browser
|
---|
| 43 | var sMaxRecursions = 30;
|
---|
| 44 |
|
---|
| 45 | // time slice for doing tasks before a context swap occurs
|
---|
| 46 | // FIXME: might need to tweak this based on the browser
|
---|
| 47 | var sTimeSlice = 20;
|
---|
| 48 |
|
---|
| 49 | /**
|
---|
| 50 | * Task states.
|
---|
| 51 | *
|
---|
| 52 | * READY: ready to start processing
|
---|
| 53 | * RUNNING: task or a subtask is running
|
---|
| 54 | * BLOCKED: task is waiting to acquire N permits to continue
|
---|
| 55 | * SLEEPING: task is sleeping for a period of time
|
---|
| 56 | * DONE: task is done
|
---|
| 57 | * ERROR: task has an error
|
---|
| 58 | */
|
---|
| 59 | var READY = 'ready';
|
---|
| 60 | var RUNNING = 'running';
|
---|
| 61 | var BLOCKED = 'blocked';
|
---|
| 62 | var SLEEPING = 'sleeping';
|
---|
| 63 | var DONE = 'done';
|
---|
| 64 | var ERROR = 'error';
|
---|
| 65 |
|
---|
| 66 | /**
|
---|
| 67 | * Task actions. Used to control state transitions.
|
---|
| 68 | *
|
---|
| 69 | * STOP: stop processing
|
---|
| 70 | * START: start processing tasks
|
---|
| 71 | * BLOCK: block task from continuing until 1 or more permits are released
|
---|
| 72 | * UNBLOCK: release one or more permits
|
---|
| 73 | * SLEEP: sleep for a period of time
|
---|
| 74 | * WAKEUP: wakeup early from SLEEPING state
|
---|
| 75 | * CANCEL: cancel further tasks
|
---|
| 76 | * FAIL: a failure occured
|
---|
| 77 | */
|
---|
| 78 | var STOP = 'stop';
|
---|
| 79 | var START = 'start';
|
---|
| 80 | var BLOCK = 'block';
|
---|
| 81 | var UNBLOCK = 'unblock';
|
---|
| 82 | var SLEEP = 'sleep';
|
---|
| 83 | var WAKEUP = 'wakeup';
|
---|
| 84 | var CANCEL = 'cancel';
|
---|
| 85 | var FAIL = 'fail';
|
---|
| 86 |
|
---|
| 87 | /**
|
---|
| 88 | * State transition table.
|
---|
| 89 | *
|
---|
| 90 | * nextState = sStateTable[currentState][action]
|
---|
| 91 | */
|
---|
| 92 | var sStateTable = {};
|
---|
| 93 |
|
---|
| 94 | sStateTable[READY] = {};
|
---|
| 95 | sStateTable[READY][STOP] = READY;
|
---|
| 96 | sStateTable[READY][START] = RUNNING;
|
---|
| 97 | sStateTable[READY][CANCEL] = DONE;
|
---|
| 98 | sStateTable[READY][FAIL] = ERROR;
|
---|
| 99 |
|
---|
| 100 | sStateTable[RUNNING] = {};
|
---|
| 101 | sStateTable[RUNNING][STOP] = READY;
|
---|
| 102 | sStateTable[RUNNING][START] = RUNNING;
|
---|
| 103 | sStateTable[RUNNING][BLOCK] = BLOCKED;
|
---|
| 104 | sStateTable[RUNNING][UNBLOCK] = RUNNING;
|
---|
| 105 | sStateTable[RUNNING][SLEEP] = SLEEPING;
|
---|
| 106 | sStateTable[RUNNING][WAKEUP] = RUNNING;
|
---|
| 107 | sStateTable[RUNNING][CANCEL] = DONE;
|
---|
| 108 | sStateTable[RUNNING][FAIL] = ERROR;
|
---|
| 109 |
|
---|
| 110 | sStateTable[BLOCKED] = {};
|
---|
| 111 | sStateTable[BLOCKED][STOP] = BLOCKED;
|
---|
| 112 | sStateTable[BLOCKED][START] = BLOCKED;
|
---|
| 113 | sStateTable[BLOCKED][BLOCK] = BLOCKED;
|
---|
| 114 | sStateTable[BLOCKED][UNBLOCK] = BLOCKED;
|
---|
| 115 | sStateTable[BLOCKED][SLEEP] = BLOCKED;
|
---|
| 116 | sStateTable[BLOCKED][WAKEUP] = BLOCKED;
|
---|
| 117 | sStateTable[BLOCKED][CANCEL] = DONE;
|
---|
| 118 | sStateTable[BLOCKED][FAIL] = ERROR;
|
---|
| 119 |
|
---|
| 120 | sStateTable[SLEEPING] = {};
|
---|
| 121 | sStateTable[SLEEPING][STOP] = SLEEPING;
|
---|
| 122 | sStateTable[SLEEPING][START] = SLEEPING;
|
---|
| 123 | sStateTable[SLEEPING][BLOCK] = SLEEPING;
|
---|
| 124 | sStateTable[SLEEPING][UNBLOCK] = SLEEPING;
|
---|
| 125 | sStateTable[SLEEPING][SLEEP] = SLEEPING;
|
---|
| 126 | sStateTable[SLEEPING][WAKEUP] = SLEEPING;
|
---|
| 127 | sStateTable[SLEEPING][CANCEL] = DONE;
|
---|
| 128 | sStateTable[SLEEPING][FAIL] = ERROR;
|
---|
| 129 |
|
---|
| 130 | sStateTable[DONE] = {};
|
---|
| 131 | sStateTable[DONE][STOP] = DONE;
|
---|
| 132 | sStateTable[DONE][START] = DONE;
|
---|
| 133 | sStateTable[DONE][BLOCK] = DONE;
|
---|
| 134 | sStateTable[DONE][UNBLOCK] = DONE;
|
---|
| 135 | sStateTable[DONE][SLEEP] = DONE;
|
---|
| 136 | sStateTable[DONE][WAKEUP] = DONE;
|
---|
| 137 | sStateTable[DONE][CANCEL] = DONE;
|
---|
| 138 | sStateTable[DONE][FAIL] = ERROR;
|
---|
| 139 |
|
---|
| 140 | sStateTable[ERROR] = {};
|
---|
| 141 | sStateTable[ERROR][STOP] = ERROR;
|
---|
| 142 | sStateTable[ERROR][START] = ERROR;
|
---|
| 143 | sStateTable[ERROR][BLOCK] = ERROR;
|
---|
| 144 | sStateTable[ERROR][UNBLOCK] = ERROR;
|
---|
| 145 | sStateTable[ERROR][SLEEP] = ERROR;
|
---|
| 146 | sStateTable[ERROR][WAKEUP] = ERROR;
|
---|
| 147 | sStateTable[ERROR][CANCEL] = ERROR;
|
---|
| 148 | sStateTable[ERROR][FAIL] = ERROR;
|
---|
| 149 |
|
---|
| 150 | /**
|
---|
| 151 | * Creates a new task.
|
---|
| 152 | *
|
---|
| 153 | * @param options options for this task
|
---|
| 154 | * run: the run function for the task (required)
|
---|
| 155 | * name: the run function for the task (optional)
|
---|
| 156 | * parent: parent of this task (optional)
|
---|
| 157 | *
|
---|
| 158 | * @return the empty task.
|
---|
| 159 | */
|
---|
| 160 | var Task = function(options) {
|
---|
| 161 | // task id
|
---|
| 162 | this.id = -1;
|
---|
| 163 |
|
---|
| 164 | // task name
|
---|
| 165 | this.name = options.name || sNoTaskName;
|
---|
| 166 |
|
---|
| 167 | // task has no parent
|
---|
| 168 | this.parent = options.parent || null;
|
---|
| 169 |
|
---|
| 170 | // save run function
|
---|
| 171 | this.run = options.run;
|
---|
| 172 |
|
---|
| 173 | // create a queue of subtasks to run
|
---|
| 174 | this.subtasks = [];
|
---|
| 175 |
|
---|
| 176 | // error flag
|
---|
| 177 | this.error = false;
|
---|
| 178 |
|
---|
| 179 | // state of the task
|
---|
| 180 | this.state = READY;
|
---|
| 181 |
|
---|
| 182 | // number of times the task has been blocked (also the number
|
---|
| 183 | // of permits needed to be released to continue running)
|
---|
| 184 | this.blocks = 0;
|
---|
| 185 |
|
---|
| 186 | // timeout id when sleeping
|
---|
| 187 | this.timeoutId = null;
|
---|
| 188 |
|
---|
| 189 | // no swap time yet
|
---|
| 190 | this.swapTime = null;
|
---|
| 191 |
|
---|
| 192 | // no user data
|
---|
| 193 | this.userData = null;
|
---|
| 194 |
|
---|
| 195 | // initialize task
|
---|
| 196 | // FIXME: deal with overflow
|
---|
| 197 | this.id = sNextTaskId++;
|
---|
| 198 | sTasks[this.id] = this;
|
---|
| 199 | if(sVL >= 1) {
|
---|
| 200 | forge.log.verbose(cat, '[%s][%s] init', this.id, this.name, this);
|
---|
| 201 | }
|
---|
| 202 | };
|
---|
| 203 |
|
---|
| 204 | /**
|
---|
| 205 | * Logs debug information on this task and the system state.
|
---|
| 206 | */
|
---|
| 207 | Task.prototype.debug = function(msg) {
|
---|
| 208 | msg = msg || '';
|
---|
| 209 | forge.log.debug(cat, msg,
|
---|
| 210 | '[%s][%s] task:', this.id, this.name, this,
|
---|
| 211 | 'subtasks:', this.subtasks.length,
|
---|
| 212 | 'queue:', sTaskQueues);
|
---|
| 213 | };
|
---|
| 214 |
|
---|
| 215 | /**
|
---|
| 216 | * Adds a subtask to run after task.doNext() or task.fail() is called.
|
---|
| 217 | *
|
---|
| 218 | * @param name human readable name for this task (optional).
|
---|
| 219 | * @param subrun a function to run that takes the current task as
|
---|
| 220 | * its first parameter.
|
---|
| 221 | *
|
---|
| 222 | * @return the current task (useful for chaining next() calls).
|
---|
| 223 | */
|
---|
| 224 | Task.prototype.next = function(name, subrun) {
|
---|
| 225 | // juggle parameters if it looks like no name is given
|
---|
| 226 | if(typeof(name) === 'function') {
|
---|
| 227 | subrun = name;
|
---|
| 228 |
|
---|
| 229 | // inherit parent's name
|
---|
| 230 | name = this.name;
|
---|
| 231 | }
|
---|
| 232 | // create subtask, set parent to this task, propagate callbacks
|
---|
| 233 | var subtask = new Task({
|
---|
| 234 | run: subrun,
|
---|
| 235 | name: name,
|
---|
| 236 | parent: this
|
---|
| 237 | });
|
---|
| 238 | // start subtasks running
|
---|
| 239 | subtask.state = RUNNING;
|
---|
| 240 | subtask.type = this.type;
|
---|
| 241 | subtask.successCallback = this.successCallback || null;
|
---|
| 242 | subtask.failureCallback = this.failureCallback || null;
|
---|
| 243 |
|
---|
| 244 | // queue a new subtask
|
---|
| 245 | this.subtasks.push(subtask);
|
---|
| 246 |
|
---|
| 247 | return this;
|
---|
| 248 | };
|
---|
| 249 |
|
---|
| 250 | /**
|
---|
| 251 | * Adds subtasks to run in parallel after task.doNext() or task.fail()
|
---|
| 252 | * is called.
|
---|
| 253 | *
|
---|
| 254 | * @param name human readable name for this task (optional).
|
---|
| 255 | * @param subrun functions to run that take the current task as
|
---|
| 256 | * their first parameter.
|
---|
| 257 | *
|
---|
| 258 | * @return the current task (useful for chaining next() calls).
|
---|
| 259 | */
|
---|
| 260 | Task.prototype.parallel = function(name, subrun) {
|
---|
| 261 | // juggle parameters if it looks like no name is given
|
---|
| 262 | if(forge.util.isArray(name)) {
|
---|
| 263 | subrun = name;
|
---|
| 264 |
|
---|
| 265 | // inherit parent's name
|
---|
| 266 | name = this.name;
|
---|
| 267 | }
|
---|
| 268 | // Wrap parallel tasks in a regular task so they are started at the
|
---|
| 269 | // proper time.
|
---|
| 270 | return this.next(name, function(task) {
|
---|
| 271 | // block waiting for subtasks
|
---|
| 272 | var ptask = task;
|
---|
| 273 | ptask.block(subrun.length);
|
---|
| 274 |
|
---|
| 275 | // we pass the iterator from the loop below as a parameter
|
---|
| 276 | // to a function because it is otherwise included in the
|
---|
| 277 | // closure and changes as the loop changes -- causing i
|
---|
| 278 | // to always be set to its highest value
|
---|
| 279 | var startParallelTask = function(pname, pi) {
|
---|
| 280 | forge.task.start({
|
---|
| 281 | type: pname,
|
---|
| 282 | run: function(task) {
|
---|
| 283 | subrun[pi](task);
|
---|
| 284 | },
|
---|
| 285 | success: function(task) {
|
---|
| 286 | ptask.unblock();
|
---|
| 287 | },
|
---|
| 288 | failure: function(task) {
|
---|
| 289 | ptask.unblock();
|
---|
| 290 | }
|
---|
| 291 | });
|
---|
| 292 | };
|
---|
| 293 |
|
---|
| 294 | for(var i = 0; i < subrun.length; i++) {
|
---|
| 295 | // Type must be unique so task starts in parallel:
|
---|
| 296 | // name + private string + task id + sub-task index
|
---|
| 297 | // start tasks in parallel and unblock when the finish
|
---|
| 298 | var pname = name + '__parallel-' + task.id + '-' + i;
|
---|
| 299 | var pi = i;
|
---|
| 300 | startParallelTask(pname, pi);
|
---|
| 301 | }
|
---|
| 302 | });
|
---|
| 303 | };
|
---|
| 304 |
|
---|
| 305 | /**
|
---|
| 306 | * Stops a running task.
|
---|
| 307 | */
|
---|
| 308 | Task.prototype.stop = function() {
|
---|
| 309 | this.state = sStateTable[this.state][STOP];
|
---|
| 310 | };
|
---|
| 311 |
|
---|
| 312 | /**
|
---|
| 313 | * Starts running a task.
|
---|
| 314 | */
|
---|
| 315 | Task.prototype.start = function() {
|
---|
| 316 | this.error = false;
|
---|
| 317 | this.state = sStateTable[this.state][START];
|
---|
| 318 |
|
---|
| 319 | // try to restart
|
---|
| 320 | if(this.state === RUNNING) {
|
---|
| 321 | this.start = new Date();
|
---|
| 322 | this.run(this);
|
---|
| 323 | runNext(this, 0);
|
---|
| 324 | }
|
---|
| 325 | };
|
---|
| 326 |
|
---|
| 327 | /**
|
---|
| 328 | * Blocks a task until it one or more permits have been released. The
|
---|
| 329 | * task will not resume until the requested number of permits have
|
---|
| 330 | * been released with call(s) to unblock().
|
---|
| 331 | *
|
---|
| 332 | * @param n number of permits to wait for(default: 1).
|
---|
| 333 | */
|
---|
| 334 | Task.prototype.block = function(n) {
|
---|
| 335 | n = typeof(n) === 'undefined' ? 1 : n;
|
---|
| 336 | this.blocks += n;
|
---|
| 337 | if(this.blocks > 0) {
|
---|
| 338 | this.state = sStateTable[this.state][BLOCK];
|
---|
| 339 | }
|
---|
| 340 | };
|
---|
| 341 |
|
---|
| 342 | /**
|
---|
| 343 | * Releases a permit to unblock a task. If a task was blocked by
|
---|
| 344 | * requesting N permits via block(), then it will only continue
|
---|
| 345 | * running once enough permits have been released via unblock() calls.
|
---|
| 346 | *
|
---|
| 347 | * If multiple processes need to synchronize with a single task then
|
---|
| 348 | * use a condition variable (see forge.task.createCondition). It is
|
---|
| 349 | * an error to unblock a task more times than it has been blocked.
|
---|
| 350 | *
|
---|
| 351 | * @param n number of permits to release (default: 1).
|
---|
| 352 | *
|
---|
| 353 | * @return the current block count (task is unblocked when count is 0)
|
---|
| 354 | */
|
---|
| 355 | Task.prototype.unblock = function(n) {
|
---|
| 356 | n = typeof(n) === 'undefined' ? 1 : n;
|
---|
| 357 | this.blocks -= n;
|
---|
| 358 | if(this.blocks === 0 && this.state !== DONE) {
|
---|
| 359 | this.state = RUNNING;
|
---|
| 360 | runNext(this, 0);
|
---|
| 361 | }
|
---|
| 362 | return this.blocks;
|
---|
| 363 | };
|
---|
| 364 |
|
---|
| 365 | /**
|
---|
| 366 | * Sleep for a period of time before resuming tasks.
|
---|
| 367 | *
|
---|
| 368 | * @param n number of milliseconds to sleep (default: 0).
|
---|
| 369 | */
|
---|
| 370 | Task.prototype.sleep = function(n) {
|
---|
| 371 | n = typeof(n) === 'undefined' ? 0 : n;
|
---|
| 372 | this.state = sStateTable[this.state][SLEEP];
|
---|
| 373 | var self = this;
|
---|
| 374 | this.timeoutId = setTimeout(function() {
|
---|
| 375 | self.timeoutId = null;
|
---|
| 376 | self.state = RUNNING;
|
---|
| 377 | runNext(self, 0);
|
---|
| 378 | }, n);
|
---|
| 379 | };
|
---|
| 380 |
|
---|
| 381 | /**
|
---|
| 382 | * Waits on a condition variable until notified. The next task will
|
---|
| 383 | * not be scheduled until notification. A condition variable can be
|
---|
| 384 | * created with forge.task.createCondition().
|
---|
| 385 | *
|
---|
| 386 | * Once cond.notify() is called, the task will continue.
|
---|
| 387 | *
|
---|
| 388 | * @param cond the condition variable to wait on.
|
---|
| 389 | */
|
---|
| 390 | Task.prototype.wait = function(cond) {
|
---|
| 391 | cond.wait(this);
|
---|
| 392 | };
|
---|
| 393 |
|
---|
| 394 | /**
|
---|
| 395 | * If sleeping, wakeup and continue running tasks.
|
---|
| 396 | */
|
---|
| 397 | Task.prototype.wakeup = function() {
|
---|
| 398 | if(this.state === SLEEPING) {
|
---|
| 399 | cancelTimeout(this.timeoutId);
|
---|
| 400 | this.timeoutId = null;
|
---|
| 401 | this.state = RUNNING;
|
---|
| 402 | runNext(this, 0);
|
---|
| 403 | }
|
---|
| 404 | };
|
---|
| 405 |
|
---|
| 406 | /**
|
---|
| 407 | * Cancel all remaining subtasks of this task.
|
---|
| 408 | */
|
---|
| 409 | Task.prototype.cancel = function() {
|
---|
| 410 | this.state = sStateTable[this.state][CANCEL];
|
---|
| 411 | // remove permits needed
|
---|
| 412 | this.permitsNeeded = 0;
|
---|
| 413 | // cancel timeouts
|
---|
| 414 | if(this.timeoutId !== null) {
|
---|
| 415 | cancelTimeout(this.timeoutId);
|
---|
| 416 | this.timeoutId = null;
|
---|
| 417 | }
|
---|
| 418 | // remove subtasks
|
---|
| 419 | this.subtasks = [];
|
---|
| 420 | };
|
---|
| 421 |
|
---|
| 422 | /**
|
---|
| 423 | * Finishes this task with failure and sets error flag. The entire
|
---|
| 424 | * task will be aborted unless the next task that should execute
|
---|
| 425 | * is passed as a parameter. This allows levels of subtasks to be
|
---|
| 426 | * skipped. For instance, to abort only this tasks's subtasks, then
|
---|
| 427 | * call fail(task.parent). To abort this task's subtasks and its
|
---|
| 428 | * parent's subtasks, call fail(task.parent.parent). To abort
|
---|
| 429 | * all tasks and simply call the task callback, call fail() or
|
---|
| 430 | * fail(null).
|
---|
| 431 | *
|
---|
| 432 | * The task callback (success or failure) will always, eventually, be
|
---|
| 433 | * called.
|
---|
| 434 | *
|
---|
| 435 | * @param next the task to continue at, or null to abort entirely.
|
---|
| 436 | */
|
---|
| 437 | Task.prototype.fail = function(next) {
|
---|
| 438 | // set error flag
|
---|
| 439 | this.error = true;
|
---|
| 440 |
|
---|
| 441 | // finish task
|
---|
| 442 | finish(this, true);
|
---|
| 443 |
|
---|
| 444 | if(next) {
|
---|
| 445 | // propagate task info
|
---|
| 446 | next.error = this.error;
|
---|
| 447 | next.swapTime = this.swapTime;
|
---|
| 448 | next.userData = this.userData;
|
---|
| 449 |
|
---|
| 450 | // do next task as specified
|
---|
| 451 | runNext(next, 0);
|
---|
| 452 | } else {
|
---|
| 453 | if(this.parent !== null) {
|
---|
| 454 | // finish root task (ensures it is removed from task queue)
|
---|
| 455 | var parent = this.parent;
|
---|
| 456 | while(parent.parent !== null) {
|
---|
| 457 | // propagate task info
|
---|
| 458 | parent.error = this.error;
|
---|
| 459 | parent.swapTime = this.swapTime;
|
---|
| 460 | parent.userData = this.userData;
|
---|
| 461 | parent = parent.parent;
|
---|
| 462 | }
|
---|
| 463 | finish(parent, true);
|
---|
| 464 | }
|
---|
| 465 |
|
---|
| 466 | // call failure callback if one exists
|
---|
| 467 | if(this.failureCallback) {
|
---|
| 468 | this.failureCallback(this);
|
---|
| 469 | }
|
---|
| 470 | }
|
---|
| 471 | };
|
---|
| 472 |
|
---|
| 473 | /**
|
---|
| 474 | * Asynchronously start a task.
|
---|
| 475 | *
|
---|
| 476 | * @param task the task to start.
|
---|
| 477 | */
|
---|
| 478 | var start = function(task) {
|
---|
| 479 | task.error = false;
|
---|
| 480 | task.state = sStateTable[task.state][START];
|
---|
| 481 | setTimeout(function() {
|
---|
| 482 | if(task.state === RUNNING) {
|
---|
| 483 | task.swapTime = +new Date();
|
---|
| 484 | task.run(task);
|
---|
| 485 | runNext(task, 0);
|
---|
| 486 | }
|
---|
| 487 | }, 0);
|
---|
| 488 | };
|
---|
| 489 |
|
---|
| 490 | /**
|
---|
| 491 | * Run the next subtask or finish this task.
|
---|
| 492 | *
|
---|
| 493 | * @param task the task to process.
|
---|
| 494 | * @param recurse the recursion count.
|
---|
| 495 | */
|
---|
| 496 | var runNext = function(task, recurse) {
|
---|
| 497 | // get time since last context swap (ms), if enough time has passed set
|
---|
| 498 | // swap to true to indicate that doNext was performed asynchronously
|
---|
| 499 | // also, if recurse is too high do asynchronously
|
---|
| 500 | var swap =
|
---|
| 501 | (recurse > sMaxRecursions) ||
|
---|
| 502 | (+new Date() - task.swapTime) > sTimeSlice;
|
---|
| 503 |
|
---|
| 504 | var doNext = function(recurse) {
|
---|
| 505 | recurse++;
|
---|
| 506 | if(task.state === RUNNING) {
|
---|
| 507 | if(swap) {
|
---|
| 508 | // update swap time
|
---|
| 509 | task.swapTime = +new Date();
|
---|
| 510 | }
|
---|
| 511 |
|
---|
| 512 | if(task.subtasks.length > 0) {
|
---|
| 513 | // run next subtask
|
---|
| 514 | var subtask = task.subtasks.shift();
|
---|
| 515 | subtask.error = task.error;
|
---|
| 516 | subtask.swapTime = task.swapTime;
|
---|
| 517 | subtask.userData = task.userData;
|
---|
| 518 | subtask.run(subtask);
|
---|
| 519 | if(!subtask.error) {
|
---|
| 520 | runNext(subtask, recurse);
|
---|
| 521 | }
|
---|
| 522 | } else {
|
---|
| 523 | finish(task);
|
---|
| 524 |
|
---|
| 525 | if(!task.error) {
|
---|
| 526 | // chain back up and run parent
|
---|
| 527 | if(task.parent !== null) {
|
---|
| 528 | // propagate task info
|
---|
| 529 | task.parent.error = task.error;
|
---|
| 530 | task.parent.swapTime = task.swapTime;
|
---|
| 531 | task.parent.userData = task.userData;
|
---|
| 532 |
|
---|
| 533 | // no subtasks left, call run next subtask on parent
|
---|
| 534 | runNext(task.parent, recurse);
|
---|
| 535 | }
|
---|
| 536 | }
|
---|
| 537 | }
|
---|
| 538 | }
|
---|
| 539 | };
|
---|
| 540 |
|
---|
| 541 | if(swap) {
|
---|
| 542 | // we're swapping, so run asynchronously
|
---|
| 543 | setTimeout(doNext, 0);
|
---|
| 544 | } else {
|
---|
| 545 | // not swapping, so run synchronously
|
---|
| 546 | doNext(recurse);
|
---|
| 547 | }
|
---|
| 548 | };
|
---|
| 549 |
|
---|
| 550 | /**
|
---|
| 551 | * Finishes a task and looks for the next task in the queue to start.
|
---|
| 552 | *
|
---|
| 553 | * @param task the task to finish.
|
---|
| 554 | * @param suppressCallbacks true to suppress callbacks.
|
---|
| 555 | */
|
---|
| 556 | var finish = function(task, suppressCallbacks) {
|
---|
| 557 | // subtask is now done
|
---|
| 558 | task.state = DONE;
|
---|
| 559 |
|
---|
| 560 | delete sTasks[task.id];
|
---|
| 561 | if(sVL >= 1) {
|
---|
| 562 | forge.log.verbose(cat, '[%s][%s] finish',
|
---|
| 563 | task.id, task.name, task);
|
---|
| 564 | }
|
---|
| 565 |
|
---|
| 566 | // only do queue processing for root tasks
|
---|
| 567 | if(task.parent === null) {
|
---|
| 568 | // report error if queue is missing
|
---|
| 569 | if(!(task.type in sTaskQueues)) {
|
---|
| 570 | forge.log.error(cat,
|
---|
| 571 | '[%s][%s] task queue missing [%s]',
|
---|
| 572 | task.id, task.name, task.type);
|
---|
| 573 | } else if(sTaskQueues[task.type].length === 0) {
|
---|
| 574 | // report error if queue is empty
|
---|
| 575 | forge.log.error(cat,
|
---|
| 576 | '[%s][%s] task queue empty [%s]',
|
---|
| 577 | task.id, task.name, task.type);
|
---|
| 578 | } else if(sTaskQueues[task.type][0] !== task) {
|
---|
| 579 | // report error if this task isn't the first in the queue
|
---|
| 580 | forge.log.error(cat,
|
---|
| 581 | '[%s][%s] task not first in queue [%s]',
|
---|
| 582 | task.id, task.name, task.type);
|
---|
| 583 | } else {
|
---|
| 584 | // remove ourselves from the queue
|
---|
| 585 | sTaskQueues[task.type].shift();
|
---|
| 586 | // clean up queue if it is empty
|
---|
| 587 | if(sTaskQueues[task.type].length === 0) {
|
---|
| 588 | if(sVL >= 1) {
|
---|
| 589 | forge.log.verbose(cat, '[%s][%s] delete queue [%s]',
|
---|
| 590 | task.id, task.name, task.type);
|
---|
| 591 | }
|
---|
| 592 | /* Note: Only a task can delete a queue of its own type. This
|
---|
| 593 | is used as a way to synchronize tasks. If a queue for a certain
|
---|
| 594 | task type exists, then a task of that type is running.
|
---|
| 595 | */
|
---|
| 596 | delete sTaskQueues[task.type];
|
---|
| 597 | } else {
|
---|
| 598 | // dequeue the next task and start it
|
---|
| 599 | if(sVL >= 1) {
|
---|
| 600 | forge.log.verbose(cat,
|
---|
| 601 | '[%s][%s] queue start next [%s] remain:%s',
|
---|
| 602 | task.id, task.name, task.type,
|
---|
| 603 | sTaskQueues[task.type].length);
|
---|
| 604 | }
|
---|
| 605 | sTaskQueues[task.type][0].start();
|
---|
| 606 | }
|
---|
| 607 | }
|
---|
| 608 |
|
---|
| 609 | if(!suppressCallbacks) {
|
---|
| 610 | // call final callback if one exists
|
---|
| 611 | if(task.error && task.failureCallback) {
|
---|
| 612 | task.failureCallback(task);
|
---|
| 613 | } else if(!task.error && task.successCallback) {
|
---|
| 614 | task.successCallback(task);
|
---|
| 615 | }
|
---|
| 616 | }
|
---|
| 617 | }
|
---|
| 618 | };
|
---|
| 619 |
|
---|
| 620 | /* Tasks API */
|
---|
| 621 | module.exports = forge.task = forge.task || {};
|
---|
| 622 |
|
---|
| 623 | /**
|
---|
| 624 | * Starts a new task that will run the passed function asynchronously.
|
---|
| 625 | *
|
---|
| 626 | * In order to finish the task, either task.doNext() or task.fail()
|
---|
| 627 | * *must* be called.
|
---|
| 628 | *
|
---|
| 629 | * The task must have a type (a string identifier) that can be used to
|
---|
| 630 | * synchronize it with other tasks of the same type. That type can also
|
---|
| 631 | * be used to cancel tasks that haven't started yet.
|
---|
| 632 | *
|
---|
| 633 | * To start a task, the following object must be provided as a parameter
|
---|
| 634 | * (each function takes a task object as its first parameter):
|
---|
| 635 | *
|
---|
| 636 | * {
|
---|
| 637 | * type: the type of task.
|
---|
| 638 | * run: the function to run to execute the task.
|
---|
| 639 | * success: a callback to call when the task succeeds (optional).
|
---|
| 640 | * failure: a callback to call when the task fails (optional).
|
---|
| 641 | * }
|
---|
| 642 | *
|
---|
| 643 | * @param options the object as described above.
|
---|
| 644 | */
|
---|
| 645 | forge.task.start = function(options) {
|
---|
| 646 | // create a new task
|
---|
| 647 | var task = new Task({
|
---|
| 648 | run: options.run,
|
---|
| 649 | name: options.name || sNoTaskName
|
---|
| 650 | });
|
---|
| 651 | task.type = options.type;
|
---|
| 652 | task.successCallback = options.success || null;
|
---|
| 653 | task.failureCallback = options.failure || null;
|
---|
| 654 |
|
---|
| 655 | // append the task onto the appropriate queue
|
---|
| 656 | if(!(task.type in sTaskQueues)) {
|
---|
| 657 | if(sVL >= 1) {
|
---|
| 658 | forge.log.verbose(cat, '[%s][%s] create queue [%s]',
|
---|
| 659 | task.id, task.name, task.type);
|
---|
| 660 | }
|
---|
| 661 | // create the queue with the new task
|
---|
| 662 | sTaskQueues[task.type] = [task];
|
---|
| 663 | start(task);
|
---|
| 664 | } else {
|
---|
| 665 | // push the task onto the queue, it will be run after a task
|
---|
| 666 | // with the same type completes
|
---|
| 667 | sTaskQueues[options.type].push(task);
|
---|
| 668 | }
|
---|
| 669 | };
|
---|
| 670 |
|
---|
| 671 | /**
|
---|
| 672 | * Cancels all tasks of the given type that haven't started yet.
|
---|
| 673 | *
|
---|
| 674 | * @param type the type of task to cancel.
|
---|
| 675 | */
|
---|
| 676 | forge.task.cancel = function(type) {
|
---|
| 677 | // find the task queue
|
---|
| 678 | if(type in sTaskQueues) {
|
---|
| 679 | // empty all but the current task from the queue
|
---|
| 680 | sTaskQueues[type] = [sTaskQueues[type][0]];
|
---|
| 681 | }
|
---|
| 682 | };
|
---|
| 683 |
|
---|
| 684 | /**
|
---|
| 685 | * Creates a condition variable to synchronize tasks. To make a task wait
|
---|
| 686 | * on the condition variable, call task.wait(condition). To notify all
|
---|
| 687 | * tasks that are waiting, call condition.notify().
|
---|
| 688 | *
|
---|
| 689 | * @return the condition variable.
|
---|
| 690 | */
|
---|
| 691 | forge.task.createCondition = function() {
|
---|
| 692 | var cond = {
|
---|
| 693 | // all tasks that are blocked
|
---|
| 694 | tasks: {}
|
---|
| 695 | };
|
---|
| 696 |
|
---|
| 697 | /**
|
---|
| 698 | * Causes the given task to block until notify is called. If the task
|
---|
| 699 | * is already waiting on this condition then this is a no-op.
|
---|
| 700 | *
|
---|
| 701 | * @param task the task to cause to wait.
|
---|
| 702 | */
|
---|
| 703 | cond.wait = function(task) {
|
---|
| 704 | // only block once
|
---|
| 705 | if(!(task.id in cond.tasks)) {
|
---|
| 706 | task.block();
|
---|
| 707 | cond.tasks[task.id] = task;
|
---|
| 708 | }
|
---|
| 709 | };
|
---|
| 710 |
|
---|
| 711 | /**
|
---|
| 712 | * Notifies all waiting tasks to wake up.
|
---|
| 713 | */
|
---|
| 714 | cond.notify = function() {
|
---|
| 715 | // since unblock() will run the next task from here, make sure to
|
---|
| 716 | // clear the condition's blocked task list before unblocking
|
---|
| 717 | var tmp = cond.tasks;
|
---|
| 718 | cond.tasks = {};
|
---|
| 719 | for(var id in tmp) {
|
---|
| 720 | tmp[id].unblock();
|
---|
| 721 | }
|
---|
| 722 | };
|
---|
| 723 |
|
---|
| 724 | return cond;
|
---|
| 725 | };
|
---|