1 | /**
|
---|
2 | * Support for concurrent task management and synchronization in web
|
---|
3 | * applications.
|
---|
4 | *
|
---|
5 | * @author Dave Longley
|
---|
6 | * @author David I. Lehn <dlehn@digitalbazaar.com>
|
---|
7 | *
|
---|
8 | * Copyright (c) 2009-2013 Digital Bazaar, Inc.
|
---|
9 | */
|
---|
10 | var forge = require('./forge');
|
---|
11 | require('./debug');
|
---|
12 | require('./log');
|
---|
13 | require('./util');
|
---|
14 |
|
---|
15 | // logging category
|
---|
16 | var cat = 'forge.task';
|
---|
17 |
|
---|
18 | // verbose level
|
---|
19 | // 0: off, 1: a little, 2: a whole lot
|
---|
20 | // Verbose debug logging is surrounded by a level check to avoid the
|
---|
21 | // performance issues with even calling the logging code regardless if it
|
---|
22 | // is actually logged. For performance reasons this should not be set to 2
|
---|
23 | // for production use.
|
---|
24 | // ex: if(sVL >= 2) forge.log.verbose(....)
|
---|
25 | var sVL = 0;
|
---|
26 |
|
---|
27 | // track tasks for debugging
|
---|
28 | var sTasks = {};
|
---|
29 | var sNextTaskId = 0;
|
---|
30 | // debug access
|
---|
31 | forge.debug.set(cat, 'tasks', sTasks);
|
---|
32 |
|
---|
33 | // a map of task type to task queue
|
---|
34 | var sTaskQueues = {};
|
---|
35 | // debug access
|
---|
36 | forge.debug.set(cat, 'queues', sTaskQueues);
|
---|
37 |
|
---|
38 | // name for unnamed tasks
|
---|
39 | var sNoTaskName = '?';
|
---|
40 |
|
---|
41 | // maximum number of doNext() recursions before a context swap occurs
|
---|
42 | // FIXME: might need to tweak this based on the browser
|
---|
43 | var sMaxRecursions = 30;
|
---|
44 |
|
---|
45 | // time slice for doing tasks before a context swap occurs
|
---|
46 | // FIXME: might need to tweak this based on the browser
|
---|
47 | var sTimeSlice = 20;
|
---|
48 |
|
---|
49 | /**
|
---|
50 | * Task states.
|
---|
51 | *
|
---|
52 | * READY: ready to start processing
|
---|
53 | * RUNNING: task or a subtask is running
|
---|
54 | * BLOCKED: task is waiting to acquire N permits to continue
|
---|
55 | * SLEEPING: task is sleeping for a period of time
|
---|
56 | * DONE: task is done
|
---|
57 | * ERROR: task has an error
|
---|
58 | */
|
---|
59 | var READY = 'ready';
|
---|
60 | var RUNNING = 'running';
|
---|
61 | var BLOCKED = 'blocked';
|
---|
62 | var SLEEPING = 'sleeping';
|
---|
63 | var DONE = 'done';
|
---|
64 | var ERROR = 'error';
|
---|
65 |
|
---|
66 | /**
|
---|
67 | * Task actions. Used to control state transitions.
|
---|
68 | *
|
---|
69 | * STOP: stop processing
|
---|
70 | * START: start processing tasks
|
---|
71 | * BLOCK: block task from continuing until 1 or more permits are released
|
---|
72 | * UNBLOCK: release one or more permits
|
---|
73 | * SLEEP: sleep for a period of time
|
---|
74 | * WAKEUP: wakeup early from SLEEPING state
|
---|
75 | * CANCEL: cancel further tasks
|
---|
76 | * FAIL: a failure occured
|
---|
77 | */
|
---|
78 | var STOP = 'stop';
|
---|
79 | var START = 'start';
|
---|
80 | var BLOCK = 'block';
|
---|
81 | var UNBLOCK = 'unblock';
|
---|
82 | var SLEEP = 'sleep';
|
---|
83 | var WAKEUP = 'wakeup';
|
---|
84 | var CANCEL = 'cancel';
|
---|
85 | var FAIL = 'fail';
|
---|
86 |
|
---|
87 | /**
|
---|
88 | * State transition table.
|
---|
89 | *
|
---|
90 | * nextState = sStateTable[currentState][action]
|
---|
91 | */
|
---|
92 | var sStateTable = {};
|
---|
93 |
|
---|
94 | sStateTable[READY] = {};
|
---|
95 | sStateTable[READY][STOP] = READY;
|
---|
96 | sStateTable[READY][START] = RUNNING;
|
---|
97 | sStateTable[READY][CANCEL] = DONE;
|
---|
98 | sStateTable[READY][FAIL] = ERROR;
|
---|
99 |
|
---|
100 | sStateTable[RUNNING] = {};
|
---|
101 | sStateTable[RUNNING][STOP] = READY;
|
---|
102 | sStateTable[RUNNING][START] = RUNNING;
|
---|
103 | sStateTable[RUNNING][BLOCK] = BLOCKED;
|
---|
104 | sStateTable[RUNNING][UNBLOCK] = RUNNING;
|
---|
105 | sStateTable[RUNNING][SLEEP] = SLEEPING;
|
---|
106 | sStateTable[RUNNING][WAKEUP] = RUNNING;
|
---|
107 | sStateTable[RUNNING][CANCEL] = DONE;
|
---|
108 | sStateTable[RUNNING][FAIL] = ERROR;
|
---|
109 |
|
---|
110 | sStateTable[BLOCKED] = {};
|
---|
111 | sStateTable[BLOCKED][STOP] = BLOCKED;
|
---|
112 | sStateTable[BLOCKED][START] = BLOCKED;
|
---|
113 | sStateTable[BLOCKED][BLOCK] = BLOCKED;
|
---|
114 | sStateTable[BLOCKED][UNBLOCK] = BLOCKED;
|
---|
115 | sStateTable[BLOCKED][SLEEP] = BLOCKED;
|
---|
116 | sStateTable[BLOCKED][WAKEUP] = BLOCKED;
|
---|
117 | sStateTable[BLOCKED][CANCEL] = DONE;
|
---|
118 | sStateTable[BLOCKED][FAIL] = ERROR;
|
---|
119 |
|
---|
120 | sStateTable[SLEEPING] = {};
|
---|
121 | sStateTable[SLEEPING][STOP] = SLEEPING;
|
---|
122 | sStateTable[SLEEPING][START] = SLEEPING;
|
---|
123 | sStateTable[SLEEPING][BLOCK] = SLEEPING;
|
---|
124 | sStateTable[SLEEPING][UNBLOCK] = SLEEPING;
|
---|
125 | sStateTable[SLEEPING][SLEEP] = SLEEPING;
|
---|
126 | sStateTable[SLEEPING][WAKEUP] = SLEEPING;
|
---|
127 | sStateTable[SLEEPING][CANCEL] = DONE;
|
---|
128 | sStateTable[SLEEPING][FAIL] = ERROR;
|
---|
129 |
|
---|
130 | sStateTable[DONE] = {};
|
---|
131 | sStateTable[DONE][STOP] = DONE;
|
---|
132 | sStateTable[DONE][START] = DONE;
|
---|
133 | sStateTable[DONE][BLOCK] = DONE;
|
---|
134 | sStateTable[DONE][UNBLOCK] = DONE;
|
---|
135 | sStateTable[DONE][SLEEP] = DONE;
|
---|
136 | sStateTable[DONE][WAKEUP] = DONE;
|
---|
137 | sStateTable[DONE][CANCEL] = DONE;
|
---|
138 | sStateTable[DONE][FAIL] = ERROR;
|
---|
139 |
|
---|
140 | sStateTable[ERROR] = {};
|
---|
141 | sStateTable[ERROR][STOP] = ERROR;
|
---|
142 | sStateTable[ERROR][START] = ERROR;
|
---|
143 | sStateTable[ERROR][BLOCK] = ERROR;
|
---|
144 | sStateTable[ERROR][UNBLOCK] = ERROR;
|
---|
145 | sStateTable[ERROR][SLEEP] = ERROR;
|
---|
146 | sStateTable[ERROR][WAKEUP] = ERROR;
|
---|
147 | sStateTable[ERROR][CANCEL] = ERROR;
|
---|
148 | sStateTable[ERROR][FAIL] = ERROR;
|
---|
149 |
|
---|
150 | /**
|
---|
151 | * Creates a new task.
|
---|
152 | *
|
---|
153 | * @param options options for this task
|
---|
154 | * run: the run function for the task (required)
|
---|
155 | * name: the run function for the task (optional)
|
---|
156 | * parent: parent of this task (optional)
|
---|
157 | *
|
---|
158 | * @return the empty task.
|
---|
159 | */
|
---|
160 | var Task = function(options) {
|
---|
161 | // task id
|
---|
162 | this.id = -1;
|
---|
163 |
|
---|
164 | // task name
|
---|
165 | this.name = options.name || sNoTaskName;
|
---|
166 |
|
---|
167 | // task has no parent
|
---|
168 | this.parent = options.parent || null;
|
---|
169 |
|
---|
170 | // save run function
|
---|
171 | this.run = options.run;
|
---|
172 |
|
---|
173 | // create a queue of subtasks to run
|
---|
174 | this.subtasks = [];
|
---|
175 |
|
---|
176 | // error flag
|
---|
177 | this.error = false;
|
---|
178 |
|
---|
179 | // state of the task
|
---|
180 | this.state = READY;
|
---|
181 |
|
---|
182 | // number of times the task has been blocked (also the number
|
---|
183 | // of permits needed to be released to continue running)
|
---|
184 | this.blocks = 0;
|
---|
185 |
|
---|
186 | // timeout id when sleeping
|
---|
187 | this.timeoutId = null;
|
---|
188 |
|
---|
189 | // no swap time yet
|
---|
190 | this.swapTime = null;
|
---|
191 |
|
---|
192 | // no user data
|
---|
193 | this.userData = null;
|
---|
194 |
|
---|
195 | // initialize task
|
---|
196 | // FIXME: deal with overflow
|
---|
197 | this.id = sNextTaskId++;
|
---|
198 | sTasks[this.id] = this;
|
---|
199 | if(sVL >= 1) {
|
---|
200 | forge.log.verbose(cat, '[%s][%s] init', this.id, this.name, this);
|
---|
201 | }
|
---|
202 | };
|
---|
203 |
|
---|
204 | /**
|
---|
205 | * Logs debug information on this task and the system state.
|
---|
206 | */
|
---|
207 | Task.prototype.debug = function(msg) {
|
---|
208 | msg = msg || '';
|
---|
209 | forge.log.debug(cat, msg,
|
---|
210 | '[%s][%s] task:', this.id, this.name, this,
|
---|
211 | 'subtasks:', this.subtasks.length,
|
---|
212 | 'queue:', sTaskQueues);
|
---|
213 | };
|
---|
214 |
|
---|
215 | /**
|
---|
216 | * Adds a subtask to run after task.doNext() or task.fail() is called.
|
---|
217 | *
|
---|
218 | * @param name human readable name for this task (optional).
|
---|
219 | * @param subrun a function to run that takes the current task as
|
---|
220 | * its first parameter.
|
---|
221 | *
|
---|
222 | * @return the current task (useful for chaining next() calls).
|
---|
223 | */
|
---|
224 | Task.prototype.next = function(name, subrun) {
|
---|
225 | // juggle parameters if it looks like no name is given
|
---|
226 | if(typeof(name) === 'function') {
|
---|
227 | subrun = name;
|
---|
228 |
|
---|
229 | // inherit parent's name
|
---|
230 | name = this.name;
|
---|
231 | }
|
---|
232 | // create subtask, set parent to this task, propagate callbacks
|
---|
233 | var subtask = new Task({
|
---|
234 | run: subrun,
|
---|
235 | name: name,
|
---|
236 | parent: this
|
---|
237 | });
|
---|
238 | // start subtasks running
|
---|
239 | subtask.state = RUNNING;
|
---|
240 | subtask.type = this.type;
|
---|
241 | subtask.successCallback = this.successCallback || null;
|
---|
242 | subtask.failureCallback = this.failureCallback || null;
|
---|
243 |
|
---|
244 | // queue a new subtask
|
---|
245 | this.subtasks.push(subtask);
|
---|
246 |
|
---|
247 | return this;
|
---|
248 | };
|
---|
249 |
|
---|
250 | /**
|
---|
251 | * Adds subtasks to run in parallel after task.doNext() or task.fail()
|
---|
252 | * is called.
|
---|
253 | *
|
---|
254 | * @param name human readable name for this task (optional).
|
---|
255 | * @param subrun functions to run that take the current task as
|
---|
256 | * their first parameter.
|
---|
257 | *
|
---|
258 | * @return the current task (useful for chaining next() calls).
|
---|
259 | */
|
---|
260 | Task.prototype.parallel = function(name, subrun) {
|
---|
261 | // juggle parameters if it looks like no name is given
|
---|
262 | if(forge.util.isArray(name)) {
|
---|
263 | subrun = name;
|
---|
264 |
|
---|
265 | // inherit parent's name
|
---|
266 | name = this.name;
|
---|
267 | }
|
---|
268 | // Wrap parallel tasks in a regular task so they are started at the
|
---|
269 | // proper time.
|
---|
270 | return this.next(name, function(task) {
|
---|
271 | // block waiting for subtasks
|
---|
272 | var ptask = task;
|
---|
273 | ptask.block(subrun.length);
|
---|
274 |
|
---|
275 | // we pass the iterator from the loop below as a parameter
|
---|
276 | // to a function because it is otherwise included in the
|
---|
277 | // closure and changes as the loop changes -- causing i
|
---|
278 | // to always be set to its highest value
|
---|
279 | var startParallelTask = function(pname, pi) {
|
---|
280 | forge.task.start({
|
---|
281 | type: pname,
|
---|
282 | run: function(task) {
|
---|
283 | subrun[pi](task);
|
---|
284 | },
|
---|
285 | success: function(task) {
|
---|
286 | ptask.unblock();
|
---|
287 | },
|
---|
288 | failure: function(task) {
|
---|
289 | ptask.unblock();
|
---|
290 | }
|
---|
291 | });
|
---|
292 | };
|
---|
293 |
|
---|
294 | for(var i = 0; i < subrun.length; i++) {
|
---|
295 | // Type must be unique so task starts in parallel:
|
---|
296 | // name + private string + task id + sub-task index
|
---|
297 | // start tasks in parallel and unblock when the finish
|
---|
298 | var pname = name + '__parallel-' + task.id + '-' + i;
|
---|
299 | var pi = i;
|
---|
300 | startParallelTask(pname, pi);
|
---|
301 | }
|
---|
302 | });
|
---|
303 | };
|
---|
304 |
|
---|
305 | /**
|
---|
306 | * Stops a running task.
|
---|
307 | */
|
---|
308 | Task.prototype.stop = function() {
|
---|
309 | this.state = sStateTable[this.state][STOP];
|
---|
310 | };
|
---|
311 |
|
---|
312 | /**
|
---|
313 | * Starts running a task.
|
---|
314 | */
|
---|
315 | Task.prototype.start = function() {
|
---|
316 | this.error = false;
|
---|
317 | this.state = sStateTable[this.state][START];
|
---|
318 |
|
---|
319 | // try to restart
|
---|
320 | if(this.state === RUNNING) {
|
---|
321 | this.start = new Date();
|
---|
322 | this.run(this);
|
---|
323 | runNext(this, 0);
|
---|
324 | }
|
---|
325 | };
|
---|
326 |
|
---|
327 | /**
|
---|
328 | * Blocks a task until it one or more permits have been released. The
|
---|
329 | * task will not resume until the requested number of permits have
|
---|
330 | * been released with call(s) to unblock().
|
---|
331 | *
|
---|
332 | * @param n number of permits to wait for(default: 1).
|
---|
333 | */
|
---|
334 | Task.prototype.block = function(n) {
|
---|
335 | n = typeof(n) === 'undefined' ? 1 : n;
|
---|
336 | this.blocks += n;
|
---|
337 | if(this.blocks > 0) {
|
---|
338 | this.state = sStateTable[this.state][BLOCK];
|
---|
339 | }
|
---|
340 | };
|
---|
341 |
|
---|
342 | /**
|
---|
343 | * Releases a permit to unblock a task. If a task was blocked by
|
---|
344 | * requesting N permits via block(), then it will only continue
|
---|
345 | * running once enough permits have been released via unblock() calls.
|
---|
346 | *
|
---|
347 | * If multiple processes need to synchronize with a single task then
|
---|
348 | * use a condition variable (see forge.task.createCondition). It is
|
---|
349 | * an error to unblock a task more times than it has been blocked.
|
---|
350 | *
|
---|
351 | * @param n number of permits to release (default: 1).
|
---|
352 | *
|
---|
353 | * @return the current block count (task is unblocked when count is 0)
|
---|
354 | */
|
---|
355 | Task.prototype.unblock = function(n) {
|
---|
356 | n = typeof(n) === 'undefined' ? 1 : n;
|
---|
357 | this.blocks -= n;
|
---|
358 | if(this.blocks === 0 && this.state !== DONE) {
|
---|
359 | this.state = RUNNING;
|
---|
360 | runNext(this, 0);
|
---|
361 | }
|
---|
362 | return this.blocks;
|
---|
363 | };
|
---|
364 |
|
---|
365 | /**
|
---|
366 | * Sleep for a period of time before resuming tasks.
|
---|
367 | *
|
---|
368 | * @param n number of milliseconds to sleep (default: 0).
|
---|
369 | */
|
---|
370 | Task.prototype.sleep = function(n) {
|
---|
371 | n = typeof(n) === 'undefined' ? 0 : n;
|
---|
372 | this.state = sStateTable[this.state][SLEEP];
|
---|
373 | var self = this;
|
---|
374 | this.timeoutId = setTimeout(function() {
|
---|
375 | self.timeoutId = null;
|
---|
376 | self.state = RUNNING;
|
---|
377 | runNext(self, 0);
|
---|
378 | }, n);
|
---|
379 | };
|
---|
380 |
|
---|
381 | /**
|
---|
382 | * Waits on a condition variable until notified. The next task will
|
---|
383 | * not be scheduled until notification. A condition variable can be
|
---|
384 | * created with forge.task.createCondition().
|
---|
385 | *
|
---|
386 | * Once cond.notify() is called, the task will continue.
|
---|
387 | *
|
---|
388 | * @param cond the condition variable to wait on.
|
---|
389 | */
|
---|
390 | Task.prototype.wait = function(cond) {
|
---|
391 | cond.wait(this);
|
---|
392 | };
|
---|
393 |
|
---|
394 | /**
|
---|
395 | * If sleeping, wakeup and continue running tasks.
|
---|
396 | */
|
---|
397 | Task.prototype.wakeup = function() {
|
---|
398 | if(this.state === SLEEPING) {
|
---|
399 | cancelTimeout(this.timeoutId);
|
---|
400 | this.timeoutId = null;
|
---|
401 | this.state = RUNNING;
|
---|
402 | runNext(this, 0);
|
---|
403 | }
|
---|
404 | };
|
---|
405 |
|
---|
406 | /**
|
---|
407 | * Cancel all remaining subtasks of this task.
|
---|
408 | */
|
---|
409 | Task.prototype.cancel = function() {
|
---|
410 | this.state = sStateTable[this.state][CANCEL];
|
---|
411 | // remove permits needed
|
---|
412 | this.permitsNeeded = 0;
|
---|
413 | // cancel timeouts
|
---|
414 | if(this.timeoutId !== null) {
|
---|
415 | cancelTimeout(this.timeoutId);
|
---|
416 | this.timeoutId = null;
|
---|
417 | }
|
---|
418 | // remove subtasks
|
---|
419 | this.subtasks = [];
|
---|
420 | };
|
---|
421 |
|
---|
422 | /**
|
---|
423 | * Finishes this task with failure and sets error flag. The entire
|
---|
424 | * task will be aborted unless the next task that should execute
|
---|
425 | * is passed as a parameter. This allows levels of subtasks to be
|
---|
426 | * skipped. For instance, to abort only this tasks's subtasks, then
|
---|
427 | * call fail(task.parent). To abort this task's subtasks and its
|
---|
428 | * parent's subtasks, call fail(task.parent.parent). To abort
|
---|
429 | * all tasks and simply call the task callback, call fail() or
|
---|
430 | * fail(null).
|
---|
431 | *
|
---|
432 | * The task callback (success or failure) will always, eventually, be
|
---|
433 | * called.
|
---|
434 | *
|
---|
435 | * @param next the task to continue at, or null to abort entirely.
|
---|
436 | */
|
---|
437 | Task.prototype.fail = function(next) {
|
---|
438 | // set error flag
|
---|
439 | this.error = true;
|
---|
440 |
|
---|
441 | // finish task
|
---|
442 | finish(this, true);
|
---|
443 |
|
---|
444 | if(next) {
|
---|
445 | // propagate task info
|
---|
446 | next.error = this.error;
|
---|
447 | next.swapTime = this.swapTime;
|
---|
448 | next.userData = this.userData;
|
---|
449 |
|
---|
450 | // do next task as specified
|
---|
451 | runNext(next, 0);
|
---|
452 | } else {
|
---|
453 | if(this.parent !== null) {
|
---|
454 | // finish root task (ensures it is removed from task queue)
|
---|
455 | var parent = this.parent;
|
---|
456 | while(parent.parent !== null) {
|
---|
457 | // propagate task info
|
---|
458 | parent.error = this.error;
|
---|
459 | parent.swapTime = this.swapTime;
|
---|
460 | parent.userData = this.userData;
|
---|
461 | parent = parent.parent;
|
---|
462 | }
|
---|
463 | finish(parent, true);
|
---|
464 | }
|
---|
465 |
|
---|
466 | // call failure callback if one exists
|
---|
467 | if(this.failureCallback) {
|
---|
468 | this.failureCallback(this);
|
---|
469 | }
|
---|
470 | }
|
---|
471 | };
|
---|
472 |
|
---|
473 | /**
|
---|
474 | * Asynchronously start a task.
|
---|
475 | *
|
---|
476 | * @param task the task to start.
|
---|
477 | */
|
---|
478 | var start = function(task) {
|
---|
479 | task.error = false;
|
---|
480 | task.state = sStateTable[task.state][START];
|
---|
481 | setTimeout(function() {
|
---|
482 | if(task.state === RUNNING) {
|
---|
483 | task.swapTime = +new Date();
|
---|
484 | task.run(task);
|
---|
485 | runNext(task, 0);
|
---|
486 | }
|
---|
487 | }, 0);
|
---|
488 | };
|
---|
489 |
|
---|
490 | /**
|
---|
491 | * Run the next subtask or finish this task.
|
---|
492 | *
|
---|
493 | * @param task the task to process.
|
---|
494 | * @param recurse the recursion count.
|
---|
495 | */
|
---|
496 | var runNext = function(task, recurse) {
|
---|
497 | // get time since last context swap (ms), if enough time has passed set
|
---|
498 | // swap to true to indicate that doNext was performed asynchronously
|
---|
499 | // also, if recurse is too high do asynchronously
|
---|
500 | var swap =
|
---|
501 | (recurse > sMaxRecursions) ||
|
---|
502 | (+new Date() - task.swapTime) > sTimeSlice;
|
---|
503 |
|
---|
504 | var doNext = function(recurse) {
|
---|
505 | recurse++;
|
---|
506 | if(task.state === RUNNING) {
|
---|
507 | if(swap) {
|
---|
508 | // update swap time
|
---|
509 | task.swapTime = +new Date();
|
---|
510 | }
|
---|
511 |
|
---|
512 | if(task.subtasks.length > 0) {
|
---|
513 | // run next subtask
|
---|
514 | var subtask = task.subtasks.shift();
|
---|
515 | subtask.error = task.error;
|
---|
516 | subtask.swapTime = task.swapTime;
|
---|
517 | subtask.userData = task.userData;
|
---|
518 | subtask.run(subtask);
|
---|
519 | if(!subtask.error) {
|
---|
520 | runNext(subtask, recurse);
|
---|
521 | }
|
---|
522 | } else {
|
---|
523 | finish(task);
|
---|
524 |
|
---|
525 | if(!task.error) {
|
---|
526 | // chain back up and run parent
|
---|
527 | if(task.parent !== null) {
|
---|
528 | // propagate task info
|
---|
529 | task.parent.error = task.error;
|
---|
530 | task.parent.swapTime = task.swapTime;
|
---|
531 | task.parent.userData = task.userData;
|
---|
532 |
|
---|
533 | // no subtasks left, call run next subtask on parent
|
---|
534 | runNext(task.parent, recurse);
|
---|
535 | }
|
---|
536 | }
|
---|
537 | }
|
---|
538 | }
|
---|
539 | };
|
---|
540 |
|
---|
541 | if(swap) {
|
---|
542 | // we're swapping, so run asynchronously
|
---|
543 | setTimeout(doNext, 0);
|
---|
544 | } else {
|
---|
545 | // not swapping, so run synchronously
|
---|
546 | doNext(recurse);
|
---|
547 | }
|
---|
548 | };
|
---|
549 |
|
---|
550 | /**
|
---|
551 | * Finishes a task and looks for the next task in the queue to start.
|
---|
552 | *
|
---|
553 | * @param task the task to finish.
|
---|
554 | * @param suppressCallbacks true to suppress callbacks.
|
---|
555 | */
|
---|
556 | var finish = function(task, suppressCallbacks) {
|
---|
557 | // subtask is now done
|
---|
558 | task.state = DONE;
|
---|
559 |
|
---|
560 | delete sTasks[task.id];
|
---|
561 | if(sVL >= 1) {
|
---|
562 | forge.log.verbose(cat, '[%s][%s] finish',
|
---|
563 | task.id, task.name, task);
|
---|
564 | }
|
---|
565 |
|
---|
566 | // only do queue processing for root tasks
|
---|
567 | if(task.parent === null) {
|
---|
568 | // report error if queue is missing
|
---|
569 | if(!(task.type in sTaskQueues)) {
|
---|
570 | forge.log.error(cat,
|
---|
571 | '[%s][%s] task queue missing [%s]',
|
---|
572 | task.id, task.name, task.type);
|
---|
573 | } else if(sTaskQueues[task.type].length === 0) {
|
---|
574 | // report error if queue is empty
|
---|
575 | forge.log.error(cat,
|
---|
576 | '[%s][%s] task queue empty [%s]',
|
---|
577 | task.id, task.name, task.type);
|
---|
578 | } else if(sTaskQueues[task.type][0] !== task) {
|
---|
579 | // report error if this task isn't the first in the queue
|
---|
580 | forge.log.error(cat,
|
---|
581 | '[%s][%s] task not first in queue [%s]',
|
---|
582 | task.id, task.name, task.type);
|
---|
583 | } else {
|
---|
584 | // remove ourselves from the queue
|
---|
585 | sTaskQueues[task.type].shift();
|
---|
586 | // clean up queue if it is empty
|
---|
587 | if(sTaskQueues[task.type].length === 0) {
|
---|
588 | if(sVL >= 1) {
|
---|
589 | forge.log.verbose(cat, '[%s][%s] delete queue [%s]',
|
---|
590 | task.id, task.name, task.type);
|
---|
591 | }
|
---|
592 | /* Note: Only a task can delete a queue of its own type. This
|
---|
593 | is used as a way to synchronize tasks. If a queue for a certain
|
---|
594 | task type exists, then a task of that type is running.
|
---|
595 | */
|
---|
596 | delete sTaskQueues[task.type];
|
---|
597 | } else {
|
---|
598 | // dequeue the next task and start it
|
---|
599 | if(sVL >= 1) {
|
---|
600 | forge.log.verbose(cat,
|
---|
601 | '[%s][%s] queue start next [%s] remain:%s',
|
---|
602 | task.id, task.name, task.type,
|
---|
603 | sTaskQueues[task.type].length);
|
---|
604 | }
|
---|
605 | sTaskQueues[task.type][0].start();
|
---|
606 | }
|
---|
607 | }
|
---|
608 |
|
---|
609 | if(!suppressCallbacks) {
|
---|
610 | // call final callback if one exists
|
---|
611 | if(task.error && task.failureCallback) {
|
---|
612 | task.failureCallback(task);
|
---|
613 | } else if(!task.error && task.successCallback) {
|
---|
614 | task.successCallback(task);
|
---|
615 | }
|
---|
616 | }
|
---|
617 | }
|
---|
618 | };
|
---|
619 |
|
---|
620 | /* Tasks API */
|
---|
621 | module.exports = forge.task = forge.task || {};
|
---|
622 |
|
---|
623 | /**
|
---|
624 | * Starts a new task that will run the passed function asynchronously.
|
---|
625 | *
|
---|
626 | * In order to finish the task, either task.doNext() or task.fail()
|
---|
627 | * *must* be called.
|
---|
628 | *
|
---|
629 | * The task must have a type (a string identifier) that can be used to
|
---|
630 | * synchronize it with other tasks of the same type. That type can also
|
---|
631 | * be used to cancel tasks that haven't started yet.
|
---|
632 | *
|
---|
633 | * To start a task, the following object must be provided as a parameter
|
---|
634 | * (each function takes a task object as its first parameter):
|
---|
635 | *
|
---|
636 | * {
|
---|
637 | * type: the type of task.
|
---|
638 | * run: the function to run to execute the task.
|
---|
639 | * success: a callback to call when the task succeeds (optional).
|
---|
640 | * failure: a callback to call when the task fails (optional).
|
---|
641 | * }
|
---|
642 | *
|
---|
643 | * @param options the object as described above.
|
---|
644 | */
|
---|
645 | forge.task.start = function(options) {
|
---|
646 | // create a new task
|
---|
647 | var task = new Task({
|
---|
648 | run: options.run,
|
---|
649 | name: options.name || sNoTaskName
|
---|
650 | });
|
---|
651 | task.type = options.type;
|
---|
652 | task.successCallback = options.success || null;
|
---|
653 | task.failureCallback = options.failure || null;
|
---|
654 |
|
---|
655 | // append the task onto the appropriate queue
|
---|
656 | if(!(task.type in sTaskQueues)) {
|
---|
657 | if(sVL >= 1) {
|
---|
658 | forge.log.verbose(cat, '[%s][%s] create queue [%s]',
|
---|
659 | task.id, task.name, task.type);
|
---|
660 | }
|
---|
661 | // create the queue with the new task
|
---|
662 | sTaskQueues[task.type] = [task];
|
---|
663 | start(task);
|
---|
664 | } else {
|
---|
665 | // push the task onto the queue, it will be run after a task
|
---|
666 | // with the same type completes
|
---|
667 | sTaskQueues[options.type].push(task);
|
---|
668 | }
|
---|
669 | };
|
---|
670 |
|
---|
671 | /**
|
---|
672 | * Cancels all tasks of the given type that haven't started yet.
|
---|
673 | *
|
---|
674 | * @param type the type of task to cancel.
|
---|
675 | */
|
---|
676 | forge.task.cancel = function(type) {
|
---|
677 | // find the task queue
|
---|
678 | if(type in sTaskQueues) {
|
---|
679 | // empty all but the current task from the queue
|
---|
680 | sTaskQueues[type] = [sTaskQueues[type][0]];
|
---|
681 | }
|
---|
682 | };
|
---|
683 |
|
---|
684 | /**
|
---|
685 | * Creates a condition variable to synchronize tasks. To make a task wait
|
---|
686 | * on the condition variable, call task.wait(condition). To notify all
|
---|
687 | * tasks that are waiting, call condition.notify().
|
---|
688 | *
|
---|
689 | * @return the condition variable.
|
---|
690 | */
|
---|
691 | forge.task.createCondition = function() {
|
---|
692 | var cond = {
|
---|
693 | // all tasks that are blocked
|
---|
694 | tasks: {}
|
---|
695 | };
|
---|
696 |
|
---|
697 | /**
|
---|
698 | * Causes the given task to block until notify is called. If the task
|
---|
699 | * is already waiting on this condition then this is a no-op.
|
---|
700 | *
|
---|
701 | * @param task the task to cause to wait.
|
---|
702 | */
|
---|
703 | cond.wait = function(task) {
|
---|
704 | // only block once
|
---|
705 | if(!(task.id in cond.tasks)) {
|
---|
706 | task.block();
|
---|
707 | cond.tasks[task.id] = task;
|
---|
708 | }
|
---|
709 | };
|
---|
710 |
|
---|
711 | /**
|
---|
712 | * Notifies all waiting tasks to wake up.
|
---|
713 | */
|
---|
714 | cond.notify = function() {
|
---|
715 | // since unblock() will run the next task from here, make sure to
|
---|
716 | // clear the condition's blocked task list before unblocking
|
---|
717 | var tmp = cond.tasks;
|
---|
718 | cond.tasks = {};
|
---|
719 | for(var id in tmp) {
|
---|
720 | tmp[id].unblock();
|
---|
721 | }
|
---|
722 | };
|
---|
723 |
|
---|
724 | return cond;
|
---|
725 | };
|
---|