Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
84.91% covered (warning)
84.91%
135 / 159
27.27% covered (danger)
27.27%
3 / 11
CRAP
0.00% covered (danger)
0.00%
0 / 1
LoadMonitor
84.91% covered (warning)
84.91%
135 / 159
27.27% covered (danger)
27.27%
3 / 11
51.96
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
2
 setLogger
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 scaleLoads
96.88% covered (success)
96.88%
31 / 32
0.00% covered (danger)
0.00%
0 / 1
10
 getServerStates
76.47% covered (warning)
76.47%
13 / 17
0.00% covered (danger)
0.00%
0 / 1
6.47
 getStateFromWanCache
81.82% covered (warning)
81.82%
18 / 22
0.00% covered (danger)
0.00%
0 / 1
4.10
 makeStateKey
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
1
 computeServerState
80.95% covered (warning)
80.95%
34 / 42
0.00% covered (danger)
0.00%
0 / 1
9.56
 newInitialServerState
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 isStateFresh
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
2.15
 acquireServerStatesLoopGuard
55.56% covered (warning)
55.56%
5 / 9
0.00% covered (danger)
0.00%
0 / 1
2.35
 getCurrentTime
n/a
0 / 0
n/a
0 / 0
2
 setMockTime
n/a
0 / 0
n/a
0 / 0
1
 getConnCountForDb
