24use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26use Psr\Log\LoggerInterface;
27use Psr\Log\NullLogger;
31use Wikimedia\ScopedCallback;
52 private int $maxConnCount;
53 private int $totalConnectionsAdjustment;
56 private $wallClockOverride;
59 private $serverStatesKeyLocked =
false;
62 private const VERSION = 3;
65 private const STATE_TARGET_TTL = 10;
67 private const STATE_PRESERVE_TTL = 60;
75 $this->wanCache = $wCache;
76 $this->logger =
new NullLogger();
78 $this->maxConnCount = (int)( $options[
'maxConnCount'] ?? 500 );
79 $this->totalConnectionsAdjustment = (int)( $options[
'totalConnectionsAdjustment'] ?? 10 );
87 $this->statsd = $statsFactory;
91 if ( count( $weightByServer ) <= 1 ) {
96 $serverIndexes = array_keys( $weightByServer );
98 $totalConnections = 0;
100 foreach ( $weightByServer as $i => $weight ) {
101 $serverState = $stateByServerIndex[$i];
102 $totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP];
103 $totalWeights += $weight;
105 foreach ( $weightByServer as $i => $weight ) {
108 !$stateByServerIndex[$i][self::STATE_UP] ||
112 $weightByServer[$i] = 0;
116 ( $totalConnections + $this->totalConnectionsAdjustment );
117 $weightRatio = $weight / $totalWeights;
118 $diffRatio = $connRatio - $weightRatio;
119 $adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 );
120 $weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio );
125 $stateByServerIndex = array_fill_keys( $serverIndexes, null );
128 $shuffledServerIndexes = $serverIndexes;
129 shuffle( $shuffledServerIndexes );
131 foreach ( $shuffledServerIndexes as $i ) {
133 $state = $this->srvCache->get( $key ) ?:
null;
134 if ( $this->isStateFresh( $state ) ) {
135 $this->logger->debug(
136 __METHOD__ .
": fresh local cache hit for '{db_server}'",
137 [
'db_server' => $this->lb->getServerName( $i ) ]
140 $lock = $this->srvCache->getScopedLock( $key, 0, 10 );
141 if ( $lock || !$state ) {
143 $this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL );
146 $stateByServerIndex[$i] = $state;
148 return $stateByServerIndex;
153 $key = $this->makeStateKey( $this->wanCache, $i );
154 $state = $this->wanCache->getWithSetCallback(
156 self::STATE_PRESERVE_TTL,
157 function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) {
158 $prevState = $wanPrevState ?: $srvPrevState ?:
null;
160 return $this->computeServerState( $i, $prevState );
165 $this->logger->debug(
166 __METHOD__ .
": WAN cache hit for '{db_server}'",
167 [
'db_server' => $this->lb->getServerName( $i ) ]
171 __METHOD__ .
": mutex acquired; regenerated cache for '{db_server}'",
172 [
'db_server' => $this->lb->getServerName( $i ) ]
182 $this->lb->getClusterName(),
183 $this->lb->getServerName( $this->lb->getWriterIndex() ),
184 $this->lb->getServerName( $i )
196 $startTime = $this->getCurrentTime();
200 $this->acquireServerStatesLoopGuard();
202 $cluster = $this->lb->getClusterName();
203 $serverName = $this->lb->getServerName( $i );
204 $statServerName = str_replace(
'.',
'_', $serverName );
206 $newState = $this->newInitialServerState();
208 if ( $this->lb->getServerInfo( $i )[
'load'] <= 0 ) {
210 $newState[self::STATE_AS_OF] = $this->getCurrentTime();
222 $connCount = $this->getConnCountForDb( $conn );
231 $conn->close( __METHOD__ );
234 $endTime = $this->getCurrentTime();
235 $newState[self::STATE_AS_OF] = $endTime;
236 $newState[self::STATE_CONN_COUNT] = $connCount;
237 $newState[self::STATE_UP] = $conn ? 1.0 : 0.0;
238 if ( $previousState ) {
239 $newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2;
242 if ( $connCount ===
false ) {
243 $this->logger->error(
244 __METHOD__ .
": host {db_server} is not up?",
245 [
'db_server' => $serverName ]
248 $this->statsd->timing(
"loadbalancer.connCount.$cluster.$statServerName", $connCount );
249 if ( $connCount > $this->maxConnCount ) {
250 $this->logger->warning(
251 "Server {db_server} has {conn_count} open connections (>= {max_conn})",
253 'db_server' => $serverName,
254 'conn_count' => $connCount,
255 'max_conn' => $this->maxConnCount
271 self::STATE_UP => 1.0,
273 self::STATE_CONN_COUNT => 0,
275 self::STATE_AS_OF => 0.0,
283 private function isStateFresh( $state ) {
287 return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL;
293 private function acquireServerStatesLoopGuard() {
294 if ( $this->serverStatesKeyLocked ) {
295 throw new RuntimeException(
296 "Circular recursion detected while regenerating server states cache. " .
297 "This may indicate improper connection handling in " . get_class( $this )
301 $this->serverStatesKeyLocked =
true;
303 return new ScopedCallback(
function () {
304 $this->serverStatesKeyLocked =
false;
313 return $this->wallClockOverride ?: microtime(
true );
321 $this->wallClockOverride =& $time;
324 private function getConnCountForDb(
IDatabase $conn ): int {
325 if ( $conn->getType() !==
'mysql' ) {
329 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST',
330 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
333 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST'
335 $res = $conn->query( $query, __METHOD__ );
336 $row = $res ? $res->fetchObject() :
false;
337 return $row ? (int)$row->pcount : 0;
if(!defined('MW_SETUP_CALLBACK'))
Class representing a cache/ephemeral data store.
Multi-datacenter aware caching interface.
Key-encoding methods for object caching (BagOStuff and WANObjectCache)
makeGlobalKey( $keygroup,... $components)