25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
29use Wikimedia\ScopedCallback;
63 private const LAG_WARN_THRESHOLD = 10;
66 private const VERSION = 1;
68 private const POLL_PERIOD_MS = 500;
70 private const STATE_PRESERVE_TTL = 60;
72 private const TIME_TILL_REFRESH = 1;
87 $this->wanCache = $wCache;
88 $this->replLogger =
new NullLogger();
90 $this->movingAveRatio = $options[
'movingAveRatio'] ?? 0.1;
91 $this->lagWarnThreshold = $options[
'lagWarnThreshold'] ?? self::LAG_WARN_THRESHOLD;
94 public function setLogger( LoggerInterface $logger ) {
95 $this->replLogger = $logger;
98 final public function scaleLoads( array &$weightByServer, $domain ) {
99 $serverIndexes = array_keys( $weightByServer );
101 $newScalesByServer = $states[
'weightScales'];
102 foreach ( $weightByServer as $i => $weight ) {
103 if ( isset( $newScalesByServer[$i] ) ) {
104 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
106 $host = $this->lb->getServerName( $i );
107 $this->replLogger->error( __METHOD__ .
": host $host not in cache" );
112 final public function getLagTimes( array $serverIndexes, $domain ) {
124 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
127 $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
132 $value = $this->srvCache->get( $srvCacheKey );
133 if ( $value && $value[
'timestamp'] > $minAsOfTime ) {
134 $this->replLogger->debug( __METHOD__ .
": used fresh '$cluster' cluster status" );
140 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
141 if ( !$scopedLock && $value ) {
142 $this->replLogger->debug( __METHOD__ .
": used stale '$cluster' cluster status" );
148 $staleValue = $value;
150 $value = $this->wanCache->getWithSetCallback(
152 self::TIME_TILL_REFRESH,
153 function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
158 if ( !$scopedLock ) {
159 throw new RuntimeException(
160 "Circular recursion detected while regenerating server states cache. " .
161 "This may indicate improper connection handling in " . get_class( $this )
170 $oldValue ?: $staleValue
175 'lockTSE' => self::STATE_PRESERVE_TTL,
176 'staleTTL' => self::STATE_PRESERVE_TTL,
184 $this->replLogger->info( __METHOD__ .
": regenerated '$cluster' cluster status" );
186 $this->replLogger->debug( __METHOD__ .
": used cached '$cluster' cluster status" );
191 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
206 if ( $this->lb->getServerCount() <= 1 ) {
210 $priorScales = $priorStates ? $priorStates[
'weightScales'] : [];
214 foreach ( $serverIndexes as $i ) {
215 $isMaster = ( $i == $this->lb->getWriterIndex() );
219 if ( $isMaster && $this->lb->getServerInfo( $i )[
'load'] <= 0 ) {
222 $weightScales[$i] = 1.0;
226 $host = $this->lb->getServerName( $i );
227 # Handles with open transactions are avoided since they might be subject
228 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
229 $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT | ILoadBalancer::CONN_SILENCE_ERRORS;
230 $conn = $this->lb->getAnyOpenConnection( $i, $flags );
235 $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
240 $lastScale = $priorScales[$i] ?? 1.0;
245 $this->movingAveRatio
249 $weightScales[$i] = max( $newScale, 0.0 );
253 $lagTimes[$i] = $isMaster ? 0 :
false;
254 $this->replLogger->error(
255 __METHOD__ .
": host {db_server} is unreachable",
256 [
'db_server' => $host ]
263 $lagTimes[$i] = $conn->getLag();
266 $lagTimes[$i] =
false;
269 if ( $lagTimes[$i] ===
false ) {
270 $this->replLogger->error(
271 __METHOD__ .
": host {db_server} is not replicating?",
272 [
'db_server' => $host ]
274 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
275 $this->replLogger->warning(
276 "Server {dbserver} has {lag} seconds of lag (>= {maxlag})",
279 'lag' => $lagTimes[$i],
280 'maxlag' => $this->lagWarnThreshold
286 # Close the connection to avoid sleeper connections piling up.
287 # Note that the caller will pick one of these DBs and reconnect,
288 # which is slightly inefficient, but this only matters for the lag
289 # time cache miss cache, which is far less common that cache hits.
290 $this->lb->closeConnection( $conn );
295 'lagTimes' => $lagTimes,
296 'weightScales' => $weightScales,
307 'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
308 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
320 return $conn ? 1.0 : 0.0;
357 return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
366 sort( $serverIndexes );
368 return $cache->makeGlobalKey(
369 'rdbms-server-states',
371 $this->lb->getServerName( $this->lb->getWriterIndex() ),
372 implode(
'-', $serverIndexes )
380 if ( $this->serverStatesKeyLocked ) {
384 $this->serverStatesKeyLocked =
true;
386 return new ScopedCallback(
function () {
387 $this->serverStatesKeyLocked =
false;
396 return $this->wallClockOverride ?: microtime(
true );
404 $this->wallClockOverride =& $time;
Class representing a cache/ephemeral data store.
Multi-datacenter aware caching interface.