Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
84.91% |
135 / 159 |
|
27.27% |
3 / 11 |
CRAP | |
0.00% |
0 / 1 |
| LoadMonitor | |
84.91% |
135 / 159 |
|
27.27% |
3 / 11 |
51.96 | |
0.00% |
0 / 1 |
| __construct | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
2 | |||
| setLogger | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| scaleLoads | |
96.88% |
31 / 32 |
|
0.00% |
0 / 1 |
10 | |||
| getServerStates | |
76.47% |
13 / 17 |
|
0.00% |
0 / 1 |
6.47 | |||
| getStateFromWanCache | |
81.82% |
18 / 22 |
|
0.00% |
0 / 1 |
4.10 | |||
| makeStateKey | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
1 | |||
| computeServerState | |
80.95% |
34 / 42 |
|
0.00% |
0 / 1 |
9.56 | |||
| newInitialServerState | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
1 | |||
| isStateFresh | |
66.67% |
2 / 3 |
|
0.00% |
0 / 1 |
2.15 | |||
| acquireServerStatesLoopGuard | |
55.56% |
5 / 9 |
|
0.00% |
0 / 1 |
2.35 | |||
| getCurrentTime | n/a |
0 / 0 |
n/a |
0 / 0 |
2 | |||||
| setMockTime | n/a |
0 / 0 |
n/a |
0 / 0 |
1 | |||||
| getConnCountForDb | |
91.67% |
11 / 12 |
|
0.00% |
0 / 1 |
4.01 | |||
| 1 | <?php |
| 2 | /** |
| 3 | * @license GPL-2.0-or-later |
| 4 | * @file |
| 5 | */ |
| 6 | namespace Wikimedia\Rdbms; |
| 7 | |
| 8 | use Psr\Log\LoggerInterface; |
| 9 | use RuntimeException; |
| 10 | use Wikimedia\ObjectCache\BagOStuff; |
| 11 | use Wikimedia\ObjectCache\IStoreKeyEncoder; |
| 12 | use Wikimedia\ObjectCache\WANObjectCache; |
| 13 | use Wikimedia\Rdbms\Platform\ISQLPlatform; |
| 14 | use Wikimedia\ScopedCallback; |
| 15 | use Wikimedia\Stats\StatsFactory; |
| 16 | |
| 17 | /** |
| 18 | * Basic DB load monitor with no external dependencies |
| 19 | * |
| 20 | * This uses both local server and local datacenter caches for DB server state information. |
| 21 | * |
| 22 | * |
| 23 | * @ingroup Database |
| 24 | */ |
| 25 | class LoadMonitor implements ILoadMonitor { |
| 26 | /** @var ILoadBalancer */ |
| 27 | protected $lb; |
| 28 | /** @var BagOStuff */ |
| 29 | protected $srvCache; |
| 30 | /** @var WANObjectCache */ |
| 31 | protected $wanCache; |
| 32 | /** @var LoggerInterface */ |
| 33 | protected $logger; |
| 34 | /** @var StatsFactory */ |
| 35 | protected $statsFactory; |
| 36 | /** @var int|float */ |
| 37 | private $maxConnCount; |
| 38 | private int $totalConnectionsAdjustment; |
| 39 | |
| 40 | /** @var float|null */ |
| 41 | private $wallClockOverride; |
| 42 | |
| 43 | /** @var bool Whether the "server states" cache key is in the process of being updated */ |
| 44 | private $serverStatesKeyLocked = false; |
| 45 | |
| 46 | /** Cache key version */ |
| 47 | private const VERSION = 3; |
| 48 | |
| 49 | /** Target time-till-refresh for DB server states */ |
| 50 | private const STATE_TARGET_TTL = 10; |
| 51 | /** Seconds to persist DB server states on cache (fresh or stale) */ |
| 52 | private const STATE_PRESERVE_TTL = 60; |
| 53 | |
| 54 | /** |
| 55 | * @inheritDoc |
| 56 | */ |
| 57 | public function __construct( |
| 58 | ILoadBalancer $lb, |
| 59 | BagOStuff $srvCache, |
| 60 | WANObjectCache $wCache, |
| 61 | LoggerInterface $logger, |
| 62 | StatsFactory $statsFactory, |
| 63 | $options |
| 64 | ) { |
| 65 | $this->lb = $lb; |
| 66 | $this->srvCache = $srvCache; |
| 67 | $this->wanCache = $wCache; |
| 68 | $this->logger = $logger; |
| 69 | $this->statsFactory = $statsFactory; |
| 70 | if ( isset( $options['maxConnCount'] ) ) { |
| 71 | $this->maxConnCount = (int)( $options['maxConnCount'] ); |
| 72 | } else { |
| 73 | $this->maxConnCount = INF; |
| 74 | } |
| 75 | |
| 76 | $this->totalConnectionsAdjustment = (int)( $options['totalConnectionsAdjustment'] ?? 10 ); |
| 77 | } |
| 78 | |
| 79 | public function setLogger( LoggerInterface $logger ): void { |
| 80 | $this->logger = $logger; |
| 81 | } |
| 82 | |
| 83 | public function scaleLoads( array &$weightByServer ) { |
| 84 | if ( count( $weightByServer ) <= 1 ) { |
| 85 | // Single-server group; relative adjustments are pointless |
| 86 | return; |
| 87 | } |
| 88 | |
| 89 | $serverIndexes = array_keys( $weightByServer ); |
| 90 | $stateByServerIndex = $this->getServerStates( $serverIndexes ); |
| 91 | $totalConnections = 0; |
| 92 | $totalWeights = 0; |
| 93 | $circuitBreakingEnabled = true; |
| 94 | foreach ( $weightByServer as $i => $weight ) { |
| 95 | $serverState = $stateByServerIndex[$i]; |
| 96 | $totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP]; |
| 97 | $totalWeights += $weight; |
| 98 | |
| 99 | // Set up circuit breaking. If at least one replica can take more connections |
| 100 | // allow the flow. |
| 101 | if ( |
| 102 | $serverState[self::STATE_UP] && |
| 103 | $weight > 0 && |
| 104 | $serverState[self::STATE_CONN_COUNT] < $this->maxConnCount |
| 105 | ) { |
| 106 | $circuitBreakingEnabled = false; |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | if ( $circuitBreakingEnabled ) { |
| 111 | throw new DBConnectionError( |
| 112 | null, 'Database servers in ' . $this->lb->getClusterName() . ' are overloaded. ' . |
| 113 | 'In order to protect application servers, the circuit breaking to databases of this section ' . |
| 114 | 'have been activated. Please try again a few seconds.' |
| 115 | ); |
| 116 | } |
| 117 | |
| 118 | foreach ( $weightByServer as $i => $weight ) { |
| 119 | if ( |
| 120 | // host is down or |
| 121 | !$stateByServerIndex[$i][self::STATE_UP] || |
| 122 | // host is primary or explicitly set to zero |
| 123 | $weight <= 0 |
| 124 | ) { |
| 125 | $weightByServer[$i] = 0; |
| 126 | continue; |
| 127 | } |
| 128 | $connRatio = $stateByServerIndex[$i][self::STATE_CONN_COUNT] / |
| 129 | ( $totalConnections + $this->totalConnectionsAdjustment ); |
| 130 | $weightRatio = $weight / $totalWeights; |
| 131 | $diffRatio = $connRatio - $weightRatio; |
| 132 | $adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 ); |
| 133 | $weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio ); |
| 134 | } |
| 135 | } |
| 136 | |
| 137 | protected function getServerStates( array $serverIndexes ): array { |
| 138 | $stateByServerIndex = array_fill_keys( $serverIndexes, null ); |
| 139 | // Perform any cache regenerations in randomized order so that the |
| 140 | // DB servers will each have similarly up-to-date state cache entries. |
| 141 | $shuffledServerIndexes = $serverIndexes; |
| 142 | shuffle( $shuffledServerIndexes ); |
| 143 | |
| 144 | foreach ( $shuffledServerIndexes as $i ) { |
| 145 | $key = $this->makeStateKey( $this->srvCache, $i ); |
| 146 | $state = $this->srvCache->get( $key ) ?: null; |
| 147 | if ( $this->isStateFresh( $state ) ) { |
| 148 | $this->logger->debug( |
| 149 | __METHOD__ . ": fresh local cache hit for '{db_server}'", |
| 150 | [ 'db_server' => $this->lb->getServerName( $i ) ] |
| 151 | ); |
| 152 | } else { |
| 153 | $lock = $this->srvCache->getScopedLock( $key, 0, 10 ); |
| 154 | if ( $lock || !$state ) { |
| 155 | $state = $this->getStateFromWanCache( $i, $state ); |
| 156 | $this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL ); |
| 157 | } |
| 158 | } |
| 159 | $stateByServerIndex[$i] = $state; |
| 160 | } |
| 161 | return $stateByServerIndex; |
| 162 | } |
| 163 | |
| 164 | protected function getStateFromWanCache( int $i, ?array $srvPrevState ): array { |
| 165 | $hit = true; |
| 166 | $key = $this->makeStateKey( $this->wanCache, $i ); |
| 167 | $state = $this->wanCache->getWithSetCallback( |
| 168 | $key, |
| 169 | self::STATE_PRESERVE_TTL, |
| 170 | function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) { |
| 171 | $prevState = $wanPrevState ?: $srvPrevState ?: null; |
| 172 | $hit = false; |
| 173 | return $this->computeServerState( $i, $prevState ); |
| 174 | }, |
| 175 | [ 'lockTSE' => 30 ] |
| 176 | ); |
| 177 | if ( $hit ) { |
| 178 | $this->logger->debug( |
| 179 | __METHOD__ . ": WAN cache hit for '{db_server}'", |
| 180 | [ 'db_server' => $this->lb->getServerName( $i ) ] |
| 181 | ); |
| 182 | } else { |
| 183 | $this->logger->info( |
| 184 | __METHOD__ . ": mutex acquired; regenerated cache for '{db_server}'", |
| 185 | [ 'db_server' => $this->lb->getServerName( $i ) ] |
| 186 | ); |
| 187 | } |
| 188 | return $state; |
| 189 | } |
| 190 | |
| 191 | protected function makeStateKey( IStoreKeyEncoder $cache, int $i ): string { |
| 192 | return $cache->makeGlobalKey( |
| 193 | 'rdbms-gauge', |
| 194 | self::VERSION, |
| 195 | $this->lb->getClusterName(), |
| 196 | $this->lb->getServerName( ServerInfo::WRITER_INDEX ), |
| 197 | $this->lb->getServerName( $i ) |
| 198 | ); |
| 199 | } |
| 200 | |
| 201 | /** |
| 202 | * @param int $i |
| 203 | * @param array|null $previousState |
| 204 | * @return array<string,mixed> |
| 205 | * @phan-return array{up:float,conn_count:float|int|false,time:float} |
| 206 | * @throws DBAccessError |
| 207 | */ |
| 208 | protected function computeServerState( int $i, ?array $previousState ) { |
| 209 | // Double check for circular recursion in computeServerStates()/getWeightScale(). |
| 210 | // Mainly, connection attempts should use LoadBalancer::getServerConnection() |
| 211 | // rather than something that will pick a server based on the server states. |
| 212 | $scope = $this->acquireServerStatesLoopGuard(); |
| 213 | |
| 214 | $cluster = $this->lb->getClusterName(); |
| 215 | $serverName = $this->lb->getServerName( $i ); |
| 216 | $statServerName = str_replace( '.', '_', $serverName ); |
| 217 | |
| 218 | $newState = $this->newInitialServerState(); |
| 219 | |
| 220 | if ( $this->lb->getServerInfo( $i )['load'] <= 0 ) { |
| 221 | // Callers only use this server when they have *no choice* anyway (e.g. primary) |
| 222 | $newState[self::STATE_AS_OF] = $this->getCurrentTime(); |
| 223 | // Avoid connecting, especially since it might reside in a remote datacenter |
| 224 | return $newState; |
| 225 | } |
| 226 | |
| 227 | // Get a new, untracked, connection in order to gauge server health |
| 228 | $flags = ILoadBalancer::CONN_UNTRACKED_GAUGE | ILoadBalancer::CONN_SILENCE_ERRORS; |
| 229 | // Get a connection to this server without triggering other server connections |
| 230 | $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags ); |
| 231 | // Determine the number of open connections in this server |
| 232 | if ( $conn ) { |
| 233 | try { |
| 234 | $connCount = $this->getConnCountForDb( $conn ); |
| 235 | } catch ( DBError ) { |
| 236 | $connCount = false; |
| 237 | } |
| 238 | } else { |
| 239 | $connCount = false; |
| 240 | } |
| 241 | // Only keep one connection open at a time |
| 242 | if ( $conn ) { |
| 243 | $conn->close( __METHOD__ ); |
| 244 | } |
| 245 | |
| 246 | $endTime = $this->getCurrentTime(); |
| 247 | $newState[self::STATE_AS_OF] = $endTime; |
| 248 | $newState[self::STATE_CONN_COUNT] = $connCount; |
| 249 | $newState[self::STATE_UP] = $conn ? 1.0 : 0.0; |
| 250 | if ( $previousState ) { |
| 251 | $newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2; |
| 252 | } |
| 253 | |
| 254 | if ( $connCount === false ) { |
| 255 | $this->logger->error( |
| 256 | __METHOD__ . ": host {db_server} is not up?", |
| 257 | [ 'db_server' => $serverName ] |
| 258 | ); |
| 259 | } else { |
| 260 | $this->statsFactory->getGauge( 'rdbms_open_connection_total' ) |
| 261 | ->setLabel( 'db_cluster', $cluster ) |
| 262 | ->setLabel( 'db_server', $serverName ) |
| 263 | ->set( (int)$connCount ); |
| 264 | |
| 265 | if ( $connCount > $this->maxConnCount ) { |
| 266 | $this->logger->warning( |
| 267 | "Server {db_server} has {conn_count} open connections (>= {max_conn})", |
| 268 | [ |
| 269 | 'db_server' => $serverName, |
| 270 | 'conn_count' => $connCount, |
| 271 | 'max_conn' => $this->maxConnCount |
| 272 | ] |
| 273 | ); |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | return $newState; |
| 278 | } |
| 279 | |
| 280 | /** |
| 281 | * @return array<string,mixed> |
| 282 | * @phan-return array{up:float,conn_count:int|int|false,time:float} |
| 283 | */ |
| 284 | protected function newInitialServerState() { |
| 285 | return [ |
| 286 | // Moving average of connectivity; treat as good |
| 287 | self::STATE_UP => 1.0, |
| 288 | // Number of connections to that replica; treat as none |
| 289 | self::STATE_CONN_COUNT => 0, |
| 290 | // UNIX timestamp of state generation completion; treat as "outdated" |
| 291 | self::STATE_AS_OF => 0.0, |
| 292 | ]; |
| 293 | } |
| 294 | |
| 295 | /** |
| 296 | * @param array|null $state |
| 297 | * @return bool |
| 298 | */ |
| 299 | private function isStateFresh( $state ) { |
| 300 | if ( !$state ) { |
| 301 | return false; |
| 302 | } |
| 303 | return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL; |
| 304 | } |
| 305 | |
| 306 | #[\NoDiscard] |
| 307 | private function acquireServerStatesLoopGuard(): ScopedCallback { |
| 308 | if ( $this->serverStatesKeyLocked ) { |
| 309 | throw new RuntimeException( |
| 310 | "Circular recursion detected while regenerating server states cache. " . |
| 311 | "This may indicate improper connection handling in " . get_class( $this ) |
| 312 | ); |
| 313 | } |
| 314 | |
| 315 | $this->serverStatesKeyLocked = true; // lock |
| 316 | |
| 317 | return new ScopedCallback( function () { |
| 318 | $this->serverStatesKeyLocked = false; // unlock |
| 319 | } ); |
| 320 | } |
| 321 | |
| 322 | /** |
| 323 | * @return float UNIX timestamp |
| 324 | * @codeCoverageIgnore |
| 325 | */ |
| 326 | protected function getCurrentTime() { |
| 327 | return $this->wallClockOverride ?: microtime( true ); |
| 328 | } |
| 329 | |
| 330 | /** |
| 331 | * @param float|null &$time Mock UNIX timestamp for testing |
| 332 | * @codeCoverageIgnore |
| 333 | */ |
| 334 | public function setMockTime( &$time ) { |
| 335 | $this->wallClockOverride =& $time; |
| 336 | } |
| 337 | |
| 338 | private function getConnCountForDb( IDatabase $conn ): int { |
| 339 | if ( $conn->getType() !== 'mysql' ) { |
| 340 | return 0; |
| 341 | } |
| 342 | $query = new Query( |
| 343 | 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST', |
| 344 | ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE, |
| 345 | 'SELECT', |
| 346 | null, |
| 347 | 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST' |
| 348 | ); |
| 349 | $res = $conn->query( $query, __METHOD__ ); |
| 350 | $row = $res ? $res->fetchObject() : false; |
| 351 | return $row ? (int)$row->pcount : 0; |
| 352 | } |
| 353 | } |