24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
56 $this->wanCache = $wCache;
57 $this->replLogger =
new NullLogger();
59 $this->movingAveRatio = isset(
$options[
'movingAveRatio'] )
64 public function setLogger( LoggerInterface $logger ) {
65 $this->replLogger = $logger;
69 $serverIndexes = array_keys( $weightByServer );
71 $coefficientsByServer = $states[
'weightScales'];
72 foreach ( $weightByServer
as $i => $weight ) {
73 if ( isset( $coefficientsByServer[$i] ) ) {
74 $weightByServer[$i] = $weight * $coefficientsByServer[$i];
76 $host = $this->parent->getServerName( $i );
77 $this->replLogger->error( __METHOD__ .
": host $host not in cache" );
85 return $states[
'lagTimes'];
89 $writerIndex = $this->parent->getWriterIndex();
90 if (
count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
91 # Single server only, just return zero without caching
93 'lagTimes' => [ $writerIndex => 0 ],
94 'weightScales' => [ $writerIndex => 1.0 ]
99 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
100 $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
101 # Keep keys around longer as fallbacks
104 # (a) Check the local APC cache
105 $value = $this->srvCache->get( $key );
106 if (
$value &&
$value[
'timestamp'] > ( microtime(
true ) - $ttl ) ) {
107 $this->replLogger->debug( __METHOD__ .
": got lag times ($key) from local cache" );
110 $staleValue =
$value ?:
false;
112 # (b) Check the shared cache and backfill APC
113 $value = $this->wanCache->get( $key );
114 if (
$value &&
$value[
'timestamp'] > ( microtime(
true ) - $ttl ) ) {
115 $this->srvCache->set( $key,
$value, $staleTTL );
116 $this->replLogger->debug( __METHOD__ .
": got lag times ($key) from main cache" );
120 $staleValue =
$value ?: $staleValue;
122 # (c) Cache key missing or expired; regenerate and backfill
123 if ( $this->srvCache->lock( $key, 0, 10 ) ) {
124 # Let only this process update the cache value on this server
127 $unlocker =
new ScopedCallback(
function ()
use ( $sCache, $key ) {
128 $sCache->unlock( $key );
130 } elseif ( $staleValue ) {
131 # Could not acquire lock but an old cache exists, so use it
138 foreach ( $serverIndexes
as $i ) {
139 if ( $i == $this->parent->getWriterIndex() ) {
141 $weightScales[$i] = 1.0;
145 $conn = $this->parent->getAnyOpenConnection( $i );
149 $conn = $this->parent->openConnection( $i,
'' );
153 $lastWeight = isset( $staleValue[
'weightScales'][$i] )
154 ? $staleValue[
'weightScales'][$i]
157 $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
160 $weightScales[$i] = max( $newWeight, 0.10 );
163 $lagTimes[$i] =
false;
164 $host = $this->parent->getServerName( $i );
165 $this->replLogger->error(
166 __METHOD__ .
": host {db_server} is unreachable",
167 [
'db_server' => $host ]
172 if ( $conn->getLBInfo(
'is static' ) ) {
175 $lagTimes[$i] = $conn->getLag();
176 if ( $lagTimes[$i] ===
false ) {
177 $host = $this->parent->getServerName( $i );
178 $this->replLogger->error(
179 __METHOD__ .
": host {db_server} is not replicating?",
180 [
'db_server' => $host ]
186 # Close the connection to avoid sleeper connections piling up.
187 # Note that the caller will pick one of these DBs and reconnect,
188 # which is slightly inefficient, but this only matters for the lag
189 # time cache miss cache, which is far less common that cache hits.
190 $this->parent->closeConnection( $conn );
194 # Add a timestamp key so we know when it was cached
196 'lagTimes' => $lagTimes,
197 'weightScales' => $weightScales,
198 'timestamp' => microtime(
true )
200 $this->wanCache->set( $key,
$value, $staleTTL );
201 $this->srvCache->set( $key,
$value, $staleTTL );
202 $this->replLogger->info( __METHOD__ .
": re-calculated lag times ($key)" );
213 return $conn ? 1.0 : 0.0;
217 sort( $serverIndexes );
219 return $this->srvCache->makeGlobalKey(
222 $this->parent->getServerName( $this->parent->getWriterIndex() ),
223 implode(
'-', $serverIndexes )