[6a3a178] | 1 | 'use strict';
|
---|
| 2 |
|
---|
| 3 | var Cell = require('./cell'),
|
---|
| 4 | Pledge = require('./pledge');
|
---|
| 5 |
|
---|
| 6 | var Pipeline = function(sessions) {
|
---|
| 7 | this._cells = sessions.map(function(session) { return new Cell(session) });
|
---|
| 8 | this._stopped = { incoming: false, outgoing: false };
|
---|
| 9 | };
|
---|
| 10 |
|
---|
| 11 | Pipeline.prototype.processIncomingMessage = function(message, callback, context) {
|
---|
| 12 | if (this._stopped.incoming) return;
|
---|
| 13 | this._loop('incoming', this._cells.length - 1, -1, -1, message, callback, context);
|
---|
| 14 | };
|
---|
| 15 |
|
---|
| 16 | Pipeline.prototype.processOutgoingMessage = function(message, callback, context) {
|
---|
| 17 | if (this._stopped.outgoing) return;
|
---|
| 18 | this._loop('outgoing', 0, this._cells.length, 1, message, callback, context);
|
---|
| 19 | };
|
---|
| 20 |
|
---|
| 21 | Pipeline.prototype.close = function(callback, context) {
|
---|
| 22 | this._stopped = { incoming: true, outgoing: true };
|
---|
| 23 |
|
---|
| 24 | var closed = this._cells.map(function(a) { return a.close() });
|
---|
| 25 | if (callback)
|
---|
| 26 | Pledge.all(closed).then(function() { callback.call(context) });
|
---|
| 27 | };
|
---|
| 28 |
|
---|
| 29 | Pipeline.prototype._loop = function(direction, start, end, step, message, callback, context) {
|
---|
| 30 | var cells = this._cells,
|
---|
| 31 | n = cells.length,
|
---|
| 32 | self = this;
|
---|
| 33 |
|
---|
| 34 | while (n--) cells[n].pending(direction);
|
---|
| 35 |
|
---|
| 36 | var pipe = function(index, error, msg) {
|
---|
| 37 | if (index === end) return callback.call(context, error, msg);
|
---|
| 38 |
|
---|
| 39 | cells[index][direction](error, msg, function(err, m) {
|
---|
| 40 | if (err) self._stopped[direction] = true;
|
---|
| 41 | pipe(index + step, err, m);
|
---|
| 42 | });
|
---|
| 43 | };
|
---|
| 44 | pipe(start, null, message);
|
---|
| 45 | };
|
---|
| 46 |
|
---|
| 47 | module.exports = Pipeline;
|
---|