MediaWiki REL1_37
LoadMonitor.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use BagOStuff;
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
27use RuntimeException;
29use Wikimedia\ScopedCallback;
30
41class LoadMonitor implements ILoadMonitor {
43 protected $lb;
45 protected $srvCache;
47 protected $wanCache;
49 protected $replLogger;
50
55
58
60 private $serverStatesKeyLocked = false;
61
63 private const VERSION = 1;
65 private const POLL_PERIOD_MS = 500;
67 private const STATE_PRESERVE_TTL = 60;
69 private const TIME_TILL_REFRESH = 1;
70
79 public function __construct(
80 ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
81 ) {
82 $this->lb = $lb;
83 $this->srvCache = $srvCache;
84 $this->wanCache = $wCache;
85 $this->replLogger = new NullLogger();
86
87 $this->movingAveRatio = $options['movingAveRatio'] ?? 0.1;
88 $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? LoadBalancer::MAX_LAG_DEFAULT;
89 }
90
91 public function setLogger( LoggerInterface $logger ) {
92 $this->replLogger = $logger;
93 }
94
95 final public function scaleLoads( array &$weightByServer, $domain ) {
96 $serverIndexes = array_keys( $weightByServer );
97 $states = $this->getServerStates( $serverIndexes, $domain );
98 $newScalesByServer = $states['weightScales'];
99 foreach ( $weightByServer as $i => $weight ) {
100 if ( isset( $newScalesByServer[$i] ) ) {
101 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
102 } else { // server recently added to config?
103 $host = $this->lb->getServerName( $i );
104 $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
105 }
106 }
107 }
108
109 final public function getLagTimes( array $serverIndexes, $domain ) {
110 return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
111 }
112
119 protected function getServerStates( array $serverIndexes, $domain ) {
120 // Represent the cluster by the name of the primary DB
121 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
122
123 // Randomize logical TTLs to reduce stampedes
124 $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
125 $minAsOfTime = $this->getCurrentTime() - $ageStaleSec;
126
127 // (a) Check the local server cache
128 $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
129 $value = $this->srvCache->get( $srvCacheKey );
130 if ( $value && $value['timestamp'] > $minAsOfTime ) {
131 $this->replLogger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" );
132
133 return $value; // cache hit
134 }
135
136 // (b) Value is stale/missing; try to use/refresh the shared cache
137 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
138 if ( !$scopedLock && $value ) {
139 $this->replLogger->debug( __METHOD__ . ": used stale '$cluster' cluster status" );
140 // (b1) Another thread on this server is already checking the shared cache
141 return $value;
142 }
143
144 // (b2) This thread gets to check the shared cache or (b3) value is missing
145 $staleValue = $value;
146 $updated = false; // whether the regeneration callback ran
147 $value = $this->wanCache->getWithSetCallback(
148 $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
149 self::TIME_TILL_REFRESH, // 1 second logical expiry
150 function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
151 // Sanity check for circular recursion in computeServerStates()/getWeightScale().
152 // Mainly, connection attempts should use LoadBalancer::getServerConnection()
153 // rather than something that will pick a server based on the server states.
154 $scopedLock = $this->acquireServerStatesLoopGuard();
155 if ( !$scopedLock ) {
156 throw new RuntimeException(
157 "Circular recursion detected while regenerating server states cache. " .
158 "This may indicate improper connection handling in " . get_class( $this )
159 );
160 }
161
162 $updated = true;
163
164 return $this->computeServerStates(
165 $serverIndexes,
166 $domain,
167 $oldValue ?: $staleValue // fallback to local cache stale value
168 );
169 },
170 [
171 // One thread can update at a time; others use the old value
172 'lockTSE' => self::STATE_PRESERVE_TTL,
173 'staleTTL' => self::STATE_PRESERVE_TTL,
174 // If there is no shared stale value then use the local cache stale value;
175 // When even that is not possible, then use the trivial value below.
176 'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
177 ]
178 );
179
180 if ( $updated ) {
181 $this->replLogger->info( __METHOD__ . ": regenerated '$cluster' cluster status" );
182 } else {
183 $this->replLogger->debug( __METHOD__ . ": used cached '$cluster' cluster status" );
184 }
185
186 // Backfill the local server cache
187 if ( $scopedLock ) {
188 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
189 }
190
191 return $value;
192 }
193
201 protected function computeServerStates( array $serverIndexes, $domain, $priorStates ) {
202 // Check if there is just a primary DB (no replication involved)
203 if ( $this->lb->getServerCount() <= 1 ) {
204 return $this->getPlaceholderServerStates( $serverIndexes );
205 }
206
207 $priorScales = $priorStates ? $priorStates['weightScales'] : [];
208
209 $lagTimes = [];
210 $weightScales = [];
211 foreach ( $serverIndexes as $i ) {
212 $isPrimary = ( $i == $this->lb->getWriterIndex() );
213 // If the primary DB has zero load, then typical read queries do not use it.
214 // In that case, avoid connecting to it since this method might run in any
215 // datacenter, and the primary DB might be geographically remote.
216 if ( $isPrimary && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
217 $lagTimes[$i] = 0;
218 // Callers only use this DB if they have *no choice* anyway (e.g. writes)
219 $weightScales[$i] = 1.0;
220 continue;
221 }
222
223 $host = $this->lb->getServerName( $i );
224 # Handles with open transactions are avoided since they might be subject
225 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
227 $conn = $this->lb->getAnyOpenConnection( $i, $flags );
228 if ( $conn ) {
229 $close = false; // already open
230 } else {
231 // Get a connection to this server without triggering other server connections
232 $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
233 $close = true; // new connection
234 }
235
236 // Get new weight scale using a moving average of the naïve and prior values
237 $lastScale = $priorScales[$i] ?? 1.0;
238 $naiveScale = $this->getWeightScale( $i, $conn ?: null );
239 $newScale = $this->getNewScaleViaMovingAve(
240 $lastScale,
241 $naiveScale,
242 $this->movingAveRatio
243 );
244
245 // Scale from 0% to 100% of nominal weight (sanity)
246 $weightScales[$i] = max( $newScale, 0.0 );
247
248 // Mark replication lag on this server as "false" if it is unreacheable
249 if ( !$conn ) {
250 $lagTimes[$i] = $isPrimary ? 0 : false;
251 $this->replLogger->error(
252 __METHOD__ . ": host {db_server} is unreachable",
253 [ 'db_server' => $host ]
254 );
255 continue;
256 }
257
258 // Determine the amount of replication lag on this server
259 try {
260 $lagTimes[$i] = $conn->getLag();
261 } catch ( DBError $e ) {
262 // Mark the lag time as "false" if it cannot be queried
263 $lagTimes[$i] = false;
264 }
265
266 if ( $lagTimes[$i] === false ) {
267 $this->replLogger->error(
268 __METHOD__ . ": host {db_server} is not replicating?",
269 [ 'db_server' => $host ]
270 );
271 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
272 $this->replLogger->warning(
273 "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
274 [
275 'db_server' => $host,
276 'lag' => $lagTimes[$i],
277 'maxlag' => $this->lagWarnThreshold
278 ]
279 );
280 }
281
282 if ( $close ) {
283 # Close the connection to avoid sleeper connections piling up.
284 # Note that the caller will pick one of these DBs and reconnect,
285 # which is slightly inefficient, but this only matters for the lag
286 # time cache miss cache, which is far less common that cache hits.
287 $this->lb->closeConnection( $conn );
288 }
289 }
290
291 return [
292 'lagTimes' => $lagTimes,
293 'weightScales' => $weightScales,
294 'timestamp' => $this->getCurrentTime()
295 ];
296 }
297
302 private function getPlaceholderServerStates( array $serverIndexes ) {
303 return [
304 'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
305 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
306 'timestamp' => $this->getCurrentTime()
307 ];
308 }
309
316 protected function getWeightScale( $index, IDatabase $conn = null ) {
317 return $conn ? 1.0 : 0.0;
318 }
319
353 protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio ) {
354 return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
355 }
356
362 private function getStatesCacheKey( $cache, array $serverIndexes ) {
363 sort( $serverIndexes );
364 // Lag is per-server, not per-DB, so key on the primary DB name
365 return $cache->makeGlobalKey(
366 'rdbms-server-states',
367 self::VERSION,
368 $this->lb->getServerName( $this->lb->getWriterIndex() ),
369 implode( '-', $serverIndexes )
370 );
371 }
372
376 private function acquireServerStatesLoopGuard() {
377 if ( $this->serverStatesKeyLocked ) {
378 return null; // locked
379 }
380
381 $this->serverStatesKeyLocked = true; // lock
382
383 return new ScopedCallback( function () {
384 $this->serverStatesKeyLocked = false; // unlock
385 } );
386 }
387
392 protected function getCurrentTime() {
393 return $this->wallClockOverride ?: microtime( true );
394 }
395
400 public function setMockTime( &$time ) {
401 $this->wallClockOverride =& $time;
402 }
403}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:86
Multi-datacenter aware caching interface.
Database error base class @newable.
Definition DBError.php:32
const MAX_LAG_DEFAULT
Default 'maxLag' when unspecified.
Basic DB load monitor with no external dependencies.
getPlaceholderServerStates(array $serverIndexes)
getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio)
Get the moving average weight scale given a naive and the last iteration value.
scaleLoads(array &$weightByServer, $domain)
Perform load ratio adjustment before deciding which server to use.
LoggerInterface $replLogger
int $lagWarnThreshold
Amount of replication lag in seconds before warnings are logged.
computeServerStates(array $serverIndexes, $domain, $priorStates)
float $movingAveRatio
Moving average ratio (e.g.
bool $serverStatesKeyLocked
Whether the "server states" cache key is in the process of being updated.
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
getLagTimes(array $serverIndexes, $domain)
Get an estimate of replication lag (in seconds) for each server.
getServerStates(array $serverIndexes, $domain)
setLogger(LoggerInterface $logger)
getStatesCacheKey( $cache, array $serverIndexes)
getWeightScale( $index, IDatabase $conn=null)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
Database cluster connection, tracking, load balancing, and transaction manager interface.
const DOMAIN_ANY
Domain specifier when no specific database needs to be selected.
const CONN_TRX_AUTOCOMMIT
DB handle should have DBO_TRX disabled and the caller will leave it as such.
const CONN_SILENCE_ERRORS
Return null on connection failure instead of throwing an exception.
An interface for database load monitoring.
$cache
Definition mcc.php:33