Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
77.93% |
113 / 145 |
|
25.00% |
3 / 12 |
CRAP | |
0.00% |
0 / 1 |
LoadMonitor | |
77.93% |
113 / 145 |
|
25.00% |
3 / 12 |
59.07 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
1 | |||
setLogger | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setStatsdDataFactory | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
scaleLoads | |
95.24% |
20 / 21 |
|
0.00% |
0 / 1 |
6 | |||
getServerStates | |
76.47% |
13 / 17 |
|
0.00% |
0 / 1 |
6.47 | |||
getStateFromWanCache | |
81.82% |
18 / 22 |
|
0.00% |
0 / 1 |
4.10 | |||
makeStateKey | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
1 | |||
computeServerState | |
62.50% |
25 / 40 |
|
0.00% |
0 / 1 |
13.27 | |||
newInitialServerState | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
1 | |||
isStateFresh | |
66.67% |
2 / 3 |
|
0.00% |
0 / 1 |
2.15 | |||
acquireServerStatesLoopGuard | |
55.56% |
5 / 9 |
|
0.00% |
0 / 1 |
2.35 | |||
getCurrentTime | n/a |
0 / 0 |
n/a |
0 / 0 |
2 | |||||
setMockTime | n/a |
0 / 0 |
n/a |
0 / 0 |
1 | |||||
getConnCountForDb | |
91.67% |
11 / 12 |
|
0.00% |
0 / 1 |
4.01 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | namespace Wikimedia\Rdbms; |
21 | |
22 | use BagOStuff; |
23 | use IStoreKeyEncoder; |
24 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
25 | use NullStatsdDataFactory; |
26 | use Psr\Log\LoggerInterface; |
27 | use Psr\Log\NullLogger; |
28 | use RuntimeException; |
29 | use WANObjectCache; |
30 | use Wikimedia\Rdbms\Platform\ISQLPlatform; |
31 | use Wikimedia\ScopedCallback; |
32 | |
33 | /** |
34 | * Basic DB load monitor with no external dependencies |
35 | * |
36 | * This uses both local server and local datacenter caches for DB server state information. |
37 | * |
38 | * |
39 | * @ingroup Database |
40 | */ |
41 | class LoadMonitor implements ILoadMonitor { |
42 | /** @var ILoadBalancer */ |
43 | protected $lb; |
44 | /** @var BagOStuff */ |
45 | protected $srvCache; |
46 | /** @var WANObjectCache */ |
47 | protected $wanCache; |
48 | /** @var LoggerInterface */ |
49 | protected $logger; |
50 | /** @var StatsdDataFactoryInterface */ |
51 | protected $statsd; |
52 | private int $maxConnCount; |
53 | private int $totalConnectionsAdjustment; |
54 | |
55 | /** @var float|null */ |
56 | private $wallClockOverride; |
57 | |
58 | /** @var bool Whether the "server states" cache key is in the process of being updated */ |
59 | private $serverStatesKeyLocked = false; |
60 | |
61 | /** Cache key version */ |
62 | private const VERSION = 3; |
63 | |
64 | /** Target time-till-refresh for DB server states */ |
65 | private const STATE_TARGET_TTL = 10; |
66 | /** Seconds to persist DB server states on cache (fresh or stale) */ |
67 | private const STATE_PRESERVE_TTL = 60; |
68 | |
69 | /** |
70 | * @inheritDoc |
71 | */ |
72 | public function __construct( ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, $options ) { |
73 | $this->lb = $lb; |
74 | $this->srvCache = $srvCache; |
75 | $this->wanCache = $wCache; |
76 | $this->logger = new NullLogger(); |
77 | $this->statsd = new NullStatsdDataFactory(); |
78 | $this->maxConnCount = (int)( $options['maxConnCount'] ?? 500 ); |
79 | $this->totalConnectionsAdjustment = (int)( $options['totalConnectionsAdjustment'] ?? 10 ); |
80 | } |
81 | |
82 | public function setLogger( LoggerInterface $logger ) { |
83 | $this->logger = $logger; |
84 | } |
85 | |
86 | public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) { |
87 | $this->statsd = $statsFactory; |
88 | } |
89 | |
90 | public function scaleLoads( array &$weightByServer ) { |
91 | if ( count( $weightByServer ) <= 1 ) { |
92 | // Single-server group; relative adjustments are pointless |
93 | return; |
94 | } |
95 | |
96 | $serverIndexes = array_keys( $weightByServer ); |
97 | $stateByServerIndex = $this->getServerStates( $serverIndexes ); |
98 | $totalConnections = 0; |
99 | $totalWeights = 0; |
100 | foreach ( $weightByServer as $i => $weight ) { |
101 | $serverState = $stateByServerIndex[$i]; |
102 | $totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP]; |
103 | $totalWeights += $weight; |
104 | } |
105 | foreach ( $weightByServer as $i => $weight ) { |
106 | if ( |
107 | // host is down or |
108 | !$stateByServerIndex[$i][self::STATE_UP] || |
109 | // host is primary or explicitly set to zero |
110 | $weight <= 0 |
111 | ) { |
112 | $weightByServer[$i] = 0; |
113 | continue; |
114 | } |
115 | $connRatio = $stateByServerIndex[$i][self::STATE_CONN_COUNT] / |
116 | ( $totalConnections + $this->totalConnectionsAdjustment ); |
117 | $weightRatio = $weight / $totalWeights; |
118 | $diffRatio = $connRatio - $weightRatio; |
119 | $adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 ); |
120 | $weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio ); |
121 | } |
122 | } |
123 | |
124 | protected function getServerStates( array $serverIndexes ): array { |
125 | $stateByServerIndex = array_fill_keys( $serverIndexes, null ); |
126 | // Perform any cache regenerations in randomized order so that the |
127 | // DB servers will each have similarly up-to-date state cache entries. |
128 | $shuffledServerIndexes = $serverIndexes; |
129 | shuffle( $shuffledServerIndexes ); |
130 | |
131 | foreach ( $shuffledServerIndexes as $i ) { |
132 | $key = $this->makeStateKey( $this->srvCache, $i ); |
133 | $state = $this->srvCache->get( $key ) ?: null; |
134 | if ( $this->isStateFresh( $state ) ) { |
135 | $this->logger->debug( |
136 | __METHOD__ . ": fresh local cache hit for '{db_server}'", |
137 | [ 'db_server' => $this->lb->getServerName( $i ) ] |
138 | ); |
139 | } else { |
140 | $lock = $this->srvCache->getScopedLock( $key, 0, 10 ); |
141 | if ( $lock || !$state ) { |
142 | $state = $this->getStateFromWanCache( $i, $state ); |
143 | $this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL ); |
144 | } |
145 | } |
146 | $stateByServerIndex[$i] = $state; |
147 | } |
148 | return $stateByServerIndex; |
149 | } |
150 | |
151 | protected function getStateFromWanCache( $i, ?array $srvPrevState ) { |
152 | $hit = true; |
153 | $key = $this->makeStateKey( $this->wanCache, $i ); |
154 | $state = $this->wanCache->getWithSetCallback( |
155 | $key, |
156 | self::STATE_PRESERVE_TTL, |
157 | function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) { |
158 | $prevState = $wanPrevState ?: $srvPrevState ?: null; |
159 | $hit = false; |
160 | return $this->computeServerState( $i, $prevState ); |
161 | }, |
162 | [ 'lockTSE' => 30 ] |
163 | ); |
164 | if ( $hit ) { |
165 | $this->logger->debug( |
166 | __METHOD__ . ": WAN cache hit for '{db_server}'", |
167 | [ 'db_server' => $this->lb->getServerName( $i ) ] |
168 | ); |
169 | } else { |
170 | $this->logger->info( |
171 | __METHOD__ . ": mutex acquired; regenerated cache for '{db_server}'", |
172 | [ 'db_server' => $this->lb->getServerName( $i ) ] |
173 | ); |
174 | } |
175 | return $state; |
176 | } |
177 | |
178 | protected function makeStateKey( IStoreKeyEncoder $cache, int $i ) { |
179 | return $cache->makeGlobalKey( |
180 | 'rdbms-gauge', |
181 | self::VERSION, |
182 | $this->lb->getClusterName(), |
183 | $this->lb->getServerName( $this->lb->getWriterIndex() ), |
184 | $this->lb->getServerName( $i ) |
185 | ); |
186 | } |
187 | |
188 | /** |
189 | * @param int $i |
190 | * @param array|null $previousState |
191 | * @return array<string,mixed> |
192 | * @phan-return array{up:float,conn_count:float|int|false,time:float} |
193 | * @throws DBAccessError |
194 | */ |
195 | protected function computeServerState( int $i, ?array $previousState ) { |
196 | $startTime = $this->getCurrentTime(); |
197 | // Double check for circular recursion in computeServerStates()/getWeightScale(). |
198 | // Mainly, connection attempts should use LoadBalancer::getServerConnection() |
199 | // rather than something that will pick a server based on the server states. |
200 | $this->acquireServerStatesLoopGuard(); |
201 | |
202 | $cluster = $this->lb->getClusterName(); |
203 | $serverName = $this->lb->getServerName( $i ); |
204 | $statServerName = str_replace( '.', '_', $serverName ); |
205 | |
206 | $newState = $this->newInitialServerState(); |
207 | |
208 | if ( $this->lb->getServerInfo( $i )['load'] <= 0 ) { |
209 | // Callers only use this server when they have *no choice* anyway (e.g. primary) |
210 | $newState[self::STATE_AS_OF] = $this->getCurrentTime(); |
211 | // Avoid connecting, especially since it might reside in a remote datacenter |
212 | return $newState; |
213 | } |
214 | |
215 | // Get a new, untracked, connection in order to gauge server health |
216 | $flags = ILoadBalancer::CONN_UNTRACKED_GAUGE | ILoadBalancer::CONN_SILENCE_ERRORS; |
217 | // Get a connection to this server without triggering other server connections |
218 | $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags ); |
219 | // Determine the number of open connections in this server |
220 | if ( $conn ) { |
221 | try { |
222 | $connCount = $this->getConnCountForDb( $conn ); |
223 | } catch ( DBError $e ) { |
224 | $connCount = false; |
225 | } |
226 | } else { |
227 | $connCount = false; |
228 | } |
229 | // Only keep one connection open at a time |
230 | if ( $conn ) { |
231 | $conn->close( __METHOD__ ); |
232 | } |
233 | |
234 | $endTime = $this->getCurrentTime(); |
235 | $newState[self::STATE_AS_OF] = $endTime; |
236 | $newState[self::STATE_CONN_COUNT] = $connCount; |
237 | $newState[self::STATE_UP] = $conn ? 1.0 : 0.0; |
238 | if ( $previousState ) { |
239 | $newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2; |
240 | } |
241 | |
242 | if ( $connCount === false ) { |
243 | $this->logger->error( |
244 | __METHOD__ . ": host {db_server} is not up?", |
245 | [ 'db_server' => $serverName ] |
246 | ); |
247 | } else { |
248 | $this->statsd->timing( "loadbalancer.connCount.$cluster.$statServerName", $connCount ); |
249 | if ( $connCount > $this->maxConnCount ) { |
250 | $this->logger->warning( |
251 | "Server {db_server} has {conn_count} open connections (>= {max_conn})", |
252 | [ |
253 | 'db_server' => $serverName, |
254 | 'conn_count' => $connCount, |
255 | 'max_conn' => $this->maxConnCount |
256 | ] |
257 | ); |
258 | } |
259 | } |
260 | |
261 | return $newState; |
262 | } |
263 | |
264 | /** |
265 | * @return array<string,mixed> |
266 | * @phan-return array{up:float,conn_count:int|int|false,time:float} |
267 | */ |
268 | protected function newInitialServerState() { |
269 | return [ |
270 | // Moving average of connectivity; treat as good |
271 | self::STATE_UP => 1.0, |
272 | // Number of connections to that replica; treat as none |
273 | self::STATE_CONN_COUNT => 0, |
274 | // UNIX timestamp of state generation completion; treat as "outdated" |
275 | self::STATE_AS_OF => 0.0, |
276 | ]; |
277 | } |
278 | |
279 | /** |
280 | * @param array|null $state |
281 | * @return bool |
282 | */ |
283 | private function isStateFresh( $state ) { |
284 | if ( !$state ) { |
285 | return false; |
286 | } |
287 | return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL; |
288 | } |
289 | |
290 | /** |
291 | * @return ScopedCallback |
292 | */ |
293 | private function acquireServerStatesLoopGuard() { |
294 | if ( $this->serverStatesKeyLocked ) { |
295 | throw new RuntimeException( |
296 | "Circular recursion detected while regenerating server states cache. " . |
297 | "This may indicate improper connection handling in " . get_class( $this ) |
298 | ); |
299 | } |
300 | |
301 | $this->serverStatesKeyLocked = true; // lock |
302 | |
303 | return new ScopedCallback( function () { |
304 | $this->serverStatesKeyLocked = false; // unlock |
305 | } ); |
306 | } |
307 | |
308 | /** |
309 | * @return float UNIX timestamp |
310 | * @codeCoverageIgnore |
311 | */ |
312 | protected function getCurrentTime() { |
313 | return $this->wallClockOverride ?: microtime( true ); |
314 | } |
315 | |
316 | /** |
317 | * @param float|null &$time Mock UNIX timestamp for testing |
318 | * @codeCoverageIgnore |
319 | */ |
320 | public function setMockTime( &$time ) { |
321 | $this->wallClockOverride =& $time; |
322 | } |
323 | |
324 | private function getConnCountForDb( IDatabase $conn ): int { |
325 | if ( $conn->getType() !== 'mysql' ) { |
326 | return 0; |
327 | } |
328 | $query = new Query( |
329 | 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST', |
330 | ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE, |
331 | 'SELECT', |
332 | null, |
333 | 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST' |
334 | ); |
335 | $res = $conn->query( $query, __METHOD__ ); |
336 | $row = $res ? $res->fetchObject() : false; |
337 | return $row ? (int)$row->pcount : 0; |
338 | } |
339 | } |