source: node_modules/undici/lib/pool-base.js

main
Last change on this file was d24f17c, checked in by Aleksandar Panovski <apano77@…>, 15 months ago

Initial commit

  • Property mode set to 100644
File size: 4.5 KB
Line 
1'use strict'
2
3const DispatcherBase = require('./dispatcher-base')
4const FixedQueue = require('./node/fixed-queue')
5const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols')
6const PoolStats = require('./pool-stats')
7
8const kClients = Symbol('clients')
9const kNeedDrain = Symbol('needDrain')
10const kQueue = Symbol('queue')
11const kClosedResolve = Symbol('closed resolve')
12const kOnDrain = Symbol('onDrain')
13const kOnConnect = Symbol('onConnect')
14const kOnDisconnect = Symbol('onDisconnect')
15const kOnConnectionError = Symbol('onConnectionError')
16const kGetDispatcher = Symbol('get dispatcher')
17const kAddClient = Symbol('add client')
18const kRemoveClient = Symbol('remove client')
19const kStats = Symbol('stats')
20
21class PoolBase extends DispatcherBase {
22 constructor () {
23 super()
24
25 this[kQueue] = new FixedQueue()
26 this[kClients] = []
27 this[kQueued] = 0
28
29 const pool = this
30
31 this[kOnDrain] = function onDrain (origin, targets) {
32 const queue = pool[kQueue]
33
34 let needDrain = false
35
36 while (!needDrain) {
37 const item = queue.shift()
38 if (!item) {
39 break
40 }
41 pool[kQueued]--
42 needDrain = !this.dispatch(item.opts, item.handler)
43 }
44
45 this[kNeedDrain] = needDrain
46
47 if (!this[kNeedDrain] && pool[kNeedDrain]) {
48 pool[kNeedDrain] = false
49 pool.emit('drain', origin, [pool, ...targets])
50 }
51
52 if (pool[kClosedResolve] && queue.isEmpty()) {
53 Promise
54 .all(pool[kClients].map(c => c.close()))
55 .then(pool[kClosedResolve])
56 }
57 }
58
59 this[kOnConnect] = (origin, targets) => {
60 pool.emit('connect', origin, [pool, ...targets])
61 }
62
63 this[kOnDisconnect] = (origin, targets, err) => {
64 pool.emit('disconnect', origin, [pool, ...targets], err)
65 }
66
67 this[kOnConnectionError] = (origin, targets, err) => {
68 pool.emit('connectionError', origin, [pool, ...targets], err)
69 }
70
71 this[kStats] = new PoolStats(this)
72 }
73
74 get [kBusy] () {
75 return this[kNeedDrain]
76 }
77
78 get [kConnected] () {
79 return this[kClients].filter(client => client[kConnected]).length
80 }
81
82 get [kFree] () {
83 return this[kClients].filter(client => client[kConnected] && !client[kNeedDrain]).length
84 }
85
86 get [kPending] () {
87 let ret = this[kQueued]
88 for (const { [kPending]: pending } of this[kClients]) {
89 ret += pending
90 }
91 return ret
92 }
93
94 get [kRunning] () {
95 let ret = 0
96 for (const { [kRunning]: running } of this[kClients]) {
97 ret += running
98 }
99 return ret
100 }
101
102 get [kSize] () {
103 let ret = this[kQueued]
104 for (const { [kSize]: size } of this[kClients]) {
105 ret += size
106 }
107 return ret
108 }
109
110 get stats () {
111 return this[kStats]
112 }
113
114 async [kClose] () {
115 if (this[kQueue].isEmpty()) {
116 return Promise.all(this[kClients].map(c => c.close()))
117 } else {
118 return new Promise((resolve) => {
119 this[kClosedResolve] = resolve
120 })
121 }
122 }
123
124 async [kDestroy] (err) {
125 while (true) {
126 const item = this[kQueue].shift()
127 if (!item) {
128 break
129 }
130 item.handler.onError(err)
131 }
132
133 return Promise.all(this[kClients].map(c => c.destroy(err)))
134 }
135
136 [kDispatch] (opts, handler) {
137 const dispatcher = this[kGetDispatcher]()
138
139 if (!dispatcher) {
140 this[kNeedDrain] = true
141 this[kQueue].push({ opts, handler })
142 this[kQueued]++
143 } else if (!dispatcher.dispatch(opts, handler)) {
144 dispatcher[kNeedDrain] = true
145 this[kNeedDrain] = !this[kGetDispatcher]()
146 }
147
148 return !this[kNeedDrain]
149 }
150
151 [kAddClient] (client) {
152 client
153 .on('drain', this[kOnDrain])
154 .on('connect', this[kOnConnect])
155 .on('disconnect', this[kOnDisconnect])
156 .on('connectionError', this[kOnConnectionError])
157
158 this[kClients].push(client)
159
160 if (this[kNeedDrain]) {
161 process.nextTick(() => {
162 if (this[kNeedDrain]) {
163 this[kOnDrain](client[kUrl], [this, client])
164 }
165 })
166 }
167
168 return this
169 }
170
171 [kRemoveClient] (client) {
172 client.close(() => {
173 const idx = this[kClients].indexOf(client)
174 if (idx !== -1) {
175 this[kClients].splice(idx, 1)
176 }
177 })
178
179 this[kNeedDrain] = this[kClients].some(dispatcher => (
180 !dispatcher[kNeedDrain] &&
181 dispatcher.closed !== true &&
182 dispatcher.destroyed !== true
183 ))
184 }
185}
186
187module.exports = {
188 PoolBase,
189 kClients,
190 kNeedDrain,
191 kAddClient,
192 kRemoveClient,
193 kGetDispatcher
194}
Note: See TracBrowser for help on using the repository browser.