MediaWiki  master
LoadMonitor.php
Go to the documentation of this file.
1 <?php
22 namespace Wikimedia\Rdbms;
23 
24 use BagOStuff;
25 use Psr\Log\LoggerInterface;
26 use Psr\Log\NullLogger;
27 use RuntimeException;
28 use WANObjectCache;
29 use Wikimedia\ScopedCallback;
30 
41 class LoadMonitor implements ILoadMonitor {
43  protected $lb;
45  protected $srvCache;
47  protected $wanCache;
49  protected $replLogger;
50 
52  private $movingAveRatio;
55 
58 
60  private $serverStatesKeyLocked = false;
61 
63  private const VERSION = 1;
65  private const POLL_PERIOD_MS = 500;
67  private const STATE_PRESERVE_TTL = 60;
69  private const TIME_TILL_REFRESH = 1;
70 
79  public function __construct(
80  ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
81  ) {
82  $this->lb = $lb;
83  $this->srvCache = $srvCache;
84  $this->wanCache = $wCache;
85  $this->replLogger = new NullLogger();
86 
87  $this->movingAveRatio = $options['movingAveRatio'] ?? 0.1;
88  $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? LoadBalancer::MAX_LAG_DEFAULT;
89  }
90 
91  public function setLogger( LoggerInterface $logger ) {
92  $this->replLogger = $logger;
93  }
94 
95  final public function scaleLoads( array &$weightByServer, $domain ) {
96  $serverIndexes = array_keys( $weightByServer );
97  $states = $this->getServerStates( $serverIndexes, $domain );
98  $newScalesByServer = $states['weightScales'];
99  foreach ( $weightByServer as $i => $weight ) {
100  if ( isset( $newScalesByServer[$i] ) ) {
101  $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
102  } else { // server recently added to config?
103  $host = $this->lb->getServerName( $i );
104  $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
105  }
106  }
107  }
108 
109  final public function getLagTimes( array $serverIndexes, $domain ) {
110  return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
111  }
112 
119  protected function getServerStates( array $serverIndexes, $domain ) {
120  // Represent the cluster by the name of the primary DB
121  $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
122 
123  // Randomize logical TTLs to reduce stampedes
124  $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
125  $minAsOfTime = $this->getCurrentTime() - $ageStaleSec;
126 
127  // (a) Check the local server cache
128  $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
129  $value = $this->srvCache->get( $srvCacheKey );
130  if ( $value && $value['timestamp'] > $minAsOfTime ) {
131  $this->replLogger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" );
132 
133  return $value; // cache hit
134  }
135 
136  // (b) Value is stale/missing; try to use/refresh the shared cache
137  $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
138  if ( !$scopedLock && $value ) {
139  $this->replLogger->debug( __METHOD__ . ": used stale '$cluster' cluster status" );
140  // (b1) Another thread on this server is already checking the shared cache
141  return $value;
142  }
143 
144  // (b2) This thread gets to check the shared cache or (b3) value is missing
145  $staleValue = $value;
146  $updated = false; // whether the regeneration callback ran
147  $value = $this->wanCache->getWithSetCallback(
148  $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
149  self::TIME_TILL_REFRESH, // 1 second logical expiry
150  function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
151  // Double check for circular recursion in computeServerStates()/getWeightScale().
152  // Mainly, connection attempts should use LoadBalancer::getServerConnection()
153  // rather than something that will pick a server based on the server states.
154  $scopedLock = $this->acquireServerStatesLoopGuard();
155  if ( !$scopedLock ) {
156  throw new RuntimeException(
157  "Circular recursion detected while regenerating server states cache. " .
158  "This may indicate improper connection handling in " . get_class( $this )
159  );
160  }
161 
162  $updated = true;
163 
164  return $this->computeServerStates(
165  $serverIndexes,
166  $domain,
167  $oldValue ?: $staleValue // fallback to local cache stale value
168  );
169  },
170  [
171  // One thread can update at a time; others use the old value
172  'lockTSE' => self::STATE_PRESERVE_TTL,
173  'staleTTL' => self::STATE_PRESERVE_TTL,
174  // If there is no shared stale value then use the local cache stale value;
175  // When even that is not possible, then use the trivial value below.
176  'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
177  ]
178  );
179 
180  if ( $updated ) {
181  $this->replLogger->info( __METHOD__ . ": regenerated '$cluster' cluster status" );
182  } else {
183  $this->replLogger->debug( __METHOD__ . ": used cached '$cluster' cluster status" );
184  }
185 
186  // Backfill the local server cache
187  if ( $scopedLock ) {
188  $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
189  }
190 
191  return $value;
192  }
193 
201  protected function computeServerStates( array $serverIndexes, $domain, $priorStates ) {
202  // Check if there is just a primary DB (no replication involved)
203  if ( $this->lb->getServerCount() <= 1 ) {
204  return $this->getPlaceholderServerStates( $serverIndexes );
205  }
206 
207  $priorScales = $priorStates ? $priorStates['weightScales'] : [];
208 
209  $lagTimes = [];
210  $weightScales = [];
211  foreach ( $serverIndexes as $i ) {
212  $isPrimary = ( $i == $this->lb->getWriterIndex() );
213  // If the primary DB has zero load, then typical read queries do not use it.
214  // In that case, avoid connecting to it since this method might run in any
215  // datacenter, and the primary DB might be geographically remote.
216  if ( $isPrimary && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
217  $lagTimes[$i] = 0;
218  // Callers only use this DB if they have *no choice* anyway (e.g. writes)
219  $weightScales[$i] = 1.0;
220  continue;
221  }
222 
223  $host = $this->lb->getServerName( $i );
224  # Handles with open transactions are avoided since they might be subject
225  # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
227  $conn = $this->lb->getAnyOpenConnection( $i, $flags );
228  if ( $conn ) {
229  $close = false; // already open
230  } else {
231  // Get a connection to this server without triggering other server connections
232  $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
233  $close = true; // new connection
234  }
235 
236  // Get new weight scale using a moving average of the na├»ve and prior values
237  $lastScale = $priorScales[$i] ?? 1.0;
238  $naiveScale = $this->getWeightScale( $i, $conn ?: null );
239  $newScale = $this->getNewScaleViaMovingAve(
240  $lastScale,
241  $naiveScale,
242  $this->movingAveRatio
243  );
244 
245  // Scale from 0% to 100% of nominal weight
246  $weightScales[$i] = max( $newScale, 0.0 );
247 
248  // Mark replication lag on this server as "false" if it is unreacheable
249  if ( !$conn ) {
250  $lagTimes[$i] = $isPrimary ? 0 : false;
251  $this->replLogger->error(
252  __METHOD__ . ": host {db_server} is unreachable",
253  [ 'db_server' => $host ]
254  );
255  continue;
256  }
257 
258  // Determine the amount of replication lag on this server
259  try {
260  $lagTimes[$i] = $conn->getLag();
261  } catch ( DBError $e ) {
262  // Mark the lag time as "false" if it cannot be queried
263  $lagTimes[$i] = false;
264  }
265 
266  if ( $lagTimes[$i] === false ) {
267  $this->replLogger->error(
268  __METHOD__ . ": host {db_server} is not replicating?",
269  [ 'db_server' => $host ]
270  );
271  } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
272  $this->replLogger->warning(
273  "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
274  [
275  'db_server' => $host,
276  'lag' => $lagTimes[$i],
277  'maxlag' => $this->lagWarnThreshold
278  ]
279  );
280  }
281 
282  if ( $close ) {
283  # Close the connection to avoid sleeper connections piling up.
284  # Note that the caller will pick one of these DBs and reconnect,
285  # which is slightly inefficient, but this only matters for the lag
286  # time cache miss cache, which is far less common that cache hits.
287  $this->lb->closeConnection( $conn );
288  }
289  }
290 
291  return [
292  'lagTimes' => $lagTimes,
293  'weightScales' => $weightScales,
294  'timestamp' => $this->getCurrentTime()
295  ];
296  }
297 
302  private function getPlaceholderServerStates( array $serverIndexes ) {
303  return [
304  'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
305  'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
306  'timestamp' => $this->getCurrentTime()
307  ];
308  }
309 
316  protected function getWeightScale( $index, IDatabase $conn = null ) {
317  return $conn ? 1.0 : 0.0;
318  }
319 
353  protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio ) {
354  return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
355  }
356 
362  private function getStatesCacheKey( $cache, array $serverIndexes ) {
363  sort( $serverIndexes );
364  // Lag is per-server, not per-DB, so key on the primary DB name
365  return $cache->makeGlobalKey(
366  'rdbms-server-states',
367  self::VERSION,
368  $this->lb->getServerName( $this->lb->getWriterIndex() ),
369  implode( '-', $serverIndexes )
370  );
371  }
372 
376  private function acquireServerStatesLoopGuard() {
377  if ( $this->serverStatesKeyLocked ) {
378  return null; // locked
379  }
380 
381  $this->serverStatesKeyLocked = true; // lock
382 
383  return new ScopedCallback( function () {
384  $this->serverStatesKeyLocked = false; // unlock
385  } );
386  }
387 
392  protected function getCurrentTime() {
393  return $this->wallClockOverride ?: microtime( true );
394  }
395 
400  public function setMockTime( &$time ) {
401  $this->wallClockOverride =& $time;
402  }
403 }
Wikimedia\Rdbms\LoadMonitor\$replLogger
LoggerInterface $replLogger
Definition: LoadMonitor.php:49
Wikimedia\Rdbms\LoadMonitor
Basic DB load monitor with no external dependencies.
Definition: LoadMonitor.php:41
Wikimedia\Rdbms\LoadMonitor\acquireServerStatesLoopGuard
acquireServerStatesLoopGuard()
Definition: LoadMonitor.php:376
Wikimedia\Rdbms\LoadMonitor\$serverStatesKeyLocked
bool $serverStatesKeyLocked
Whether the "server states" cache key is in the process of being updated.
Definition: LoadMonitor.php:60
Wikimedia\Rdbms\LoadMonitor\$srvCache
BagOStuff $srvCache
Definition: LoadMonitor.php:45
Wikimedia\Rdbms\LoadMonitor\computeServerStates
computeServerStates(array $serverIndexes, $domain, $priorStates)
Definition: LoadMonitor.php:201
Wikimedia\Rdbms\ILoadMonitor
An interface for database load monitoring.
Definition: ILoadMonitor.php:35
Wikimedia\Rdbms\LoadMonitor\getPlaceholderServerStates
getPlaceholderServerStates(array $serverIndexes)
Definition: LoadMonitor.php:302
Wikimedia\Rdbms
Definition: ChronologyProtector.php:24
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:86
Wikimedia\Rdbms\LoadMonitor\getStatesCacheKey
getStatesCacheKey( $cache, array $serverIndexes)
Definition: LoadMonitor.php:362
Wikimedia\Rdbms\LoadMonitor\getWeightScale
getWeightScale( $index, IDatabase $conn=null)
Definition: LoadMonitor.php:316
Wikimedia\Rdbms\LoadBalancer\MAX_LAG_DEFAULT
const MAX_LAG_DEFAULT
Default 'maxLag' when unspecified.
Definition: LoadBalancer.php:155
Wikimedia\Rdbms\DBError
Database error base class @newable.
Definition: DBError.php:32
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
Wikimedia\Rdbms\LoadMonitor\$wallClockOverride
float null $wallClockOverride
Definition: LoadMonitor.php:57
Wikimedia\Rdbms\LoadMonitor\$wanCache
WANObjectCache $wanCache
Definition: LoadMonitor.php:47
Wikimedia\Rdbms\LoadMonitor\$lagWarnThreshold
int $lagWarnThreshold
Amount of replication lag in seconds before warnings are logged.
Definition: LoadMonitor.php:54
Wikimedia\Rdbms\LoadMonitor\$movingAveRatio
float $movingAveRatio
Moving average ratio (e.g.
Definition: LoadMonitor.php:52
Wikimedia\Rdbms\ILoadBalancer\CONN_SILENCE_ERRORS
const CONN_SILENCE_ERRORS
Return null on connection failure instead of throwing an exception.
Definition: ILoadBalancer.php:106
Wikimedia\Rdbms\LoadMonitor\$lb
ILoadBalancer $lb
Definition: LoadMonitor.php:43
Wikimedia\Rdbms\LoadMonitor\scaleLoads
scaleLoads(array &$weightByServer, $domain)
Perform load ratio adjustment before deciding which server to use.
Definition: LoadMonitor.php:95
Wikimedia\Rdbms\LoadMonitor\getServerStates
getServerStates(array $serverIndexes, $domain)
Definition: LoadMonitor.php:119
Wikimedia\Rdbms\ILoadBalancer\DOMAIN_ANY
const DOMAIN_ANY
Domain specifier when no specific database needs to be selected.
Definition: ILoadBalancer.php:99
Wikimedia\Rdbms\LoadMonitor\getCurrentTime
getCurrentTime()
Definition: LoadMonitor.php:392
Wikimedia\Rdbms\LoadMonitor\getNewScaleViaMovingAve
getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio)
Get the moving average weight scale given a naive and the last iteration value.
Definition: LoadMonitor.php:353
Wikimedia\Rdbms\LoadMonitor\setLogger
setLogger(LoggerInterface $logger)
Definition: LoadMonitor.php:91
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:131
Wikimedia\Rdbms\ILoadBalancer\CONN_TRX_AUTOCOMMIT
const CONN_TRX_AUTOCOMMIT
DB handle should have DBO_TRX disabled and the caller will leave it as such.
Definition: ILoadBalancer.php:104
Wikimedia\Rdbms\LoadMonitor\getLagTimes
getLagTimes(array $serverIndexes, $domain)
Get an estimate of replication lag (in seconds) for each server.
Definition: LoadMonitor.php:109
$cache
$cache
Definition: mcc.php:33
Wikimedia\Rdbms\LoadMonitor\__construct
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
Definition: LoadMonitor.php:79
Wikimedia\Rdbms\ILoadBalancer
Database cluster connection, tracking, load balancing, and transaction manager interface.
Definition: ILoadBalancer.php:81
Wikimedia\Rdbms\LoadMonitor\setMockTime
setMockTime(&$time)
Definition: LoadMonitor.php:400