23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
29use Wikimedia\ScopedCallback;
54 private $movingAveRatio;
56 private $lagWarnThreshold;
59 private $wallClockOverride;
62 private $serverStatesKeyLocked =
false;
65 private const VERSION = 1;
67 private const POLL_PERIOD_MS = 500;
69 private const STATE_PRESERVE_TTL = 60;
71 private const TIME_TILL_REFRESH = 1;
86 $this->wanCache = $wCache;
87 $this->replLogger =
new NullLogger();
90 $this->movingAveRatio = $options[
'movingAveRatio'] ?? 0.1;
94 public function setLogger( LoggerInterface $logger ) {
95 $this->replLogger = $logger;
99 $this->statsd = $statsFactory;
102 final public function scaleLoads( array &$weightByServer, $domain ) {
103 $serverIndexes = array_keys( $weightByServer );
105 $newScalesByServer = $states[
'weightScales'];
106 foreach ( $weightByServer as $i => $weight ) {
107 if ( isset( $newScalesByServer[$i] ) ) {
108 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
110 $host = $this->lb->getServerName( $i );
111 $this->replLogger->error( __METHOD__ .
": host $host not in cache" );
116 final public function getLagTimes( array $serverIndexes, $domain ) {
128 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
131 $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
135 $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
136 $value = $this->srvCache->get( $srvCacheKey );
137 if ( $value && $value[
'timestamp'] > $minAsOfTime ) {
138 $this->replLogger->debug( __METHOD__ .
": used fresh '$cluster' cluster status" );
144 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
145 if ( !$scopedLock && $value ) {
146 $this->replLogger->debug( __METHOD__ .
": used stale '$cluster' cluster status" );
152 $staleValue = $value;
154 $value = $this->wanCache->getWithSetCallback(
155 $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
156 self::TIME_TILL_REFRESH,
157 function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
161 $scopedLock = $this->acquireServerStatesLoopGuard();
162 if ( !$scopedLock ) {
163 throw new RuntimeException(
164 "Circular recursion detected while regenerating server states cache. " .
165 "This may indicate improper connection handling in " . get_class( $this )
174 $oldValue ?: $staleValue
179 'lockTSE' => self::STATE_PRESERVE_TTL,
180 'staleTTL' => self::STATE_PRESERVE_TTL,
183 'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
188 $this->replLogger->info( __METHOD__ .
": regenerated '$cluster' cluster status" );
190 $this->replLogger->debug( __METHOD__ .
": used cached '$cluster' cluster status" );
195 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
210 if ( $this->lb->getServerCount() <= 1 ) {
211 return $this->getPlaceholderServerStates( $serverIndexes );
214 $priorScales = $priorStates ? $priorStates[
'weightScales'] : [];
215 $cluster = $this->lb->getClusterName();
219 foreach ( $serverIndexes as $i ) {
220 $isPrimary = ( $i == $this->lb->getWriterIndex() );
224 if ( $isPrimary && $this->lb->getServerInfo( $i )[
'load'] <= 0 ) {
227 $weightScales[$i] = 1.0;
231 $host = $this->lb->getServerName( $i );
232 # Handles with open transactions are avoided since they might be subject
233 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
235 $conn = $this->lb->getAnyOpenConnection( $i, $flags );
245 $lastScale = $priorScales[$i] ?? 1.0;
250 $this->movingAveRatio
253 $newScale = max( $newScale, 0.0 );
255 $weightScales[$i] = $newScale;
256 $statHost = str_replace(
'.',
'_', $host );
257 $this->statsd->gauge(
"loadbalancer.weight.$cluster.$statHost", $newScale );
261 $lagTimes[$i] = $isPrimary ? 0 :
false;
262 $this->replLogger->error(
263 __METHOD__ .
": host {db_server} is unreachable",
264 [
'db_server' => $host ]
271 $lag = $conn->getLag();
276 $lagTimes[$i] = $lag;
278 if ( $lag ===
false ) {
279 $this->replLogger->error(
280 __METHOD__ .
": host {db_server} is not replicating?",
281 [
'db_server' => $host ]
284 $this->statsd->timing(
"loadbalancer.lag.$cluster.$statHost", $lag * 1000 );
285 if ( $lag > $this->lagWarnThreshold ) {
286 $this->replLogger->warning(
287 "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
289 'db_server' => $host,
291 'maxlag' => $this->lagWarnThreshold
298 # Close the connection to avoid sleeper connections piling up.
299 # Note that the caller will pick one of these DBs and reconnect,
300 # which is slightly inefficient, but this only matters for the lag
301 # time cache miss cache, which is far less common that cache hits.
302 $this->lb->closeConnection( $conn );
307 'lagTimes' => $lagTimes,
308 'weightScales' => $weightScales,
317 private function getPlaceholderServerStates( array $serverIndexes ) {
319 'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
320 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
332 return $conn ? 1.0 : 0.0;
369 return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
377 private function getStatesCacheKey(
$cache, array $serverIndexes ) {
378 sort( $serverIndexes );
380 return $cache->makeGlobalKey(
381 'rdbms-server-states',
383 $this->lb->getServerName( $this->lb->getWriterIndex() ),
384 implode(
'-', $serverIndexes )
391 private function acquireServerStatesLoopGuard() {
392 if ( $this->serverStatesKeyLocked ) {
396 $this->serverStatesKeyLocked =
true;
398 return new ScopedCallback(
function () {
399 $this->serverStatesKeyLocked =
false;
408 return $this->wallClockOverride ?: microtime(
true );
416 $this->wallClockOverride =& $time;
Class representing a cache/ephemeral data store.
Multi-datacenter aware caching interface.