Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
77.93% covered (warning)
77.93%
113 / 145
25.00% covered (danger)
25.00%
3 / 12
CRAP
0.00% covered (danger)
0.00%
0 / 1
LoadMonitor
77.93% covered (warning)
77.93%
113 / 145
25.00% covered (danger)
25.00%
3 / 12
59.07
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
1
 setLogger
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setStatsdDataFactory
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 scaleLoads
95.24% covered (success)
95.24%
20 / 21
0.00% covered (danger)
0.00%
0 / 1
6
 getServerStates
76.47% covered (warning)
76.47%
13 / 17
0.00% covered (danger)
0.00%
0 / 1
6.47
 getStateFromWanCache
81.82% covered (warning)
81.82%
18 / 22
0.00% covered (danger)
0.00%
0 / 1
4.10
 makeStateKey
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
1
 computeServerState
62.50% covered (warning)
62.50%
25 / 40
0.00% covered (danger)
0.00%
0 / 1
13.27
 newInitialServerState
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 isStateFresh
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
2.15
 acquireServerStatesLoopGuard
55.56% covered (warning)
55.56%
5 / 9
0.00% covered (danger)
0.00%
0 / 1
2.35
 getCurrentTime
n/a
0 / 0
n/a
0 / 0
2
 setMockTime
n/a
0 / 0
n/a
0 / 0
1
 getConnCountForDb
