24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
56 $this->replLogger =
new NullLogger();
58 $this->movingAveRatio = isset(
$options[
'movingAveRatio'] )
63 public function setLogger( LoggerInterface $logger ) {
64 $this->replLogger = $logger;
68 $serverIndexes = array_keys( $weightByServer );
70 $coefficientsByServer = $states[
'weightScales'];
71 foreach ( $weightByServer
as $i => $weight ) {
72 if ( isset( $coefficientsByServer[$i] ) ) {
73 $weightByServer[$i] = $weight * $coefficientsByServer[$i];
75 $host = $this->parent->getServerName( $i );
76 $this->replLogger->error( __METHOD__ .
": host $host not in cache" );
84 return $states[
'lagTimes'];
88 $writerIndex = $this->parent->getWriterIndex();
89 if (
count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
90 # Single server only, just return zero without caching
92 'lagTimes' => [ $writerIndex => 0 ],
93 'weightScales' => [ $writerIndex => 1.0 ]
98 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
99 $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
100 # Keep keys around longer as fallbacks
103 # (a) Check the local APC cache
104 $value = $this->srvCache->get( $key );
105 if (
$value &&
$value[
'timestamp'] > ( microtime(
true ) - $ttl ) ) {
106 $this->replLogger->debug( __METHOD__ .
": got lag times ($key) from local cache" );
109 $staleValue =
$value ?:
false;
111 # (b) Check the shared cache and backfill APC
112 $value = $this->mainCache->get( $key );
113 if (
$value &&
$value[
'timestamp'] > ( microtime(
true ) - $ttl ) ) {
114 $this->srvCache->set( $key,
$value, $staleTTL );
115 $this->replLogger->debug( __METHOD__ .
": got lag times ($key) from main cache" );
119 $staleValue =
$value ?: $staleValue;
121 # (c) Cache key missing or expired; regenerate and backfill
122 if ( $this->mainCache->lock( $key, 0, 10 ) ) {
123 # Let this process alone update the cache value
126 $unlocker =
new ScopedCallback(
function ()
use (
$cache, $key ) {
129 } elseif ( $staleValue ) {
130 # Could not acquire lock but an old cache exists, so use it
137 foreach ( $serverIndexes
as $i ) {
138 if ( $i == $this->parent->getWriterIndex() ) {
140 $weightScales[$i] = 1.0;
144 $conn = $this->parent->getAnyOpenConnection( $i );
148 $conn = $this->parent->openConnection( $i, $domain );
152 $lastWeight = isset( $staleValue[
'weightScales'][$i] )
153 ? $staleValue[
'weightScales'][$i]
156 $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
159 $weightScales[$i] = max( $newWeight, .10 );
162 $lagTimes[$i] =
false;
163 $host = $this->parent->getServerName( $i );
164 $this->replLogger->error( __METHOD__ .
": host $host is unreachable" );
168 if ( $conn->getLBInfo(
'is static' ) ) {
171 $lagTimes[$i] = $conn->getLag();
172 if ( $lagTimes[$i] ===
false ) {
173 $host = $this->parent->getServerName( $i );
174 $this->replLogger->error( __METHOD__ .
": host $host is not replicating?" );
179 # Close the connection to avoid sleeper connections piling up.
180 # Note that the caller will pick one of these DBs and reconnect,
181 # which is slightly inefficient, but this only matters for the lag
182 # time cache miss cache, which is far less common that cache hits.
183 $this->parent->closeConnection( $conn );
187 # Add a timestamp key so we know when it was cached
189 'lagTimes' => $lagTimes,
190 'weightScales' => $weightScales,
191 'timestamp' => microtime(
true )
193 $this->mainCache->set( $key,
$value, $staleTTL );
194 $this->srvCache->set( $key,
$value, $staleTTL );
195 $this->replLogger->info( __METHOD__ .
": re-calculated lag times ($key)" );
206 return $conn ? 1.0 : 0.0;
210 sort( $serverIndexes );
212 return $this->srvCache->makeGlobalKey(
215 $this->parent->getServerName( $this->parent->getWriterIndex() ),
216 implode(
'-', $serverIndexes )