1 | 'use strict';
|
---|
2 |
|
---|
3 | var util = require('util');
|
---|
4 | var EventEmitter = require('events').EventEmitter;
|
---|
5 |
|
---|
6 | function Hose(socket, options, filter) {
|
---|
7 | EventEmitter.call(this);
|
---|
8 |
|
---|
9 | if (typeof options === 'function') {
|
---|
10 | filter = options;
|
---|
11 | options = {};
|
---|
12 | }
|
---|
13 |
|
---|
14 | this.socket = socket;
|
---|
15 | this.options = options;
|
---|
16 | this.filter = filter;
|
---|
17 |
|
---|
18 | this.buffer = null;
|
---|
19 |
|
---|
20 | var self = this;
|
---|
21 | this.listeners = {
|
---|
22 | error: function(err) {
|
---|
23 | return self.onError(err);
|
---|
24 | },
|
---|
25 | data: function(chunk) {
|
---|
26 | return self.onData(chunk);
|
---|
27 | },
|
---|
28 | end: function() {
|
---|
29 | return self.onEnd();
|
---|
30 | }
|
---|
31 | };
|
---|
32 |
|
---|
33 | this.socket.on('error', this.listeners.error);
|
---|
34 | this.socket.on('data', this.listeners.data);
|
---|
35 | this.socket.on('end', this.listeners.end);
|
---|
36 | }
|
---|
37 | util.inherits(Hose, EventEmitter);
|
---|
38 | module.exports = Hose;
|
---|
39 |
|
---|
40 | Hose.create = function create(socket, options, filter) {
|
---|
41 | return new Hose(socket, options, filter);
|
---|
42 | };
|
---|
43 |
|
---|
44 | Hose.prototype.detach = function detach() {
|
---|
45 | // Stop the flow
|
---|
46 | this.socket.pause();
|
---|
47 |
|
---|
48 | this.socket.removeListener('error', this.listeners.error);
|
---|
49 | this.socket.removeListener('data', this.listeners.data);
|
---|
50 | this.socket.removeListener('end', this.listeners.end);
|
---|
51 | };
|
---|
52 |
|
---|
53 | Hose.prototype.reemit = function reemit() {
|
---|
54 | var buffer = this.buffer;
|
---|
55 | this.buffer = null;
|
---|
56 |
|
---|
57 | // Modern age
|
---|
58 | if (this.socket.unshift) {
|
---|
59 | this.socket.unshift(buffer);
|
---|
60 | if (this.socket.listeners('data').length > 0)
|
---|
61 | this.socket.resume();
|
---|
62 | return;
|
---|
63 | }
|
---|
64 |
|
---|
65 | // Rusty node v0.8
|
---|
66 | if (this.socket.ondata)
|
---|
67 | this.socket.ondata(buffer, 0, buffer.length);
|
---|
68 | this.socket.emit('data', buffer);
|
---|
69 | this.socket.resume();
|
---|
70 | };
|
---|
71 |
|
---|
72 | Hose.prototype.onError = function onError(err) {
|
---|
73 | this.detach();
|
---|
74 | this.emit('error', err);
|
---|
75 | };
|
---|
76 |
|
---|
77 | Hose.prototype.onData = function onData(chunk) {
|
---|
78 | if (this.buffer)
|
---|
79 | this.buffer = Buffer.concat([ this.buffer, chunk ]);
|
---|
80 | else
|
---|
81 | this.buffer = chunk;
|
---|
82 |
|
---|
83 | var self = this;
|
---|
84 | this.filter(this.buffer, function(err, protocol) {
|
---|
85 | if (err)
|
---|
86 | return self.onError(err);
|
---|
87 |
|
---|
88 | // No protocol selected yet
|
---|
89 | if (!protocol)
|
---|
90 | return;
|
---|
91 |
|
---|
92 | self.detach();
|
---|
93 | self.emit('select', protocol, self.socket);
|
---|
94 | self.reemit();
|
---|
95 | });
|
---|
96 | };
|
---|
97 |
|
---|
98 | Hose.prototype.onEnd = function onEnd() {
|
---|
99 | this.detach();
|
---|
100 | this.emit('error', new Error('Not enough data to recognize protocol'));
|
---|
101 | };
|
---|