25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
29use Wikimedia\ScopedCallback;
63 private const VERSION = 1;
65 private const POLL_PERIOD_MS = 500;
67 private const STATE_PRESERVE_TTL = 60;
69 private const TIME_TILL_REFRESH = 1;
84 $this->wanCache = $wCache;
85 $this->replLogger =
new NullLogger();
87 $this->movingAveRatio = $options[
'movingAveRatio'] ?? 0.1;
91 public function setLogger( LoggerInterface $logger ) {
92 $this->replLogger = $logger;
95 final public function scaleLoads( array &$weightByServer, $domain ) {
96 $serverIndexes = array_keys( $weightByServer );
98 $newScalesByServer = $states[
'weightScales'];
99 foreach ( $weightByServer as $i => $weight ) {
100 if ( isset( $newScalesByServer[$i] ) ) {
101 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
103 $host = $this->lb->getServerName( $i );
104 $this->replLogger->error( __METHOD__ .
": host $host not in cache" );
109 final public function getLagTimes( array $serverIndexes, $domain ) {
121 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
124 $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
129 $value = $this->srvCache->get( $srvCacheKey );
130 if ( $value && $value[
'timestamp'] > $minAsOfTime ) {
131 $this->replLogger->debug( __METHOD__ .
": used fresh '$cluster' cluster status" );
137 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
138 if ( !$scopedLock && $value ) {
139 $this->replLogger->debug( __METHOD__ .
": used stale '$cluster' cluster status" );
145 $staleValue = $value;
147 $value = $this->wanCache->getWithSetCallback(
149 self::TIME_TILL_REFRESH,
150 function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
155 if ( !$scopedLock ) {
156 throw new RuntimeException(
157 "Circular recursion detected while regenerating server states cache. " .
158 "This may indicate improper connection handling in " . get_class( $this )
167 $oldValue ?: $staleValue
172 'lockTSE' => self::STATE_PRESERVE_TTL,
173 'staleTTL' => self::STATE_PRESERVE_TTL,
181 $this->replLogger->info( __METHOD__ .
": regenerated '$cluster' cluster status" );
183 $this->replLogger->debug( __METHOD__ .
": used cached '$cluster' cluster status" );
188 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
203 if ( $this->lb->getServerCount() <= 1 ) {
207 $priorScales = $priorStates ? $priorStates[
'weightScales'] : [];
211 foreach ( $serverIndexes as $i ) {
212 $isPrimary = ( $i == $this->lb->getWriterIndex() );
216 if ( $isPrimary && $this->lb->getServerInfo( $i )[
'load'] <= 0 ) {
219 $weightScales[$i] = 1.0;
223 $host = $this->lb->getServerName( $i );
224 # Handles with open transactions are avoided since they might be subject
225 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
227 $conn = $this->lb->getAnyOpenConnection( $i, $flags );
237 $lastScale = $priorScales[$i] ?? 1.0;
242 $this->movingAveRatio
246 $weightScales[$i] = max( $newScale, 0.0 );
250 $lagTimes[$i] = $isPrimary ? 0 :
false;
251 $this->replLogger->error(
252 __METHOD__ .
": host {db_server} is unreachable",
253 [
'db_server' => $host ]
260 $lagTimes[$i] = $conn->getLag();
263 $lagTimes[$i] =
false;
266 if ( $lagTimes[$i] ===
false ) {
267 $this->replLogger->error(
268 __METHOD__ .
": host {db_server} is not replicating?",
269 [
'db_server' => $host ]
271 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
272 $this->replLogger->warning(
273 "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
275 'db_server' => $host,
276 'lag' => $lagTimes[$i],
277 'maxlag' => $this->lagWarnThreshold
283 # Close the connection to avoid sleeper connections piling up.
284 # Note that the caller will pick one of these DBs and reconnect,
285 # which is slightly inefficient, but this only matters for the lag
286 # time cache miss cache, which is far less common that cache hits.
287 $this->lb->closeConnection( $conn );
292 'lagTimes' => $lagTimes,
293 'weightScales' => $weightScales,
304 'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
305 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
317 return $conn ? 1.0 : 0.0;
354 return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
363 sort( $serverIndexes );
365 return $cache->makeGlobalKey(
366 'rdbms-server-states',
368 $this->lb->getServerName( $this->lb->getWriterIndex() ),
369 implode(
'-', $serverIndexes )
377 if ( $this->serverStatesKeyLocked ) {
381 $this->serverStatesKeyLocked =
true;
383 return new ScopedCallback(
function () {
384 $this->serverStatesKeyLocked =
false;
393 return $this->wallClockOverride ?: microtime(
true );
401 $this->wallClockOverride =& $time;
Class representing a cache/ephemeral data store.
Multi-datacenter aware caching interface.