1 | 'use strict';
|
---|
2 |
|
---|
3 | var Buffer = require('safe-buffer').Buffer;
|
---|
4 |
|
---|
5 | var StreamReader = function() {
|
---|
6 | this._queue = [];
|
---|
7 | this._queueSize = 0;
|
---|
8 | this._offset = 0;
|
---|
9 | };
|
---|
10 |
|
---|
11 | StreamReader.prototype.put = function(buffer) {
|
---|
12 | if (!buffer || buffer.length === 0) return;
|
---|
13 | if (!Buffer.isBuffer(buffer)) buffer = Buffer.from(buffer);
|
---|
14 | this._queue.push(buffer);
|
---|
15 | this._queueSize += buffer.length;
|
---|
16 | };
|
---|
17 |
|
---|
18 | StreamReader.prototype.read = function(length) {
|
---|
19 | if (length > this._queueSize) return null;
|
---|
20 | if (length === 0) return Buffer.alloc(0);
|
---|
21 |
|
---|
22 | this._queueSize -= length;
|
---|
23 |
|
---|
24 | var queue = this._queue,
|
---|
25 | remain = length,
|
---|
26 | first = queue[0],
|
---|
27 | buffers, buffer;
|
---|
28 |
|
---|
29 | if (first.length >= length) {
|
---|
30 | if (first.length === length) {
|
---|
31 | return queue.shift();
|
---|
32 | } else {
|
---|
33 | buffer = first.slice(0, length);
|
---|
34 | queue[0] = first.slice(length);
|
---|
35 | return buffer;
|
---|
36 | }
|
---|
37 | }
|
---|
38 |
|
---|
39 | for (var i = 0, n = queue.length; i < n; i++) {
|
---|
40 | if (remain < queue[i].length) break;
|
---|
41 | remain -= queue[i].length;
|
---|
42 | }
|
---|
43 | buffers = queue.splice(0, i);
|
---|
44 |
|
---|
45 | if (remain > 0 && queue.length > 0) {
|
---|
46 | buffers.push(queue[0].slice(0, remain));
|
---|
47 | queue[0] = queue[0].slice(remain);
|
---|
48 | }
|
---|
49 | return Buffer.concat(buffers, length);
|
---|
50 | };
|
---|
51 |
|
---|
52 | StreamReader.prototype.eachByte = function(callback, context) {
|
---|
53 | var buffer, n, index;
|
---|
54 |
|
---|
55 | while (this._queue.length > 0) {
|
---|
56 | buffer = this._queue[0];
|
---|
57 | n = buffer.length;
|
---|
58 |
|
---|
59 | while (this._offset < n) {
|
---|
60 | index = this._offset;
|
---|
61 | this._offset += 1;
|
---|
62 | callback.call(context, buffer[index]);
|
---|
63 | }
|
---|
64 | this._offset = 0;
|
---|
65 | this._queue.shift();
|
---|
66 | }
|
---|
67 | };
|
---|
68 |
|
---|
69 | module.exports = StreamReader;
|
---|