source: node_modules/undici/lib/balanced-pool.js@ d24f17c

main
Last change on this file since d24f17c was d24f17c, checked in by Aleksandar Panovski <apano77@…>, 15 months ago

Initial commit

  • Property mode set to 100644
File size: 5.1 KB
RevLine 
[d24f17c]1'use strict'
2
3const {
4 BalancedPoolMissingUpstreamError,
5 InvalidArgumentError
6} = require('./core/errors')
7const {
8 PoolBase,
9 kClients,
10 kNeedDrain,
11 kAddClient,
12 kRemoveClient,
13 kGetDispatcher
14} = require('./pool-base')
15const Pool = require('./pool')
16const { kUrl, kInterceptors } = require('./core/symbols')
17const { parseOrigin } = require('./core/util')
18const kFactory = Symbol('factory')
19
20const kOptions = Symbol('options')
21const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor')
22const kCurrentWeight = Symbol('kCurrentWeight')
23const kIndex = Symbol('kIndex')
24const kWeight = Symbol('kWeight')
25const kMaxWeightPerServer = Symbol('kMaxWeightPerServer')
26const kErrorPenalty = Symbol('kErrorPenalty')
27
28function getGreatestCommonDivisor (a, b) {
29 if (b === 0) return a
30 return getGreatestCommonDivisor(b, a % b)
31}
32
33function defaultFactory (origin, opts) {
34 return new Pool(origin, opts)
35}
36
37class BalancedPool extends PoolBase {
38 constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) {
39 super()
40
41 this[kOptions] = opts
42 this[kIndex] = -1
43 this[kCurrentWeight] = 0
44
45 this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100
46 this[kErrorPenalty] = this[kOptions].errorPenalty || 15
47
48 if (!Array.isArray(upstreams)) {
49 upstreams = [upstreams]
50 }
51
52 if (typeof factory !== 'function') {
53 throw new InvalidArgumentError('factory must be a function.')
54 }
55
56 this[kInterceptors] = opts.interceptors && opts.interceptors.BalancedPool && Array.isArray(opts.interceptors.BalancedPool)
57 ? opts.interceptors.BalancedPool
58 : []
59 this[kFactory] = factory
60
61 for (const upstream of upstreams) {
62 this.addUpstream(upstream)
63 }
64 this._updateBalancedPoolStats()
65 }
66
67 addUpstream (upstream) {
68 const upstreamOrigin = parseOrigin(upstream).origin
69
70 if (this[kClients].find((pool) => (
71 pool[kUrl].origin === upstreamOrigin &&
72 pool.closed !== true &&
73 pool.destroyed !== true
74 ))) {
75 return this
76 }
77 const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))
78
79 this[kAddClient](pool)
80 pool.on('connect', () => {
81 pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty])
82 })
83
84 pool.on('connectionError', () => {
85 pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
86 this._updateBalancedPoolStats()
87 })
88
89 pool.on('disconnect', (...args) => {
90 const err = args[2]
91 if (err && err.code === 'UND_ERR_SOCKET') {
92 // decrease the weight of the pool.
93 pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
94 this._updateBalancedPoolStats()
95 }
96 })
97
98 for (const client of this[kClients]) {
99 client[kWeight] = this[kMaxWeightPerServer]
100 }
101
102 this._updateBalancedPoolStats()
103
104 return this
105 }
106
107 _updateBalancedPoolStats () {
108 this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0)
109 }
110
111 removeUpstream (upstream) {
112 const upstreamOrigin = parseOrigin(upstream).origin
113
114 const pool = this[kClients].find((pool) => (
115 pool[kUrl].origin === upstreamOrigin &&
116 pool.closed !== true &&
117 pool.destroyed !== true
118 ))
119
120 if (pool) {
121 this[kRemoveClient](pool)
122 }
123
124 return this
125 }
126
127 get upstreams () {
128 return this[kClients]
129 .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true)
130 .map((p) => p[kUrl].origin)
131 }
132
133 [kGetDispatcher] () {
134 // We validate that pools is greater than 0,
135 // otherwise we would have to wait until an upstream
136 // is added, which might never happen.
137 if (this[kClients].length === 0) {
138 throw new BalancedPoolMissingUpstreamError()
139 }
140
141 const dispatcher = this[kClients].find(dispatcher => (
142 !dispatcher[kNeedDrain] &&
143 dispatcher.closed !== true &&
144 dispatcher.destroyed !== true
145 ))
146
147 if (!dispatcher) {
148 return
149 }
150
151 const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)
152
153 if (allClientsBusy) {
154 return
155 }
156
157 let counter = 0
158
159 let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])
160
161 while (counter++ < this[kClients].length) {
162 this[kIndex] = (this[kIndex] + 1) % this[kClients].length
163 const pool = this[kClients][this[kIndex]]
164
165 // find pool index with the largest weight
166 if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
167 maxWeightIndex = this[kIndex]
168 }
169
170 // decrease the current weight every `this[kClients].length`.
171 if (this[kIndex] === 0) {
172 // Set the current weight to the next lower weight.
173 this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]
174
175 if (this[kCurrentWeight] <= 0) {
176 this[kCurrentWeight] = this[kMaxWeightPerServer]
177 }
178 }
179 if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
180 return pool
181 }
182 }
183
184 this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
185 this[kIndex] = maxWeightIndex
186 return this[kClients][maxWeightIndex]
187 }
188}
189
190module.exports = BalancedPool
Note: See TracBrowser for help on using the repository browser.