source: trip-planner-front/node_modules/qjobs/qjobs.js@ fa375fe

Last change on this file since fa375fe was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 5.0 KB
RevLine 
[6a3a178]1var util = require('util');
2var events = require('events').EventEmitter;
3
4var qjob = function(options) {
5
6 if(false === (this instanceof qjob)) {
7 return new qjob(options);
8 }
9
10 this.maxConcurrency = 10;
11 this.jobsRunning = 0;
12 this.jobsDone = 0;
13 this.jobsTotal = 0;
14 this.timeStart;
15 this.jobId = 0;
16 this.jobsList = [];
17 this.paused = false;
18 this.pausedId = null;
19 this.lastPause = 0;
20
21 this.interval = null;
22 this.stopAdding = false;
23 this.sleeping = false;
24
25 this.aborting = false;
26
27 if (options) {
28 this.maxConcurrency = options.maxConcurrency || this.maxConcurrency;
29 this.interval = options.interval || this.interval;
30 }
31 events.call(this);
32};
33
34util.inherits(qjob, events);
35
36/*
37 * helper to set max concurrency
38 */
39qjob.prototype.setConcurrency = function(max) {
40 this.maxConcurrency = max;
41}
42
43/*
44 * helper to set delay between rafales
45 */
46qjob.prototype.setInterval = function(delay) {
47 this.interval = delay;
48}
49
50/*
51 * add some jobs in the queue
52 */
53qjob.prototype.add = function(job,args) {
54 var self = this;
55 self.jobsList.push([job,args]);
56 self.jobsTotal++;
57}
58
59/*
60 *
61 */
62qjob.prototype.sleepDueToInterval = function() {
63 var self = this;
64
65 if (this.interval === null) {
66 return;
67 }
68
69 if (this.sleeping) {
70 return true;
71 }
72
73 if (this.stopAdding) {
74
75 if (this.jobsRunning > 0) {
76 //console.log('waiting for '+jobsRunning+' jobs to finish');
77 return true;
78 }
79
80 //console.log('waiting for '+rafaleDelay+' ms');
81 this.sleeping = true;
82 self.emit('sleep');
83
84 setTimeout(function() {
85 this.stopAdding = false;
86 this.sleeping = false;
87 self.emit('continu');
88 self.run();
89 }.bind(self),this.interval);
90
91 return true;
92 }
93
94 if (this.jobsRunning + 1 == this.maxConcurrency) {
95 //console.log('max concurrent jobs reached');
96 this.stopAdding = true;
97 return true;
98 }
99}
100
101/*
102 * run the queue
103 */
104qjob.prototype.run = function() {
105
106 var self = this;
107
108 // first launch, let's emit start event
109 if (this.jobsDone == 0) {
110 self.emit('start');
111 this.timeStart = Date.now();
112 }
113
114 if (self.sleepDueToInterval()) return;
115
116 if (self.aborting) {
117 this.jobsList = [];
118 }
119
120 // while queue is empty and number of job running
121 // concurrently are less than max job running,
122 // then launch the next job
123
124 while (this.jobsList.length && this.jobsRunning < this.maxConcurrency) {
125 // get the next job and
126 // remove it from the queue
127 var job = self.jobsList.shift();
128
129 // increment number of job running
130 self.jobsRunning++;
131
132 // fetch args for the job
133 var args = job[1];
134
135 // add jobId in args
136 args._jobId = this.jobId++;
137
138 // emit jobStart event
139 self.emit('jobStart',args);
140
141 // run the job
142 setTimeout(function() {
143 this.j(this.args,self.next.bind(self,this.args));
144 }.bind({j:job[0],args:args}),1);
145 }
146
147 // all jobs done ? emit end event
148 if (this.jobsList.length == 0 && this.jobsRunning == 0) {
149 self.emit('end');
150 }
151}
152
153/*
154 * a task has been terminated,
155 * so 'next()' has been called
156 */
157qjob.prototype.next = function(args) {
158
159 var self = this;
160
161 // update counters
162 this.jobsRunning--;
163 this.jobsDone++;
164
165 // emit 'jobEnd' event
166 self.emit('jobEnd',args);
167
168 // if queue has been set to pause
169 // then do nothing
170 if (this.paused) return;
171
172 // else, execute run() function
173 self.run();
174}
175
176/*
177 * You can 'pause' jobs.
178 * it will not pause running jobs, but
179 * it will stop launching pending jobs
180 * until paused = false
181 */
182qjob.prototype.pause = function(status) {
183 var self = this;
184 this.paused = status;
185 if (!this.paused && this.pausedId) {
186 clearInterval(this.pausedId);
187 self.emit('unpause');
188 this.run();
189 }
190 if (this.paused && !this.pausedId) {
191 self.lastPause = Date.now();
192 this.pausedId = setInterval(function() {
193 var since = Date.now() - self.lastPause;
194 self.emit('pause',since);
195 },1000);
196 return;
197 }
198}
199
200qjob.prototype.stats = function() {
201
202 var now = Date.now();
203
204 var o = {};
205 o._timeStart = this.timeStart || 'N/A';
206 o._timeElapsed = (now - this.timeStart) || 'N/A';
207 o._jobsTotal = this.jobsTotal;
208 o._jobsRunning = this.jobsRunning;
209 o._jobsDone = this.jobsDone;
210 o._progress = Math.floor((this.jobsDone/this.jobsTotal)*100);
211 o._concurrency = this.maxConcurrency;
212
213 if (this.paused) {
214 o._status = 'Paused';
215 return o;
216 }
217
218 if (o._timeElapsed == 'N/A') {
219 o._status = 'Starting';
220 return o;
221 }
222
223 if (this.jobsTotal == this.jobsDone) {
224 o._status = 'Finished';
225 return o;
226 }
227
228 o._status = 'Running';
229 return o;
230}
231
232qjob.prototype.abort = function() {
233 this.aborting = true;
234}
235
236module.exports = qjob;
Note: See TracBrowser for help on using the repository browser.