MediaWiki REL1_30
LoadMonitor.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
26use Wikimedia\ScopedCallback;
27use BagOStuff;
29
36class LoadMonitor implements ILoadMonitor {
38 protected $parent;
40 protected $srvCache;
42 protected $wanCache;
44 protected $replLogger;
45
48
49 const VERSION = 1; // cache key version
50
51 public function __construct(
53 ) {
54 $this->parent = $lb;
55 $this->srvCache = $srvCache;
56 $this->wanCache = $wCache;
57 $this->replLogger = new NullLogger();
58
59 $this->movingAveRatio = isset( $options['movingAveRatio'] )
60 ? $options['movingAveRatio']
61 : 0.1;
62 }
63
64 public function setLogger( LoggerInterface $logger ) {
65 $this->replLogger = $logger;
66 }
67
68 public function scaleLoads( array &$weightByServer, $domain ) {
69 $serverIndexes = array_keys( $weightByServer );
70 $states = $this->getServerStates( $serverIndexes, $domain );
71 $coefficientsByServer = $states['weightScales'];
72 foreach ( $weightByServer as $i => $weight ) {
73 if ( isset( $coefficientsByServer[$i] ) ) {
74 $weightByServer[$i] = $weight * $coefficientsByServer[$i];
75 } else { // server recently added to config?
76 $host = $this->parent->getServerName( $i );
77 $this->replLogger->error( __METHOD__ . ": host $host not in cache" );
78 }
79 }
80 }
81
82 public function getLagTimes( array $serverIndexes, $domain ) {
83 $states = $this->getServerStates( $serverIndexes, $domain );
84
85 return $states['lagTimes'];
86 }
87
88 protected function getServerStates( array $serverIndexes, $domain ) {
89 $writerIndex = $this->parent->getWriterIndex();
90 if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
91 # Single server only, just return zero without caching
92 return [
93 'lagTimes' => [ $writerIndex => 0 ],
94 'weightScales' => [ $writerIndex => 1.0 ]
95 ];
96 }
97
98 $key = $this->getCacheKey( $serverIndexes );
99 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
100 $ttl = mt_rand( 4e6, 5e6 ) / 1e6;
101 # Keep keys around longer as fallbacks
102 $staleTTL = 60;
103
104 # (a) Check the local APC cache
105 $value = $this->srvCache->get( $key );
106 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
107 $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from local cache" );
108 return $value; // cache hit
109 }
110 $staleValue = $value ?: false;
111
112 # (b) Check the shared cache and backfill APC
113 $value = $this->wanCache->get( $key );
114 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
115 $this->srvCache->set( $key, $value, $staleTTL );
116 $this->replLogger->debug( __METHOD__ . ": got lag times ($key) from main cache" );
117
118 return $value; // cache hit
119 }
120 $staleValue = $value ?: $staleValue;
121
122 # (c) Cache key missing or expired; regenerate and backfill
123 if ( $this->srvCache->lock( $key, 0, 10 ) ) {
124 # Let only this process update the cache value on this server
125 $sCache = $this->srvCache;
127 $unlocker = new ScopedCallback( function () use ( $sCache, $key ) {
128 $sCache->unlock( $key );
129 } );
130 } elseif ( $staleValue ) {
131 # Could not acquire lock but an old cache exists, so use it
132 return $staleValue;
133 }
134
135 $lagTimes = [];
136 $weightScales = [];
137 $movAveRatio = $this->movingAveRatio;
138 foreach ( $serverIndexes as $i ) {
139 if ( $i == $this->parent->getWriterIndex() ) {
140 $lagTimes[$i] = 0; // master always has no lag
141 $weightScales[$i] = 1.0; // nominal weight
142 continue;
143 }
144
145 $conn = $this->parent->getAnyOpenConnection( $i );
146 if ( $conn ) {
147 $close = false; // already open
148 } else {
149 $conn = $this->parent->openConnection( $i, '' );
150 $close = true; // new connection
151 }
152
153 $lastWeight = isset( $staleValue['weightScales'][$i] )
154 ? $staleValue['weightScales'][$i]
155 : 1.0;
156 $coefficient = $this->getWeightScale( $i, $conn ?: null );
157 $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
158
159 // Scale from 10% to 100% of nominal weight
160 $weightScales[$i] = max( $newWeight, 0.10 );
161
162 if ( !$conn ) {
163 $lagTimes[$i] = false;
164 $host = $this->parent->getServerName( $i );
165 $this->replLogger->error(
166 __METHOD__ . ": host {db_server} is unreachable",
167 [ 'db_server' => $host ]
168 );
169 continue;
170 }
171
172 if ( $conn->getLBInfo( 'is static' ) ) {
173 $lagTimes[$i] = 0;
174 } else {
175 $lagTimes[$i] = $conn->getLag();
176 if ( $lagTimes[$i] === false ) {
177 $host = $this->parent->getServerName( $i );
178 $this->replLogger->error(
179 __METHOD__ . ": host {db_server} is not replicating?",
180 [ 'db_server' => $host ]
181 );
182 }
183 }
184
185 if ( $close ) {
186 # Close the connection to avoid sleeper connections piling up.
187 # Note that the caller will pick one of these DBs and reconnect,
188 # which is slightly inefficient, but this only matters for the lag
189 # time cache miss cache, which is far less common that cache hits.
190 $this->parent->closeConnection( $conn );
191 }
192 }
193
194 # Add a timestamp key so we know when it was cached
195 $value = [
196 'lagTimes' => $lagTimes,
197 'weightScales' => $weightScales,
198 'timestamp' => microtime( true )
199 ];
200 $this->wanCache->set( $key, $value, $staleTTL );
201 $this->srvCache->set( $key, $value, $staleTTL );
202 $this->replLogger->info( __METHOD__ . ": re-calculated lag times ($key)" );
203
204 return $value;
205 }
206
212 protected function getWeightScale( $index, IDatabase $conn = null ) {
213 return $conn ? 1.0 : 0.0;
214 }
215
216 private function getCacheKey( array $serverIndexes ) {
217 sort( $serverIndexes );
218 // Lag is per-server, not per-DB, so key on the master DB name
219 return $this->srvCache->makeGlobalKey(
220 'lag-times',
221 self::VERSION,
222 $this->parent->getServerName( $this->parent->getWriterIndex() ),
223 implode( '-', $serverIndexes )
224 );
225 }
226}
interface is intended to be more or less compatible with the PHP memcached client.
Definition BagOStuff.php:47
Multi-datacenter aware caching interface.
Basic DB load monitor with no external dependencies Uses memcached to cache the replication lag for a...
scaleLoads(array &$weightByServer, $domain)
Perform load ratio adjustment before deciding which server to use.
LoggerInterface $replLogger
float $movingAveRatio
Moving average ratio (e.g.
__construct(ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options=[])
Construct a new LoadMonitor with a given LoadBalancer parent.
getCacheKey(array $serverIndexes)
getLagTimes(array $serverIndexes, $domain)
Get an estimate of replication lag (in seconds) for each server.
getServerStates(array $serverIndexes, $domain)
setLogger(LoggerInterface $logger)
getWeightScale( $index, IDatabase $conn=null)
null means default in associative array with keys and values unescaped Should be merged with default with a value of false meaning to suppress the attribute in associative array with keys and values unescaped & $options
Definition hooks.txt:1971
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:40
Database cluster connection, tracking, load balancing, and transaction manager interface.
An interface for database load monitoring.