MediaWiki master
LoadMonitor.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\Rdbms;
21
22use BagOStuff;
24use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26use Psr\Log\LoggerInterface;
27use Psr\Log\NullLogger;
28use RuntimeException;
30use Wikimedia\ScopedCallback;
31
42class LoadMonitor implements ILoadMonitor {
44 protected $lb;
46 protected $srvCache;
48 protected $wanCache;
50 protected $logger;
52 protected $statsd;
53
55 private $movingAveRatio;
57 private $lagWarnThreshold;
58
60 private $wallClockOverride;
61
63 private $serverStatesKeyLocked = false;
64
66 private const VERSION = 2;
67
69 private const STATE_TARGET_TTL = 1;
71 private const STATE_PRESERVE_TTL = 60;
72
82 public function __construct(
83 ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
84 ) {
85 $this->lb = $lb;
86 $this->srvCache = $srvCache;
87 $this->wanCache = $wCache;
88 $this->logger = new NullLogger();
89 $this->statsd = new NullStatsdDataFactory();
90 $this->movingAveRatio = (float)( $options['movingAveRatio'] ?? 0.8 );
91 $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? LoadBalancer::MAX_LAG_DEFAULT;
92 }
93
94 public function setLogger( LoggerInterface $logger ) {
95 $this->logger = $logger;
96 }
97
98 public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) {
99 $this->statsd = $statsFactory;
100 }
101
102 final public function scaleLoads( array &$weightByServer ) {
103 if ( count( $weightByServer ) <= 1 ) {
104 // Single-server group; relative adjustments are pointless since
105 return;
106 }
107
108 $serverIndexes = array_keys( $weightByServer );
109 $stateByServerIndex = $this->getServerStates( $serverIndexes );
110 foreach ( $weightByServer as $i => $weight ) {
111 $scale = $this->getWeightScale( $stateByServerIndex[$i] );
112 $weightByServer[$i] = (int)round( $weight * $scale );
113 }
114 }
115
116 final public function getLagTimes( array $serverIndexes ): array {
117 $lagByServerIndex = [];
118
119 $stateByServerIndex = $this->getServerStates( $serverIndexes );
120 foreach ( $stateByServerIndex as $i => $state ) {
121 $lagByServerIndex[$i] = $state[self::STATE_LAG];
122 }
123
124 return $lagByServerIndex;
125 }
126
127 public function getServerStates( array $serverIndexes ): array {
128 $stateByServerIndex = array_fill_keys( $serverIndexes, null );
129 // Perform any cache regenerations in randomized order so that the
130 // DB servers will each have similarly up-to-date state cache entries.
131 $shuffledServerIndexes = $serverIndexes;
132 shuffle( $shuffledServerIndexes );
133 $now = $this->getCurrentTime();
134
135 $scopeLocks = [];
136 $serverIndexesCompute = [];
137 $scKeyByServerIndex = [];
138 $wcKeyByServerIndex = [];
139 foreach ( $shuffledServerIndexes as $i ) {
140 $scKeyByServerIndex[$i] = $this->makeStateKey( $this->srvCache, $i );
141 $stateByServerIndex[$i] = $this->srvCache->get( $scKeyByServerIndex[$i] ) ?: null;
142 if ( $this->isStateFresh( $stateByServerIndex[$i], $now ) ) {
143 $this->logger->debug(
144 __METHOD__ . ": fresh local cache hit for '{db_server}'",
145 [ 'db_server' => $this->lb->getServerName( $i ) ]
146 );
147 } else {
148 $scopeLocks[$i] = $this->srvCache->getScopedLock( $scKeyByServerIndex[$i], 0, 10 );
149 if ( $scopeLocks[$i] || !$stateByServerIndex[$i] ) {
150 $wcKeyByServerIndex[$i] = $this->makeStateKey( $this->wanCache, $i );
151 }
152 }
153 }
154
155 $valueByKey = $this->wanCache->getMulti( $wcKeyByServerIndex );
156 foreach ( $wcKeyByServerIndex as $i => $wcKey ) {
157 $value = $valueByKey[$wcKey] ?? null;
158 if ( $value ) {
159 $stateByServerIndex[$i] = $value;
160 if ( $scopeLocks[$i] ) {
161 $this->srvCache->set( $scKeyByServerIndex[$i], $value );
162 }
163 if ( $this->isStateFresh( $value, $now ) ) {
164 $this->logger->debug(
165 __METHOD__ . ": fresh WAN cache hit for '{db_server}'",
166 [ 'db_server' => $this->lb->getServerName( $i ) ]
167 );
168 } elseif ( $scopeLocks[$i] ) {
169 $serverIndexesCompute[] = $i;
170 } else {
171 $this->logger->info(
172 __METHOD__ . ": mutex busy, stale WAN cache hit for '{db_server}'",
173 [ 'db_server' => $this->lb->getServerName( $i ) ]
174 );
175 }
176 } elseif ( $scopeLocks[$i] ) {
177 $serverIndexesCompute[] = $i;
178 } elseif ( $stateByServerIndex[$i] ) {
179 $this->logger->info(
180 __METHOD__ . ": mutex busy, stale local cache hit for '{db_server}'",
181 [ 'db_server' => $this->lb->getServerName( $i ) ]
182 );
183 } else {
184 $stateByServerIndex[$i] = $this->newInitialServerState();
185 }
186 }
187
188 foreach ( $serverIndexesCompute as $i ) {
189 $state = $this->computeServerState( $i, $stateByServerIndex[$i] );
190 $stateByServerIndex[$i] = $state;
191 $this->srvCache->set( $scKeyByServerIndex[$i], $state, self::STATE_PRESERVE_TTL );
192 $this->wanCache->set( $wcKeyByServerIndex[$i], $state, self::STATE_PRESERVE_TTL );
193 $this->logger->info(
194 __METHOD__ . ": mutex acquired; regenerated cache for '{db_server}'",
195 [ 'db_server' => $this->lb->getServerName( $i ) ]
196 );
197 }
198
199 return $stateByServerIndex;
200 }
201
202 protected function makeStateKey( IStoreKeyEncoder $cache, int $i ) {
203 return $cache->makeGlobalKey(
204 'rdbms-gauge',
205 self::VERSION,
206 $this->lb->getClusterName(),
207 $this->lb->getServerName( $this->lb->getWriterIndex() ),
208 $this->lb->getServerName( $i )
209 );
210 }
211
219 protected function computeServerState( int $i, ?array $priorState ) {
220 $startTime = $this->getCurrentTime();
221 // Double check for circular recursion in computeServerStates()/getWeightScale().
222 // Mainly, connection attempts should use LoadBalancer::getServerConnection()
223 // rather than something that will pick a server based on the server states.
224 $scopedGuard = $this->acquireServerStatesLoopGuard();
225
226 $cluster = $this->lb->getClusterName();
227 $serverName = $this->lb->getServerName( $i );
228 $statServerName = str_replace( '.', '_', $serverName );
229 $isPrimary = ( $i == $this->lb->getWriterIndex() );
230
231 $newState = $this->newInitialServerState();
232
233 if ( $isPrimary && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
234 // Callers only use this server when they have *no choice* anyway (e.g. writes)
235 $newState[self::STATE_AS_OF] = $this->getCurrentTime();
236 // Avoid connecting, especially since it might reside in a remote datacenter
237 return $newState;
238 }
239
240 // Get a new, untracked, connection in order to gauge server health
241 $flags = $this->lb::CONN_UNTRACKED_GAUGE | $this->lb::CONN_SILENCE_ERRORS;
242 // Get a connection to this server without triggering other server connections
243 $conn = $this->lb->getServerConnection( $i, $this->lb::DOMAIN_ANY, $flags );
244 // Check if the server is up
245 $gaugeUp = $conn ? 1.0 : 0.0;
246 // Determine the amount of replication lag on this server
247 if ( $isPrimary ) {
248 $gaugeLag = 0;
249 } elseif ( $conn ) {
250 try {
251 $gaugeLag = $conn->getLag();
252 } catch ( DBError $e ) {
253 $gaugeLag = false;
254 }
255 } else {
256 $gaugeLag = false;
257 }
258 // Only keep one connection open at a time
259 if ( $conn ) {
260 $conn->close( __METHOD__ );
261 }
262
263 $endTime = $this->getCurrentTime();
264 $newState[self::STATE_AS_OF] = $endTime;
265 $newState[self::STATE_LAG] = $gaugeLag;
266 if ( $priorState ) {
267 $newState[self::STATE_UP] = $this->movingAverage(
268 $priorState[self::STATE_UP],
269 $gaugeUp,
270 max( $endTime - $priorState[self::STATE_AS_OF], 0.0 ),
271 $this->movingAveRatio
272 );
273 } else {
274 $newState[self::STATE_UP] = $gaugeUp;
275 }
276 $newState[self::STATE_GEN_DELAY] = max( $endTime - $startTime, 0.0 );
277
278 // Get new weight scale
279 $newScale = $this->getWeightScale( $newState );
280 $this->statsd->gauge( "loadbalancer.weight.$cluster.$statServerName", $newScale );
281
282 if ( $gaugeLag === false ) {
283 $this->logger->error(
284 __METHOD__ . ": host {db_server} is not replicating?",
285 [ 'db_server' => $serverName ]
286 );
287 } else {
288 $this->statsd->timing( "loadbalancer.lag.$cluster.$statServerName", $gaugeLag * 1000 );
289 if ( $gaugeLag > $this->lagWarnThreshold ) {
290 $this->logger->warning(
291 "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
292 [
293 'db_server' => $serverName,
294 'lag' => $gaugeLag,
295 'maxlag' => $this->lagWarnThreshold
296 ]
297 );
298 }
299 }
300
301 return $newState;
302 }
303
308 protected function getWeightScale( array $state ) {
309 // Use the connectivity as a coefficient
310 return $state[self::STATE_UP];
311 }
312
317 protected function newInitialServerState() {
318 return [
319 // Moving average of connectivity; treat as good
320 self::STATE_UP => 1.0,
321 // Seconds of replication lag; treat as none
322 self::STATE_LAG => 0,
323 // UNIX timestamp of state generation completion; treat as "outdated"
324 self::STATE_AS_OF => 0.0,
325 // Seconds elapsed during state generation; treat as "fast"
326 self::STATE_GEN_DELAY => 0.0
327 ];
328 }
329
335 protected function isStateFresh( $state, $now ) {
336 return (
337 $state &&
338 !$this->isStateRefreshDue(
339 $state[self::STATE_AS_OF],
340 $state[self::STATE_GEN_DELAY],
341 self::STATE_TARGET_TTL,
342 $now
343 )
344 );
345 }
346
354 protected function isStateRefreshDue( $priorAsOf, $priorGenDelay, $referenceTTL, $now ) {
355 $age = max( $now - $priorAsOf, 0.0 );
356 // Ratio of the nominal TTL that has elapsed (r)
357 $ttrRatio = $age / $referenceTTL;
358 // Ratio of the nominal TTL that elapses during regeneration (g)
359 $genRatio = $priorGenDelay / $referenceTTL;
360 // Use p(r,g) as the monotonically increasing "chance of refresh" function,
361 // having p(0,g)=0. Normally, g~=0, in which case p(1,g)~=1. If g >> 0, then
362 // the value might not refresh until a modest time after the nominal expiry.
363 $chance = exp( -64 * min( $genRatio, 0.1 ) ) * ( $ttrRatio ** 4 );
364
365 return ( mt_rand( 1, 1_000_000_000 ) <= 1_000_000_000 * $chance );
366 }
367
377 public function movingAverage(
378 $priorValue,
379 $gaugeValue,
380 float $delay,
381 float $movAveRatio
382 ) {
383 if ( $gaugeValue === null ) {
384 return $priorValue;
385 } elseif ( $priorValue === null ) {
386 return $gaugeValue;
387 }
388
389 // Apply more weight to the newer gauge the more outdated the prior gauge is.
390 // The rate of state updates generally depends on the amount of site traffic.
391 // Smaller will get less frequent updates, but the gauges still still converge
392 // within reasonable time bounds so that unreachable DB servers are avoided.
393 $delayAwareRatio = 1 - pow( 1 - $movAveRatio, $delay );
394
395 return max( $delayAwareRatio * $gaugeValue + ( 1 - $delayAwareRatio ) * $priorValue, 0.0 );
396 }
397
401 private function acquireServerStatesLoopGuard() {
402 if ( $this->serverStatesKeyLocked ) {
403 throw new RuntimeException(
404 "Circular recursion detected while regenerating server states cache. " .
405 "This may indicate improper connection handling in " . get_class( $this )
406 );
407 }
408
409 $this->serverStatesKeyLocked = true; // lock
410
411 return new ScopedCallback( function () {
412 $this->serverStatesKeyLocked = false; // unlock
413 } );
414 }
415
420 protected function getCurrentTime() {
421 return $this->wallClockOverride ?: microtime( true );
422 }
423
428 public function setMockTime( &$time ) {
429 $this->wallClockOverride =& $time;
430 }
431}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:85
Multi-datacenter aware caching interface.
Database error base class.
Definition DBError.php:36
const MAX_LAG_DEFAULT
Default 'maxLag' when unspecified.
Basic DB load monitor with no external dependencies.
makeStateKey(IStoreKeyEncoder $cache, int $i)
isStateRefreshDue( $priorAsOf, $priorGenDelay, $referenceTTL, $now)
computeServerState(int $i, ?array $priorState)
getLagTimes(array $serverIndexes)
Get an estimate of replication lag (in seconds) for the specified servers.
movingAverage( $priorValue, $gaugeValue, float $delay, float $movAveRatio)
Update a moving average for a gauge, accounting for the time delay since the last gauge.
setStatsdDataFactory(StatsdDataFactoryInterface $statsFactory)
Sets a StatsdDataFactory instance on the object.
StatsdDataFactoryInterface $statsd
getServerStates(array $serverIndexes)
Get a server gauge map for the specified servers.
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
scaleLoads(array &$weightByServer)
Perform load ratio adjustment before deciding which server to use.
setLogger(LoggerInterface $logger)
Key-encoding methods for object caching (BagOStuff and WANObjectCache)
makeGlobalKey( $keygroup,... $components)
This class is a delegate to ILBFactory for a given database cluster.
Database load monitoring interface.