MediaWiki REL1_35
LoadMonitor.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use BagOStuff;
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
27use RuntimeException;
29use Wikimedia\ScopedCallback;
30
41class LoadMonitor implements ILoadMonitor {
43 protected $lb;
45 protected $srvCache;
47 protected $wanCache;
49 protected $replLogger;
50
55
58
60 private $serverStatesKeyLocked = false;
61
63 private const LAG_WARN_THRESHOLD = 10;
64
66 private const VERSION = 1;
68 private const POLL_PERIOD_MS = 500;
70 private const STATE_PRESERVE_TTL = 60;
72 private const TIME_TILL_REFRESH = 1;
73
82 public function __construct(
83 ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
84 ) {
85 $this->lb = $lb;
86 $this->srvCache = $srvCache;
87 $this->wanCache = $wCache;
88 $this->replLogger = new NullLogger();
89
90 $this->movingAveRatio = $options['movingAveRatio'] ?? 0.1;
91 $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? self::LAG_WARN_THRESHOLD;
92 }
93
94 public function setLogger( LoggerInterface $logger ) {
95 $this->replLogger = $logger;
96 }
97
98 final public function scaleLoads( array &$weightByServer, $domain ) {
99 $serverIndexes = array_keys( $weightByServer );
100 $states = $this->getServerStates( $serverIndexes, $domain );
101 $newScalesByServer = $states['weightScales'];
102 foreach ( $weightByServer as $i => $weight ) {
103 if ( isset( $newScalesByServer[$i] ) ) {
104 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
105 } else { // server recently added to config?
106 $host = $this->lb->getServerName( $i );
107 $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
108 }
109 }
110 }
111
112 final public function getLagTimes( array $serverIndexes, $domain ) {
113 return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
114 }
115
122 protected function getServerStates( array $serverIndexes, $domain ) {
123 // Represent the cluster by the name of the master DB
124 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
125
126 // Randomize logical TTLs to reduce stampedes
127 $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
128 $minAsOfTime = $this->getCurrentTime() - $ageStaleSec;
129
130 // (a) Check the local server cache
131 $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
132 $value = $this->srvCache->get( $srvCacheKey );
133 if ( $value && $value['timestamp'] > $minAsOfTime ) {
134 $this->replLogger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" );
135
136 return $value; // cache hit
137 }
138
139 // (b) Value is stale/missing; try to use/refresh the shared cache
140 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
141 if ( !$scopedLock && $value ) {
142 $this->replLogger->debug( __METHOD__ . ": used stale '$cluster' cluster status" );
143 // (b1) Another thread on this server is already checking the shared cache
144 return $value;
145 }
146
147 // (b2) This thread gets to check the shared cache or (b3) value is missing
148 $staleValue = $value;
149 $updated = false; // whether the regeneration callback ran
150 $value = $this->wanCache->getWithSetCallback(
151 $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
152 self::TIME_TILL_REFRESH, // 1 second logical expiry
153 function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
154 // Sanity check for circular recursion in computeServerStates()/getWeightScale().
155 // Mainly, connection attempts should use LoadBalancer::getServerConnection()
156 // rather than something that will pick a server based on the server states.
157 $scopedLock = $this->acquireServerStatesLoopGuard();
158 if ( !$scopedLock ) {
159 throw new RuntimeException(
160 "Circular recursion detected while regenerating server states cache. " .
161 "This may indicate improper connection handling in " . get_class( $this )
162 );
163 }
164
165 $updated = true;
166
167 return $this->computeServerStates(
168 $serverIndexes,
169 $domain,
170 $oldValue ?: $staleValue // fallback to local cache stale value
171 );
172 },
173 [
174 // One thread can update at a time; others use the old value
175 'lockTSE' => self::STATE_PRESERVE_TTL,
176 'staleTTL' => self::STATE_PRESERVE_TTL,
177 // If there is no shared stale value then use the local cache stale value;
178 // When even that is not possible, then use the trivial value below.
179 'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
180 ]
181 );
182
183 if ( $updated ) {
184 $this->replLogger->info( __METHOD__ . ": regenerated '$cluster' cluster status" );
185 } else {
186 $this->replLogger->debug( __METHOD__ . ": used cached '$cluster' cluster status" );
187 }
188
189 // Backfill the local server cache
190 if ( $scopedLock ) {
191 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
192 }
193
194 return $value;
195 }
196
204 protected function computeServerStates( array $serverIndexes, $domain, $priorStates ) {
205 // Check if there is just a master DB (no replication involved)
206 if ( $this->lb->getServerCount() <= 1 ) {
207 return $this->getPlaceholderServerStates( $serverIndexes );
208 }
209
210 $priorScales = $priorStates ? $priorStates['weightScales'] : [];
211
212 $lagTimes = [];
213 $weightScales = [];
214 foreach ( $serverIndexes as $i ) {
215 $isMaster = ( $i == $this->lb->getWriterIndex() );
216 // If the master DB has zero load, then typical read queries do not use it.
217 // In that case, avoid connecting to it since this method might run in any
218 // datacenter, and the master DB might be geographically remote.
219 if ( $isMaster && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
220 $lagTimes[$i] = 0;
221 // Callers only use this DB if they have *no choice* anyway (e.g. writes)
222 $weightScales[$i] = 1.0;
223 continue;
224 }
225
226 $host = $this->lb->getServerName( $i );
227 # Handles with open transactions are avoided since they might be subject
228 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
229 $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT | ILoadBalancer::CONN_SILENCE_ERRORS;
230 $conn = $this->lb->getAnyOpenConnection( $i, $flags );
231 if ( $conn ) {
232 $close = false; // already open
233 } else {
234 // Get a connection to this server without triggering other server connections
235 $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
236 $close = true; // new connection
237 }
238
239 // Get new weight scale using a moving average of the naïve and prior values
240 $lastScale = $priorScales[$i] ?? 1.0;
241 $naiveScale = $this->getWeightScale( $i, $conn ?: null );
242 $newScale = $this->getNewScaleViaMovingAve(
243 $lastScale,
244 $naiveScale,
245 $this->movingAveRatio
246 );
247
248 // Scale from 0% to 100% of nominal weight (sanity)
249 $weightScales[$i] = max( $newScale, 0.0 );
250
251 // Mark replication lag on this server as "false" if it is unreacheable
252 if ( !$conn ) {
253 $lagTimes[$i] = $isMaster ? 0 : false;
254 $this->replLogger->error(
255 __METHOD__ . ": host {db_server} is unreachable",
256 [ 'db_server' => $host ]
257 );
258 continue;
259 }
260
261 // Determine the amount of replication lag on this server
262 try {
263 $lagTimes[$i] = $conn->getLag();
264 } catch ( DBError $e ) {
265 // Mark the lag time as "false" if it cannot be queried
266 $lagTimes[$i] = false;
267 }
268
269 if ( $lagTimes[$i] === false ) {
270 $this->replLogger->error(
271 __METHOD__ . ": host {db_server} is not replicating?",
272 [ 'db_server' => $host ]
273 );
274 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
275 $this->replLogger->warning(
276 "Server {dbserver} has {lag} seconds of lag (>= {maxlag})",
277 [
278 'dbserver' => $host,
279 'lag' => $lagTimes[$i],
280 'maxlag' => $this->lagWarnThreshold
281 ]
282 );
283 }
284
285 if ( $close ) {
286 # Close the connection to avoid sleeper connections piling up.
287 # Note that the caller will pick one of these DBs and reconnect,
288 # which is slightly inefficient, but this only matters for the lag
289 # time cache miss cache, which is far less common that cache hits.
290 $this->lb->closeConnection( $conn );
291 }
292 }
293
294 return [
295 'lagTimes' => $lagTimes,
296 'weightScales' => $weightScales,
297 'timestamp' => $this->getCurrentTime()
298 ];
299 }
300
305 private function getPlaceholderServerStates( array $serverIndexes ) {
306 return [
307 'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
308 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
309 'timestamp' => $this->getCurrentTime()
310 ];
311 }
312
319 protected function getWeightScale( $index, IDatabase $conn = null ) {
320 return $conn ? 1.0 : 0.0;
321 }
322
356 protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio ) {
357 return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
358 }
359
365 private function getStatesCacheKey( $cache, array $serverIndexes ) {
366 sort( $serverIndexes );
367 // Lag is per-server, not per-DB, so key on the master DB name
368 return $cache->makeGlobalKey(
369 'rdbms-server-states',
370 self::VERSION,
371 $this->lb->getServerName( $this->lb->getWriterIndex() ),
372 implode( '-', $serverIndexes )
373 );
374 }
375
379 private function acquireServerStatesLoopGuard() {
380 if ( $this->serverStatesKeyLocked ) {
381 return null; // locked
382 }
383
384 $this->serverStatesKeyLocked = true; // lock
385
386 return new ScopedCallback( function () {
387 $this->serverStatesKeyLocked = false; // unlock
388 } );
389 }
390
395 protected function getCurrentTime() {
396 return $this->wallClockOverride ?: microtime( true );
397 }
398
403 public function setMockTime( &$time ) {
404 $this->wallClockOverride =& $time;
405 }
406}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:71
Multi-datacenter aware caching interface.
Database error base class @newable Stable to extend.
Definition DBError.php:32
Basic DB load monitor with no external dependencies.
getPlaceholderServerStates(array $serverIndexes)
getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio)
Get the moving average weight scale given a naive and the last iteration value.
scaleLoads(array &$weightByServer, $domain)
Perform load ratio adjustment before deciding which server to use.
LoggerInterface $replLogger
int $lagWarnThreshold
Amount of replication lag in seconds before warnings are logged.
computeServerStates(array $serverIndexes, $domain, $priorStates)
float $movingAveRatio
Moving average ratio (e.g.
bool $serverStatesKeyLocked
Whether the "server states" cache key is in the process of being updated.
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
getLagTimes(array $serverIndexes, $domain)
Get an estimate of replication lag (in seconds) for each server.
getServerStates(array $serverIndexes, $domain)
setLogger(LoggerInterface $logger)
getStatesCacheKey( $cache, array $serverIndexes)
getWeightScale( $index, IDatabase $conn=null)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
Database cluster connection, tracking, load balancing, and transaction manager interface.
An interface for database load monitoring.
$cache
Definition mcc.php:33