MediaWiki master
LoadMonitor.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\Rdbms;
21
22use BagOStuff;
24use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26use Psr\Log\LoggerInterface;
27use Psr\Log\NullLogger;
28use RuntimeException;
31use Wikimedia\ScopedCallback;
32
41class LoadMonitor implements ILoadMonitor {
43 protected $lb;
45 protected $srvCache;
47 protected $wanCache;
49 protected $logger;
51 protected $statsd;
52 private int $maxConnCount;
53 private int $totalConnectionsAdjustment;
54
56 private $wallClockOverride;
57
59 private $serverStatesKeyLocked = false;
60
62 private const VERSION = 3;
63
65 private const STATE_TARGET_TTL = 10;
67 private const STATE_PRESERVE_TTL = 60;
68
72 public function __construct( ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, $options ) {
73 $this->lb = $lb;
74 $this->srvCache = $srvCache;
75 $this->wanCache = $wCache;
76 $this->logger = new NullLogger();
77 $this->statsd = new NullStatsdDataFactory();
78 $this->maxConnCount = (int)( $options['maxConnCount'] ?? 500 );
79 $this->totalConnectionsAdjustment = (int)( $options['totalConnectionsAdjustment'] ?? 10 );
80 }
81
82 public function setLogger( LoggerInterface $logger ) {
83 $this->logger = $logger;
84 }
85
86 public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) {
87 $this->statsd = $statsFactory;
88 }
89
90 public function scaleLoads( array &$weightByServer ) {
91 if ( count( $weightByServer ) <= 1 ) {
92 // Single-server group; relative adjustments are pointless
93 return;
94 }
95
96 $serverIndexes = array_keys( $weightByServer );
97 $stateByServerIndex = $this->getServerStates( $serverIndexes );
98 $totalConnections = 0;
99 $totalWeights = 0;
100 foreach ( $weightByServer as $i => $weight ) {
101 $serverState = $stateByServerIndex[$i];
102 $totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP];
103 $totalWeights += $weight;
104 }
105 foreach ( $weightByServer as $i => $weight ) {
106 if (
107 // host is down or
108 !$stateByServerIndex[$i][self::STATE_UP] ||
109 // host is primary or explicitly set to zero
110 $weight <= 0
111 ) {
112 $weightByServer[$i] = 0;
113 continue;
114 }
115 $connRatio = $stateByServerIndex[$i][self::STATE_CONN_COUNT] /
116 ( $totalConnections + $this->totalConnectionsAdjustment );
117 $weightRatio = $weight / $totalWeights;
118 $diffRatio = $connRatio - $weightRatio;
119 $adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 );
120 $weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio );
121 }
122 }
123
124 protected function getServerStates( array $serverIndexes ): array {
125 $stateByServerIndex = array_fill_keys( $serverIndexes, null );
126 // Perform any cache regenerations in randomized order so that the
127 // DB servers will each have similarly up-to-date state cache entries.
128 $shuffledServerIndexes = $serverIndexes;
129 shuffle( $shuffledServerIndexes );
130
131 foreach ( $shuffledServerIndexes as $i ) {
132 $key = $this->makeStateKey( $this->srvCache, $i );
133 $state = $this->srvCache->get( $key ) ?: null;
134 if ( $this->isStateFresh( $state ) ) {
135 $this->logger->debug(
136 __METHOD__ . ": fresh local cache hit for '{db_server}'",
137 [ 'db_server' => $this->lb->getServerName( $i ) ]
138 );
139 } else {
140 $lock = $this->srvCache->getScopedLock( $key, 0, 10 );
141 if ( $lock || !$state ) {
142 $state = $this->getStateFromWanCache( $i, $state );
143 $this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL );
144 }
145 }
146 $stateByServerIndex[$i] = $state;
147 }
148 return $stateByServerIndex;
149 }
150
151 protected function getStateFromWanCache( $i, ?array $srvPrevState ) {
152 $hit = true;
153 $key = $this->makeStateKey( $this->wanCache, $i );
154 $state = $this->wanCache->getWithSetCallback(
155 $key,
156 self::STATE_PRESERVE_TTL,
157 function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) {
158 $prevState = $wanPrevState ?: $srvPrevState ?: null;
159 $hit = false;
160 return $this->computeServerState( $i, $prevState );
161 },
162 [ 'lockTSE' => 30 ]
163 );
164 if ( $hit ) {
165 $this->logger->debug(
166 __METHOD__ . ": WAN cache hit for '{db_server}'",
167 [ 'db_server' => $this->lb->getServerName( $i ) ]
168 );
169 } else {
170 $this->logger->info(
171 __METHOD__ . ": mutex acquired; regenerated cache for '{db_server}'",
172 [ 'db_server' => $this->lb->getServerName( $i ) ]
173 );
174 }
175 return $state;
176 }
177
178 protected function makeStateKey( IStoreKeyEncoder $cache, int $i ) {
179 return $cache->makeGlobalKey(
180 'rdbms-gauge',
181 self::VERSION,
182 $this->lb->getClusterName(),
183 $this->lb->getServerName( $this->lb->getWriterIndex() ),
184 $this->lb->getServerName( $i )
185 );
186 }
187
195 protected function computeServerState( int $i, ?array $previousState ) {
196 $startTime = $this->getCurrentTime();
197 // Double check for circular recursion in computeServerStates()/getWeightScale().
198 // Mainly, connection attempts should use LoadBalancer::getServerConnection()
199 // rather than something that will pick a server based on the server states.
200 $this->acquireServerStatesLoopGuard();
201
202 $cluster = $this->lb->getClusterName();
203 $serverName = $this->lb->getServerName( $i );
204 $statServerName = str_replace( '.', '_', $serverName );
205
206 $newState = $this->newInitialServerState();
207
208 if ( $this->lb->getServerInfo( $i )['load'] <= 0 ) {
209 // Callers only use this server when they have *no choice* anyway (e.g. primary)
210 $newState[self::STATE_AS_OF] = $this->getCurrentTime();
211 // Avoid connecting, especially since it might reside in a remote datacenter
212 return $newState;
213 }
214
215 // Get a new, untracked, connection in order to gauge server health
217 // Get a connection to this server without triggering other server connections
218 $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
219 // Determine the number of open connections in this server
220 if ( $conn ) {
221 try {
222 $connCount = $this->getConnCountForDb( $conn );
223 } catch ( DBError $e ) {
224 $connCount = false;
225 }
226 } else {
227 $connCount = false;
228 }
229 // Only keep one connection open at a time
230 if ( $conn ) {
231 $conn->close( __METHOD__ );
232 }
233
234 $endTime = $this->getCurrentTime();
235 $newState[self::STATE_AS_OF] = $endTime;
236 $newState[self::STATE_CONN_COUNT] = $connCount;
237 $newState[self::STATE_UP] = $conn ? 1.0 : 0.0;
238 if ( $previousState ) {
239 $newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2;
240 }
241
242 if ( $connCount === false ) {
243 $this->logger->error(
244 __METHOD__ . ": host {db_server} is not up?",
245 [ 'db_server' => $serverName ]
246 );
247 } else {
248 $this->statsd->timing( "loadbalancer.connCount.$cluster.$statServerName", $connCount );
249 if ( $connCount > $this->maxConnCount ) {
250 $this->logger->warning(
251 "Server {db_server} has {conn_count} open connections (>= {max_conn})",
252 [
253 'db_server' => $serverName,
254 'conn_count' => $connCount,
255 'max_conn' => $this->maxConnCount
256 ]
257 );
258 }
259 }
260
261 return $newState;
262 }
263
268 protected function newInitialServerState() {
269 return [
270 // Moving average of connectivity; treat as good
271 self::STATE_UP => 1.0,
272 // Number of connections to that replica; treat as none
273 self::STATE_CONN_COUNT => 0,
274 // UNIX timestamp of state generation completion; treat as "outdated"
275 self::STATE_AS_OF => 0.0,
276 ];
277 }
278
283 private function isStateFresh( $state ) {
284 if ( !$state ) {
285 return false;
286 }
287 return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL;
288 }
289
293 private function acquireServerStatesLoopGuard() {
294 if ( $this->serverStatesKeyLocked ) {
295 throw new RuntimeException(
296 "Circular recursion detected while regenerating server states cache. " .
297 "This may indicate improper connection handling in " . get_class( $this )
298 );
299 }
300
301 $this->serverStatesKeyLocked = true; // lock
302
303 return new ScopedCallback( function () {
304 $this->serverStatesKeyLocked = false; // unlock
305 } );
306 }
307
312 protected function getCurrentTime() {
313 return $this->wallClockOverride ?: microtime( true );
314 }
315
320 public function setMockTime( &$time ) {
321 $this->wallClockOverride =& $time;
322 }
323
324 private function getConnCountForDb( IDatabase $conn ): int {
325 if ( $conn->getType() !== 'mysql' ) {
326 return 0;
327 }
328 $query = new Query(
329 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST',
330 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
331 'SELECT',
332 null,
333 'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST'
334 );
335 $res = $conn->query( $query, __METHOD__ );
336 $row = $res ? $res->fetchObject() : false;
337 return $row ? (int)$row->pcount : 0;
338 }
339}
if(!defined('MW_SETUP_CALLBACK'))
Definition WebStart.php:81
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:85
Multi-datacenter aware caching interface.
Database error base class.
Definition DBError.php:36
Basic DB load monitor with no external dependencies.
makeStateKey(IStoreKeyEncoder $cache, int $i)
getStateFromWanCache( $i, ?array $srvPrevState)
setStatsdDataFactory(StatsdDataFactoryInterface $statsFactory)
Sets a StatsdDataFactory instance on the object.
StatsdDataFactoryInterface $statsd
getServerStates(array $serverIndexes)
scaleLoads(array &$weightByServer)
Perform load ratio adjustment before deciding which server to use.
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, $options)
Construct a new LoadMonitor with a given LoadBalancer parent.
setLogger(LoggerInterface $logger)
computeServerState(int $i, ?array $previousState)
Key-encoding methods for object caching (BagOStuff and WANObjectCache)
makeGlobalKey( $keygroup,... $components)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
This class is a delegate to ILBFactory for a given database cluster.
const CONN_UNTRACKED_GAUGE
Yield an untracked, low-timeout, autocommit-mode handle (to gauge server health)
const DOMAIN_ANY
Domain specifier when no specific database needs to be selected.
const CONN_SILENCE_ERRORS
Yield null on connection failure instead of throwing an exception.
Database load monitoring interface.
Interface for query language.