source: trip-planner-front/node_modules/websocket-extensions/lib/pipeline/index.js@ 76712b2

Last change on this file since 76712b2 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 1.4 KB
Line 
1'use strict';
2
3var Cell = require('./cell'),
4 Pledge = require('./pledge');
5
6var Pipeline = function(sessions) {
7 this._cells = sessions.map(function(session) { return new Cell(session) });
8 this._stopped = { incoming: false, outgoing: false };
9};
10
11Pipeline.prototype.processIncomingMessage = function(message, callback, context) {
12 if (this._stopped.incoming) return;
13 this._loop('incoming', this._cells.length - 1, -1, -1, message, callback, context);
14};
15
16Pipeline.prototype.processOutgoingMessage = function(message, callback, context) {
17 if (this._stopped.outgoing) return;
18 this._loop('outgoing', 0, this._cells.length, 1, message, callback, context);
19};
20
21Pipeline.prototype.close = function(callback, context) {
22 this._stopped = { incoming: true, outgoing: true };
23
24 var closed = this._cells.map(function(a) { return a.close() });
25 if (callback)
26 Pledge.all(closed).then(function() { callback.call(context) });
27};
28
29Pipeline.prototype._loop = function(direction, start, end, step, message, callback, context) {
30 var cells = this._cells,
31 n = cells.length,
32 self = this;
33
34 while (n--) cells[n].pending(direction);
35
36 var pipe = function(index, error, msg) {
37 if (index === end) return callback.call(context, error, msg);
38
39 cells[index][direction](error, msg, function(err, m) {
40 if (err) self._stopped[direction] = true;
41 pipe(index + step, err, m);
42 });
43 };
44 pipe(start, null, message);
45};
46
47module.exports = Pipeline;
Note: See TracBrowser for help on using the repository browser.