24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
54 const LAG_WARN_THRESHOLD = 10;
69 $this->wanCache = $wCache;
70 $this->replLogger =
new NullLogger();
72 $this->movingAveRatio = $options[
'movingAveRatio'] ?? 0.1;
73 $this->lagWarnThreshold = $options[
'lagWarnThreshold'] ?? self::LAG_WARN_THRESHOLD;
76 public function setLogger( LoggerInterface $logger ) {
77 $this->replLogger = $logger;
80 final public function scaleLoads( array &$weightByServer, $domain ) {
81 $serverIndexes = array_keys( $weightByServer );
83 $newScalesByServer = $states[
'weightScales'];
84 foreach ( $weightByServer as $i => $weight ) {
85 if ( isset( $newScalesByServer[$i] ) ) {
86 $weightByServer[$i] = $weight * $newScalesByServer[$i];
88 $host = $this->lb->getServerName( $i );
89 $this->replLogger->error( __METHOD__ .
": host $host not in cache" );
94 final public function getLagTimes( array $serverIndexes, $domain ) {
99 $writerIndex = $this->lb->getWriterIndex();
100 if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
101 # Single server only, just return zero without caching
103 'lagTimes' => [ $writerIndex => 0 ],
104 'weightScales' => [ $writerIndex => 1.0 ]
109 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
111 $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
112 # Keep keys around longer as fallbacks
115 # (a) Check the local APC cache
116 $value = $this->srvCache->get( $key );
117 if ( $value && $value[
'timestamp'] > ( microtime(
true ) - $ttl ) ) {
118 $this->replLogger->debug( __METHOD__ .
": got lag times ($key) from local cache" );
121 $staleValue = $value ?:
false;
123 # (b) Check the shared cache and backfill APC
124 $value = $this->wanCache->get( $key );
125 if ( $value && $value[
'timestamp'] > ( microtime(
true ) - $ttl ) ) {
126 $this->srvCache->set( $key, $value, $staleTTL );
127 $this->replLogger->debug( __METHOD__ .
": got lag times ($key) from main cache" );
131 $staleValue = $value ?: $staleValue;
133 # (c) Cache key missing or expired; regenerate and backfill
134 if ( $this->srvCache->lock( $key, 0, 10 ) ) {
135 # Let only this process update the cache value on this server
138 $unlocker =
new ScopedCallback(
function () use ( $sCache, $key ) {
139 $sCache->unlock( $key );
141 } elseif ( $staleValue ) {
142 # Could not acquire lock but an old cache exists, so use it
149 foreach ( $serverIndexes as $i ) {
150 if ( $i == $this->lb->getWriterIndex() ) {
152 $weightScales[$i] = 1.0;
156 # Handles with open transactions are avoided since they might be subject
157 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
158 $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT | ILoadBalancer::CONN_SILENCE_ERRORS;
159 $conn = $this->lb->getAnyOpenConnection( $i, $flags );
164 $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
168 $lastWeight = $staleValue[
'weightScales'][$i] ?? 1.0;
170 $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
173 $weightScales[$i] = max( $newWeight, 0.10 );
175 $host = $this->lb->getServerName( $i );
178 $lagTimes[$i] =
false;
179 $this->replLogger->error(
180 __METHOD__ .
": host {db_server} is unreachable",
181 [
'db_server' => $host ]
186 $lagTimes[$i] = $conn->getLag();
187 if ( $lagTimes[$i] ===
false ) {
188 $this->replLogger->error(
189 __METHOD__ .
": host {db_server} is not replicating?",
190 [
'db_server' => $host ]
192 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
193 $this->replLogger->warning(
194 "Server {host} has {lag} seconds of lag (>= {maxlag})",
197 'lag' => $lagTimes[$i],
198 'maxlag' => $this->lagWarnThreshold
204 # Close the connection to avoid sleeper connections piling up.
205 # Note that the caller will pick one of these DBs and reconnect,
206 # which is slightly inefficient, but this only matters for the lag
207 # time cache miss cache, which is far less common that cache hits.
208 $this->lb->closeConnection( $conn );
212 # Add a timestamp key so we know when it was cached
214 'lagTimes' => $lagTimes,
215 'weightScales' => $weightScales,
216 'timestamp' => microtime(
true )
218 $this->wanCache->set( $key, $value, $staleTTL );
219 $this->srvCache->set( $key, $value, $staleTTL );
220 $this->replLogger->info( __METHOD__ .
": re-calculated lag times ($key)" );
231 return $conn ? 1.0 : 0.0;
235 sort( $serverIndexes );
237 return $this->srvCache->makeGlobalKey(
240 $this->lb->getServerName( $this->lb->getWriterIndex() ),
241 implode(
'-', $serverIndexes )