1 | 'use strict';
|
---|
2 |
|
---|
3 | var Cell = require('./cell'),
|
---|
4 | Pledge = require('./pledge');
|
---|
5 |
|
---|
6 | var Pipeline = function(sessions) {
|
---|
7 | this._cells = sessions.map(function(session) { return new Cell(session) });
|
---|
8 | this._stopped = { incoming: false, outgoing: false };
|
---|
9 | };
|
---|
10 |
|
---|
11 | Pipeline.prototype.processIncomingMessage = function(message, callback, context) {
|
---|
12 | if (this._stopped.incoming) return;
|
---|
13 | this._loop('incoming', this._cells.length - 1, -1, -1, message, callback, context);
|
---|
14 | };
|
---|
15 |
|
---|
16 | Pipeline.prototype.processOutgoingMessage = function(message, callback, context) {
|
---|
17 | if (this._stopped.outgoing) return;
|
---|
18 | this._loop('outgoing', 0, this._cells.length, 1, message, callback, context);
|
---|
19 | };
|
---|
20 |
|
---|
21 | Pipeline.prototype.close = function(callback, context) {
|
---|
22 | this._stopped = { incoming: true, outgoing: true };
|
---|
23 |
|
---|
24 | var closed = this._cells.map(function(a) { return a.close() });
|
---|
25 | if (callback)
|
---|
26 | Pledge.all(closed).then(function() { callback.call(context) });
|
---|
27 | };
|
---|
28 |
|
---|
29 | Pipeline.prototype._loop = function(direction, start, end, step, message, callback, context) {
|
---|
30 | var cells = this._cells,
|
---|
31 | n = cells.length,
|
---|
32 | self = this;
|
---|
33 |
|
---|
34 | while (n--) cells[n].pending(direction);
|
---|
35 |
|
---|
36 | var pipe = function(index, error, msg) {
|
---|
37 | if (index === end) return callback.call(context, error, msg);
|
---|
38 |
|
---|
39 | cells[index][direction](error, msg, function(err, m) {
|
---|
40 | if (err) self._stopped[direction] = true;
|
---|
41 | pipe(index + step, err, m);
|
---|
42 | });
|
---|
43 | };
|
---|
44 | pipe(start, null, message);
|
---|
45 | };
|
---|
46 |
|
---|
47 | module.exports = Pipeline;
|
---|