MediaWiki REL1_39
LoadMonitor.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\Rdbms;
21
22use BagOStuff;
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
27use RuntimeException;
29use Wikimedia\ScopedCallback;
30
41class LoadMonitor implements ILoadMonitor {
43 protected $lb;
45 protected $srvCache;
47 protected $wanCache;
49 protected $replLogger;
51 protected $statsd;
52
54 private $movingAveRatio;
56 private $lagWarnThreshold;
57
59 private $wallClockOverride;
60
62 private $serverStatesKeyLocked = false;
63
65 private const VERSION = 1;
67 private const POLL_PERIOD_MS = 500;
69 private const STATE_PRESERVE_TTL = 60;
71 private const TIME_TILL_REFRESH = 1;
72
81 public function __construct(
82 ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
83 ) {
84 $this->lb = $lb;
85 $this->srvCache = $srvCache;
86 $this->wanCache = $wCache;
87 $this->replLogger = new NullLogger();
88 $this->statsd = new NullStatsdDataFactory();
89
90 $this->movingAveRatio = $options['movingAveRatio'] ?? 0.1;
91 $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? LoadBalancer::MAX_LAG_DEFAULT;
92 }
93
94 public function setLogger( LoggerInterface $logger ) {
95 $this->replLogger = $logger;
96 }
97
98 public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) {
99 $this->statsd = $statsFactory;
100 }
101
102 final public function scaleLoads( array &$weightByServer, $domain ) {
103 $serverIndexes = array_keys( $weightByServer );
104 $states = $this->getServerStates( $serverIndexes, $domain );
105 $newScalesByServer = $states['weightScales'];
106 foreach ( $weightByServer as $i => $weight ) {
107 if ( isset( $newScalesByServer[$i] ) ) {
108 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
109 } else { // server recently added to config?
110 $host = $this->lb->getServerName( $i );
111 $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
112 }
113 }
114 }
115
116 final public function getLagTimes( array $serverIndexes, $domain ) {
117 return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
118 }
119
126 protected function getServerStates( array $serverIndexes, $domain ) {
127 // Represent the cluster by the name of the primary DB
128 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
129
130 // Randomize logical TTLs to reduce stampedes
131 $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
132 $minAsOfTime = $this->getCurrentTime() - $ageStaleSec;
133
134 // (a) Check the local server cache
135 $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
136 $value = $this->srvCache->get( $srvCacheKey );
137 if ( $value && $value['timestamp'] > $minAsOfTime ) {
138 $this->replLogger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" );
139
140 return $value; // cache hit
141 }
142
143 // (b) Value is stale/missing; try to use/refresh the shared cache
144 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
145 if ( !$scopedLock && $value ) {
146 $this->replLogger->debug( __METHOD__ . ": used stale '$cluster' cluster status" );
147 // (b1) Another thread on this server is already checking the shared cache
148 return $value;
149 }
150
151 // (b2) This thread gets to check the shared cache or (b3) value is missing
152 $staleValue = $value;
153 $updated = false; // whether the regeneration callback ran
154 $value = $this->wanCache->getWithSetCallback(
155 $this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
156 self::TIME_TILL_REFRESH, // 1 second logical expiry
157 function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
158 // Double check for circular recursion in computeServerStates()/getWeightScale().
159 // Mainly, connection attempts should use LoadBalancer::getServerConnection()
160 // rather than something that will pick a server based on the server states.
161 $scopedLock = $this->acquireServerStatesLoopGuard();
162 if ( !$scopedLock ) {
163 throw new RuntimeException(
164 "Circular recursion detected while regenerating server states cache. " .
165 "This may indicate improper connection handling in " . get_class( $this )
166 );
167 }
168
169 $updated = true;
170
171 return $this->computeServerStates(
172 $serverIndexes,
173 $domain,
174 $oldValue ?: $staleValue // fallback to local cache stale value
175 );
176 },
177 [
178 // One thread can update at a time; others use the old value
179 'lockTSE' => self::STATE_PRESERVE_TTL,
180 'staleTTL' => self::STATE_PRESERVE_TTL,
181 // If there is no shared stale value then use the local cache stale value;
182 // When even that is not possible, then use the trivial value below.
183 'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
184 ]
185 );
186
187 if ( $updated ) {
188 $this->replLogger->info( __METHOD__ . ": regenerated '$cluster' cluster status" );
189 } else {
190 $this->replLogger->debug( __METHOD__ . ": used cached '$cluster' cluster status" );
191 }
192
193 // Backfill the local server cache
194 if ( $scopedLock ) {
195 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
196 }
197
198 return $value;
199 }
200
208 protected function computeServerStates( array $serverIndexes, $domain, $priorStates ) {
209 // Check if there is just a primary DB (no replication involved)
210 if ( $this->lb->getServerCount() <= 1 ) {
211 return $this->getPlaceholderServerStates( $serverIndexes );
212 }
213
214 $priorScales = $priorStates ? $priorStates['weightScales'] : [];
215 $cluster = $this->lb->getClusterName();
216
217 $lagTimes = [];
218 $weightScales = [];
219 foreach ( $serverIndexes as $i ) {
220 $isPrimary = ( $i == $this->lb->getWriterIndex() );
221 // If the primary DB has zero load, then typical read queries do not use it.
222 // In that case, avoid connecting to it since this method might run in any
223 // datacenter, and the primary DB might be geographically remote.
224 if ( $isPrimary && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
225 $lagTimes[$i] = 0;
226 // Callers only use this DB if they have *no choice* anyway (e.g. writes)
227 $weightScales[$i] = 1.0;
228 continue;
229 }
230
231 $host = $this->lb->getServerName( $i );
232 # Handles with open transactions are avoided since they might be subject
233 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
235 $conn = $this->lb->getAnyOpenConnection( $i, $flags );
236 if ( $conn ) {
237 $close = false; // already open
238 } else {
239 // Get a connection to this server without triggering other server connections
240 $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
241 $close = true; // new connection
242 }
243
244 // Get new weight scale using a moving average of the naïve and prior values
245 $lastScale = $priorScales[$i] ?? 1.0;
246 $naiveScale = $this->getWeightScale( $i, $conn ?: null );
247 $newScale = $this->getNewScaleViaMovingAve(
248 $lastScale,
249 $naiveScale,
250 $this->movingAveRatio
251 );
252 // Scale from 0% to 100% of nominal weight
253 $newScale = max( $newScale, 0.0 );
254
255 $weightScales[$i] = $newScale;
256 $statHost = str_replace( '.', '_', $host );
257 $this->statsd->gauge( "loadbalancer.weight.$cluster.$statHost", $newScale );
258
259 // Mark replication lag on this server as "false" if it is unreachable
260 if ( !$conn ) {
261 $lagTimes[$i] = $isPrimary ? 0 : false;
262 $this->replLogger->error(
263 __METHOD__ . ": host {db_server} is unreachable",
264 [ 'db_server' => $host ]
265 );
266 continue;
267 }
268
269 // Determine the amount of replication lag on this server
270 try {
271 $lag = $conn->getLag();
272 } catch ( DBError $e ) {
273 // Mark the lag time as "false" if it cannot be queried
274 $lag = false;
275 }
276 $lagTimes[$i] = $lag;
277
278 if ( $lag === false ) {
279 $this->replLogger->error(
280 __METHOD__ . ": host {db_server} is not replicating?",
281 [ 'db_server' => $host ]
282 );
283 } else {
284 $this->statsd->timing( "loadbalancer.lag.$cluster.$statHost", $lag * 1000 );
285 if ( $lag > $this->lagWarnThreshold ) {
286 $this->replLogger->warning(
287 "Server {db_server} has {lag} seconds of lag (>= {maxlag})",
288 [
289 'db_server' => $host,
290 'lag' => $lag,
291 'maxlag' => $this->lagWarnThreshold
292 ]
293 );
294 }
295 }
296
297 if ( $close ) {
298 # Close the connection to avoid sleeper connections piling up.
299 # Note that the caller will pick one of these DBs and reconnect,
300 # which is slightly inefficient, but this only matters for the lag
301 # time cache miss cache, which is far less common that cache hits.
302 $this->lb->closeConnection( $conn );
303 }
304 }
305
306 return [
307 'lagTimes' => $lagTimes,
308 'weightScales' => $weightScales,
309 'timestamp' => $this->getCurrentTime()
310 ];
311 }
312
317 private function getPlaceholderServerStates( array $serverIndexes ) {
318 return [
319 'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
320 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
321 'timestamp' => $this->getCurrentTime()
322 ];
323 }
324
331 protected function getWeightScale( $index, IDatabase $conn = null ) {
332 return $conn ? 1.0 : 0.0;
333 }
334
368 protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio ) {
369 return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
370 }
371
377 private function getStatesCacheKey( $cache, array $serverIndexes ) {
378 sort( $serverIndexes );
379 // Lag is per-server, not per-DB, so key on the primary DB name
380 return $cache->makeGlobalKey(
381 'rdbms-server-states',
382 self::VERSION,
383 $this->lb->getServerName( $this->lb->getWriterIndex() ),
384 implode( '-', $serverIndexes )
385 );
386 }
387
391 private function acquireServerStatesLoopGuard() {
392 if ( $this->serverStatesKeyLocked ) {
393 return null; // locked
394 }
395
396 $this->serverStatesKeyLocked = true; // lock
397
398 return new ScopedCallback( function () {
399 $this->serverStatesKeyLocked = false; // unlock
400 } );
401 }
402
407 protected function getCurrentTime() {
408 return $this->wallClockOverride ?: microtime( true );
409 }
410
415 public function setMockTime( &$time ) {
416 $this->wallClockOverride =& $time;
417 }
418}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:85
Multi-datacenter aware caching interface.
Database error base class.
Definition DBError.php:31
const MAX_LAG_DEFAULT
Default 'maxLag' when unspecified.
Basic DB load monitor with no external dependencies.
getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio)
Get the moving average weight scale given a naive and the last iteration value.
scaleLoads(array &$weightByServer, $domain)
Perform load ratio adjustment before deciding which server to use.
LoggerInterface $replLogger
setStatsdDataFactory(StatsdDataFactoryInterface $statsFactory)
Sets a StatsdDataFactory instance on the object.
computeServerStates(array $serverIndexes, $domain, $priorStates)
StatsdDataFactoryInterface $statsd
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
getLagTimes(array $serverIndexes, $domain)
Get an estimate of replication lag (in seconds) for each server.
getServerStates(array $serverIndexes, $domain)
setLogger(LoggerInterface $logger)
getWeightScale( $index, IDatabase $conn=null)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:39
Create and track the database connections and transactions for a given database cluster.
const DOMAIN_ANY
Domain specifier when no specific database needs to be selected.
const CONN_TRX_AUTOCOMMIT
DB handle should have DBO_TRX disabled and the caller will leave it as such.
const CONN_SILENCE_ERRORS
Return null on connection failure instead of throwing an exception.
Database load monitoring interface.
$cache
Definition mcc.php:33