MediaWiki REL1_40
LoadMonitor.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\Rdbms;
21
22use BagOStuff;
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
27use RuntimeException;
29use Wikimedia\ScopedCallback;
30
41class LoadMonitor implements ILoadMonitor {
43 protected $lb;
45 protected $srvCache;
47 protected $wanCache;
49 protected $logger;
51 protected $statsd;
52
54 private $movingAveRatio;
56 private $lagWarnThreshold;
57
59 private $wallClockOverride;
60
62 private $serverStatesKeyLocked = false;
63
65 private const VERSION = 2;
66
68 private const STATE_TARGET_TTL = 1.0;
70 private const STATE_PRESERVE_TTL = 60;
72 private const TIME_TILL_REFRESH = 1;
73
83 public function __construct(
84 ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
85 ) {
86 $this->lb = $lb;
87 $this->srvCache = $srvCache;
88 $this->wanCache = $wCache;
89 $this->logger = new NullLogger();
90 $this->statsd = new NullStatsdDataFactory();
91
92 $this->movingAveRatio = (float)( $options['movingAveRatio'] ?? 0.54 );
93 $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? LoadBalancer::MAX_LAG_DEFAULT;
94 }
95
96 public function setLogger( LoggerInterface $logger ) {
97 $this->logger = $logger;
98 }
99
100 public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) {
101 $this->statsd = $statsFactory;
102 }
103
104 final public function scaleLoads( array &$weightByServer ) {
105 $serverIndexes = array_keys( $weightByServer );
106 $states = $this->getServerStates( $serverIndexes );
107 $newScalesByServer = $states['weightScales'];
108 foreach ( $weightByServer as $i => $weight ) {
109 if ( isset( $newScalesByServer[$i] ) ) {
110 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
111 } else { // server recently added to config?
112 $host = $this->lb->getServerName( $i );
113 $this->logger->error( __METHOD__ . ": host $host not in cache" );
114 }
115 }
116 }
117
118 final public function getLagTimes( array $serverIndexes ) {
119 return $this->getServerStates( $serverIndexes )['lagTimes'];
120 }
121
127 protected function getServerStates( array $serverIndexes ) {
128 $now = $this->getCurrentTime();
129 // Represent the cluster by the name of the primary DB
130 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
131
132 // (a) Check the local server cache
133 $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
134 $value = $this->srvCache->get( $srvCacheKey );
135 if (
136 $value &&
137 !$this->isStateRefreshDue(
138 $value['timestamp'],
139 $value['genTime'],
140 self::STATE_TARGET_TTL,
141 $now
142 )
143 ) {
144 $this->logger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" );
145
146 return $value; // cache hit
147 }
148
149 // (b) Value is stale/missing; try to use/refresh the shared cache
150 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
151 if ( !$scopedLock && $value ) {
152 $this->logger->debug( __METHOD__ . ": used stale '$cluster' cluster status" );
153 // (b1) Another thread on this server is already checking the shared cache
154 return $value;
155 }
156
157 // (b2) This thread gets to check the shared cache or (b3) value is missing
158 $staleValue = $value;
159 $updated = false; // whether the regeneration callback ran
160 $value = $this->wanCache->getWithSetCallback(
161 $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
162 self::TIME_TILL_REFRESH, // 1 second logical expiry
163 function ( $oldValue, &$ttl ) use ( $serverIndexes, $staleValue, &$updated ) {
164 // Double check for circular recursion in computeServerStates()/getWeightScale().
165 // Mainly, connection attempts should use LoadBalancer::getServerConnection()
166 // rather than something that will pick a server based on the server states.
167 $scopedLock = $this->acquireServerStatesLoopGuard();
168 if ( !$scopedLock ) {
169 throw new RuntimeException(
170 "Circular recursion detected while regenerating server states cache. " .
171 "This may indicate improper connection handling in " . get_class( $this )
172 );
173 }
174
175 $updated = true;
176
177 return $this->computeServerStates(
178 $serverIndexes,
179 $oldValue ?: $staleValue // fallback to local cache stale value
180 );
181 },
182 [
183 // One thread can update at a time; others use the old value
184 'lockTSE' => self::STATE_PRESERVE_TTL,
185 'staleTTL' => self::STATE_PRESERVE_TTL,
186 // If there is no shared stale value then use the local cache stale value;
187 // When even that is not possible, then use the trivial value below.
188 'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
189 ]
190 );
191
192 if ( $updated ) {
193 $this->logger->info( __METHOD__ . ": regenerated '$cluster' cluster status" );
194 } else {
195 $this->logger->debug( __METHOD__ . ": used cached '$cluster' cluster status" );
196 }
197
198 // Backfill the local server cache
199 if ( $scopedLock ) {
200 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
201 }
202
203 return $value;
204 }
205
213 protected function isStateRefreshDue( $priorAsOf, $priorGenDelay, $referenceTTL, $now ) {
214 $age = max( $now - $priorAsOf, 0.0 );
215 // Ratio of the nominal TTL that has elapsed (r)
216 $ttrRatio = $age / $referenceTTL;
217 // Ratio of the nominal TTL that elapses during regeneration (g)
218 $genRatio = $priorGenDelay / $referenceTTL;
219 // Use p(r,g) as the monotonically increasing "chance of refresh" function,
220 // having p(0,g)=0. Normally, g~=0, in which case p(1,g)~=1. If g >> 0, then
221 // the value might not refresh until a small amount after the nominal expiry.
222 $chance = exp( -128 * $genRatio ) * ( $ttrRatio ** 4 );
223 return ( mt_rand( 1, 1000000000 ) <= 1000000000 * $chance );
224 }
225
232 protected function computeServerStates( array $serverIndexes, $priorStates ) {
233 $startTime = $this->getCurrentTime();
234 // Check if there is just a primary DB (no replication involved)
235 if ( $this->lb->getServerCount() <= 1 ) {
236 return $this->getPlaceholderServerStates( $serverIndexes );
237 }
238
239 $priorAsOf = $priorStates['timestamp'] ?? 0;
240 $priorScales = $priorStates ? $priorStates['weightScales'] : [];
241 $cluster = $this->lb->getClusterName();
242
243 $lagTimes = [];
244 $weightScales = [];
245 foreach ( $serverIndexes as $i ) {
246 $isPrimary = ( $i == $this->lb->getWriterIndex() );
247 // If the primary DB has zero load, then typical read queries do not use it.
248 // In that case, avoid connecting to it since this method might run in any
249 // datacenter, and the primary DB might be geographically remote.
250 if ( $isPrimary && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
251 $lagTimes[$i] = 0;
252 // Callers only use this DB if they have *no choice* anyway (e.g. writes)
253 $weightScales[$i] = 1.0;
254 continue;
255 }
256
257 $host = $this->lb->getServerName( $i );
258
259 // Get a new, untracked, connection in order to gauge server health
260 $flags = $this->lb::CONN_UNTRACKED_GAUGE | $this->lb::CONN_SILENCE_ERRORS;
261 // Get a connection to this server without triggering other server connections
262 $conn = $this->lb->getServerConnection( $i, $this->lb::DOMAIN_ANY, $flags );
263
264 // Get new weight scale using a moving average of the naïve and prior values
265 $lastScale = $priorScales[$i] ?? 1.0;
266 $naiveScale = $this->getWeightScale( $i, $conn ?: null );
267 $newScale = $this->movingAverage(
268 $lastScale,
269 $naiveScale,
270 max( $this->getCurrentTime() - $priorAsOf, 0.0 ),
271 $this->movingAveRatio
272 );
273 // Scale from 0% to 100% of nominal weight
274 $newScale = max( $newScale, 0.0 );
275
276 $weightScales[$i] = $newScale;
277 $statHost = str_replace( '.', '_', $host );
278 $this->statsd->gauge( "loadbalancer.weight.$cluster.$statHost", $newScale );
279
280 // Mark replication lag on this server as "false" if it is unreachable
281 if ( !$conn ) {
282 $lagTimes[$i] = $isPrimary ? 0 : false;
283 $this->logger->error(
284 __METHOD__ . ": host {db_server} is unreachable",
285 [ 'db_server' => $host ]
286 );
287 continue;
288 }
289
290 // Determine the amount of replication lag on this server
291 try {
292 $lag = $conn->getLag();
293 } catch ( DBError $e ) {
294 // Mark the lag time as "false" if it cannot be queried
295 $lag = false;
296 }
297 $lagTimes[$i] = $lag;
298
299 if ( $lag === false ) {
300 $this->logger->error(
301 __METHOD__ . ": host {db_server} is not replicating?",
302 [ 'db_server' => $host ]
303 );
304 } else {
305 $this->statsd->timing( "loadbalancer.lag.$cluster.$statHost", $lag * 1000 );
306 if ( $lag > $this->lagWarnThreshold ) {
307 $this->logger->warning(
308 "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
309 [
310 'db_server' => $host,
311 'lag' => $lag,
312 'maxlag' => $this->lagWarnThreshold
313 ]
314 );
315 }
316 }
317
318 // Only keep one connection open at a time
319 $conn->close( __METHOD__ );
320 }
321
322 $endTime = $this->getCurrentTime();
323
324 return [
325 'lagTimes' => $lagTimes,
326 'weightScales' => $weightScales,
327 'timestamp' => $endTime,
328 'genTime' => max( $endTime - $startTime, 0.0 )
329 ];
330 }
331
336 private function getPlaceholderServerStates( array $serverIndexes ) {
337 return [
338 'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
339 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
340 'timestamp' => $this->getCurrentTime(),
341 'genTime' => 0.0
342 ];
343 }
344
351 protected function getWeightScale( $index, IDatabase $conn = null ) {
352 return $conn ? 1.0 : 0.0;
353 }
354
364 public function movingAverage(
365 $priorValue,
366 $gaugeValue,
367 float $delay,
368 float $movAveRatio
369 ) {
370 if ( $gaugeValue === null ) {
371 return $priorValue;
372 } elseif ( $priorValue === null ) {
373 return $gaugeValue;
374 }
375
376 // Apply more weight to the newer gauge the more outdated the prior gauge is.
377 // The rate of state updates generally depends on the amount of site traffic.
378 // Smaller will get less frequent updates, but the gauges still still converge
379 // within reasonable time bounds so that unreachable DB servers are avoided.
380 $delayAwareRatio = 1 - pow( 1 - $movAveRatio, $delay );
381
382 return max( $delayAwareRatio * $gaugeValue + ( 1 - $delayAwareRatio ) * $priorValue, 0.0 );
383 }
384
390 private function getStatesCacheKey( $cache, array $serverIndexes ) {
391 sort( $serverIndexes );
392 // Lag is per-server, not per-DB, so key on the primary DB name
393 return $cache->makeGlobalKey(
394 'rdbms-server-states',
395 self::VERSION,
396 $this->lb->getServerName( $this->lb->getWriterIndex() ),
397 implode( '-', $serverIndexes )
398 );
399 }
400
404 private function acquireServerStatesLoopGuard() {
405 if ( $this->serverStatesKeyLocked ) {
406 return null; // locked
407 }
408
409 $this->serverStatesKeyLocked = true; // lock
410
411 return new ScopedCallback( function () {
412 $this->serverStatesKeyLocked = false; // unlock
413 } );
414 }
415
420 protected function getCurrentTime() {
421 return $this->wallClockOverride ?: microtime( true );
422 }
423
428 public function setMockTime( &$time ) {
429 $this->wallClockOverride =& $time;
430 }
431}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:85
Multi-datacenter aware caching interface.
Database error base class.
Definition DBError.php:31
const MAX_LAG_DEFAULT
Default 'maxLag' when unspecified.
Basic DB load monitor with no external dependencies.
computeServerStates(array $serverIndexes, $priorStates)
isStateRefreshDue( $priorAsOf, $priorGenDelay, $referenceTTL, $now)
getLagTimes(array $serverIndexes)
Get an estimate of replication lag (in seconds) for each server.
movingAverage( $priorValue, $gaugeValue, float $delay, float $movAveRatio)
Update a moving average for a gauge, accounting for the time delay since the last gauge.
setStatsdDataFactory(StatsdDataFactoryInterface $statsFactory)
Sets a StatsdDataFactory instance on the object.
StatsdDataFactoryInterface $statsd
getServerStates(array $serverIndexes)
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
scaleLoads(array &$weightByServer)
Perform load ratio adjustment before deciding which server to use.
setLogger(LoggerInterface $logger)
getWeightScale( $index, IDatabase $conn=null)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
This class is a delegate to ILBFactory for a given database cluster.
Database load monitoring interface.