MediaWiki  master
LoadMonitor.php
Go to the documentation of this file.
1 <?php
22 namespace Wikimedia\Rdbms;
23 
24 use BagOStuff;
25 use Psr\Log\LoggerInterface;
26 use Psr\Log\NullLogger;
27 use RuntimeException;
28 use WANObjectCache;
29 use Wikimedia\ScopedCallback;
30 
41 class LoadMonitor implements ILoadMonitor {
43  protected $lb;
45  protected $srvCache;
47  protected $wanCache;
49  protected $replLogger;
50 
52  private $movingAveRatio;
55 
58 
60  private $serverStatesKeyLocked = false;
61 
63  private const LAG_WARN_THRESHOLD = 10;
64 
66  private const VERSION = 1;
68  private const POLL_PERIOD_MS = 500;
70  private const STATE_PRESERVE_TTL = 60;
72  private const TIME_TILL_REFRESH = 1;
73 
82  public function __construct(
83  ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
84  ) {
85  $this->lb = $lb;
86  $this->srvCache = $srvCache;
87  $this->wanCache = $wCache;
88  $this->replLogger = new NullLogger();
89 
90  $this->movingAveRatio = $options['movingAveRatio'] ?? 0.1;
91  $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? self::LAG_WARN_THRESHOLD;
92  }
93 
94  public function setLogger( LoggerInterface $logger ) {
95  $this->replLogger = $logger;
96  }
97 
98  final public function scaleLoads( array &$weightByServer, $domain ) {
99  $serverIndexes = array_keys( $weightByServer );
100  $states = $this->getServerStates( $serverIndexes, $domain );
101  $newScalesByServer = $states['weightScales'];
102  foreach ( $weightByServer as $i => $weight ) {
103  if ( isset( $newScalesByServer[$i] ) ) {
104  $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
105  } else { // server recently added to config?
106  $host = $this->lb->getServerName( $i );
107  $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
108  }
109  }
110  }
111 
112  final public function getLagTimes( array $serverIndexes, $domain ) {
113  return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
114  }
115 
122  protected function getServerStates( array $serverIndexes, $domain ) {
123  // Represent the cluster by the name of the master DB
124  $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
125 
126  // Randomize logical TTLs to reduce stampedes
127  $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
128  $minAsOfTime = $this->getCurrentTime() - $ageStaleSec;
129 
130  // (a) Check the local server cache
131  $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
132  $value = $this->srvCache->get( $srvCacheKey );
133  if ( $value && $value['timestamp'] > $minAsOfTime ) {
134  $this->replLogger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" );
135 
136  return $value; // cache hit
137  }
138 
139  // (b) Value is stale/missing; try to use/refresh the shared cache
140  $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
141  if ( !$scopedLock && $value ) {
142  $this->replLogger->debug( __METHOD__ . ": used stale '$cluster' cluster status" );
143  // (b1) Another thread on this server is already checking the shared cache
144  return $value;
145  }
146 
147  // (b2) This thread gets to check the shared cache or (b3) value is missing
148  $staleValue = $value;
149  $updated = false; // whether the regeneration callback ran
150  $value = $this->wanCache->getWithSetCallback(
151  $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
152  self::TIME_TILL_REFRESH, // 1 second logical expiry
153  function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
154  // Sanity check for circular recursion in computeServerStates()/getWeightScale().
155  // Mainly, connection attempts should use LoadBalancer::getServerConnection()
156  // rather than something that will pick a server based on the server states.
157  $scopedLock = $this->acquireServerStatesLoopGuard();
158  if ( !$scopedLock ) {
159  throw new RuntimeException(
160  "Circular recursion detected while regenerating server states cache. " .
161  "This may indicate improper connection handling in " . get_class( $this )
162  );
163  }
164 
165  $updated = true;
166 
167  return $this->computeServerStates(
168  $serverIndexes,
169  $domain,
170  $oldValue ?: $staleValue // fallback to local cache stale value
171  );
172  },
173  [
174  // One thread can update at a time; others use the old value
175  'lockTSE' => self::STATE_PRESERVE_TTL,
176  'staleTTL' => self::STATE_PRESERVE_TTL,
177  // If there is no shared stale value then use the local cache stale value;
178  // When even that is not possible, then use the trivial value below.
179  'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
180  ]
181  );
182 
183  if ( $updated ) {
184  $this->replLogger->info( __METHOD__ . ": regenerated '$cluster' cluster status" );
185  } else {
186  $this->replLogger->debug( __METHOD__ . ": used cached '$cluster' cluster status" );
187  }
188 
189  // Backfill the local server cache
190  if ( $scopedLock ) {
191  $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
192  }
193 
194  return $value;
195  }
196 
204  protected function computeServerStates( array $serverIndexes, $domain, $priorStates ) {
205  // Check if there is just a master DB (no replication involved)
206  if ( $this->lb->getServerCount() <= 1 ) {
207  return $this->getPlaceholderServerStates( $serverIndexes );
208  }
209 
210  $priorScales = $priorStates ? $priorStates['weightScales'] : [];
211 
212  $lagTimes = [];
213  $weightScales = [];
214  foreach ( $serverIndexes as $i ) {
215  $isMaster = ( $i == $this->lb->getWriterIndex() );
216  // If the master DB has zero load, then typical read queries do not use it.
217  // In that case, avoid connecting to it since this method might run in any
218  // datacenter, and the master DB might be geographically remote.
219  if ( $isMaster && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
220  $lagTimes[$i] = 0;
221  // Callers only use this DB if they have *no choice* anyway (e.g. writes)
222  $weightScales[$i] = 1.0;
223  continue;
224  }
225 
226  $host = $this->lb->getServerName( $i );
227  # Handles with open transactions are avoided since they might be subject
228  # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
229  $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT | ILoadBalancer::CONN_SILENCE_ERRORS;
230  $conn = $this->lb->getAnyOpenConnection( $i, $flags );
231  if ( $conn ) {
232  $close = false; // already open
233  } else {
234  // Get a connection to this server without triggering other server connections
235  $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
236  $close = true; // new connection
237  }
238 
239  // Get new weight scale using a moving average of the na├»ve and prior values
240  $lastScale = $priorScales[$i] ?? 1.0;
241  $naiveScale = $this->getWeightScale( $i, $conn ?: null );
242  $newScale = $this->getNewScaleViaMovingAve(
243  $lastScale,
244  $naiveScale,
245  $this->movingAveRatio
246  );
247 
248  // Scale from 0% to 100% of nominal weight (sanity)
249  $weightScales[$i] = max( $newScale, 0.0 );
250 
251  // Mark replication lag on this server as "false" if it is unreacheable
252  if ( !$conn ) {
253  $lagTimes[$i] = $isMaster ? 0 : false;
254  $this->replLogger->error(
255  __METHOD__ . ": host {db_server} is unreachable",
256  [ 'db_server' => $host ]
257  );
258  continue;
259  }
260 
261  // Determine the amount of replication lag on this server
262  try {
263  $lagTimes[$i] = $conn->getLag();
264  } catch ( DBError $e ) {
265  // Mark the lag time as "false" if it cannot be queried
266  $lagTimes[$i] = false;
267  }
268 
269  if ( $lagTimes[$i] === false ) {
270  $this->replLogger->error(
271  __METHOD__ . ": host {db_server} is not replicating?",
272  [ 'db_server' => $host ]
273  );
274  } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
275  $this->replLogger->warning(
276  "Server {dbserver} has {lag} seconds of lag (>= {maxlag})",
277  [
278  'dbserver' => $host,
279  'lag' => $lagTimes[$i],
280  'maxlag' => $this->lagWarnThreshold
281  ]
282  );
283  }
284 
285  if ( $close ) {
286  # Close the connection to avoid sleeper connections piling up.
287  # Note that the caller will pick one of these DBs and reconnect,
288  # which is slightly inefficient, but this only matters for the lag
289  # time cache miss cache, which is far less common that cache hits.
290  $this->lb->closeConnection( $conn );
291  }
292  }
293 
294  return [
295  'lagTimes' => $lagTimes,
296  'weightScales' => $weightScales,
297  'timestamp' => $this->getCurrentTime()
298  ];
299  }
300 
305  private function getPlaceholderServerStates( array $serverIndexes ) {
306  return [
307  'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
308  'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
309  'timestamp' => $this->getCurrentTime()
310  ];
311  }
312 
319  protected function getWeightScale( $index, IDatabase $conn = null ) {
320  return $conn ? 1.0 : 0.0;
321  }
322 
356  protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio ) {
357  return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
358  }
359 
365  private function getStatesCacheKey( $cache, array $serverIndexes ) {
366  sort( $serverIndexes );
367  // Lag is per-server, not per-DB, so key on the master DB name
368  return $cache->makeGlobalKey(
369  'rdbms-server-states',
370  self::VERSION,
371  $this->lb->getServerName( $this->lb->getWriterIndex() ),
372  implode( '-', $serverIndexes )
373  );
374  }
375 
379  private function acquireServerStatesLoopGuard() {
380  if ( $this->serverStatesKeyLocked ) {
381  return null; // locked
382  }
383 
384  $this->serverStatesKeyLocked = true; // lock
385 
386  return new ScopedCallback( function () {
387  $this->serverStatesKeyLocked = false; // unlock
388  } );
389  }
390 
395  protected function getCurrentTime() {
396  return $this->wallClockOverride ?: microtime( true );
397  }
398 
403  public function setMockTime( &$time ) {
404  $this->wallClockOverride =& $time;
405  }
406 }
Wikimedia\Rdbms\LoadMonitor\$replLogger
LoggerInterface $replLogger
Definition: LoadMonitor.php:49
Wikimedia\Rdbms\LoadMonitor
Basic DB load monitor with no external dependencies.
Definition: LoadMonitor.php:41
Wikimedia\Rdbms\LoadMonitor\acquireServerStatesLoopGuard
acquireServerStatesLoopGuard()
Definition: LoadMonitor.php:379
Wikimedia\Rdbms\LoadMonitor\$serverStatesKeyLocked
bool $serverStatesKeyLocked
Whether the "server states" cache key is in the process of being updated.
Definition: LoadMonitor.php:60
Wikimedia\Rdbms\LoadMonitor\$srvCache
BagOStuff $srvCache
Definition: LoadMonitor.php:45
Wikimedia\Rdbms\LoadMonitor\computeServerStates
computeServerStates(array $serverIndexes, $domain, $priorStates)
Definition: LoadMonitor.php:204
Wikimedia\Rdbms\ILoadMonitor
An interface for database load monitoring.
Definition: ILoadMonitor.php:35
Wikimedia\Rdbms\LoadMonitor\getPlaceholderServerStates
getPlaceholderServerStates(array $serverIndexes)
Definition: LoadMonitor.php:305
Wikimedia\Rdbms
Definition: ChronologyProtector.php:24
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:71
Wikimedia\Rdbms\LoadMonitor\getStatesCacheKey
getStatesCacheKey( $cache, array $serverIndexes)
Definition: LoadMonitor.php:365
Wikimedia\Rdbms\LoadMonitor\getWeightScale
getWeightScale( $index, IDatabase $conn=null)
Definition: LoadMonitor.php:319
Wikimedia\Rdbms\DBError
Database error base class @newable Stable to extend.
Definition: DBError.php:32
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
Wikimedia\Rdbms\LoadMonitor\$wallClockOverride
float null $wallClockOverride
Definition: LoadMonitor.php:57
Wikimedia\Rdbms\LoadMonitor\$wanCache
WANObjectCache $wanCache
Definition: LoadMonitor.php:47
Wikimedia\Rdbms\LoadMonitor\$lagWarnThreshold
int $lagWarnThreshold
Amount of replication lag in seconds before warnings are logged.
Definition: LoadMonitor.php:54
Wikimedia\Rdbms\LoadMonitor\$movingAveRatio
float $movingAveRatio
Moving average ratio (e.g.
Definition: LoadMonitor.php:52
Wikimedia\Rdbms\LoadMonitor\$lb
ILoadBalancer $lb
Definition: LoadMonitor.php:43
Wikimedia\Rdbms\LoadMonitor\scaleLoads
scaleLoads(array &$weightByServer, $domain)
Perform load ratio adjustment before deciding which server to use.
Definition: LoadMonitor.php:98
Wikimedia\Rdbms\LoadMonitor\getServerStates
getServerStates(array $serverIndexes, $domain)
Definition: LoadMonitor.php:122
Wikimedia\Rdbms\LoadMonitor\getCurrentTime
getCurrentTime()
Definition: LoadMonitor.php:395
Wikimedia\Rdbms\LoadMonitor\getNewScaleViaMovingAve
getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio)
Get the moving average weight scale given a naive and the last iteration value.
Definition: LoadMonitor.php:356
Wikimedia\Rdbms\LoadMonitor\setLogger
setLogger(LoggerInterface $logger)
Definition: LoadMonitor.php:94
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:125
Wikimedia\Rdbms\LoadMonitor\getLagTimes
getLagTimes(array $serverIndexes, $domain)
Get an estimate of replication lag (in seconds) for each server.
Definition: LoadMonitor.php:112
$cache
$cache
Definition: mcc.php:33
Wikimedia\Rdbms\LoadMonitor\__construct
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
Definition: LoadMonitor.php:82
Wikimedia\Rdbms\ILoadBalancer
Database cluster connection, tracking, load balancing, and transaction manager interface.
Definition: ILoadBalancer.php:81
Wikimedia\Rdbms\LoadMonitor\setMockTime
setMockTime(&$time)
Definition: LoadMonitor.php:403