MediaWiki  master
LoadMonitor.php
Go to the documentation of this file.
1 <?php
20 namespace Wikimedia\Rdbms;
21 
22 use BagOStuff;
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25 use Psr\Log\LoggerInterface;
26 use Psr\Log\NullLogger;
27 use RuntimeException;
28 use WANObjectCache;
29 use Wikimedia\ScopedCallback;
30 
41 class LoadMonitor implements ILoadMonitor {
43  protected $lb;
45  protected $srvCache;
47  protected $wanCache;
49  protected $replLogger;
51  protected $statsd;
52 
54  private $movingAveRatio;
56  private $lagWarnThreshold;
57 
59  private $wallClockOverride;
60 
62  private $serverStatesKeyLocked = false;
63 
65  private const VERSION = 1;
67  private const POLL_PERIOD_MS = 500;
69  private const STATE_PRESERVE_TTL = 60;
71  private const TIME_TILL_REFRESH = 1;
72 
81  public function __construct(
82  ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
83  ) {
84  $this->lb = $lb;
85  $this->srvCache = $srvCache;
86  $this->wanCache = $wCache;
87  $this->replLogger = new NullLogger();
88  $this->statsd = new NullStatsdDataFactory();
89 
90  $this->movingAveRatio = $options['movingAveRatio'] ?? 0.1;
91  $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? LoadBalancer::MAX_LAG_DEFAULT;
92  }
93 
94  public function setLogger( LoggerInterface $logger ) {
95  $this->replLogger = $logger;
96  }
97 
98  public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) {
99  $this->statsd = $statsFactory;
100  }
101 
102  final public function scaleLoads( array &$weightByServer ) {
103  $serverIndexes = array_keys( $weightByServer );
104  $states = $this->getServerStates( $serverIndexes );
105  $newScalesByServer = $states['weightScales'];
106  foreach ( $weightByServer as $i => $weight ) {
107  if ( isset( $newScalesByServer[$i] ) ) {
108  $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
109  } else { // server recently added to config?
110  $host = $this->lb->getServerName( $i );
111  $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
112  }
113  }
114  }
115 
116  final public function getLagTimes( array $serverIndexes ) {
117  return $this->getServerStates( $serverIndexes )['lagTimes'];
118  }
119 
125  protected function getServerStates( array $serverIndexes ) {
126  // Represent the cluster by the name of the primary DB
127  $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
128 
129  // Randomize logical TTLs to reduce stampedes
130  $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
131  $minAsOfTime = $this->getCurrentTime() - $ageStaleSec;
132 
133  // (a) Check the local server cache
134  $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
135  $value = $this->srvCache->get( $srvCacheKey );
136  if ( $value && $value['timestamp'] > $minAsOfTime ) {
137  $this->replLogger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" );
138 
139  return $value; // cache hit
140  }
141 
142  // (b) Value is stale/missing; try to use/refresh the shared cache
143  $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
144  if ( !$scopedLock && $value ) {
145  $this->replLogger->debug( __METHOD__ . ": used stale '$cluster' cluster status" );
146  // (b1) Another thread on this server is already checking the shared cache
147  return $value;
148  }
149 
150  // (b2) This thread gets to check the shared cache or (b3) value is missing
151  $staleValue = $value;
152  $updated = false; // whether the regeneration callback ran
153  $value = $this->wanCache->getWithSetCallback(
154  $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
155  self::TIME_TILL_REFRESH, // 1 second logical expiry
156  function ( $oldValue, &$ttl ) use ( $serverIndexes, $staleValue, &$updated ) {
157  // Double check for circular recursion in computeServerStates()/getWeightScale().
158  // Mainly, connection attempts should use LoadBalancer::getServerConnection()
159  // rather than something that will pick a server based on the server states.
160  $scopedLock = $this->acquireServerStatesLoopGuard();
161  if ( !$scopedLock ) {
162  throw new RuntimeException(
163  "Circular recursion detected while regenerating server states cache. " .
164  "This may indicate improper connection handling in " . get_class( $this )
165  );
166  }
167 
168  $updated = true;
169 
170  return $this->computeServerStates(
171  $serverIndexes,
172  $oldValue ?: $staleValue // fallback to local cache stale value
173  );
174  },
175  [
176  // One thread can update at a time; others use the old value
177  'lockTSE' => self::STATE_PRESERVE_TTL,
178  'staleTTL' => self::STATE_PRESERVE_TTL,
179  // If there is no shared stale value then use the local cache stale value;
180  // When even that is not possible, then use the trivial value below.
181  'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
182  ]
183  );
184 
185  if ( $updated ) {
186  $this->replLogger->info( __METHOD__ . ": regenerated '$cluster' cluster status" );
187  } else {
188  $this->replLogger->debug( __METHOD__ . ": used cached '$cluster' cluster status" );
189  }
190 
191  // Backfill the local server cache
192  if ( $scopedLock ) {
193  $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
194  }
195 
196  return $value;
197  }
198 
205  protected function computeServerStates( array $serverIndexes, $priorStates ) {
206  // Check if there is just a primary DB (no replication involved)
207  if ( $this->lb->getServerCount() <= 1 ) {
208  return $this->getPlaceholderServerStates( $serverIndexes );
209  }
210 
211  $priorScales = $priorStates ? $priorStates['weightScales'] : [];
212  $cluster = $this->lb->getClusterName();
213 
214  $lagTimes = [];
215  $weightScales = [];
216  foreach ( $serverIndexes as $i ) {
217  $isPrimary = ( $i == $this->lb->getWriterIndex() );
218  // If the primary DB has zero load, then typical read queries do not use it.
219  // In that case, avoid connecting to it since this method might run in any
220  // datacenter, and the primary DB might be geographically remote.
221  if ( $isPrimary && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
222  $lagTimes[$i] = 0;
223  // Callers only use this DB if they have *no choice* anyway (e.g. writes)
224  $weightScales[$i] = 1.0;
225  continue;
226  }
227 
228  $host = $this->lb->getServerName( $i );
229  # Handles with open transactions are avoided since they might be subject
230  # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
232  $conn = $this->lb->getAnyOpenConnection( $i, $flags );
233  if ( $conn ) {
234  $close = false; // already open
235  } else {
236  // Get a connection to this server without triggering other server connections
237  $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
238  $close = true; // new connection
239  }
240 
241  // Get new weight scale using a moving average of the na├»ve and prior values
242  $lastScale = $priorScales[$i] ?? 1.0;
243  $naiveScale = $this->getWeightScale( $i, $conn ?: null );
244  $newScale = $this->getNewScaleViaMovingAve(
245  $lastScale,
246  $naiveScale,
247  $this->movingAveRatio
248  );
249  // Scale from 0% to 100% of nominal weight
250  $newScale = max( $newScale, 0.0 );
251 
252  $weightScales[$i] = $newScale;
253  $statHost = str_replace( '.', '_', $host );
254  $this->statsd->gauge( "loadbalancer.weight.$cluster.$statHost", $newScale );
255 
256  // Mark replication lag on this server as "false" if it is unreachable
257  if ( !$conn ) {
258  $lagTimes[$i] = $isPrimary ? 0 : false;
259  $this->replLogger->error(
260  __METHOD__ . ": host {db_server} is unreachable",
261  [ 'db_server' => $host ]
262  );
263  continue;
264  }
265 
266  // Determine the amount of replication lag on this server
267  try {
268  $lag = $conn->getLag();
269  } catch ( DBError $e ) {
270  // Mark the lag time as "false" if it cannot be queried
271  $lag = false;
272  }
273  $lagTimes[$i] = $lag;
274 
275  if ( $lag === false ) {
276  $this->replLogger->error(
277  __METHOD__ . ": host {db_server} is not replicating?",
278  [ 'db_server' => $host ]
279  );
280  } else {
281  $this->statsd->timing( "loadbalancer.lag.$cluster.$statHost", $lag * 1000 );
282  if ( $lag > $this->lagWarnThreshold ) {
283  $this->replLogger->warning(
284  "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
285  [
286  'db_server' => $host,
287  'lag' => $lag,
288  'maxlag' => $this->lagWarnThreshold
289  ]
290  );
291  }
292  }
293 
294  if ( $close ) {
295  # Close the connection to avoid sleeper connections piling up.
296  # Note that the caller will pick one of these DBs and reconnect,
297  # which is slightly inefficient, but this only matters for the lag
298  # time cache miss cache, which is far less common that cache hits.
299  $this->lb->closeConnection( $conn );
300  }
301  }
302 
303  return [
304  'lagTimes' => $lagTimes,
305  'weightScales' => $weightScales,
306  'timestamp' => $this->getCurrentTime()
307  ];
308  }
309 
314  private function getPlaceholderServerStates( array $serverIndexes ) {
315  return [
316  'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
317  'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
318  'timestamp' => $this->getCurrentTime()
319  ];
320  }
321 
328  protected function getWeightScale( $index, IDatabase $conn = null ) {
329  return $conn ? 1.0 : 0.0;
330  }
331 
365  protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio ) {
366  return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
367  }
368 
374  private function getStatesCacheKey( $cache, array $serverIndexes ) {
375  sort( $serverIndexes );
376  // Lag is per-server, not per-DB, so key on the primary DB name
377  return $cache->makeGlobalKey(
378  'rdbms-server-states',
379  self::VERSION,
380  $this->lb->getServerName( $this->lb->getWriterIndex() ),
381  implode( '-', $serverIndexes )
382  );
383  }
384 
388  private function acquireServerStatesLoopGuard() {
389  if ( $this->serverStatesKeyLocked ) {
390  return null; // locked
391  }
392 
393  $this->serverStatesKeyLocked = true; // lock
394 
395  return new ScopedCallback( function () {
396  $this->serverStatesKeyLocked = false; // unlock
397  } );
398  }
399 
404  protected function getCurrentTime() {
405  return $this->wallClockOverride ?: microtime( true );
406  }
407 
412  public function setMockTime( &$time ) {
413  $this->wallClockOverride =& $time;
414  }
415 }
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:85
Multi-datacenter aware caching interface.
Database error base class.
Definition: DBError.php:31
const MAX_LAG_DEFAULT
Default 'maxLag' when unspecified.
Basic DB load monitor with no external dependencies.
Definition: LoadMonitor.php:41
computeServerStates(array $serverIndexes, $priorStates)
getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio)
Get the moving average weight scale given a naive and the last iteration value.
getLagTimes(array $serverIndexes)
Get an estimate of replication lag (in seconds) for each server.
LoggerInterface $replLogger
Definition: LoadMonitor.php:49
setStatsdDataFactory(StatsdDataFactoryInterface $statsFactory)
Sets a StatsdDataFactory instance on the object.
Definition: LoadMonitor.php:98
StatsdDataFactoryInterface $statsd
Definition: LoadMonitor.php:51
getServerStates(array $serverIndexes)
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
Definition: LoadMonitor.php:81
scaleLoads(array &$weightByServer)
Perform load ratio adjustment before deciding which server to use.
setLogger(LoggerInterface $logger)
Definition: LoadMonitor.php:94
getWeightScale( $index, IDatabase $conn=null)
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:40
Create and track the database connections and transactions for a given database cluster.
const DOMAIN_ANY
Domain specifier when no specific database needs to be selected.
const CONN_TRX_AUTOCOMMIT
DB handle should have DBO_TRX disabled and the caller will leave it as such.
const CONN_SILENCE_ERRORS
Return null on connection failure instead of throwing an exception.
Database load monitoring interface.
$cache
Definition: mcc.php:33