Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
56.07% covered (warning)
56.07%
494 / 881
30.86% covered (danger)
30.86%
25 / 81
CRAP
0.00% covered (danger)
0.00%
0 / 1
LoadBalancer
56.07% covered (warning)
56.07%
494 / 881
30.86% covered (danger)
30.86%
25 / 81
9166.22
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 configure
86.05% covered (warning)
86.05%
37 / 43
0.00% covered (danger)
0.00%
0 / 1
12.39
 newTrackedConnectionsArray
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 getClusterName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getLocalDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 resolveDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 resolveDomainInstance
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
6
 resolveGroups
73.33% covered (warning)
73.33%
11 / 15
0.00% covered (danger)
0.00%
0 / 1
14.73
 sanitizeConnectionFlags
81.82% covered (warning)
81.82%
9 / 11
0.00% covered (danger)
0.00%
0 / 1
6.22
 enforceConnectionFlags
50.00% covered (danger)
50.00%
4 / 8
0.00% covered (danger)
0.00%
0 / 1
6.00
 getRandomNonLagged
45.45% covered (danger)
45.45%
10 / 22
0.00% covered (danger)
0.00%
0 / 1
14.95
 getReaderIndex
78.26% covered (warning)
78.26%
18 / 23
0.00% covered (danger)
0.00%
0 / 1
8.66
 getExistingReaderIndex
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 pickReaderIndex
55.56% covered (warning)
55.56%
15 / 27
0.00% covered (danger)
0.00%
0 / 1
18.78
 waitForAll
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
72
 serverHasLoadInAnyGroup
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 getAnyOpenConnection
93.33% covered (success)
93.33%
14 / 15
0.00% covered (danger)
0.00%
0 / 1
7.01
 pickAnyOpenConnection
45.45% covered (danger)
45.45%
5 / 11
0.00% covered (danger)
0.00%
0 / 1
6.60
 awaitSessionPrimaryPos
7.14% covered (danger)
7.14%
3 / 42
0.00% covered (danger)
0.00%
0 / 1
127.29
 getConnection
66.67% covered (warning)
66.67%
6 / 9
0.00% covered (danger)
0.00%
0 / 1
4.59
 getConnectionInternal
93.33% covered (success)
93.33%
14 / 15
0.00% covered (danger)
0.00%
0 / 1
6.01
 getServerConnection
73.68% covered (warning)
73.68%
14 / 19
0.00% covered (danger)
0.00%
0 / 1
6.66
 reuseConnection
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getConnectionRef
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getMaintenanceConnectionRef
55.56% covered (warning)
55.56%
5 / 9
0.00% covered (danger)
0.00%
0 / 1
5.40
 reuseOrOpenConnectionForNewRef
92.68% covered (success)
92.68%
38 / 41
0.00% covered (danger)
0.00%
0 / 1
15.09
 assertConnectionDomain
25.00% covered (danger)
25.00%
1 / 4
0.00% covered (danger)
0.00%
0 / 1
3.69
 getServerAttributes
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 reallyOpenConnection
79.10% covered (warning)
79.10%
53 / 67
0.00% covered (danger)
0.00%
0 / 1
15.79
 loadSessionPrimaryPos
71.43% covered (warning)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
6.84
 reportConnectionError
60.61% covered (warning)
60.61%
20 / 33
0.00% covered (danger)
0.00%
0 / 1
10.00
 getServerCount
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasReplicaServers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasStreamingReplicaServers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerInfo
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerType
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getPrimaryPos
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
 reconfigure
89.47% covered (warning)
89.47%
17 / 19
0.00% covered (danger)
0.00%
0 / 1
8.07
 disable
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 closeAll
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 closeConnection
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
42
 finalizePrimaryChanges
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
3
 approvePrimaryChanges
28.57% covered (danger)
28.57%
10 / 35
0.00% covered (danger)
0.00%
0 / 1
46.44
 beginPrimaryChanges
69.23% covered (warning)
69.23%
9 / 13
0.00% covered (danger)
0.00%
0 / 1
3.26
 commitPrimaryChanges
65.00% covered (warning)
65.00%
13 / 20
0.00% covered (danger)
0.00%
0 / 1
7.54
 runPrimaryTransactionIdleCallbacks
52.63% covered (warning)
52.63%
20 / 38
0.00% covered (danger)
0.00%
0 / 1
20.63
 runPrimaryTransactionListenerCallbacks
60.00% covered (warning)
60.00%
9 / 15
0.00% covered (danger)
0.00%
0 / 1
5.02
 rollbackPrimaryChanges
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
20
 flushPrimarySessions
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 assertTransactionRoundStage
16.67% covered (danger)
16.67%
2 / 12
0.00% covered (danger)
0.00%
0 / 1
4.31
 applyTransactionRoundFlags
83.33% covered (warning)
83.33%
5 / 6
0.00% covered (danger)
0.00%
0 / 1
4.07
 undoTransactionRoundFlags
83.33% covered (warning)
83.33%
5 / 6
0.00% covered (danger)
0.00%
0 / 1
4.07
 flushReplicaSnapshots
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
30
 flushPrimarySnapshots
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 hasPrimaryConnection
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasPrimaryChanges
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 lastPrimaryChangeTimestamp
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 hasOrMadeRecentPrimaryChanges
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 pendingPrimaryChangeCallers
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 setLaggedReplicaMode
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 laggedReplicaUsed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getReadOnlyReason
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 isPrimaryRunningReadOnly
92.31% covered (success)
92.31%
24 / 26
0.00% covered (danger)
0.00%
0 / 1
4.01
 pingAll
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 getOpenConnections
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
4
 getOpenPrimaryConnections
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 getMaxLag
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 getLagTimes
90.48% covered (success)
90.48%
19 / 21
0.00% covered (danger)
0.00%
0 / 1
4.01
 waitForPrimaryPos