91.67% covered (success)
91.67%
11 / 12
0.00% covered (danger)
0.00%
0 / 1
4.01
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use BagOStuff;
23use IStoreKeyEncoder;
24use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use NullStatsdDataFactory;
26use Psr\Log\LoggerInterface;
27use Psr\Log\NullLogger;
28use RuntimeException;
29use WANObjectCache;
30use Wikimedia\Rdbms\Platform\ISQLPlatform;
31use Wikimedia\ScopedCallback;
32
33/**
34 * Basic DB load monitor with no external dependencies
35 *
36 * This uses both local server and local datacenter caches for DB server state information.
37 *
38 *
39 * @ingroup Database
40 */
41class LoadMonitor implements ILoadMonitor {
42    /** @var ILoadBalancer */
43    protected $lb;
44    /** @var BagOStuff */
45    protected $srvCache;
46    /** @var WANObjectCache */
47    protected $wanCache;
48    /** @var LoggerInterface */
49    protected $logger;
50    /** @var StatsdDataFactoryInterface */
51    protected $statsd;
52    private int $maxConnCount;
53    private int $totalConnectionsAdjustment;
54
55    /** @var float|null */
56    private $wallClockOverride;
57
58    /** @var bool Whether the "server states" cache key is in the process of being updated */
59    private $serverStatesKeyLocked = false;
60
61    /** Cache key version */
62    private const VERSION = 3;
63
64    /** Target time-till-refresh for DB server states */
65    private const STATE_TARGET_TTL = 10;
66    /** Seconds to persist DB server states on cache (fresh or stale) */
67    private const STATE_PRESERVE_TTL = 60;
68
69    /**
70     * @inheritDoc
71     */
72    public function __construct( ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, $options ) {
73        $this->lb = $lb;
74        $this->srvCache = $srvCache;
75        $this->wanCache = $wCache;
76        $this->logger = new NullLogger();
77        $this->statsd = new NullStatsdDataFactory();
78        $this->maxConnCount = (int)( $options['maxConnCount'] ?? 500 );
79        $this->totalConnectionsAdjustment = (int)( $options['totalConnectionsAdjustment'] ?? 10 );
80    }
81
82    public function setLogger( LoggerInterface $logger ) {
83        $this->logger = $logger;
84    }
85
86    public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) {
87        $this->statsd = $statsFactory;
88    }
89
90    public function scaleLoads( array &$weightByServer ) {
91        if ( count( $weightByServer ) <= 1 ) {
92            // Single-server group; relative adjustments are pointless
93            return;
94        }
95
96        $serverIndexes = array_keys( $weightByServer );
97        $stateByServerIndex = $this->getServerStates( $serverIndexes );
98        $totalConnections = 0;
99        $totalWeights = 0;
100        foreach ( $weightByServer as $i => $weight ) {
101            $serverState = $stateByServerIndex[$i];
102            $totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP];
103            $totalWeights += $weight;
104        }
105        foreach ( $weightByServer as $i => $weight ) {
106            if (
107                // host is down or
108                !$stateByServerIndex[$i][self::STATE_UP] ||
109                // host is primary or explicitly set to zero
110                $weight <= 0
111            ) {
112                $weightByServer[$i] = 0;
113                continue;
114            }
115            $connRatio = $stateByServerIndex[$i][self::STATE_CONN_COUNT] /
116                ( $totalConnections + $this->totalConnectionsAdjustment );
117            $weightRatio = $weight / $totalWeights;
118            $diffRatio = $connRatio - $weightRatio;
119            $adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 );
120            $weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio );
121        }
122    }
123
124    protected function getServerStates( array $serverIndexes ): array {
125        $stateByServerIndex = array_fill_keys( $serverIndexes, null );
126        // Perform any cache regenerations in randomized order so that the
127        // DB servers will each have similarly up-to-date state cache entries.
128        $shuffledServerIndexes = $serverIndexes;
129        shuffle( $shuffledServerIndexes );
130
131        foreach ( $shuffledServerIndexes as $i ) {
132            $key = $this->makeStateKey( $this->srvCache, $i );
133            $state = $this->srvCache->get( $key ) ?: null;
134            if ( $this->isStateFresh( $state ) ) {
135                $this->logger->debug(
136                    __METHOD__ . ": fresh local cache hit for '{db_server}'",
137                    [ 'db_server' => $this->lb->getServerName( $i ) ]
138                );
139            } else {
140                $lock = $this->srvCache->getScopedLock( $key, 0, 10 );
141                if ( $lock || !$state ) {
142                    $state = $this->getStateFromWanCache( $i, $state );
143                    $this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL );
144                }
145            }
146            $stateByServerIndex[$i] = $state;
147        }
148        return $stateByServerIndex;
149    }
150
151    protected function getStateFromWanCache( $i, ?array $srvPrevState ) {
152        $hit = true;
153        $key = $this->makeStateKey( $this->wanCache, $i );
154        $state = $this->wanCache->getWithSetCallback(
155            $key,
156            self::STATE_PRESERVE_TTL,
157            function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) {
158                $prevState = $wanPrevState ?: $srvPrevState ?: null;
159                $hit = false;
160                return $this->computeServerState( $i, $prevState );
161            },
162            [ 'lockTSE' => 30 ]
163        );
164        if ( $hit ) {
165            $this->logger->debug(
166                __METHOD__ . ": WAN cache hit for '{db_server}'",
167                [ 'db_server' => $this->lb->getServerName( $i ) ]
168            );
169        } else {
170            $this->logger->info(
171                __METHOD__ . ": mutex acquired; regenerated cache for '{db_server}'",
172                [ 'db_server' => $this->lb->getServerName( $i ) ]
173            );
174        }
175        return $state;
176    }
177
178    protected function makeStateKey( IStoreKeyEncoder $cache, int $i ) {
179        return $cache->makeGlobalKey(
180            'rdbms-gauge',
181            self::VERSION,
182            $this->lb->getClusterName(),
183            $this->lb->getServerName( $this->lb->getWriterIndex() ),
184            $this->lb->getServerName( $i )
185        );
186    }
187
188    /**
189     * @param int $i
190     * @param array|null $previousState
191     * @return array<string,mixed>
192     * @phan-return array{up:float,conn_count:float|int|false,time:float}
193     * @throws DBAccessError
194     */
195    protected function computeServerState( int $i, ?array $previousState ) {
196        $startTime = $this->getCurrentTime();
197        // Double check for circular recursion in computeServerStates()/getWeightScale().
198        // Mainly, connection attempts should use LoadBalancer::getServerConnection()
199        // rather than something that will pick a server based on the server states.
200        $this->acquireServerStatesLoopGuard();
201
202        $cluster = $this->lb->getClusterName();
203        $serverName = $this->lb->getServerName( $i );
204        $statServerName = str_replace( '.', '_', $serverName );
205
206        $newState = $this->newInitialServerState();
207
208        if ( $this->lb->getServerInfo( $i )['load'] <= 0 ) {
209            // Callers only use this server when they have *no choice* anyway (e.g. primary)
210            $newState[self::STATE_AS_OF] = $this->getCurrentTime();
211            // Avoid connecting, especially since it might reside in a remote datacenter
212            return $newState;
213        }
214
215        // Get a new, untracked, connection in order to gauge server health
216        $flags = ILoadBalancer::CONN_UNTRACKED_GAUGE | ILoadBalancer::CONN_SILENCE_ERRORS;
217        // Get a connection to this server without triggering other server connections
218        $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
219        // Determine the number of open connections in this server
220        if ( $conn ) {
221            try {
222                $connCount = $this->getConnCountForDb( $conn );
223            } catch ( DBError $e ) {
224                $connCount = false;
225            }
226        } else {
227            $connCount = false;
228        }
229        // Only keep one connection open at a time
230        if ( $conn ) {
231            $conn->close( __METHOD__ );
232        }
233
234        $endTime = $this->getCurrentTime();
235        $newState[self::STATE_AS_OF] = $endTime;
236        $newState[self::STATE_CONN_COUNT] = $connCount;
237        $newState[self::STATE_UP] = $conn ? 1.0 : 0.0;
238        if ( $previousState ) {
239            $newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2;
240        }
241
242        if ( $connCount === false ) {
243            $this->logger->error(
244                __METHOD__ . ": host {db_server} is not up?",
245                [ 'db_server' => $serverName ]
246            );
247        } else {
248            $this->statsd->timing( "loadbalancer.connCount.$cluster.$statServerName", $connCount );
249            if ( $connCount > $this->maxConnCount ) {
250                $this->logger->warning(
251                    "Server {db_server} has {conn_count} open connections (>= {max_conn})",
252                    [
253                        'db_server' => $serverName,
254                        'conn_count' => $connCount,
255                        'max_conn' => $this->maxConnCount
256                    ]
257                );
258            }
259        }
260
261        return $newState;
262    }
263
264    /**
265     * @return array<string,mixed>
266     * @phan-return array{up:float,conn_count:int|int|false,time:float}
267     */
268    protected function newInitialServerState() {
269        return [
270            // Moving average of connectivity; treat as good
271            self::STATE_UP => 1.0,
272            // Number of connections to that replica; treat as none
273            self::STATE_CONN_COUNT => 0,
274            // UNIX timestamp of state generation completion; treat as "outdated"
275            self::STATE_AS_OF => 0.0,
276        ];
277    }
278
279    /**
280     * @param array|null $state
281     * @return bool
282     */
283    private function isStateFresh( $state ) {
284        if ( !$state ) {
285            return false;
286        }
287        return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL;
288    }
289
290    /**
291     * @return ScopedCallback
292     */
293    private function acquireServerStatesLoopGuard() {
294        if ( $this->serverStatesKeyLocked ) {
295            throw new RuntimeException(
296                "Circular recursion detected while regenerating server states cache. " .
297                "This may indicate improper connection handling in " . get_class( $this )
298            );
299        }
300
301        $this->serverStatesKeyLocked = true; // lock
302
303        return new ScopedCallback( function () {
304            $this->serverStatesKeyLocked = false; // unlock
305        } );
306    }
307
308    /**
309     * @return float UNIX timestamp
310     * @codeCoverageIgnore
311     */
312    protected function getCurrentTime() {
313        return $this->wallClockOverride ?: microtime( true );
314    }
315
316    /**
317     * @param float|null &$time Mock UNIX timestamp for testing
318     * @codeCoverageIgnore
319     */
320    public function setMockTime( &$time ) {
321        $this->wallClockOverride =& $time;
322    }
323
324    private function getConnCountForDb( IDatabase $conn ): int {
325        if ( $conn->getType() !== 'mysql' ) {
326            return 0;
327        }
328        $query = new Query(
329            'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST',
330            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
331            'SELECT',
332            null,
333            'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST'
334        );
335        $res = $conn->query( $query, __METHOD__ );
336        $row = $res ? $res->fetchObject() : false;
337        return $row ? (int)$row->pcount : 0;
338    }
339}