MediaWiki REL1_31
LoadMonitor.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
26use Wikimedia\ScopedCallback;
27use BagOStuff;
29
36class LoadMonitor implements ILoadMonitor {
38 protected $parent;
40 protected $srvCache;
42 protected $wanCache;
44 protected $replLogger;
45
50
52 const VERSION = 1;
54 const LAG_WARN_THRESHOLD = 10;
55
64 public function __construct(
66 ) {
67 $this->parent = $lb;
68 $this->srvCache = $srvCache;
69 $this->wanCache = $wCache;
70 $this->replLogger = new NullLogger();
71
72 $this->movingAveRatio = isset( $options['movingAveRatio'] )
73 ? $options['movingAveRatio']
74 : 0.1;
75 $this->lagWarnThreshold = isset( $options['lagWarnThreshold'] )
76 ? $options['lagWarnThreshold']
77 : self::LAG_WARN_THRESHOLD;
78 }
79
80 public function setLogger( LoggerInterface $logger ) {
81 $this->replLogger = $logger;
82 }
83
84 final public function scaleLoads( array &$weightByServer, $domain ) {
85 $serverIndexes = array_keys( $weightByServer );
86 $states = $this->getServerStates( $serverIndexes, $domain );
87 $newScalesByServer = $states['weightScales'];
88 foreach ( $weightByServer as $i => $weight ) {
89 if ( isset( $newScalesByServer[$i] ) ) {
90 $weightByServer[$i] = $weight * $newScalesByServer[$i];
91 } else { // server recently added to config?
92 $host = $this->parent->getServerName( $i );
93 $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
94 }
95 }
96 }
97
98 final public function getLagTimes( array $serverIndexes, $domain ) {
99 return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
100 }
101
102 protected function getServerStates( array $serverIndexes, $domain ) {
103 $writerIndex = $this->parent->getWriterIndex();
104 if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
105 # Single server only, just return zero without caching
106 return [
107 'lagTimes' => [ $writerIndex => 0 ],
108 'weightScales' => [ $writerIndex => 1.0 ]
109 ];
110 }
111
112 $key = $this->getCacheKey( $serverIndexes );
113 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
114 $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
115 # Keep keys around longer as fallbacks
116 $staleTTL = 60;
117
118 # (a) Check the local APC cache
119 $value = $this->srvCache->get( $key );
120 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
121 $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from local cache" );
122 return $value; // cache hit
123 }
124 $staleValue = $value ?: false;
125
126 # (b) Check the shared cache and backfill APC
127 $value = $this->wanCache->get( $key );
128 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
129 $this->srvCache->set( $key, $value, $staleTTL );
130 $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from main cache" );
131
132 return $value; // cache hit
133 }
134 $staleValue = $value ?: $staleValue;
135
136 # (c) Cache key missing or expired; regenerate and backfill
137 if ( $this->srvCache->lock( $key, 0, 10 ) ) {
138 # Let only this process update the cache value on this server
139 $sCache = $this->srvCache;
141 $unlocker = new ScopedCallback( function () use ( $sCache, $key ) {
142 $sCache->unlock( $key );
143 } );
144 } elseif ( $staleValue ) {
145 # Could not acquire lock but an old cache exists, so use it
146 return $staleValue;
147 }
148
149 $lagTimes = [];
150 $weightScales = [];
151 $movAveRatio = $this->movingAveRatio;
152 foreach ( $serverIndexes as $i ) {
153 if ( $i == $this->parent->getWriterIndex() ) {
154 $lagTimes[$i] = 0; // master always has no lag
155 $weightScales[$i] = 1.0; // nominal weight
156 continue;
157 }
158
159 # Handles with open transactions are avoided since they might be subject
160 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
161 $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT;
162 $conn = $this->parent->getAnyOpenConnection( $i, $flags );
163 if ( $conn ) {
164 $close = false; // already open
165 } else {
166 $conn = $this->parent->openConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
167 $close = true; // new connection
168 }
169
170 $lastWeight = isset( $staleValue['weightScales'][$i] )
171 ? $staleValue['weightScales'][$i]
172 : 1.0;
173 $coefficient = $this->getWeightScale( $i, $conn ?: null );
174 $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
175
176 // Scale from 10% to 100% of nominal weight
177 $weightScales[$i] = max( $newWeight, 0.10 );
178
179 $host = $this->parent->getServerName( $i );
180
181 if ( !$conn ) {
182 $lagTimes[$i] = false;
183 $this->replLogger->error(
184 __METHOD__ . ": host {db_server} is unreachable",
185 [ 'db_server' => $host ]
186 );
187 continue;
188 }
189
190 if ( $conn->getLBInfo( 'is static' ) ) {
191 $lagTimes[$i] = 0;
192 } else {
193 $lagTimes[$i] = $conn->getLag();
194 if ( $lagTimes[$i] === false ) {
195 $this->replLogger->error(
196 __METHOD__ . ": host {db_server} is not replicating?",
197 [ 'db_server' => $host ]
198 );
199 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
200 $this->replLogger->error(
201 "Server {host} has {lag} seconds of lag (>= {maxlag})",
202 [
203 'host' => $host,
204 'lag' => $lagTimes[$i],
205 'maxlag' => $this->lagWarnThreshold
206 ]
207 );
208 }
209 }
210
211 if ( $close ) {
212 # Close the connection to avoid sleeper connections piling up.
213 # Note that the caller will pick one of these DBs and reconnect,
214 # which is slightly inefficient, but this only matters for the lag
215 # time cache miss cache, which is far less common that cache hits.
216 $this->parent->closeConnection( $conn );
217 }
218 }
219
220 # Add a timestamp key so we know when it was cached
221 $value = [
222 'lagTimes' => $lagTimes,
223 'weightScales' => $weightScales,
224 'timestamp' => microtime( true )
225 ];
226 $this->wanCache->set( $key, $value, $staleTTL );
227 $this->srvCache->set( $key, $value, $staleTTL );
228 $this->replLogger->info( __METHOD__ . ": re-calculated lag times ($key)" );
229
230 return $value;
231 }
232
238 protected function getWeightScale( $index, IDatabase $conn = null ) {
239 return $conn ? 1.0 : 0.0;
240 }
241
242 private function getCacheKey( array $serverIndexes ) {
243 sort( $serverIndexes );
244 // Lag is per-server, not per-DB, so key on the master DB name
245 return $this->srvCache->makeGlobalKey(
246 'lag-times',
247 self::VERSION,
248 $this->parent->getServerName( $this->parent->getWriterIndex() ),
249 implode( '-', $serverIndexes )
250 );
251 }
252}
interface is intended to be more or less compatible with the PHP memcached client.
Definition BagOStuff.php:47
Multi-datacenter aware caching interface.
Basic DB load monitor with no external dependencies Uses memcached to cache the replication lag for a...
scaleLoads(array &$weightByServer, $domain)
Perform load ratio adjustment before deciding which server to use.
LoggerInterface $replLogger
int $lagWarnThreshold
Amount of replication lag in seconds before warnings are logged.
float $movingAveRatio
Moving average ratio (e.g.
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
getCacheKey(array $serverIndexes)
getLagTimes(array $serverIndexes, $domain)
Get an estimate of replication lag (in seconds) for each server.
getServerStates(array $serverIndexes, $domain)
setLogger(LoggerInterface $logger)
getWeightScale( $index, IDatabase $conn=null)
null means default in associative array with keys and values unescaped Should be merged with default with a value of false meaning to suppress the attribute in associative array with keys and values unescaped & $options
Definition hooks.txt:2001
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
Database cluster connection, tracking, load balancing, and transaction manager interface.
An interface for database load monitoring.