source: node_modules/axios/lib/helpers/AxiosTransformStream.js@ d24f17c

main
Last change on this file since d24f17c was d24f17c, checked in by Aleksandar Panovski <apano77@…>, 15 months ago

Initial commit

  • Property mode set to 100644
File size: 5.0 KB
RevLine 
[d24f17c]1'use strict';
2
3import stream from 'stream';
4import utils from '../utils.js';
5import throttle from './throttle.js';
6import speedometer from './speedometer.js';
7
8const kInternals = Symbol('internals');
9
10class AxiosTransformStream extends stream.Transform{
11 constructor(options) {
12 options = utils.toFlatObject(options, {
13 maxRate: 0,
14 chunkSize: 64 * 1024,
15 minChunkSize: 100,
16 timeWindow: 500,
17 ticksRate: 2,
18 samplesCount: 15
19 }, null, (prop, source) => {
20 return !utils.isUndefined(source[prop]);
21 });
22
23 super({
24 readableHighWaterMark: options.chunkSize
25 });
26
27 const self = this;
28
29 const internals = this[kInternals] = {
30 length: options.length,
31 timeWindow: options.timeWindow,
32 ticksRate: options.ticksRate,
33 chunkSize: options.chunkSize,
34 maxRate: options.maxRate,
35 minChunkSize: options.minChunkSize,
36 bytesSeen: 0,
37 isCaptured: false,
38 notifiedBytesLoaded: 0,
39 ts: Date.now(),
40 bytes: 0,
41 onReadCallback: null
42 };
43
44 const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow);
45
46 this.on('newListener', event => {
47 if (event === 'progress') {
48 if (!internals.isCaptured) {
49 internals.isCaptured = true;
50 }
51 }
52 });
53
54 let bytesNotified = 0;
55
56 internals.updateProgress = throttle(function throttledHandler() {
57 const totalBytes = internals.length;
58 const bytesTransferred = internals.bytesSeen;
59 const progressBytes = bytesTransferred - bytesNotified;
60 if (!progressBytes || self.destroyed) return;
61
62 const rate = _speedometer(progressBytes);
63
64 bytesNotified = bytesTransferred;
65
66 process.nextTick(() => {
67 self.emit('progress', {
68 'loaded': bytesTransferred,
69 'total': totalBytes,
70 'progress': totalBytes ? (bytesTransferred / totalBytes) : undefined,
71 'bytes': progressBytes,
72 'rate': rate ? rate : undefined,
73 'estimated': rate && totalBytes && bytesTransferred <= totalBytes ?
74 (totalBytes - bytesTransferred) / rate : undefined
75 });
76 });
77 }, internals.ticksRate);
78
79 const onFinish = () => {
80 internals.updateProgress(true);
81 };
82
83 this.once('end', onFinish);
84 this.once('error', onFinish);
85 }
86
87 _read(size) {
88 const internals = this[kInternals];
89
90 if (internals.onReadCallback) {
91 internals.onReadCallback();
92 }
93
94 return super._read(size);
95 }
96
97 _transform(chunk, encoding, callback) {
98 const self = this;
99 const internals = this[kInternals];
100 const maxRate = internals.maxRate;
101
102 const readableHighWaterMark = this.readableHighWaterMark;
103
104 const timeWindow = internals.timeWindow;
105
106 const divider = 1000 / timeWindow;
107 const bytesThreshold = (maxRate / divider);
108 const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
109
110 function pushChunk(_chunk, _callback) {
111 const bytes = Buffer.byteLength(_chunk);
112 internals.bytesSeen += bytes;
113 internals.bytes += bytes;
114
115 if (internals.isCaptured) {
116 internals.updateProgress();
117 }
118
119 if (self.push(_chunk)) {
120 process.nextTick(_callback);
121 } else {
122 internals.onReadCallback = () => {
123 internals.onReadCallback = null;
124 process.nextTick(_callback);
125 };
126 }
127 }
128
129 const transformChunk = (_chunk, _callback) => {
130 const chunkSize = Buffer.byteLength(_chunk);
131 let chunkRemainder = null;
132 let maxChunkSize = readableHighWaterMark;
133 let bytesLeft;
134 let passed = 0;
135
136 if (maxRate) {
137 const now = Date.now();
138
139 if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
140 internals.ts = now;
141 bytesLeft = bytesThreshold - internals.bytes;
142 internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
143 passed = 0;
144 }
145
146 bytesLeft = bytesThreshold - internals.bytes;
147 }
148
149 if (maxRate) {
150 if (bytesLeft <= 0) {
151 // next time window
152 return setTimeout(() => {
153 _callback(null, _chunk);
154 }, timeWindow - passed);
155 }
156
157 if (bytesLeft < maxChunkSize) {
158 maxChunkSize = bytesLeft;
159 }
160 }
161
162 if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
163 chunkRemainder = _chunk.subarray(maxChunkSize);
164 _chunk = _chunk.subarray(0, maxChunkSize);
165 }
166
167 pushChunk(_chunk, chunkRemainder ? () => {
168 process.nextTick(_callback, null, chunkRemainder);
169 } : _callback);
170 };
171
172 transformChunk(chunk, function transformNextChunk(err, _chunk) {
173 if (err) {
174 return callback(err);
175 }
176
177 if (_chunk) {
178 transformChunk(_chunk, transformNextChunk);
179 } else {
180 callback(null);
181 }
182 });
183 }
184
185 setLength(length) {
186 this[kInternals].length = +length;
187 return this;
188 }
189}
190
191export default AxiosTransformStream;
Note: See TracBrowser for help on using the repository browser.