MediaWiki master
LoadMonitor.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\Rdbms;
21
22use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23use Psr\Log\LoggerInterface;
24use Psr\Log\NullLogger;
25use RuntimeException;
30use Wikimedia\ScopedCallback;
32
41class LoadMonitor implements ILoadMonitor {
43 protected $lb;
45 protected $srvCache;
47 protected $wanCache;
49 protected $logger;
51 protected $statsd;
53 private $maxConnCount;
54 private int $totalConnectionsAdjustment;
55
57 private $wallClockOverride;
58
60 private $serverStatesKeyLocked = false;
61
63 private const VERSION = 3;
64
66 private const STATE_TARGET_TTL = 10;
68 private const STATE_PRESERVE_TTL = 60;
69
73 public function __construct( ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, $options ) {
74 $this->lb = $lb;
75 $this->srvCache = $srvCache;
76 $this->wanCache = $wCache;
77 $this->logger = new NullLogger();
78 $this->statsd = new NullStatsdDataFactory();
79 if ( isset( $options['maxConnCount'] ) ) {
80 $this->maxConnCount = (int)( $options['maxConnCount'] );
81 } else {
82 $this->maxConnCount = INF;
83 }
84
85 $this->totalConnectionsAdjustment = (int)( $options['totalConnectionsAdjustment'] ?? 10 );
86 }
87
88 public function setLogger( LoggerInterface $logger ) {
89 $this->logger = $logger;
90 }
91
92 public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) {
93 $this->statsd = $statsFactory;
94 }
95
96 public function scaleLoads( array &$weightByServer ) {
97 if ( count( $weightByServer ) <= 1 ) {
98 // Single-server group; relative adjustments are pointless
99 return;
100 }
101
102 $serverIndexes = array_keys( $weightByServer );
103 $stateByServerIndex = $this->getServerStates( $serverIndexes );
104 $totalConnections = 0;
105 $totalWeights = 0;
106 $circuitBreakingEnabled = true;
107 foreach ( $weightByServer as $i => $weight ) {
108 $serverState = $stateByServerIndex[$i];
109 $totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP];
110 $totalWeights += $weight;
111
112 // Set up circuit breaking. If at least one replica can take more connections
113 // allow the flow.
114 if (
115 $serverState[self::STATE_UP] &&
116 $weight > 0 &&
117 $serverState[self::STATE_CONN_COUNT] < $this->maxConnCount
118 ) {
119 $circuitBreakingEnabled = false;
120 }
121 }
122
123 if ( $circuitBreakingEnabled ) {
124 throw new DBUnexpectedError(
125 null, 'Database servers in ' . $this->lb->getClusterName() . ' are overloaded. ' .
126 'In order to protect application servers, the circuit breaking to databases of this section ' .
127 'have been activated. Please try again a few seconds.'
128 );
129 }
130
131 foreach ( $weightByServer as $i => $weight ) {
132 if (
133 // host is down or
134 !$stateByServerIndex[$i][self::STATE_UP] ||
135 // host is primary or explicitly set to zero
136 $weight <= 0
137 ) {
138 $weightByServer[$i] = 0;
139 continue;
140 }
141 $connRatio = $stateByServerIndex[$i][self::STATE_CONN_COUNT] /
142 ( $totalConnections + $this->totalConnectionsAdjustment );
143 $weightRatio = $weight / $totalWeights;
144 $diffRatio = $connRatio - $weightRatio;
145 $adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 );
146 $weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio );
147 }
148 }
149
150 protected function getServerStates( array $serverIndexes ): array {
151 $stateByServerIndex = array_fill_keys( $serverIndexes, null );
152 // Perform any cache regenerations in randomized order so that the
153 // DB servers will each have similarly up-to-date state cache entries.
154 $shuffledServerIndexes = $serverIndexes;
155 shuffle( $shuffledServerIndexes );
156
157 foreach ( $shuffledServerIndexes as $i ) {
158 $key = $this->makeStateKey( $this->srvCache, $i );
159 $state = $this->srvCache->get( $key ) ?: null;
160 if ( $this->isStateFresh( $state ) ) {
161 $this->logger->debug(
162 __METHOD__ . ": fresh local cache hit for '{db_server}'",
163 [ 'db_server' => $this->lb->getServerName( $i ) ]
164 );
165 } else {
166 $lock = $this->srvCache->getScopedLock( $key, 0, 10 );
167 if ( $lock || !$state ) {
168 $state = $this->getStateFromWanCache( $i, $state );
169 $this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL );
170 }
171 }
172 $stateByServerIndex[$i] = $state;
173 }
174 return $stateByServerIndex;
175 }
176
177 protected function getStateFromWanCache( $i, ?array $srvPrevState ) {
178 $hit = true;
179 $key = $this->makeStateKey( $this->wanCache, $i );
180 $state = $this->wanCache->getWithSetCallback(
181 $key,
182 self::STATE_PRESERVE_TTL,
183 function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) {
184 $prevState = $wanPrevState ?: $srvPrevState ?: null;
185 $hit = false;
186 return $this->computeServerState( $i, $prevState );
187 },
188 [ 'lockTSE' => 30 ]
189 );
190 if ( $hit ) {
191 $this->logger->debug(
192 __METHOD__ . ": WAN cache hit for '{db_server}'",
193 [ 'db_server' => $this->lb->getServerName( $i ) ]
194 );
195 } else {
196 $this->logger->info(
197 __METHOD__ . ": mutex acquired; regenerated cache for '{db_server}'",
198 [ 'db_server' => $this->lb->getServerName( $i ) ]
199 );
200 }
201 return $state;
202 }
203
204 protected function makeStateKey( IStoreKeyEncoder $cache, int $i ) {
205 return $cache->makeGlobalKey(
206 'rdbms-gauge',
207 self::VERSION,
208 $this->lb->getClusterName(),
209 $this->lb->getServerName( ServerInfo::WRITER_INDEX ),
210 $this->lb->getServerName( $i )
211 );
212 }
213
221 protected function computeServerState( int $i, ?array $previousState ) {
222 $startTime = $this->getCurrentTime();
223 // Double check for circular recursion in computeServerStates()/getWeightScale().
224 // Mainly, connection attempts should use LoadBalancer::getServerConnection()
225 // rather than something that will pick a server based on the server states.
226 $this->acquireServerStatesLoopGuard();
227
228 $cluster = $this->lb->getClusterName();
229 $serverName = $this->lb->getServerName( $i );
230 $statServerName = str_replace( '.', '_', $serverName );
231
232 $newState = $this->newInitialServerState();
233
234 if ( $this->lb->getServerInfo( $i )['load'] <= 0 ) {
235 // Callers only use this server when they have *no choice* anyway (e.g. primary)
236 $newState[self::STATE_AS_OF] = $this->getCurrentTime();
237 // Avoid connecting, especially since it might reside in a remote datacenter
238 return $newState;
239 }
240
241 // Get a new, untracked, connection in order to gauge server health
243 // Get a connection to this server without triggering other server connections
244 $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
245 // Determine the number of open connections in this server
246 if ( $conn ) {
247 try {
248 $connCount = $this->getConnCountForDb( $conn );
249 } catch ( DBError $e ) {
250 $connCount = false;
251 }
252 } else {
253 $connCount = false;
254 }
255 // Only keep one connection open at a time
256 if ( $conn ) {
257 $conn->close( __METHOD__ );
258 }
259
260 $endTime = $this->getCurrentTime();
261 $newState[self::STATE_AS_OF] = $endTime;
262 $newState[self::STATE_CONN_COUNT] = $connCount;
263 $newState[self::STATE_UP] = $conn ? 1.0 : 0.0;
264 if ( $previousState ) {
265 $newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2;
266 }
267
268 if ( $connCount === false ) {
269 $this->logger->error(
270 __METHOD__ . ": host {db_server} is not up?",
271 [ 'db_server' => $serverName ]
272 );
273 } else {
274 $this->statsd->timing( "loadbalancer.connCount.$cluster.$statServerName", $connCount );
275 if ( $connCount > $this->maxConnCount ) {
276 $this->logger->warning(
277 "Server {db_server} has {conn_count} open connections (>= {max_conn})",
278 [
279 'db_server' => $serverName,
280 'conn_count' => $connCount,
281 'max_conn' => $this->maxConnCount
282 ]
283 );
284 }
285 }
286
287 return $newState;
288 }
289
294 protected function newInitialServerState() {
295 return [
296 // Moving average of connectivity; treat as good
297 self::STATE_UP => 1.0,
298 // Number of connections to that replica; treat as none
299 self::STATE_CONN_COUNT => 0,
300 // UNIX timestamp of state generation completion; treat as "outdated"
301 self::STATE_AS_OF => 0.0,
302 ];
303 }
304
309 private function isStateFresh( $state ) {
310 if ( !$state ) {
311 return false;
312 }
313 return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL;
314 }
315
319 private function acquireServerStatesLoopGuard() {
320 if ( $this->serverStatesKeyLocked ) {
321 throw new RuntimeException(
322 "Circular recursion detected while regenerating server states cache. " .
323 "This may indicate improper connection handling in " . get_class( $this )
324 );
325 }
326
327 $this->serverStatesKeyLocked = true; // lock
328
329 return new ScopedCallback( function () {
330 $this->serverStatesKeyLocked = false; // unlock
331 } );
332 }
333
338 protected function getCurrentTime() {
339 return $this->wallClockOverride ?: microtime( true );
340 }
341
346 public function setMockTime( &$time ) {
347 $this->wallClockOverride =& $time;
348 }
349
350 private function getConnCountForDb( IDatabase $conn ): int {
351 if ( $conn->getType() !== 'mysql' ) {
352 return 0;
353 }
354 $query = new Query(
355 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST',
356 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
357 'SELECT',
358 null,
359 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST'
360 );
361 $res = $conn->query( $query, __METHOD__ );
362 $row = $res ? $res->fetchObject() : false;
363 return $row ? (int)$row->pcount : 0;
364 }
365}
if(!defined('MW_SETUP_CALLBACK'))
Definition WebStart.php:81
Abstract class for any ephemeral data store.
Definition BagOStuff.php:89
Multi-datacenter aware caching interface.
Database error base class.
Definition DBError.php:36
Basic DB load monitor with no external dependencies.
makeStateKey(IStoreKeyEncoder $cache, int $i)
getStateFromWanCache( $i, ?array $srvPrevState)
setStatsdDataFactory(StatsdDataFactoryInterface $statsFactory)
Sets a StatsdDataFactory instance on the object.
StatsdDataFactoryInterface $statsd
getServerStates(array $serverIndexes)
scaleLoads(array &$weightByServer)
Perform load ratio adjustment before deciding which server to use.
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, $options)
Construct a new LoadMonitor with a given LoadBalancer parent.
setLogger(LoggerInterface $logger)
computeServerState(int $i, ?array $previousState)
Key-encoding methods for object caching (BagOStuff and WANObjectCache)
makeGlobalKey( $keygroup,... $components)
Interface to a relational database.
Definition IDatabase.php:48
This class is a delegate to ILBFactory for a given database cluster.
const CONN_UNTRACKED_GAUGE
Yield an untracked, low-timeout, autocommit-mode handle (to gauge server health)
const DOMAIN_ANY
Domain specifier when no specific database needs to be selected.
const CONN_SILENCE_ERRORS
Yield null on connection failure instead of throwing an exception.
Database load monitoring interface.
Interface for query language.