MediaWiki  1.34.0
LoadMonitor.php
Go to the documentation of this file.
1 <?php
22 namespace Wikimedia\Rdbms;
23 
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
27 use BagOStuff;
28 use WANObjectCache;
29 
36 class LoadMonitor implements ILoadMonitor {
38  protected $lb;
40  protected $srvCache;
42  protected $wanCache;
44  protected $replLogger;
45 
47  private $movingAveRatio;
50 
52  const VERSION = 1;
54  const LAG_WARN_THRESHOLD = 10;
55 
64  public function __construct(
65  ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
66  ) {
67  $this->lb = $lb;
68  $this->srvCache = $srvCache;
69  $this->wanCache = $wCache;
70  $this->replLogger = new NullLogger();
71 
72  $this->movingAveRatio = $options['movingAveRatio'] ?? 0.1;
73  $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? self::LAG_WARN_THRESHOLD;
74  }
75 
76  public function setLogger( LoggerInterface $logger ) {
77  $this->replLogger = $logger;
78  }
79 
80  final public function scaleLoads( array &$weightByServer, $domain ) {
81  $serverIndexes = array_keys( $weightByServer );
82  $states = $this->getServerStates( $serverIndexes, $domain );
83  $newScalesByServer = $states['weightScales'];
84  foreach ( $weightByServer as $i => $weight ) {
85  if ( isset( $newScalesByServer[$i] ) ) {
86  $weightByServer[$i] = $weight * $newScalesByServer[$i];
87  } else { // server recently added to config?
88  $host = $this->lb->getServerName( $i );
89  $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
90  }
91  }
92  }
93 
94  final public function getLagTimes( array $serverIndexes, $domain ) {
95  return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
96  }
97 
98  protected function getServerStates( array $serverIndexes, $domain ) {
99  $writerIndex = $this->lb->getWriterIndex();
100  if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
101  # Single server only, just return zero without caching
102  return [
103  'lagTimes' => [ $writerIndex => 0 ],
104  'weightScales' => [ $writerIndex => 1.0 ]
105  ];
106  }
107 
108  $key = $this->getCacheKey( $serverIndexes );
109  # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
110  // @phan-suppress-next-line PhanTypeMismatchArgumentInternal
111  $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
112  # Keep keys around longer as fallbacks
113  $staleTTL = 60;
114 
115  # (a) Check the local APC cache
116  $value = $this->srvCache->get( $key );
117  if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
118  $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from local cache" );
119  return $value; // cache hit
120  }
121  $staleValue = $value ?: false;
122 
123  # (b) Check the shared cache and backfill APC
124  $value = $this->wanCache->get( $key );
125  if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
126  $this->srvCache->set( $key, $value, $staleTTL );
127  $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from main cache" );
128 
129  return $value; // cache hit
130  }
131  $staleValue = $value ?: $staleValue;
132 
133  # (c) Cache key missing or expired; regenerate and backfill
134  if ( $this->srvCache->lock( $key, 0, 10 ) ) {
135  # Let only this process update the cache value on this server
136  $sCache = $this->srvCache;
138  $unlocker = new ScopedCallback( function () use ( $sCache, $key ) {
139  $sCache->unlock( $key );
140  } );
141  } elseif ( $staleValue ) {
142  # Could not acquire lock but an old cache exists, so use it
143  return $staleValue;
144  }
145 
146  $lagTimes = [];
147  $weightScales = [];
148  $movAveRatio = $this->movingAveRatio;
149  foreach ( $serverIndexes as $i ) {
150  if ( $i == $this->lb->getWriterIndex() ) {
151  $lagTimes[$i] = 0; // master always has no lag
152  $weightScales[$i] = 1.0; // nominal weight
153  continue;
154  }
155 
156  # Handles with open transactions are avoided since they might be subject
157  # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
158  $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT | ILoadBalancer::CONN_SILENCE_ERRORS;
159  $conn = $this->lb->getAnyOpenConnection( $i, $flags );
160  if ( $conn ) {
161  $close = false; // already open
162  } else {
163  // Get a connection to this server without triggering other server connections
164  $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
165  $close = true; // new connection
166  }
167 
168  $lastWeight = $staleValue['weightScales'][$i] ?? 1.0;
169  $coefficient = $this->getWeightScale( $i, $conn ?: null );
170  $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
171 
172  // Scale from 10% to 100% of nominal weight
173  $weightScales[$i] = max( $newWeight, 0.10 );
174 
175  $host = $this->lb->getServerName( $i );
176 
177  if ( !$conn ) {
178  $lagTimes[$i] = false;
179  $this->replLogger->error(
180  __METHOD__ . ": host {db_server} is unreachable",
181  [ 'db_server' => $host ]
182  );
183  continue;
184  }
185 
186  $lagTimes[$i] = $conn->getLag();
187  if ( $lagTimes[$i] === false ) {
188  $this->replLogger->error(
189  __METHOD__ . ": host {db_server} is not replicating?",
190  [ 'db_server' => $host ]
191  );
192  } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
193  $this->replLogger->warning(
194  "Server {host} has {lag} seconds of lag (>= {maxlag})",
195  [
196  'host' => $host,
197  'lag' => $lagTimes[$i],
198  'maxlag' => $this->lagWarnThreshold
199  ]
200  );
201  }
202 
203  if ( $close ) {
204  # Close the connection to avoid sleeper connections piling up.
205  # Note that the caller will pick one of these DBs and reconnect,
206  # which is slightly inefficient, but this only matters for the lag
207  # time cache miss cache, which is far less common that cache hits.
208  $this->lb->closeConnection( $conn );
209  }
210  }
211 
212  # Add a timestamp key so we know when it was cached
213  $value = [
214  'lagTimes' => $lagTimes,
215  'weightScales' => $weightScales,
216  'timestamp' => microtime( true )
217  ];
218  $this->wanCache->set( $key, $value, $staleTTL );
219  $this->srvCache->set( $key, $value, $staleTTL );
220  $this->replLogger->info( __METHOD__ . ": re-calculated lag times ($key)" );
221 
222  return $value;
223  }
224 
230  protected function getWeightScale( $index, IDatabase $conn = null ) {
231  return $conn ? 1.0 : 0.0;
232  }
233 
234  private function getCacheKey( array $serverIndexes ) {
235  sort( $serverIndexes );
236  // Lag is per-server, not per-DB, so key on the master DB name
237  return $this->srvCache->makeGlobalKey(
238  'lag-times',
239  self::VERSION,
240  $this->lb->getServerName( $this->lb->getWriterIndex() ),
241  implode( '-', $serverIndexes )
242  );
243  }
244 }
Wikimedia\Rdbms\LoadMonitor\$replLogger
LoggerInterface $replLogger
Definition: LoadMonitor.php:44
Wikimedia\Rdbms\LoadMonitor
Basic DB load monitor with no external dependencies Uses memcached to cache the replication lag for a...
Definition: LoadMonitor.php:36
Wikimedia\Rdbms\LoadMonitor\$srvCache
BagOStuff $srvCache
Definition: LoadMonitor.php:40
Wikimedia\Rdbms\ILoadMonitor
An interface for database load monitoring.
Definition: ILoadMonitor.php:35
Wikimedia\Rdbms
Definition: ChronologyProtector.php:24
Wikimedia\Rdbms\LoadMonitor\getCacheKey
getCacheKey(array $serverIndexes)
Definition: LoadMonitor.php:234
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:63
Wikimedia\Rdbms\LoadMonitor\getWeightScale
getWeightScale( $index, IDatabase $conn=null)
Definition: LoadMonitor.php:230
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
Wikimedia\Rdbms\LoadMonitor\$wanCache
WANObjectCache $wanCache
Definition: LoadMonitor.php:42
Wikimedia\Rdbms\LoadMonitor\$lagWarnThreshold
int $lagWarnThreshold
Amount of replication lag in seconds before warnings are logged.
Definition: LoadMonitor.php:49
Wikimedia\Rdbms\LoadMonitor\$movingAveRatio
float $movingAveRatio
Moving average ratio (e.g.
Definition: LoadMonitor.php:47
Wikimedia\Rdbms\LoadMonitor\$lb
ILoadBalancer $lb
Definition: LoadMonitor.php:38
Wikimedia\Rdbms\LoadMonitor\scaleLoads
scaleLoads(array &$weightByServer, $domain)
Perform load ratio adjustment before deciding which server to use.
Definition: LoadMonitor.php:80
Wikimedia\Rdbms\LoadMonitor\getServerStates
getServerStates(array $serverIndexes, $domain)
Definition: LoadMonitor.php:98
Wikimedia\Rdbms\LoadMonitor\setLogger
setLogger(LoggerInterface $logger)
Definition: LoadMonitor.php:76
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:116
Wikimedia\Rdbms\LoadMonitor\getLagTimes
getLagTimes(array $serverIndexes, $domain)
Get an estimate of replication lag (in seconds) for each server.
Definition: LoadMonitor.php:94
Wikimedia\Rdbms\LoadMonitor\__construct
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
Definition: LoadMonitor.php:64
Wikimedia\Rdbms\ILoadBalancer
Database cluster connection, tracking, load balancing, and transaction manager interface.
Definition: ILoadBalancer.php:81