22use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23use Psr\Log\LoggerInterface;
24use Psr\Log\NullLogger;
30use Wikimedia\ScopedCallback;
53 private $maxConnCount;
54 private int $totalConnectionsAdjustment;
57 private $wallClockOverride;
60 private $serverStatesKeyLocked =
false;
63 private const VERSION = 3;
66 private const STATE_TARGET_TTL = 10;
68 private const STATE_PRESERVE_TTL = 60;
76 $this->wanCache = $wCache;
77 $this->logger =
new NullLogger();
79 if ( isset( $options[
'maxConnCount'] ) ) {
80 $this->maxConnCount = (int)( $options[
'maxConnCount'] );
82 $this->maxConnCount = INF;
85 $this->totalConnectionsAdjustment = (int)( $options[
'totalConnectionsAdjustment'] ?? 10 );
93 $this->statsd = $statsFactory;
97 if ( count( $weightByServer ) <= 1 ) {
102 $serverIndexes = array_keys( $weightByServer );
104 $totalConnections = 0;
106 $circuitBreakingEnabled =
true;
107 foreach ( $weightByServer as $i => $weight ) {
108 $serverState = $stateByServerIndex[$i];
109 $totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP];
110 $totalWeights += $weight;
115 $serverState[self::STATE_UP] &&
117 $serverState[self::STATE_CONN_COUNT] < $this->maxConnCount
119 $circuitBreakingEnabled =
false;
123 if ( $circuitBreakingEnabled ) {
125 null,
'Database servers in ' . $this->lb->getClusterName() .
' are overloaded. ' .
126 'In order to protect application servers, the circuit breaking to databases of this section ' .
127 'have been activated. Please try again a few seconds.'
131 foreach ( $weightByServer as $i => $weight ) {
134 !$stateByServerIndex[$i][self::STATE_UP] ||
138 $weightByServer[$i] = 0;
142 ( $totalConnections + $this->totalConnectionsAdjustment );
143 $weightRatio = $weight / $totalWeights;
144 $diffRatio = $connRatio - $weightRatio;
145 $adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 );
146 $weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio );
151 $stateByServerIndex = array_fill_keys( $serverIndexes, null );
154 $shuffledServerIndexes = $serverIndexes;
155 shuffle( $shuffledServerIndexes );
157 foreach ( $shuffledServerIndexes as $i ) {
159 $state = $this->srvCache->get( $key ) ?:
null;
160 if ( $this->isStateFresh( $state ) ) {
161 $this->logger->debug(
162 __METHOD__ .
": fresh local cache hit for '{db_server}'",
163 [
'db_server' => $this->lb->getServerName( $i ) ]
166 $lock = $this->srvCache->getScopedLock( $key, 0, 10 );
167 if ( $lock || !$state ) {
169 $this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL );
172 $stateByServerIndex[$i] = $state;
174 return $stateByServerIndex;
179 $key = $this->makeStateKey( $this->wanCache, $i );
180 $state = $this->wanCache->getWithSetCallback(
182 self::STATE_PRESERVE_TTL,
183 function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) {
184 $prevState = $wanPrevState ?: $srvPrevState ?:
null;
186 return $this->computeServerState( $i, $prevState );
191 $this->logger->debug(
192 __METHOD__ .
": WAN cache hit for '{db_server}'",
193 [
'db_server' => $this->lb->getServerName( $i ) ]
197 __METHOD__ .
": mutex acquired; regenerated cache for '{db_server}'",
198 [
'db_server' => $this->lb->getServerName( $i ) ]
208 $this->lb->getClusterName(),
210 $this->lb->getServerName( $i )
222 $startTime = $this->getCurrentTime();
226 $this->acquireServerStatesLoopGuard();
228 $cluster = $this->lb->getClusterName();
229 $serverName = $this->lb->getServerName( $i );
230 $statServerName = str_replace(
'.',
'_', $serverName );
232 $newState = $this->newInitialServerState();
234 if ( $this->lb->getServerInfo( $i )[
'load'] <= 0 ) {
236 $newState[self::STATE_AS_OF] = $this->getCurrentTime();
248 $connCount = $this->getConnCountForDb( $conn );
257 $conn->close( __METHOD__ );
260 $endTime = $this->getCurrentTime();
261 $newState[self::STATE_AS_OF] = $endTime;
262 $newState[self::STATE_CONN_COUNT] = $connCount;
263 $newState[self::STATE_UP] = $conn ? 1.0 : 0.0;
264 if ( $previousState ) {
265 $newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2;
268 if ( $connCount ===
false ) {
269 $this->logger->error(
270 __METHOD__ .
": host {db_server} is not up?",
271 [
'db_server' => $serverName ]
274 $this->statsd->timing(
"loadbalancer.connCount.$cluster.$statServerName", $connCount );
275 if ( $connCount > $this->maxConnCount ) {
276 $this->logger->warning(
277 "Server {db_server} has {conn_count} open connections (>= {max_conn})",
279 'db_server' => $serverName,
280 'conn_count' => $connCount,
281 'max_conn' => $this->maxConnCount
297 self::STATE_UP => 1.0,
299 self::STATE_CONN_COUNT => 0,
301 self::STATE_AS_OF => 0.0,
309 private function isStateFresh( $state ) {
313 return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL;
319 private function acquireServerStatesLoopGuard() {
320 if ( $this->serverStatesKeyLocked ) {
321 throw new RuntimeException(
322 "Circular recursion detected while regenerating server states cache. " .
323 "This may indicate improper connection handling in " . get_class( $this )
327 $this->serverStatesKeyLocked =
true;
329 return new ScopedCallback(
function () {
330 $this->serverStatesKeyLocked =
false;
339 return $this->wallClockOverride ?: microtime(
true );
347 $this->wallClockOverride =& $time;
350 private function getConnCountForDb(
IDatabase $conn ): int {
351 if ( $conn->getType() !==
'mysql' ) {
355 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST',
356 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
359 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST'
361 $res = $conn->query( $query, __METHOD__ );
362 $row = $res ? $res->fetchObject() :
false;
363 return $row ? (int)$row->pcount : 0;
if(!defined('MW_SETUP_CALLBACK'))