91.67% covered (success)
91.67%
11 / 12
0.00% covered (danger)
0.00%
0 / 1
4.01
1<?php
2/**
3 * @license GPL-2.0-or-later
4 * @file
5 */
6namespace Wikimedia\Rdbms;
7
8use Psr\Log\LoggerInterface;
9use RuntimeException;
10use Wikimedia\ObjectCache\BagOStuff;
11use Wikimedia\ObjectCache\IStoreKeyEncoder;
12use Wikimedia\ObjectCache\WANObjectCache;
13use Wikimedia\Rdbms\Platform\ISQLPlatform;
14use Wikimedia\ScopedCallback;
15use Wikimedia\Stats\StatsFactory;
16
17/**
18 * Basic DB load monitor with no external dependencies
19 *
20 * This uses both local server and local datacenter caches for DB server state information.
21 *
22 *
23 * @ingroup Database
24 */
25class LoadMonitor implements ILoadMonitor {
26    /** @var ILoadBalancer */
27    protected $lb;
28    /** @var BagOStuff */
29    protected $srvCache;
30    /** @var WANObjectCache */
31    protected $wanCache;
32    /** @var LoggerInterface */
33    protected $logger;
34    /** @var StatsFactory */
35    protected $statsFactory;
36    /** @var int|float */
37    private $maxConnCount;
38    private int $totalConnectionsAdjustment;
39
40    /** @var float|null */
41    private $wallClockOverride;
42
43    /** @var bool Whether the "server states" cache key is in the process of being updated */
44    private $serverStatesKeyLocked = false;
45
46    /** Cache key version */
47    private const VERSION = 3;
48
49    /** Target time-till-refresh for DB server states */
50    private const STATE_TARGET_TTL = 10;
51    /** Seconds to persist DB server states on cache (fresh or stale) */
52    private const STATE_PRESERVE_TTL = 60;
53
54    /**
55     * @inheritDoc
56     */
57    public function __construct(
58        ILoadBalancer $lb,
59        BagOStuff $srvCache,
60        WANObjectCache $wCache,
61        LoggerInterface $logger,
62        StatsFactory $statsFactory,
63        $options
64    ) {
65        $this->lb = $lb;
66        $this->srvCache = $srvCache;
67        $this->wanCache = $wCache;
68        $this->logger = $logger;
69        $this->statsFactory = $statsFactory;
70        if ( isset( $options['maxConnCount'] ) ) {
71            $this->maxConnCount = (int)( $options['maxConnCount'] );
72        } else {
73            $this->maxConnCount = INF;
74        }
75
76        $this->totalConnectionsAdjustment = (int)( $options['totalConnectionsAdjustment'] ?? 10 );
77    }
78
79    public function setLogger( LoggerInterface $logger ): void {
80        $this->logger = $logger;
81    }
82
83    public function scaleLoads( array &$weightByServer ) {
84        if ( count( $weightByServer ) <= 1 ) {
85            // Single-server group; relative adjustments are pointless
86            return;
87        }
88
89        $serverIndexes = array_keys( $weightByServer );
90        $stateByServerIndex = $this->getServerStates( $serverIndexes );
91        $totalConnections = 0;
92        $totalWeights = 0;
93        $circuitBreakingEnabled = true;
94        foreach ( $weightByServer as $i => $weight ) {
95            $serverState = $stateByServerIndex[$i];
96            $totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP];
97            $totalWeights += $weight;
98
99            // Set up circuit breaking. If at least one replica can take more connections
100            // allow the flow.
101            if (
102                $serverState[self::STATE_UP] &&
103                $weight > 0 &&
104                $serverState[self::STATE_CONN_COUNT] < $this->maxConnCount
105            ) {
106                $circuitBreakingEnabled = false;
107            }
108        }
109
110        if ( $circuitBreakingEnabled ) {
111            throw new DBConnectionError(
112                null, 'Database servers in ' . $this->lb->getClusterName() . ' are overloaded. ' .
113                'In order to protect application servers, the circuit breaking to databases of this section ' .
114                'have been activated. Please try again a few seconds.'
115            );
116        }
117
118        foreach ( $weightByServer as $i => $weight ) {
119            if (
120                // host is down or
121                !$stateByServerIndex[$i][self::STATE_UP] ||
122                // host is primary or explicitly set to zero
123                $weight <= 0
124            ) {
125                $weightByServer[$i] = 0;
126                continue;
127            }
128            $connRatio = $stateByServerIndex[$i][self::STATE_CONN_COUNT] /
129                ( $totalConnections + $this->totalConnectionsAdjustment );
130            $weightRatio = $weight / $totalWeights;
131            $diffRatio = $connRatio - $weightRatio;
132            $adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 );
133            $weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio );
134        }
135    }
136
137    protected function getServerStates( array $serverIndexes ): array {
138        $stateByServerIndex = array_fill_keys( $serverIndexes, null );
139        // Perform any cache regenerations in randomized order so that the
140        // DB servers will each have similarly up-to-date state cache entries.
141        $shuffledServerIndexes = $serverIndexes;
142        shuffle( $shuffledServerIndexes );
143
144        foreach ( $shuffledServerIndexes as $i ) {
145            $key = $this->makeStateKey( $this->srvCache, $i );
146            $state = $this->srvCache->get( $key ) ?: null;
147            if ( $this->isStateFresh( $state ) ) {
148                $this->logger->debug(
149                    __METHOD__ . ": fresh local cache hit for '{db_server}'",
150                    [ 'db_server' => $this->lb->getServerName( $i ) ]
151                );
152            } else {
153                $lock = $this->srvCache->getScopedLock( $key, 0, 10 );
154                if ( $lock || !$state ) {
155                    $state = $this->getStateFromWanCache( $i, $state );
156                    $this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL );
157                }
158            }
159            $stateByServerIndex[$i] = $state;
160        }
161        return $stateByServerIndex;
162    }
163
164    protected function getStateFromWanCache( int $i, ?array $srvPrevState ): array {
165        $hit = true;
166        $key = $this->makeStateKey( $this->wanCache, $i );
167        $state = $this->wanCache->getWithSetCallback(
168            $key,
169            self::STATE_PRESERVE_TTL,
170            function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) {
171                $prevState = $wanPrevState ?: $srvPrevState ?: null;
172                $hit = false;
173                return $this->computeServerState( $i, $prevState );
174            },
175            [ 'lockTSE' => 30 ]
176        );
177        if ( $hit ) {
178            $this->logger->debug(
179                __METHOD__ . ": WAN cache hit for '{db_server}'",
180                [ 'db_server' => $this->lb->getServerName( $i ) ]
181            );
182        } else {
183            $this->logger->info(
184                __METHOD__ . ": mutex acquired; regenerated cache for '{db_server}'",
185                [ 'db_server' => $this->lb->getServerName( $i ) ]
186            );
187        }
188        return $state;
189    }
190
191    protected function makeStateKey( IStoreKeyEncoder $cache, int $i ): string {
192        return $cache->makeGlobalKey(
193            'rdbms-gauge',
194            self::VERSION,
195            $this->lb->getClusterName(),
196            $this->lb->getServerName( ServerInfo::WRITER_INDEX ),
197            $this->lb->getServerName( $i )
198        );
199    }
200
201    /**
202     * @param int $i
203     * @param array|null $previousState
204     * @return array<string,mixed>
205     * @phan-return array{up:float,conn_count:float|int|false,time:float}
206     * @throws DBAccessError
207     */
208    protected function computeServerState( int $i, ?array $previousState ) {
209        // Double check for circular recursion in computeServerStates()/getWeightScale().
210        // Mainly, connection attempts should use LoadBalancer::getServerConnection()
211        // rather than something that will pick a server based on the server states.
212        $scope = $this->acquireServerStatesLoopGuard();
213
214        $cluster = $this->lb->getClusterName();
215        $serverName = $this->lb->getServerName( $i );
216        $statServerName = str_replace( '.', '_', $serverName );
217
218        $newState = $this->newInitialServerState();
219
220        if ( $this->lb->getServerInfo( $i )['load'] <= 0 ) {
221            // Callers only use this server when they have *no choice* anyway (e.g. primary)
222            $newState[self::STATE_AS_OF] = $this->getCurrentTime();
223            // Avoid connecting, especially since it might reside in a remote datacenter
224            return $newState;
225        }
226
227        // Get a new, untracked, connection in order to gauge server health
228        $flags = ILoadBalancer::CONN_UNTRACKED_GAUGE | ILoadBalancer::CONN_SILENCE_ERRORS;
229        // Get a connection to this server without triggering other server connections
230        $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
231        // Determine the number of open connections in this server
232        if ( $conn ) {
233            try {
234                $connCount = $this->getConnCountForDb( $conn );
235            } catch ( DBError ) {
236                $connCount = false;
237            }
238        } else {
239            $connCount = false;
240        }
241        // Only keep one connection open at a time
242        if ( $conn ) {
243            $conn->close( __METHOD__ );
244        }
245
246        $endTime = $this->getCurrentTime();
247        $newState[self::STATE_AS_OF] = $endTime;
248        $newState[self::STATE_CONN_COUNT] = $connCount;
249        $newState[self::STATE_UP] = $conn ? 1.0 : 0.0;
250        if ( $previousState ) {
251            $newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2;
252        }
253
254        if ( $connCount === false ) {
255            $this->logger->error(
256                __METHOD__ . ": host {db_server} is not up?",
257                [ 'db_server' => $serverName ]
258            );
259        } else {
260            $this->statsFactory->getGauge( 'rdbms_open_connection_total' )
261                ->setLabel( 'db_cluster', $cluster )
262                ->setLabel( 'db_server', $serverName )
263                ->set( (int)$connCount );
264
265            if ( $connCount > $this->maxConnCount ) {
266                $this->logger->warning(
267                    "Server {db_server} has {conn_count} open connections (>= {max_conn})",
268                    [
269                        'db_server' => $serverName,
270                        'conn_count' => $connCount,
271                        'max_conn' => $this->maxConnCount
272                    ]
273                );
274            }
275        }
276
277        return $newState;
278    }
279
280    /**
281     * @return array<string,mixed>
282     * @phan-return array{up:float,conn_count:int|int|false,time:float}
283     */
284    protected function newInitialServerState() {
285        return [
286            // Moving average of connectivity; treat as good
287            self::STATE_UP => 1.0,
288            // Number of connections to that replica; treat as none
289            self::STATE_CONN_COUNT => 0,
290            // UNIX timestamp of state generation completion; treat as "outdated"
291            self::STATE_AS_OF => 0.0,
292        ];
293    }
294
295    /**
296     * @param array|null $state
297     * @return bool
298     */
299    private function isStateFresh( $state ) {
300        if ( !$state ) {
301            return false;
302        }
303        return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL;
304    }
305
306    #[\NoDiscard]
307    private function acquireServerStatesLoopGuard(): ScopedCallback {
308        if ( $this->serverStatesKeyLocked ) {
309            throw new RuntimeException(
310                "Circular recursion detected while regenerating server states cache. " .
311                "This may indicate improper connection handling in " . get_class( $this )
312            );
313        }
314
315        $this->serverStatesKeyLocked = true; // lock
316
317        return new ScopedCallback( function () {
318            $this->serverStatesKeyLocked = false; // unlock
319        } );
320    }
321
322    /**
323     * @return float UNIX timestamp
324     * @codeCoverageIgnore
325     */
326    protected function getCurrentTime() {
327        return $this->wallClockOverride ?: microtime( true );
328    }
329
330    /**
331     * @param float|null &$time Mock UNIX timestamp for testing
332     * @codeCoverageIgnore
333     */
334    public function setMockTime( &$time ) {
335        $this->wallClockOverride =& $time;
336    }
337
338    private function getConnCountForDb( IDatabase $conn ): int {
339        if ( $conn->getType() !== 'mysql' ) {
340            return 0;
341        }
342        $query = new Query(
343            'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST',
344            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
345            'SELECT',
346            null,
347            'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST'
348        );
349        $res = $conn->query( $query, __METHOD__ );
350        $row = $res ? $res->fetchObject() : false;
351        return $row ? (int)$row->pcount : 0;
352    }
353}