0.00% covered (danger)
0.00%
0 / 27
0.00% covered (danger)
0.00%
0 / 1
72
 setTransactionListener
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 setTableAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setIndexAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setDomainAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setLocalDomainPrefix
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 redefineLocalDomain
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 setTempTablesOnlyMode
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 stringifyConn
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 fieldHasBit
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getConnLogContext
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 setMockTime
n/a
0 / 0
n/a
0 / 0
1
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use ArrayUtils;
23use InvalidArgumentException;
24use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use LogicException;
26use NullStatsdDataFactory;
27use Psr\Log\LoggerInterface;
28use Psr\Log\NullLogger;
29use RuntimeException;
30use Throwable;
31use UnexpectedValueException;
32use WANObjectCache;
33use Wikimedia\ObjectCache\BagOStuff;
34use Wikimedia\ObjectCache\EmptyBagOStuff;
35use Wikimedia\ScopedCallback;
36
37/**
38 * @see ILoadBalancer
39 * @ingroup Database
40 */
41class LoadBalancer implements ILoadBalancerForOwner {
42    /** @var ILoadMonitor */
43    private $loadMonitor;
44    /** @var BagOStuff */
45    private $srvCache;
46    /** @var WANObjectCache */
47    private $wanCache;
48    /** @var DatabaseFactory */
49    private $databaseFactory;
50
51    /** @var TransactionProfiler */
52    private $trxProfiler;
53    /** @var StatsdDataFactoryInterface */
54    private $statsd;
55    /** @var LoggerInterface */
56    private $logger;
57    /** @var callable Exception logger */
58    private $errorLogger;
59    /** @var DatabaseDomain Local DB domain ID and default for new connections */
60    private $localDomain;
61
62    /** @var array<string,array<int,Database[]>> Map of (connection pool => server index => Database[]) */
63    private $conns;
64
65    /** @var string|null The name of the DB cluster */
66    private $clusterName;
67    /** @var ServerInfo */
68    private $serverInfo;
69    /** @var array[] Map of (group => server index => weight) */
70    private $groupLoads;
71    /** @var string|null Default query group to use with getConnection() */
72    private $defaultGroup;
73
74    /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
75    private $tableAliases = [];
76    /** @var string[] Map of (index alias => index) */
77    private $indexAliases = [];
78    /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */
79    private $domainAliases = [];
80    /** @var callable[] Map of (name => callable) */
81    private $trxRecurringCallbacks = [];
82    /** @var bool[] Map of (domain => whether to use "temp tables only" mode) */
83    private $tempTablesOnlyMode = [];
84
85    /** @var string|false Explicit DBO_TRX transaction round active or false if none */
86    private $trxRoundId = false;
87    /** @var string Stage of the current transaction round in the transaction round life-cycle */
88    private $trxRoundStage = self::ROUND_CURSORY;
89    /** @var int[] The group replica server indexes keyed by group */
90    private $readIndexByGroup = [];
91    /** @var DBPrimaryPos|null Replication sync position or false if not set */
92    private $waitForPos;
93    /** @var bool Whether a lagged replica DB was used */
94    private $laggedReplicaMode = false;
95    /** @var string|false Reason this instance is read-only or false if not */
96    private $readOnlyReason = false;
97    /** @var int Total number of new connections ever made with this instance */
98    private $connectionCounter = 0;
99    /** @var bool */
100    private $disabled = false;
101    private ?ChronologyProtector $chronologyProtector = null;
102    /** @var bool Whether the session consistency callback already executed */
103    private $chronologyProtectorCalled = false;
104
105    /** @var Database|null The last connection handle that caused a problem */
106    private $lastErrorConn;
107
108    /** @var DatabaseDomain[] Map of (domain ID => domain instance) */
109    private $nonLocalDomainCache = [];
110
111    /**
112     * @var int Modification counter for invalidating connections held by
113     *      DBConnRef instances. This is bumped by reconfigure().
114     */
115    private $modcount = 0;
116
117    /** The "server index" LB info key; see {@link IDatabase::getLBInfo()} */
118    private const INFO_SERVER_INDEX = 'serverIndex';
119    /** The "connection category" LB info key; see {@link IDatabase::getLBInfo()} */
120    private const INFO_CONN_CATEGORY = 'connCategory';
121
122    /**
123     * Default 'maxLag' when unspecified
124     * @internal Only for use within LoadBalancer/LoadMonitor
125     */
126    public const MAX_LAG_DEFAULT = ServerInfo::MAX_LAG_DEFAULT;
127
128    /** Warn when this many connection are held */
129    private const CONN_HELD_WARN_THRESHOLD = 10;
130
131    /** Default 'waitTimeout' when unspecified */
132    private const MAX_WAIT_DEFAULT = 10;
133    /** Seconds to cache primary DB server read-only status */
134    private const TTL_CACHE_READONLY = 5;
135
136    /** A category of connections that are tracked and transaction round aware */
137    private const CATEGORY_ROUND = 'round';
138    /** A category of connections that are tracked and in autocommit-mode */
139    private const CATEGORY_AUTOCOMMIT = 'auto-commit';
140    /** A category of connections that are untracked and in gauge-mode */
141    private const CATEGORY_GAUGE = 'gauge';
142
143    /** Transaction round, explicit or implicit, has not finished writing */
144    private const ROUND_CURSORY = 'cursory';
145    /** Transaction round writes are complete and ready for pre-commit checks */
146    private const ROUND_FINALIZED = 'finalized';
147    /** Transaction round passed final pre-commit checks */
148    private const ROUND_APPROVED = 'approved';
149    /** Transaction round was committed and post-commit callbacks must be run */
150    private const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
151    /** Transaction round was rolled back and post-rollback callbacks must be run */
152    private const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
153    /** Transaction round encountered an error */
154    private const ROUND_ERROR = 'error';
155
156    /** @var int Idiom for getExistingReaderIndex() meaning "no index selected" */
157    private const READER_INDEX_NONE = -1;
158
159    public function __construct( array $params ) {
160        $this->configure( $params );
161
162        $this->conns = self::newTrackedConnectionsArray();
163    }
164
165    /**
166     * @param array $params A database configuration array, see $wgLBFactoryConf.
167     *
168     * @return void
169     */
170    protected function configure( array $params ): void {
171        $this->localDomain = isset( $params['localDomain'] )
172            ? DatabaseDomain::newFromId( $params['localDomain'] )
173            : DatabaseDomain::newUnspecified();
174
175        $this->serverInfo = new ServerInfo();
176        $this->groupLoads = [ self::GROUP_GENERIC => [] ];
177        foreach ( $this->serverInfo->normalizeServerMaps( $params['servers'] ?? [] ) as $i => $server ) {
178            $this->serverInfo->addServer( $i, $server );
179            foreach ( $server['groupLoads'] as $group => $weight ) {
180                $this->groupLoads[$group][$i] = $weight;
181            }
182            $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
183        }
184
185        if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
186            $this->readOnlyReason = $params['readOnlyReason'];
187        }
188
189        $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
190        $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
191
192        // Note: this parameter is normally absent. It is injectable for testing purposes only.
193        $this->databaseFactory = $params['databaseFactory'] ?? new DatabaseFactory( $params );
194
195        $this->errorLogger = $params['errorLogger'] ?? static function ( Throwable $e ) {
196                trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
197        };
198        $this->logger = $params['logger'] ?? new NullLogger();
199
200        $this->clusterName = $params['clusterName'] ?? null;
201        $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
202        $this->statsd = $params['statsdDataFactory'] ?? new NullStatsdDataFactory();
203
204        // Set up LoadMonitor
205        $loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => LoadMonitorNull::class ];
206        $compat = [
207            'LoadMonitor' => LoadMonitor::class,
208            'LoadMonitorNull' => LoadMonitorNull::class
209        ];
210        $class = $loadMonitorConfig['class'];
211        // @phan-suppress-next-line PhanImpossibleCondition
212        if ( isset( $compat[$class] ) ) {
213            $class = $compat[$class];
214        }
215        $this->loadMonitor = new $class(
216            $this, $this->srvCache, $this->wanCache, $loadMonitorConfig );
217        $this->loadMonitor->setLogger( $this->logger );
218        $this->loadMonitor->setStatsdDataFactory( $this->statsd );
219
220        if ( isset( $params['chronologyProtector'] ) ) {
221            $this->chronologyProtector = $params['chronologyProtector'];
222        }
223
224        if ( isset( $params['roundStage'] ) ) {
225            if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
226                $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
227            } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
228                $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
229            }
230        }
231
232        $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
233        $this->defaultGroup = isset( $this->groupLoads[ $group ] ) ? $group : self::GROUP_GENERIC;
234    }
235
236    private static function newTrackedConnectionsArray() {
237        // Note that CATEGORY_GAUGE connections are untracked
238        return [
239            self::CATEGORY_ROUND => [],
240            self::CATEGORY_AUTOCOMMIT => []
241        ];
242    }
243
244    public function getClusterName(): string {
245        // Fallback to the current primary name if not specified
246        return $this->clusterName ?? $this->getServerName( ServerInfo::WRITER_INDEX );
247    }
248
249    public function getLocalDomainID(): string {
250        return $this->localDomain->getId();
251    }
252
253    public function resolveDomainID( $domain ): string {
254        return $this->resolveDomainInstance( $domain )->getId();
255    }
256
257    /**
258     * @param DatabaseDomain|string|false $domain
259     * @return DatabaseDomain
260     */
261    final protected function resolveDomainInstance( $domain ): DatabaseDomain {
262        if ( $domain instanceof DatabaseDomain ) {
263            return $domain; // already a domain instance
264        } elseif ( $domain === false || $domain === $this->localDomain->getId() ) {
265            return $this->localDomain;
266        } elseif ( isset( $this->domainAliases[$domain] ) ) {
267            $this->domainAliases[$domain] =
268                DatabaseDomain::newFromId( $this->domainAliases[$domain] );
269
270            return $this->domainAliases[$domain];
271        }
272
273        $cachedDomain = $this->nonLocalDomainCache[$domain] ?? null;
274        if ( $cachedDomain === null ) {
275            $cachedDomain = DatabaseDomain::newFromId( $domain );
276            $this->nonLocalDomainCache = [ $domain => $cachedDomain ];
277        }
278
279        return $cachedDomain;
280    }
281
282    /**
283     * Get the first group in $groups with assigned servers, falling back to the default group
284     *
285     * @param string[]|string|false $groups Query group(s) in preference order, [], or false
286     * @param int $i Specific server index or DB_PRIMARY/DB_REPLICA
287     * @return string Query group
288     */
289    private function resolveGroups( $groups, $i ) {
290        // If a specific replica server was specified, then $groups makes no sense
291        if ( $i > 0 && $groups !== [] && $groups !== false ) {
292            $list = implode( ', ', (array)$groups );
293            throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
294        }
295
296        if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
297            $resolvedGroup = $this->defaultGroup;
298        } elseif ( is_string( $groups ) ) {
299            $resolvedGroup = isset( $this->groupLoads[$groups] ) ? $groups : $this->defaultGroup;
300        } elseif ( is_array( $groups ) ) {
301            $resolvedGroup = $this->defaultGroup;
302            foreach ( $groups as $group ) {
303                if ( isset( $this->groupLoads[$group] ) ) {
304                    $resolvedGroup = $group;
305                    break;
306                }
307            }
308        } else {
309            $resolvedGroup = $this->defaultGroup;
310        }
311
312        return $resolvedGroup;
313    }
314
315    /**
316     * Sanitize connection flags provided by a call to getConnection()
317     *
318     * @param int $flags Bitfield of class CONN_* constants
319     * @param int $i Specific server index or DB_PRIMARY/DB_REPLICA
320     * @param string $domain Database domain
321     * @return int Sanitized bitfield
322     */
323    private function sanitizeConnectionFlags( $flags, $i, $domain ) {
324        // Whether an outside caller is explicitly requesting the primary database server
325        if ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX ) {
326            $flags |= self::CONN_INTENT_WRITABLE;
327        }
328
329        if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
330            // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
331            // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
332            // during larger transactions. This is useful for avoiding lock contention.
333            // Assuming all servers are of the same type (or similar), which is overwhelmingly
334            // the case, use the primary server information to get the attributes. The information
335            // for $i cannot be used since it might be DB_REPLICA, which might require connection
336            // attempts in order to be resolved into a real server index.
337            $attributes = $this->getServerAttributes( ServerInfo::WRITER_INDEX );
338            if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
339                // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
340                // to use separate connections would just cause self-deadlocks. Note that
341                // REPEATABLE-READ staleness is not an issue since DB-level locking means
342                // that transactions are Strict Serializable anyway.
343                $flags &= ~self::CONN_TRX_AUTOCOMMIT;
344                $type = $this->serverInfo->getServerType( ServerInfo::WRITER_INDEX );
345                $this->logger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
346            } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
347                // T202116: integration tests are active and queries should be all be using
348                // temporary clone tables (via prefix). Such tables are not visible across
349                // different connections nor can there be REPEATABLE-READ snapshot staleness,
350                // so use the same connection for everything.
351                $flags &= ~self::CONN_TRX_AUTOCOMMIT;
352            }
353        }
354
355        return $flags;
356    }
357
358    /**
359     * @param IDatabase $conn
360     * @param int $flags
361     * @throws DBUnexpectedError
362     */
363    private function enforceConnectionFlags( IDatabase $conn, $flags ) {
364        if (
365            self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ||
366            // Handles with open transactions are avoided since they might be subject
367            // to REPEATABLE-READ snapshots, which could affect the lag estimate query.
368            self::fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE )
369        ) {
370            if ( $conn->trxLevel() ) {
371                throw new DBUnexpectedError(
372                    $conn,
373                    'Handle requested with autocommit-mode yet it has a transaction'
374                );
375            }
376
377            $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
378        }
379    }
380
381    /**
382     * @param array $loads
383     * @param int|float $maxLag Restrict the maximum allowed lag to this many seconds, or INF for no max
384     * @return int|string|false
385     */
386    private function getRandomNonLagged( array $loads, $maxLag = INF ) {
387        $lags = $this->getLagTimes();
388
389        # Unset excessively lagged servers
390        foreach ( $lags as $i => $lag ) {
391            if ( $i !== ServerInfo::WRITER_INDEX ) {
392                # How much lag this server nominally is allowed to have
393                $maxServerLag = $this->serverInfo->getServerMaxLag( $i ); // default
394                # Constrain that further by $maxLag argument
395                $maxServerLag = min( $maxServerLag, $maxLag );
396
397                $srvName = $this->serverInfo->getServerName( $i );
398                if ( $lag === false && !is_infinite( $maxServerLag ) ) {
399                    $this->logger->debug(
400                        __METHOD__ . ": server {db_server} is not replicating?",
401                        [ 'db_server' => $srvName ]
402                    );
403                    unset( $loads[$i] );
404                } elseif ( $lag > $maxServerLag ) {
405                    $this->logger->debug(
406                        __METHOD__ .
407                            ": server {db_server} has {lag} seconds of lag (>= {maxlag})",
408                        [ 'db_server' => $srvName, 'lag' => $lag, 'maxlag' => $maxServerLag ]
409                    );
410                    unset( $loads[$i] );
411                }
412            }
413        }
414
415        if ( array_sum( $loads ) == 0 ) {
416            // All the replicas with non-zero load are lagged and the primary has zero load.
417            // Inform caller so that it can use switch to read-only mode and use a lagged replica.
418            return false;
419        }
420
421        # Return a random representative of the remainder
422        return ArrayUtils::pickRandom( $loads );
423    }
424
425    public function getReaderIndex( $group = false ) {
426        $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
427
428        if ( !$this->serverInfo->hasReplicaServers() ) {
429            // There is only one possible server to use (the primary)
430            return ServerInfo::WRITER_INDEX;
431        }
432
433        $index = $this->getExistingReaderIndex( $group );
434        if ( $index !== self::READER_INDEX_NONE ) {
435            // A reader index was already selected for this query group. Keep using it,
436            // since any session replication position was already waited on and any
437            // active transaction will be reused (e.g. for point-in-time snapshots).
438            return $index;
439        }
440
441        // Get the server weight array for this load group
442        $loads = $this->groupLoads[$group] ?? [];
443        if ( !$loads ) {
444            $this->logger->info( __METHOD__ . ": no loads for group $group" );
445            return false;
446        }
447
448        // Load any session replication positions, before any connection attempts,
449        // since reading them afterwards can only cause more delay due to possibly
450        // seeing even higher replication positions (e.g. from concurrent requests).
451        $this->loadSessionPrimaryPos();
452
453        // Scale the configured load ratios according to each server's load/state.
454        // This can sometimes trigger server connections due to cache regeneration.
455        $this->loadMonitor->scaleLoads( $loads );
456
457        // Pick a server, accounting for weight, load, lag, and session consistency
458        $i = $this->pickReaderIndex( $loads );
459        if ( $i === false ) {
460            // Connection attempts failed
461            return false;
462        }
463
464        // If data seen by queries is expected to reflect writes from a prior transaction,
465        // then wait for the chosen server to apply those changes. This is used to improve
466        // session consistency.
467        if ( !$this->awaitSessionPrimaryPos( $i ) ) {
468            // Data will be outdated compared to what was expected
469            $this->setLaggedReplicaMode();
470        }
471
472        // Keep using this server for DB_REPLICA handles for this group
473        if ( $i < 0 ) {
474            throw new UnexpectedValueException( "Cannot set a negative read server index" );
475        }
476        $this->readIndexByGroup[$group] = $i;
477
478        $serverName = $this->getServerName( $i );
479        $this->logger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
480
481        return $i;
482    }
483
484    /**
485     * Get the server index chosen for DB_REPLICA connections for the given query group
486     *
487     * @param string $group Query group; use false for the generic group
488     * @return int Specific server index or LoadBalancer::READER_INDEX_NONE if none was chosen
489     */
490    protected function getExistingReaderIndex( $group ) {
491        return $this->readIndexByGroup[$group] ?? self::READER_INDEX_NONE;
492    }
493
494    /**
495     * Pick a server that is reachable, preferably non-lagged, and return its server index
496     *
497     * This will leave the server connection open within the pool for reuse
498     *
499     * @param array $loads List of server weights
500     * @return int|false reader index or false
501     */
502    private function pickReaderIndex( array $loads ) {
503        if ( $loads === [] ) {
504            throw new InvalidArgumentException( "Server configuration array is empty" );
505        }
506
507        // Quickly look through the available servers for a server that meets criteria...
508        $currentLoads = $loads;
509        $i = false;
510        while ( count( $currentLoads ) ) {
511            if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
512                $this->logger->debug( __METHOD__ . ": session has replication position" );
513                // ChronologyProtector::getSessionPrimaryPos called in $this->loadSessionPrimaryPos()
514                // sets "waitForPos" for session consistency.
515                // This triggers doWait() after connect, so it's especially good to
516                // avoid lagged servers so as to avoid excessive delay in that method.
517                $ago = microtime( true ) - $this->waitForPos->asOfTime();
518                // Aim for <= 1 second of waiting (being too picky can backfire)
519                $i = $this->getRandomNonLagged( $currentLoads, $ago + 1 );
520            } else {
521                // Any server with less lag than it's 'max lag' param is preferable
522                $i = $this->getRandomNonLagged( $currentLoads );
523            }
524
525            if ( $i === false && count( $currentLoads ) ) {
526                $this->setLaggedReplicaMode();
527                // All replica DBs lagged, just pick anything.
528                $i = ArrayUtils::pickRandom( $currentLoads );
529            }
530
531            if ( $i === false ) {
532                // pickRandom() returned false.
533                // This is permanent and means the configuration or LoadMonitor
534                // wants us to return false.
535                $this->logger->debug( __METHOD__ . ": no suitable server found" );
536                return false;
537            }
538
539            $serverName = $this->getServerName( $i );
540            $this->logger->debug( __METHOD__ . ": connecting to $serverName..." );
541
542            // Get a connection to this server without triggering complementary connections
543            // to other servers (due to things like lag or read-only checks). We want to avoid
544            // the risk of overhead and recursion here.
545            $conn = $this->getServerConnection( $i, self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS );
546            if ( !$conn ) {
547                $this->logger->warning( __METHOD__ . ": failed connecting to $serverName" );
548                unset( $currentLoads[$i] ); // avoid this server next iteration
549                continue;
550            }
551
552            // Return this server
553            break;
554        }
555
556        // If all servers were down, quit now
557        if ( $currentLoads === [] ) {
558            $this->logger->error( __METHOD__ . ": all servers down" );
559        }
560
561        return $i;
562    }
563
564    public function waitForAll( DBPrimaryPos $pos, $timeout = null ) {
565        $timeout = $timeout ?: self::MAX_WAIT_DEFAULT;
566
567        $oldPos = $this->waitForPos;
568        try {
569            $this->waitForPos = $pos;
570
571            $failedReplicas = [];
572            foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) {
573                if ( $this->serverHasLoadInAnyGroup( $i ) ) {
574                    $start = microtime( true );
575                    $ok = $this->awaitSessionPrimaryPos( $i, $timeout );
576                    if ( !$ok ) {
577                        $failedReplicas[] = $this->getServerName( $i );
578                    }
579                    $timeout -= intval( microtime( true ) - $start );
580                }
581            }
582
583            // Stop spamming logs when only one replica is lagging and we have 5+ replicas.
584            // Mediawiki automatically stops sending queries to the lagged one.
585            $failed = $failedReplicas && ( count( $failedReplicas ) > 1 || $this->getServerCount() < 5 );
586            if ( $failed ) {
587                $this->logger->error(
588                    "Timed out waiting for replication to reach {raw_pos}",
589                    [
590                        'raw_pos' => $pos->__toString(),
591                        'failed_hosts' => $failedReplicas,
592                        'timeout' => $timeout,
593                        'exception' => new RuntimeException()
594                    ]
595                );
596            }
597
598            return !$failed;
599        } finally {
600            // Restore the old position; this is used for throttling, not lag-protection
601            $this->waitForPos = $oldPos;
602        }
603    }
604
605    /**
606     * @param int $i Specific server index
607     * @return bool
608     */
609    private function serverHasLoadInAnyGroup( $i ) {
610        foreach ( $this->groupLoads as $loadsByIndex ) {
611            if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
612                return true;
613            }
614        }
615
616        return false;
617    }
618
619    public function getAnyOpenConnection( $i, $flags = 0 ) {
620        $i = ( $i === self::DB_PRIMARY ) ? ServerInfo::WRITER_INDEX : $i;
621        $conn = false;
622        foreach ( $this->conns as $type => $poolConnsByServer ) {
623            if ( $i === self::DB_REPLICA ) {
624                // Consider all existing connections to any server
625                $applicableConnsByServer = $poolConnsByServer;
626            } else {
627                // Consider all existing connections to a specific server
628                $applicableConnsByServer = isset( $poolConnsByServer[$i] )
629                    ? [ $i => $poolConnsByServer[$i] ]
630                    : [];
631            }
632
633            $conn = $this->pickAnyOpenConnection( $applicableConnsByServer );
634            if ( $conn ) {
635                $this->logger->debug( __METHOD__ . ": found '$type' connection to #$i." );
636                break;
637            }
638        }
639
640        if ( $conn ) {
641            $this->enforceConnectionFlags( $conn, $flags );
642        }
643
644        return $conn;
645    }
646
647    /**
648     * @param Database[][] $connsByServer Map of (server index => array of DB handles)
649     * @return IDatabaseForOwner|false An appropriate open connection or false if none found
650     */
651    private function pickAnyOpenConnection( array $connsByServer ) {
652        foreach ( $connsByServer as $i => $conns ) {
653            foreach ( $conns as $conn ) {
654                if ( !$conn->isOpen() ) {
655                    $this->logger->warning(
656                        __METHOD__ .
657                        ": pooled DB handle for {db_server} (#$i) has no open connection.",
658                        $this->getConnLogContext( $conn )
659                    );
660                    continue; // some sort of error occurred?
661                }
662                return $conn;
663            }
664        }
665
666        return false;
667    }
668
669    /**
670     * Wait for a given replica DB to catch up to the primary DB pos stored in "waitForPos"
671     *
672     * @see loadSessionPrimaryPos()
673     *
674     * @param int $index Specific server index
675     * @param int|null $timeout Max seconds to wait; default is "waitTimeout"
676     * @return bool Success
677     */
678    private function awaitSessionPrimaryPos( $index, $timeout = null ) {
679        $timeout = max( 1, intval( $timeout ?: self::MAX_WAIT_DEFAULT ) );
680
681        if ( !$this->waitForPos || $index === ServerInfo::WRITER_INDEX ) {
682            return true;
683        }
684
685        $srvName = $this->getServerName( $index );
686
687        // Check if we already know that the DB has reached this point
688        $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $srvName, 'v2' );
689
690        /** @var DBPrimaryPos $knownReachedPos */
691        $position = $this->srvCache->get( $key );
692        if ( !is_array( $position ) ) {
693            $knownReachedPos = null;
694        } else {
695            $class = $position['_type_'];
696            $knownReachedPos = $class::newFromArray( $position );
697        }
698        if (
699            $knownReachedPos instanceof DBPrimaryPos &&
700            $knownReachedPos->hasReached( $this->waitForPos )
701        ) {
702            $this->logger->debug(
703                __METHOD__ .
704                ": replica DB {db_server} known to be caught up (pos >= $knownReachedPos).",
705                [ 'db_server' => $srvName ]
706            );
707
708            return true;
709        }
710
711        $close = false; // close the connection afterwards
712        $flags = self::CONN_SILENCE_ERRORS;
713        // Check if there is an existing connection that can be used
714        $conn = $this->getAnyOpenConnection( $index, $flags );
715        if ( !$conn ) {
716            // Get a connection to this server without triggering complementary connections
717            // to other servers (due to things like lag or read-only checks). We want to avoid
718            // the risk of overhead and recursion here.
719            $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
720            if ( !$conn ) {
721                $this->logger->warning(
722                    __METHOD__ . ': failed to connect to {db_server}',
723                    [ 'db_server' => $srvName ]
724                );
725
726                return false;
727            }
728            // Avoid connection spam in waitForAll() when connections
729            // are made just for the sake of doing this lag check.
730            $close = true;
731        }
732
733        $this->logger->info(
734            __METHOD__ .
735            ': waiting for replica DB {db_server} to catch up...',
736            $this->getConnLogContext( $conn )
737        );
738
739        $result = $conn->primaryPosWait( $this->waitForPos, $timeout );
740
741        $ok = ( $result !== null && $result != -1 );
742        if ( $ok ) {
743            // Remember that the DB reached this point
744            $this->srvCache->set( $key, $this->waitForPos->toArray(), BagOStuff::TTL_DAY );
745        }
746
747        if ( $close ) {
748            $this->closeConnection( $conn );
749        }
750
751        return $ok;
752    }
753
754    public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
755        if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
756            throw new UnexpectedValueException(
757                __METHOD__ . ' got CONN_SILENCE_ERRORS; connection is already deferred'
758            );
759        }
760
761        $domain = $this->resolveDomainID( $domain );
762        $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX )
763            ? self::DB_PRIMARY
764            : self::DB_REPLICA;
765
766        return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount );
767    }
768
769    public function getConnectionInternal( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase {
770        $domain = $this->resolveDomainID( $domain );
771        $group = $this->resolveGroups( $groups, $i );
772        $flags = $this->sanitizeConnectionFlags( $flags, $i, $domain );
773        // If given DB_PRIMARY/DB_REPLICA, resolve it to a specific server index. Resolving
774        // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
775        // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
776        // The use of getServerConnection() instead of getConnection() avoids infinite loops.
777        $serverIndex = $i;
778        if ( $i === self::DB_PRIMARY ) {
779            $serverIndex = ServerInfo::WRITER_INDEX;
780        } elseif ( $i === self::DB_REPLICA ) {
781            $groupIndex = $this->getReaderIndex( $group );
782            if ( $groupIndex !== false ) {
783                // Group connection succeeded
784                $serverIndex = $groupIndex;
785            }
786            if ( $serverIndex < 0 ) {
787                $this->reportConnectionError( 'could not connect to any replica DB server' );
788            }
789        } elseif ( !$this->serverInfo->hasServerIndex( $i ) ) {
790            throw new UnexpectedValueException( "Invalid server index index #$i" );
791        }
792        // Get an open connection to that server (might trigger a new connection)
793        return $this->getServerConnection( $serverIndex, $domain, $flags );
794    }
795
796    public function getServerConnection( $i, $domain, $flags = 0 ) {
797        $domainInstance = DatabaseDomain::newFromId( $domain );
798        // Number of connections made before getting the server index and handle
799        $priorConnectionsMade = $this->connectionCounter;
800        // Get an open connection to this server (might trigger a new connection)
801        $conn = $this->reuseOrOpenConnectionForNewRef( $i, $domainInstance, $flags );
802        // Throw an error or otherwise bail out if the connection attempt failed
803        if ( !( $conn instanceof IDatabaseForOwner ) ) {
804            if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
805                $this->reportConnectionError();
806            }
807
808            return false;
809        }
810
811        // Profile any new connections caused by this method
812        if ( $this->connectionCounter > $priorConnectionsMade ) {
813            $this->trxProfiler->recordConnection(
814                $conn->getServerName(),
815                $conn->getDBname(),
816                self::fieldHasBit( $flags, self::CONN_INTENT_WRITABLE )
817            );
818        }
819
820        if ( !$conn->isOpen() ) {
821            $this->lastErrorConn = $conn;
822            // Connection was made but later unrecoverably lost for some reason.
823            // Do not return a handle that will just throw exceptions on use, but
824            // let the calling code, e.g. getReaderIndex(), try another server.
825            if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
826                $this->reportConnectionError();
827            }
828            return false;
829        }
830
831        return $conn;
832    }
833
834    public function reuseConnection( IDatabase $conn ) {
835        // no-op
836    }
837
838    /**
839     * @deprecated since 1.39.
840     */
841    public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ): DBConnRef {
842        wfDeprecated( __METHOD__, '1.39' );
843        // @phan-suppress-next-line PhanTypeMismatchReturnSuperType to be removed soon
844        return $this->getConnection( $i, $groups, $domain, $flags );
845    }
846
847    public function getMaintenanceConnectionRef(
848        $i,
849        $groups = [],
850        $domain = false,
851        $flags = 0
852    ): DBConnRef {
853        if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
854            throw new UnexpectedValueException(
855                __METHOD__ . ' CONN_SILENCE_ERRORS is not supported'
856            );
857        }
858
859        $domain = $this->resolveDomainID( $domain );
860        $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX )
861            ? self::DB_PRIMARY
862            : self::DB_REPLICA;
863
864        return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount );
865    }
866
867    /**
868     * Get a live connection handle to the given domain
869     *
870     * This will reuse an existing tracked connection when possible. In some cases, this
871     * involves switching the DB domain of an existing handle in order to reuse it. If no
872     * existing handles can be reused, then a new connection will be made.
873     *
874     * @param int $i Specific server index
875     * @param DatabaseDomain $domain Database domain ID required by the reference
876     * @param int $flags Bit field of class CONN_* constants
877     * @return IDatabase|null Database or null on error
878     * @throws DBError When database selection fails
879     * @throws InvalidArgumentException When the server index is invalid
880     * @throws UnexpectedValueException When the DB domain of the connection is corrupted
881     * @throws DBAccessError If disable() was called
882     */
883    private function reuseOrOpenConnectionForNewRef( $i, DatabaseDomain $domain, $flags = 0 ) {
884        // Figure out which connection pool to use based on the flags
885        if ( $this->fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE ) ) {
886            // Use low timeouts, use autocommit mode, ignore transaction rounds
887            $category = self::CATEGORY_GAUGE;
888        } elseif ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
889            // Use autocommit mode, ignore transaction rounds
890            $category = self::CATEGORY_AUTOCOMMIT;
891        } else {
892            // Respect DBO_DEFAULT, respect transaction rounds
893            $category = self::CATEGORY_ROUND;
894        }
895
896        $conn = null;
897        // Reuse a free connection in the pool from any domain if possible. There should only
898        // be one connection in this pool unless either:
899        //  - a) IDatabase::databasesAreIndependent() returns true (e.g. postgres) and two
900        //       or more database domains have been used during the load balancer's lifetime
901        //  - b) Two or more nested function calls used getConnection() on different domains.
902        foreach ( ( $this->conns[$category][$i] ?? [] ) as $poolConn ) {
903            // Check if any required DB domain changes for the new reference are possible
904            // Calling selectDomain() would trigger a reconnect, which will break if a
905            // transaction is active or if there is any other meaningful session state.
906            $isShareable = !(
907                $poolConn->databasesAreIndependent() &&
908                $domain->getDatabase() !== null &&
909                $domain->getDatabase() !== $poolConn->getDBname()
910            );
911            if ( $isShareable ) {
912                $conn = $poolConn;
913                // Make any required DB domain changes for the new reference
914                if ( !$domain->isUnspecified() ) {
915                    $conn->selectDomain( $domain );
916                }
917                $this->logger->debug( __METHOD__ . ": reusing connection for $i/$domain" );
918                break;
919            }
920        }
921
922        // If necessary, try to open a new connection and add it to the pool
923        if ( !$conn ) {
924            $conn = $this->reallyOpenConnection(
925                $i,
926                $domain,
927                [ self::INFO_CONN_CATEGORY => $category ]
928            );
929            if ( $conn->isOpen() ) {
930                // Make Database::isReadOnly() respect server-side and configuration-based
931                // read-only mode. Note that replica handles are always seen as read-only
932                // in Database::isReadOnly() and Database::assertIsWritablePrimary().
933                if ( $i === ServerInfo::WRITER_INDEX ) {
934                    if ( $this->readOnlyReason !== false ) {
935                        $readOnlyReason = $this->readOnlyReason;
936                    } elseif ( $this->isPrimaryRunningReadOnly( $conn ) ) {
937                        $readOnlyReason = 'The primary database server is running in read-only mode.';
938                    } else {
939                        $readOnlyReason = false;
940                    }
941                    $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $readOnlyReason );
942                }
943                // Connection obtained; check if it belongs to a tracked connection category
944                if ( isset( $this->conns[$category] ) ) {
945                    // Track this connection for future reuse
946                    $this->conns[$category][$i][] = $conn;
947                }
948            } else {
949                $this->logger->warning( __METHOD__ . ": connection error for $i/$domain" );
950                $this->lastErrorConn = $conn;
951                $conn = null;
952            }
953        }
954
955        if ( $conn instanceof IDatabaseForOwner ) {
956            // Check to make sure that the right domain is selected
957            $this->assertConnectionDomain( $conn, $domain );
958            // Check to make sure that the CONN_* flags are respected
959            $this->enforceConnectionFlags( $conn, $flags );
960        }
961
962        return $conn;
963    }
964
965    /**
966     * Sanity check to make sure that the right domain is selected
967     *
968     * @param Database $conn
969     * @param DatabaseDomain $domain
970     * @throws DBUnexpectedError
971     */
972    private function assertConnectionDomain( Database $conn, DatabaseDomain $domain ) {
973        if ( !$domain->isCompatible( $conn->getDomainID() ) ) {
974            throw new UnexpectedValueException(
975                "Got connection to '{$conn->getDomainID()}', but expected one for '{$domain}'"
976            );
977        }
978    }
979
980    public function getServerAttributes( $i ) {
981        return $this->databaseFactory->attributesFromType(
982            $this->getServerType( $i ),
983            $this->serverInfo->getServerDriver( $i )
984        );
985    }
986
987    /**
988     * Open a new network connection to a server (uncached)
989     *
990     * Returns a Database object whether or not the connection was successful.
991     *
992     * @param int $i Specific server index
993     * @param DatabaseDomain $domain Domain the connection is for, possibly unspecified
994     * @param array $lbInfo Additional information for setLBInfo()
995     * @return Database
996     * @throws DBAccessError
997     * @throws InvalidArgumentException
998     */
999    protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo ) {
1000        if ( $this->disabled ) {
1001            throw new DBAccessError();
1002        }
1003
1004        $server = $this->serverInfo->getServerInfoStrict( $i );
1005        if ( $lbInfo[self::INFO_CONN_CATEGORY] === self::CATEGORY_GAUGE ) {
1006            // Use low connection/read timeouts for connection used for gauging server health.
1007            // Gauge information should be cached and used to avoid outages. Indefinite hanging
1008            // while gauging servers would do the opposite.
1009            $server['connectTimeout'] = min( 1, $server['connectTimeout'] ?? INF );
1010            $server['receiveTimeout'] = min( 1, $server['receiveTimeout'] ?? INF );
1011            // Avoid implicit transactions and avoid any SET query for session variables during
1012            // Database::open(). If a server becomes slow, every extra query can cause significant
1013            // delays, even with low connect/receive timeouts.
1014            $server['flags'] ??= 0;
1015            $server['flags'] &= ~IDatabase::DBO_DEFAULT;
1016            $server['flags'] |= IDatabase::DBO_GAUGE;
1017        } else {
1018            // Use implicit transactions unless explicitly configured otherwise
1019            $server['flags'] ??= IDatabase::DBO_DEFAULT;
1020        }
1021
1022        if ( !empty( $server['is static'] ) ) {
1023            $topologyRole = IDatabase::ROLE_STATIC_CLONE;
1024        } else {
1025            $topologyRole = ( $i === ServerInfo::WRITER_INDEX )
1026                ? IDatabase::ROLE_STREAMING_MASTER
1027                : IDatabase::ROLE_STREAMING_REPLICA;
1028        }
1029
1030        $conn = $this->databaseFactory->create(
1031            $server['type'],
1032            array_merge( $server, [
1033                // Basic replication role information
1034                'topologyRole' => $topologyRole,
1035                // Use the database specified in $domain (null means "none or entrypoint DB");
1036                // fallback to the $server default if the RDBMs is an embedded library using a
1037                // file on disk since there would be nothing to access to without a DB/file name.
1038                'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE]
1039                    ? ( $domain->getDatabase() ?? $server['dbname'] ?? null )
1040                    : $domain->getDatabase(),
1041                // Override the $server default schema with that of $domain if specified
1042                'schema' => $domain->getSchema(),
1043                // Use the table prefix specified in $domain
1044                'tablePrefix' => $domain->getTablePrefix(),
1045                'srvCache' => $this->srvCache,
1046                'logger' => $this->logger,
1047                'errorLogger' => $this->errorLogger,
1048                'trxProfiler' => $this->trxProfiler,
1049                'lbInfo' => [ self::INFO_SERVER_INDEX => $i ] + $lbInfo
1050            ] ),
1051            Database::NEW_UNCONNECTED
1052        );
1053        // Set alternative table/index names before any queries can be issued
1054        $conn->setTableAliases( $this->tableAliases );
1055        $conn->setIndexAliases( $this->indexAliases );
1056        // Account for any active transaction round and listeners
1057        if ( $i === ServerInfo::WRITER_INDEX ) {
1058            if ( $this->trxRoundId !== false ) {
1059                $this->applyTransactionRoundFlags( $conn );
1060            }
1061            foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1062                $conn->setTransactionListener( $name, $callback );
1063            }
1064        }
1065
1066        // Make the connection handle live
1067        try {
1068            $conn->initConnection();
1069            ++$this->connectionCounter;
1070        } catch ( DBConnectionError $e ) {
1071            $this->lastErrorConn = $conn;
1072            // ignore; let the DB handle the logging
1073        }
1074
1075        if ( $conn->isOpen() ) {
1076 &nb