24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
54 const LAG_WARN_THRESHOLD = 10;
69 $this->wanCache = $wCache;
70 $this->replLogger =
new NullLogger();
72 $this->movingAveRatio =
$options[
'movingAveRatio'] ?? 0.1;
73 $this->lagWarnThreshold =
$options[
'lagWarnThreshold'] ?? self::LAG_WARN_THRESHOLD;
76 public function setLogger( LoggerInterface $logger ) {
77 $this->replLogger = $logger;
81 $serverIndexes = array_keys( $weightByServer );
83 $newScalesByServer = $states[
'weightScales'];
84 foreach ( $weightByServer
as $i => $weight ) {
85 if ( isset( $newScalesByServer[$i] ) ) {
86 $weightByServer[$i] = $weight * $newScalesByServer[$i];
88 $host = $this->parent->getServerName( $i );
89 $this->replLogger->error( __METHOD__ .
": host $host not in cache" );
99 $writerIndex = $this->parent->getWriterIndex();
100 if (
count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
101 # Single server only, just return zero without caching
103 'lagTimes' => [ $writerIndex => 0 ],
104 'weightScales' => [ $writerIndex => 1.0 ]
109 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
110 $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
111 # Keep keys around longer as fallbacks
114 # (a) Check the local APC cache
115 $value = $this->srvCache->get( $key );
116 if (
$value &&
$value[
'timestamp'] > ( microtime(
true ) - $ttl ) ) {
117 $this->replLogger->debug( __METHOD__ .
": got lag times ($key) from local cache" );
120 $staleValue =
$value ?:
false;
122 # (b) Check the shared cache and backfill APC
123 $value = $this->wanCache->get( $key );
124 if (
$value &&
$value[
'timestamp'] > ( microtime(
true ) - $ttl ) ) {
125 $this->srvCache->set( $key,
$value, $staleTTL );
126 $this->replLogger->debug( __METHOD__ .
": got lag times ($key) from main cache" );
130 $staleValue =
$value ?: $staleValue;
132 # (c) Cache key missing or expired; regenerate and backfill
133 if ( $this->srvCache->lock( $key, 0, 10 ) ) {
134 # Let only this process update the cache value on this server
137 $unlocker =
new ScopedCallback(
function ()
use ( $sCache, $key ) {
138 $sCache->unlock( $key );
140 } elseif ( $staleValue ) {
141 # Could not acquire lock but an old cache exists, so use it
148 foreach ( $serverIndexes
as $i ) {
149 if ( $i == $this->parent->getWriterIndex() ) {
151 $weightScales[$i] = 1.0;
155 # Handles with open transactions are avoided since they might be subject
156 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
157 $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT;
158 $conn = $this->parent->getAnyOpenConnection( $i, $flags );
162 $conn = $this->parent->openConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
166 $lastWeight = $staleValue[
'weightScales'][$i] ?? 1.0;
168 $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
171 $weightScales[$i] = max( $newWeight, 0.10 );
173 $host = $this->parent->getServerName( $i );
176 $lagTimes[$i] =
false;
177 $this->replLogger->error(
178 __METHOD__ .
": host {db_server} is unreachable",
179 [
'db_server' => $host ]
184 if ( $conn->getLBInfo(
'is static' ) ) {
187 $lagTimes[$i] = $conn->getLag();
188 if ( $lagTimes[$i] ===
false ) {
189 $this->replLogger->error(
190 __METHOD__ .
": host {db_server} is not replicating?",
191 [
'db_server' => $host ]
193 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
194 $this->replLogger->error(
195 "Server {host} has {lag} seconds of lag (>= {maxlag})",
198 'lag' => $lagTimes[$i],
199 'maxlag' => $this->lagWarnThreshold
206 # Close the connection to avoid sleeper connections piling up.
207 # Note that the caller will pick one of these DBs and reconnect,
208 # which is slightly inefficient, but this only matters for the lag
209 # time cache miss cache, which is far less common that cache hits.
210 $this->parent->closeConnection( $conn );
214 # Add a timestamp key so we know when it was cached
216 'lagTimes' => $lagTimes,
217 'weightScales' => $weightScales,
218 'timestamp' => microtime(
true )
220 $this->wanCache->set( $key,
$value, $staleTTL );
221 $this->srvCache->set( $key,
$value, $staleTTL );
222 $this->replLogger->info( __METHOD__ .
": re-calculated lag times ($key)" );
233 return $conn ? 1.0 : 0.0;
237 sort( $serverIndexes );
239 return $this->srvCache->makeGlobalKey(
242 $this->parent->getServerName( $this->parent->getWriterIndex() ),
243 implode(
'-', $serverIndexes )