23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
29use Wikimedia\ScopedCallback;
54 private $movingAveRatio;
56 private $lagWarnThreshold;
59 private $wallClockOverride;
62 private $serverStatesKeyLocked =
false;
65 private const VERSION = 2;
68 private const STATE_TARGET_TTL = 1.0;
70 private const STATE_PRESERVE_TTL = 60;
72 private const TIME_TILL_REFRESH = 1;
88 $this->wanCache = $wCache;
89 $this->logger =
new NullLogger();
92 $this->movingAveRatio = (float)( $options[
'movingAveRatio'] ?? 0.54 );
101 $this->statsd = $statsFactory;
105 $serverIndexes = array_keys( $weightByServer );
107 $newScalesByServer = $states[
'weightScales'];
108 foreach ( $weightByServer as $i => $weight ) {
109 if ( isset( $newScalesByServer[$i] ) ) {
110 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
112 $host = $this->lb->getServerName( $i );
113 $this->logger->error( __METHOD__ .
": host $host not in cache" );
130 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
133 $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
134 $value = $this->srvCache->get( $srvCacheKey );
140 self::STATE_TARGET_TTL,
144 $this->logger->debug( __METHOD__ .
": used fresh '$cluster' cluster status" );
150 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
151 if ( !$scopedLock && $value ) {
152 $this->logger->debug( __METHOD__ .
": used stale '$cluster' cluster status" );
158 $staleValue = $value;
160 $value = $this->wanCache->getWithSetCallback(
161 $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
162 self::TIME_TILL_REFRESH,
163 function ( $oldValue, &$ttl ) use ( $serverIndexes, $staleValue, &$updated ) {
167 $scopedLock = $this->acquireServerStatesLoopGuard();
168 if ( !$scopedLock ) {
169 throw new RuntimeException(
170 "Circular recursion detected while regenerating server states cache. " .
171 "This may indicate improper connection handling in " . get_class( $this )
179 $oldValue ?: $staleValue
184 'lockTSE' => self::STATE_PRESERVE_TTL,
185 'staleTTL' => self::STATE_PRESERVE_TTL,
188 'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
193 $this->logger->info( __METHOD__ .
": regenerated '$cluster' cluster status" );
195 $this->logger->debug( __METHOD__ .
": used cached '$cluster' cluster status" );
200 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
214 $age = max( $now - $priorAsOf, 0.0 );
216 $ttrRatio = $age / $referenceTTL;
218 $genRatio = $priorGenDelay / $referenceTTL;
222 $chance = exp( -128 * $genRatio ) * ( $ttrRatio ** 4 );
223 return ( mt_rand( 1, 1000000000 ) <= 1000000000 * $chance );
235 if ( $this->lb->getServerCount() <= 1 ) {
236 return $this->getPlaceholderServerStates( $serverIndexes );
239 $priorAsOf = $priorStates[
'timestamp'] ?? 0;
240 $priorScales = $priorStates ? $priorStates[
'weightScales'] : [];
241 $cluster = $this->lb->getClusterName();
245 foreach ( $serverIndexes as $i ) {
246 $isPrimary = ( $i == $this->lb->getWriterIndex() );
250 if ( $isPrimary && $this->lb->getServerInfo( $i )[
'load'] <= 0 ) {
253 $weightScales[$i] = 1.0;
257 $host = $this->lb->getServerName( $i );
260 $flags = $this->lb::CONN_UNTRACKED_GAUGE | $this->lb::CONN_SILENCE_ERRORS;
262 $conn = $this->lb->getServerConnection( $i, $this->lb::DOMAIN_ANY, $flags );
265 $lastScale = $priorScales[$i] ?? 1.0;
271 $this->movingAveRatio
274 $newScale = max( $newScale, 0.0 );
276 $weightScales[$i] = $newScale;
277 $statHost = str_replace(
'.',
'_', $host );
278 $this->statsd->gauge(
"loadbalancer.weight.$cluster.$statHost", $newScale );
282 $lagTimes[$i] = $isPrimary ? 0 :
false;
283 $this->logger->error(
284 __METHOD__ .
": host {db_server} is unreachable",
285 [
'db_server' => $host ]
292 $lag = $conn->getLag();
297 $lagTimes[$i] = $lag;
299 if ( $lag ===
false ) {
300 $this->logger->error(
301 __METHOD__ .
": host {db_server} is not replicating?",
302 [
'db_server' => $host ]
305 $this->statsd->timing(
"loadbalancer.lag.$cluster.$statHost", $lag * 1000 );
306 if ( $lag > $this->lagWarnThreshold ) {
307 $this->logger->warning(
308 "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
310 'db_server' => $host,
312 'maxlag' => $this->lagWarnThreshold
319 $conn->close( __METHOD__ );
325 'lagTimes' => $lagTimes,
326 'weightScales' => $weightScales,
327 'timestamp' => $endTime,
328 'genTime' => max( $endTime - $startTime, 0.0 )
336 private function getPlaceholderServerStates( array $serverIndexes ) {
338 'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
339 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
352 return $conn ? 1.0 : 0.0;
370 if ( $gaugeValue ===
null ) {
372 } elseif ( $priorValue ===
null ) {
380 $delayAwareRatio = 1 - pow( 1 - $movAveRatio, $delay );
382 return max( $delayAwareRatio * $gaugeValue + ( 1 - $delayAwareRatio ) * $priorValue, 0.0 );
390 private function getStatesCacheKey( $cache, array $serverIndexes ) {
391 sort( $serverIndexes );
393 return $cache->makeGlobalKey(
394 'rdbms-server-states',
396 $this->lb->getServerName( $this->lb->getWriterIndex() ),
397 implode(
'-', $serverIndexes )
404 private function acquireServerStatesLoopGuard() {
405 if ( $this->serverStatesKeyLocked ) {
409 $this->serverStatesKeyLocked =
true;
411 return new ScopedCallback(
function () {
412 $this->serverStatesKeyLocked =
false;
421 return $this->wallClockOverride ?: microtime(
true );
429 $this->wallClockOverride =& $time;
Class representing a cache/ephemeral data store.
Multi-datacenter aware caching interface.