source: trip-planner-front/node_modules/websocket-extensions/lib/pipeline/functor.js

Last change on this file was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 1.5 KB
Line 
1'use strict';
2
3var RingBuffer = require('./ring_buffer');
4
5var Functor = function(session, method) {
6 this._session = session;
7 this._method = method;
8 this._queue = new RingBuffer(Functor.QUEUE_SIZE);
9 this._stopped = false;
10 this.pending = 0;
11};
12
13Functor.QUEUE_SIZE = 8;
14
15Functor.prototype.call = function(error, message, callback, context) {
16 if (this._stopped) return;
17
18 var record = { error: error, message: message, callback: callback, context: context, done: false },
19 called = false,
20 self = this;
21
22 this._queue.push(record);
23
24 if (record.error) {
25 record.done = true;
26 this._stop();
27 return this._flushQueue();
28 }
29
30 var handler = function(err, msg) {
31 if (!(called ^ (called = true))) return;
32
33 if (err) {
34 self._stop();
35 record.error = err;
36 record.message = null;
37 } else {
38 record.message = msg;
39 }
40
41 record.done = true;
42 self._flushQueue();
43 };
44
45 try {
46 this._session[this._method](message, handler);
47 } catch (err) {
48 handler(err);
49 }
50};
51
52Functor.prototype._stop = function() {
53 this.pending = this._queue.length;
54 this._stopped = true;
55};
56
57Functor.prototype._flushQueue = function() {
58 var queue = this._queue, record;
59
60 while (queue.length > 0 && queue.peek().done) {
61 record = queue.shift();
62 if (record.error) {
63 this.pending = 0;
64 queue.clear();
65 } else {
66 this.pending -= 1;
67 }
68 record.callback.call(record.context, record.error, record.message);
69 }
70};
71
72module.exports = Functor;
Note: See TracBrowser for help on using the repository browser.