Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
57.48% covered (warning)
57.48%
507 / 882
37.66% covered (danger)
37.66%
29 / 77
CRAP
0.00% covered (danger)
0.00%
0 / 1
LoadBalancer
57.48% covered (warning)
57.48%
507 / 882
37.66% covered (danger)
37.66%
29 / 77
7793.65
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 configure
87.76% covered (warning)
87.76%
43 / 49
0.00% covered (danger)
0.00%
0 / 1
12.26
 newTrackedConnectionsArray
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 getClusterName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getLocalDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 resolveDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 resolveDomainInstance
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
6
 sanitizeConnectionFlags
77.78% covered (warning)
77.78%
7 / 9
0.00% covered (danger)
0.00%
0 / 1
4.18
 enforceConnectionFlags
50.00% covered (danger)
50.00%
4 / 8
0.00% covered (danger)
0.00%
0 / 1
6.00
 getRandomNonLagged
50.00% covered (danger)
50.00%
14 / 28
0.00% covered (danger)
0.00%
0 / 1
16.00
 shuffleSharding
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
3
 getReaderIndex
86.49% covered (warning)
86.49%
32 / 37
0.00% covered (danger)
0.00%
0 / 1
17.71
 getExistingReaderIndex
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 pickReaderIndex
62.96% covered (warning)
62.96%
17 / 27
0.00% covered (danger)
0.00%
0 / 1
15.08
 waitForAll
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
90
 getAnyOpenConnection
93.33% covered (success)
93.33%
14 / 15
0.00% covered (danger)
0.00%
0 / 1
7.01
 pickAnyOpenConnection
45.45% covered (danger)
45.45%
5 / 11
0.00% covered (danger)
0.00%
0 / 1
6.60
 awaitSessionPrimaryPos
7.14% covered (danger)
7.14%
3 / 42
0.00% covered (danger)
0.00%
0 / 1
127.29
 getConnection
66.67% covered (warning)
66.67%
6 / 9
0.00% covered (danger)
0.00%
0 / 1
4.59
 getConnectionInternal
85.71% covered (warning)
85.71%
12 / 14
0.00% covered (danger)
0.00%
0 / 1
6.10
 getServerConnection
73.68% covered (warning)
73.68%
14 / 19
0.00% covered (danger)
0.00%
0 / 1
7.89
 getMaintenanceConnectionRef
55.56% covered (warning)
55.56%
5 / 9
0.00% covered (danger)
0.00%
0 / 1
5.40
 reuseOrOpenConnectionForNewRef
92.68% covered (success)
92.68%
38 / 41
0.00% covered (danger)
0.00%
0 / 1
15.09
 assertConnectionDomain
25.00% covered (danger)
25.00%
1 / 4
0.00% covered (danger)
0.00%
0 / 1
3.69
 getServerAttributes
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 reallyOpenConnection
80.00% covered (warning)
80.00%
52 / 65
0.00% covered (danger)
0.00%
0 / 1
14.35
 loadSessionPrimaryPos
71.43% covered (warning)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
6.84
 reportConnectionError
30.30% covered (danger)
30.30%
10 / 33
0.00% covered (danger)
0.00%
0 / 1
23.59
 getServerCount
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasReplicaServers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasStreamingReplicaServers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerInfo
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerType
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getPrimaryPos
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
 reconfigure
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
4
 disable
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 closeAll
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 closeConnection
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
42
 finalizePrimaryChanges
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
3
 approvePrimaryChanges
28.57% covered (danger)
28.57%
10 / 35
0.00% covered (danger)
0.00%
0 / 1
46.44
 beginPrimaryChanges
69.23% covered (warning)
69.23%
9 / 13
0.00% covered (danger)
0.00%
0 / 1
3.26
 commitPrimaryChanges
61.11% covered (warning)
61.11%
11 / 18
0.00% covered (danger)
0.00%
0 / 1
6.47
 runPrimaryTransactionIdleCallbacks
47.62% covered (danger)
47.62%
20 / 42
0.00% covered (danger)
0.00%
0 / 1
24.37
 runPrimaryTransactionListenerCallbacks
60.00% covered (warning)
60.00%
9 / 15
0.00% covered (danger)
0.00%
0 / 1
5.02
 rollbackPrimaryChanges
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
 flushPrimarySessions
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 assertTransactionRoundStage
16.67% covered (danger)
16.67%
2 / 12
0.00% covered (danger)
0.00%
0 / 1
4.31
 syncConnectionRoundState
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
7
 flushReplicaSnapshots
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
5
 flushPrimarySnapshots
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 hasPrimaryConnection
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasPrimaryChanges
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 lastPrimaryChangeTimestamp
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 hasOrMadeRecentPrimaryChanges
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 pendingPrimaryChangeCallers
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 setLaggedReplicaMode
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 laggedReplicaUsed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getReadOnlyReason
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 isPrimaryRunningReadOnly
88.46% covered (warning)
88.46%
23 / 26
0.00% covered (danger)
0.00%
0 / 1
4.02
 pingAll
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 getOpenConnections
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
4
 getOpenPrimaryConnections
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 getMaxLag
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 getLagTimes
90.91% covered (success)
90.91%
20 / 22
0.00% covered (danger)
0.00%
0 / 1
4.01
 waitForPrimaryPos
0.00% covered (danger)
0.00%
0 / 27
0.00% covered (danger)
0.00%
0 / 1
72
 setTransactionListener
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 setTableAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setDomainAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setLocalDomainPrefix
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 redefineLocalDomain
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 setTempTablesOnlyMode
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 stringifyConn
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 fieldHasBit
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getConnLogContext
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 setMockTime
n/a
0 / 0
n/a
0 / 0
2
 setDefaultGroupName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2/**
3 * @license GPL-2.0-or-later
4 * @file
5 */
6namespace Wikimedia\Rdbms;
7
8use InvalidArgumentException;
9use LogicException;
10use Psr\Log\LoggerInterface;
11use Psr\Log\NullLogger;
12use RuntimeException;
13use Throwable;
14use UnexpectedValueException;
15use Wikimedia\ArrayUtils\ArrayUtils;
16use Wikimedia\ObjectCache\BagOStuff;
17use Wikimedia\ObjectCache\EmptyBagOStuff;
18use Wikimedia\ObjectCache\WANObjectCache;
19use Wikimedia\ScopedCallback;
20use Wikimedia\Stats\StatsFactory;
21
22/**
23 * @see ILoadBalancer
24 * @ingroup Database
25 */
26class LoadBalancer implements ILoadBalancerForOwner {
27    /** @var ILoadMonitor */
28    private $loadMonitor;
29    /** @var BagOStuff */
30    private $srvCache;
31    /** @var WANObjectCache */
32    private $wanCache;
33    /** @var DatabaseFactory */
34    private $databaseFactory;
35
36    /** @var TransactionProfiler */
37    private $trxProfiler;
38    /** @var StatsFactory */
39    private $statsFactory;
40    /** @var LoggerInterface */
41    private $logger;
42    /** @var callable Exception logger */
43    private $errorLogger;
44    /** @var DatabaseDomain Local DB domain ID and default for new connections */
45    private $localDomain;
46    /** @var bool Whether this PHP instance is for a CLI script */
47    private $cliMode;
48
49    /** @var array<string,array<int,Database[]>> Map of (connection pool => server index => Database[]) */
50    private $conns;
51
52    /** @var string The name of the DB cluster */
53    private $clusterName;
54    /** @var ServerInfo */
55    private $serverInfo;
56    /** @var array<int,int|float> Map of server index => weight */
57    private $loads;
58    /** @var string|null Default query group to use with getConnection() */
59    private $defaultGroup;
60
61    /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
62    private $tableAliases = [];
63    /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */
64    private $domainAliases = [];
65    /** @var callable[] Map of (name => callable) */
66    private $trxRecurringCallbacks = [];
67    /** @var bool[] Map of (domain => whether to use "temp tables only" mode) */
68    private $tempTablesOnlyMode = [];
69
70    /** @var string|null Active explicit transaction round owner or false if none */
71    private $trxRoundFname = null;
72    /** @var string Stage of the current transaction round in the transaction round life-cycle */
73    private $trxRoundStage = self::ROUND_CURSORY;
74    /** @var int[] The group replica server indexes keyed by group */
75    private $readIndexByGroup = [];
76    /** @var DBPrimaryPos|null Replication sync position or false if not set */
77    private $waitForPos;
78    /** @var bool Whether a lagged replica DB was used */
79    private $laggedReplicaMode = false;
80    /** @var string|false Reason this instance is read-only or false if not */
81    private $readOnlyReason = false;
82    /** @var int Total number of new connections ever made with this instance */
83    private $connectionCounter = 0;
84    /** @var bool */
85    private $disabled = false;
86    private ?ChronologyProtector $chronologyProtector = null;
87    /** @var bool Whether the session consistency callback already executed */
88    private $chronologyProtectorCalled = false;
89
90    /** @var IDatabaseForOwner|null The last connection handle that caused a problem */
91    private $lastErrorConn;
92
93    /** @var DatabaseDomain[] Map of (domain ID => domain instance) */
94    private $nonLocalDomainCache = [];
95    private ?string $uniqueIdentifier = null;
96
97    /**
98     * @var int Modification counter for invalidating connections held by
99     *      DBConnRef instances. This is bumped by reconfigure().
100     */
101    private $modcount = 0;
102
103    /** The "server index" LB info key; see {@link IDatabase::getLBInfo()} */
104    private const INFO_SERVER_INDEX = 'serverIndex';
105    /** The "connection category" LB info key; see {@link IDatabase::getLBInfo()} */
106    private const INFO_CONN_CATEGORY = 'connCategory';
107
108    /** Warn when this many connection are held */
109    private const CONN_HELD_WARN_THRESHOLD = 10;
110
111    /** Default 'waitTimeout' when unspecified */
112    private const MAX_WAIT_DEFAULT = 10;
113    /** Seconds to cache primary DB server read-only status */
114    private const TTL_CACHE_READONLY = 5;
115
116    /** A category of connections that are tracked and transaction round aware */
117    private const CATEGORY_ROUND = 'round';
118    /** A category of connections that are tracked and in autocommit-mode */
119    private const CATEGORY_AUTOCOMMIT = 'auto-commit';
120    /** A category of connections that are untracked and in gauge-mode */
121    private const CATEGORY_GAUGE = 'gauge';
122
123    /** Transaction round, explicit or implicit, has not finished writing */
124    private const ROUND_CURSORY = 'cursory';
125    /** Transaction round writes are complete and ready for pre-commit checks */
126    private const ROUND_FINALIZED = 'finalized';
127    /** Transaction round passed final pre-commit checks */
128    private const ROUND_APPROVED = 'approved';
129    /** Transaction round was committed and post-commit callbacks must be run */
130    private const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
131    /** Transaction round was rolled back and post-rollback callbacks must be run */
132    private const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
133    /** Transaction round encountered an error */
134    private const ROUND_ERROR = 'error';
135
136    /** @var int Idiom for getExistingReaderIndex() meaning "no index selected" */
137    private const READER_INDEX_NONE = -1;
138
139    public function __construct( array $params ) {
140        $this->configure( $params );
141
142        $this->conns = self::newTrackedConnectionsArray();
143    }
144
145    /**
146     * @param array $params A database configuration array, see $wgLBFactoryConf.
147     *
148     * @return void
149     */
150    protected function configure( array $params ): void {
151        $this->localDomain = isset( $params['localDomain'] )
152            ? DatabaseDomain::newFromId( $params['localDomain'] )
153            : DatabaseDomain::newUnspecified();
154
155        $this->serverInfo = new ServerInfo();
156        $this->loads = [];
157        foreach ( $this->serverInfo->normalizeServerMaps( $params['servers'] ?? [] ) as $i => $server ) {
158            $this->serverInfo->addServer( $i, $server );
159            $this->loads[$i] = $server['load'];
160        }
161        // If the cluster name is not specified, fallback to the current primary name
162        $this->clusterName = $params['clusterName']
163            ?? $this->serverInfo->getServerName( ServerInfo::WRITER_INDEX );
164
165        if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
166            $this->readOnlyReason = $params['readOnlyReason'];
167        }
168
169        $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
170        $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
171
172        // Note: this parameter is normally absent. It is injectable for testing purposes only.
173        $this->databaseFactory = $params['databaseFactory'] ?? new DatabaseFactory( $params );
174
175        $this->errorLogger = $params['errorLogger'] ?? static function ( Throwable $e ) {
176            trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
177        };
178        $this->logger = $params['logger'] ?? new NullLogger();
179
180        $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
181        $this->statsFactory = $params['statsFactory'] ?? StatsFactory::newNull();
182
183        // Set up LoadMonitor
184        $loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => LoadMonitorNull::class ];
185        $compat = [
186            'LoadMonitor' => LoadMonitor::class,
187            'LoadMonitorNull' => LoadMonitorNull::class
188        ];
189        $class = $loadMonitorConfig['class'];
190        // @phan-suppress-next-line PhanImpossibleCondition
191        if ( isset( $compat[$class] ) ) {
192            $class = $compat[$class];
193        }
194        $this->loadMonitor = new $class(
195            $this,
196            $this->srvCache,
197            $this->wanCache,
198            $this->logger,
199            $this->statsFactory,
200            $loadMonitorConfig
201        );
202
203        if ( isset( $params['chronologyProtector'] ) ) {
204            $this->chronologyProtector = $params['chronologyProtector'];
205        }
206
207        if ( isset( $params['roundStage'] ) ) {
208            if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
209                $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
210            } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
211                $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
212            }
213        }
214
215        $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
216
217        $this->defaultGroup = $params['defaultGroup'] ?? self::GROUP_GENERIC;
218        if ( empty( $params['shuffleSharding'] ) ) {
219            $this->uniqueIdentifier = null;
220        } else {
221            $this->uniqueIdentifier = $params['uniqueIdentifier'] ?? null;
222        }
223    }
224
225    private static function newTrackedConnectionsArray(): array {
226        // Note that CATEGORY_GAUGE connections are untracked
227        return [
228            self::CATEGORY_ROUND => [],
229            self::CATEGORY_AUTOCOMMIT => []
230        ];
231    }
232
233    public function getClusterName(): string {
234        return $this->clusterName;
235    }
236
237    public function getLocalDomainID(): string {
238        return $this->localDomain->getId();
239    }
240
241    /** @inheritDoc */
242    public function resolveDomainID( DatabaseDomain|string|false $domain ): string {
243        return $this->resolveDomainInstance( $domain )->getId();
244    }
245
246    final protected function resolveDomainInstance( DatabaseDomain|string|false $domain ): DatabaseDomain {
247        if ( $domain instanceof DatabaseDomain ) {
248            return $domain; // already a domain instance
249        } elseif ( $domain === false || $domain === $this->localDomain->getId() ) {
250            return $this->localDomain;
251        } elseif ( isset( $this->domainAliases[$domain] ) ) {
252            $this->domainAliases[$domain] =
253                DatabaseDomain::newFromId( $this->domainAliases[$domain] );
254
255            return $this->domainAliases[$domain];
256        }
257
258        $cachedDomain = $this->nonLocalDomainCache[$domain] ?? null;
259        if ( $cachedDomain === null ) {
260            $cachedDomain = DatabaseDomain::newFromId( $domain );
261            $this->nonLocalDomainCache = [ $domain => $cachedDomain ];
262        }
263
264        return $cachedDomain;
265    }
266
267    /**
268     * Sanitize connection flags provided by a call to getConnection()
269     *
270     * @param int $flags Bitfield of class CONN_* constants
271     * @param string $domain Database domain
272     * @return int Sanitized bitfield
273     */
274    protected function sanitizeConnectionFlags( $flags, $domain ) {
275        if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
276            // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
277            // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
278            // during larger transactions. This is useful for avoiding lock contention.
279            // Assuming all servers are of the same type (or similar), which is overwhelmingly
280            // the case, use the primary server information to get the attributes. The information
281            // for $i cannot be used since it might be DB_REPLICA, which might require connection
282            // attempts in order to be resolved into a real server index.
283            $attributes = $this->getServerAttributes( ServerInfo::WRITER_INDEX );
284            if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
285                // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
286                // to use separate connections would just cause self-deadlocks. Note that
287                // REPEATABLE-READ staleness is not an issue since DB-level locking means
288                // that transactions are Strict Serializable anyway.
289                $flags &= ~self::CONN_TRX_AUTOCOMMIT;
290                $type = $this->serverInfo->getServerType( ServerInfo::WRITER_INDEX );
291                $this->logger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
292            } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
293                // T202116: integration tests are active and queries should be all be using
294                // temporary clone tables (via prefix). Such tables are not visible across
295                // different connections nor can there be REPEATABLE-READ snapshot staleness,
296                // so use the same connection for everything.
297                $flags &= ~self::CONN_TRX_AUTOCOMMIT;
298            }
299        }
300
301        return $flags;
302    }
303
304    /**
305     * @param IDatabase $conn
306     * @param int $flags
307     * @throws DBUnexpectedError
308     */
309    private function enforceConnectionFlags( IDatabase $conn, $flags ) {
310        if (
311            self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ||
312            // Handles with open transactions are avoided since they might be subject
313            // to REPEATABLE-READ snapshots, which could affect the lag estimate query.
314            self::fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE )
315        ) {
316            if ( $conn->trxLevel() ) {
317                throw new DBUnexpectedError(
318                    $conn,
319                    'Handle requested with autocommit-mode yet it has a transaction'
320                );
321            }
322
323            $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
324        }
325    }
326
327    /**
328     * @param array<int,int|float> $loads Map of (server index => weight) for a load group
329     * @param int|float $sessionLagLimit Additional maximum lag threshold imposed by the session;
330     *  use INF if none applies. Servers will count as lagged if their lag exceeds either this
331     *  value or the configured "max lag" value.
332     * @return int|false Index of a non-lagged server with non-zero weight; false if none
333     */
334    private function getRandomNonLagged( array $loads, $sessionLagLimit = INF ) {
335        $lags = $this->getLagTimes();
336
337        $dbs = [];
338        foreach ( $loads as $i => $load ) {
339            $srvName = $this->serverInfo->getServerName( $i );
340            $dbs[$srvName] = $i;
341        }
342        // Unset excessively lagged servers from the load group
343        foreach ( $lags as $i => $lag ) {
344            if ( $i !== ServerInfo::WRITER_INDEX ) {
345                // How much lag normally counts as "excessive" for this server
346                $maxServerLag = $this->serverInfo->getServerMaxLag( $i );
347                // How much lag counts as "excessive" for this server given the session
348                $maxServerLag = min( $maxServerLag, $sessionLagLimit );
349
350                $srvName = $this->serverInfo->getServerName( $i );
351                if ( $lag === false && !is_infinite( $maxServerLag ) ) {
352                    $this->logger->debug(
353                        __METHOD__ . ": server {db_server} is not replicating?",
354                        [ 'db_server' => $srvName ]
355                    );
356                    unset( $loads[$i] );
357                    unset( $dbs[$srvName] );
358                } elseif ( $lag > $maxServerLag ) {
359                    $this->logger->debug(
360                        __METHOD__ .
361                            ": server {db_server} has {lag} seconds of lag (>= {maxlag})",
362                        [ 'db_server' => $srvName, 'lag' => $lag, 'maxlag' => $maxServerLag ]
363                    );
364                    unset( $loads[$i] );
365                    unset( $dbs[$srvName] );
366                }
367            }
368        }
369
370        if ( array_sum( $loads ) == 0 ) {
371            // All the replicas with non-zero weight are lagged and the primary has zero load.
372            // Inform caller so that it can use switch to read-only mode and use a lagged replica.
373            return false;
374        }
375
376        // Return a server index based on weighted random selection
377        return $this->shuffleSharding( $dbs, $loads );
378    }
379
380    /**
381     * Apply shuffle sharding. Idea originally from Amazon's Route 53.
382     *
383     * It basically used a unique identifier from the request (IP is good enough) and uses hashing
384     * to pick a consistent set of 2 or 3 from the pool. Then using randomness, picks one at random.
385     *
386     * It provides many benefits and makes the system more robust against issues or attacks.
387     *
388     * @param array $dbs Map of server name => server index
389     * @param array $loads Map of (server index => weight) for a load group
390     * @return int Index of the server
391     */
392    private function shuffleSharding( $dbs, $loads ) {
393        // shuffle sharding doesn't make sense in small groups
394        if ( count( $loads ) <= 3 || $this->uniqueIdentifier === null ) {
395            return ArrayUtils::pickRandom( $loads );
396        }
397
398        ArrayUtils::consistentHashSort( $dbs, $this->uniqueIdentifier );
399        $keys = array_values( $dbs );
400        return ArrayUtils::pickRandom( [
401            $keys[0] => $loads[$keys[0]],
402            $keys[1] => $loads[$keys[1]],
403            $keys[2] => $loads[$keys[2]],
404        ] );
405    }
406
407    /** @inheritDoc */
408    public function getReaderIndex( $group = false ) {
409        if ( !$this->serverInfo->hasReplicaServers() ) {
410            // There is only one possible server to use (the primary)
411            return ServerInfo::WRITER_INDEX;
412        }
413
414        if (
415            $group === 'dump' ||
416            $group === 'vslow' ||
417            $group === [ 'dump' ] ||
418            $group === [ 'vslow' ] ||
419            $this->defaultGroup === 'dump' ||
420            $this->defaultGroup === 'vslow'
421        ) {
422            $group = 'vslow';
423        } else {
424            $group = self::GROUP_GENERIC;
425        }
426
427        $index = $this->getExistingReaderIndex( $group );
428        if ( $index !== self::READER_INDEX_NONE ) {
429            // A reader index was already selected for this query group. Keep using it,
430            // since any session replication position was already waited on and any
431            // active transaction will be reused (e.g. for point-in-time snapshots).
432            return $index;
433        }
434
435        // Get the server weight array for this load group
436        if ( $group === 'vslow' ) {
437            // pick the replica with the lowest weight above zero as the vslow
438            $lowestWeight = INF;
439            $loads = $this->loads;
440            foreach ( $this->loads as $index => $weight ) {
441                if ( $weight > 0 && $weight < $lowestWeight ) {
442                    $loads = [ $index => $weight ];
443                    $lowestWeight = $weight;
444                }
445            }
446        } else {
447            $loads = $this->loads;
448        }
449
450        if ( !$loads ) {
451            $this->logger->info( __METHOD__ . ": no loads for group $group" );
452            return false;
453        }
454
455        // Load any session replication positions, before any connection attempts,
456        // since reading them afterwards can only cause more delay due to possibly
457        // seeing even higher replication positions (e.g. from concurrent requests).
458        $this->loadSessionPrimaryPos();
459
460        // Scale the configured load weights according to each server's load/state.
461        // This can sometimes trigger server connections due to cache regeneration.
462        $this->loadMonitor->scaleLoads( $loads );
463
464        // Pick a server, accounting for load weights, lag, and session consistency
465        $i = $this->pickReaderIndex( $loads );
466        if ( $i === false ) {
467            // Connection attempts failed
468            return false;
469        }
470
471        // If data seen by queries is expected to reflect writes from a prior transaction,
472        // then wait for the chosen server to apply those changes. This is used to improve
473        // session consistency.
474        if ( !$this->awaitSessionPrimaryPos( $i ) ) {
475            // Data will be outdated compared to what was expected
476            $this->setLaggedReplicaMode();
477        }
478
479        // Keep using this server for DB_REPLICA handles for this group
480        if ( $i < 0 ) {
481            throw new UnexpectedValueException( "Cannot set a negative read server index" );
482        }
483        $this->readIndexByGroup[$group] = $i;
484
485        $serverName = $this->getServerName( $i );
486        $this->logger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
487
488        return $i;
489    }
490
491    /**
492     * Get the server index chosen for DB_REPLICA connections for the given query group
493     *
494     * @param string $group Query group; use false for the generic group
495     * @return int Specific server index or LoadBalancer::READER_INDEX_NONE if none was chosen
496     */
497    protected function getExistingReaderIndex( $group ) {
498        return $this->readIndexByGroup[$group] ?? self::READER_INDEX_NONE;
499    }
500
501    /**
502     * Pick a server that is reachable and preferably non-lagged based on the load weights
503     *
504     * This will leave the server connection open within the pool for reuse
505     *
506     * @param array<int,int|float> $loads Map of (server index => weight) for a load group
507     * @return int|false Server index to use as the load group reader index; false on failure
508     */
509    private function pickReaderIndex( array $loads ) {
510        if ( $loads === [] ) {
511            throw new InvalidArgumentException( "Load group array is empty" );
512        }
513
514        $i = false;
515        // Quickly look through the available servers for a server that meets criteria...
516        $currentLoads = $loads;
517        while ( count( $currentLoads ) ) {
518            if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
519                $this->logger->debug( __METHOD__ . ": session has replication position" );
520                // ChronologyProtector::getSessionPrimaryPos called in loadSessionPrimaryPos()
521                // sets "waitForPos" for session consistency. This triggers doWait() after
522                // connect, so it's especially good to avoid lagged servers so as to avoid
523                // excessive delay in that method.
524                $ago = microtime( true ) - $this->waitForPos->asOfTime();
525                // Aim for <= 1 second of waiting (being too picky can backfire)
526                $i = $this->getRandomNonLagged( $currentLoads, $ago + 1 );
527            } else {
528                // Any server with less lag than it's 'max lag' param is preferable
529                $i = $this->getRandomNonLagged( $currentLoads );
530            }
531
532            if ( $i === false && count( $currentLoads ) ) {
533                $this->setLaggedReplicaMode();
534                // All replica DBs lagged, just pick anything.
535                $i = ArrayUtils::pickRandom( $currentLoads );
536            }
537
538            if ( $i === false ) {
539                // pickRandom() returned false.
540                // This is permanent and means the configuration or LoadMonitor
541                // wants us to return false.
542                $this->logger->debug( __METHOD__ . ": no suitable server found" );
543                return false;
544            }
545
546            $serverName = $this->getServerName( $i );
547            $this->logger->debug( __METHOD__ . ": connecting to $serverName..." );
548
549            // Get a connection to this server without triggering complementary connections
550            // to other servers (due to things like lag or read-only checks). We want to avoid
551            // the risk of overhead and recursion here.
552            $conn = $this->getServerConnection( $i, self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS );
553            if ( !$conn ) {
554                $this->logger->warning( __METHOD__ . ": failed connecting to $serverName" );
555                unset( $currentLoads[$i] ); // avoid this server next iteration
556                continue;
557            }
558
559            // Return this server
560            break;
561        }
562
563        // If all servers were down, quit now
564        if ( $currentLoads === [] ) {
565            $this->logger->error( __METHOD__ . ": all servers down" );
566        }
567
568        return $i;
569    }
570
571    /** @inheritDoc */
572    public function waitForAll( DBPrimaryPos $pos, $timeout = null ) {
573        $timeout = $timeout ?: self::MAX_WAIT_DEFAULT;
574
575        $oldPos = $this->waitForPos;
576        try {
577            $this->waitForPos = $pos;
578
579            $failedReplicas = [];
580            foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) {
581                if ( isset( $this->loads[$i] ) && $this->loads[$i] > 0 ) {
582                    $start = microtime( true );
583                    $ok = $this->awaitSessionPrimaryPos( $i, $timeout );
584                    if ( !$ok ) {
585                        $failedReplicas[] = $this->getServerName( $i );
586                    }
587                    $timeout -= intval( microtime( true ) - $start );
588                }
589            }
590
591            // Stop spamming logs when only one replica is lagging and we have 5+ replicas.
592            // Mediawiki automatically stops sending queries to the lagged one.
593            $failed = $failedReplicas && ( count( $failedReplicas ) > 1 || $this->getServerCount() < 5 );
594            if ( $failed ) {
595                $this->logger->error(
596                    "Timed out waiting for replication to reach {raw_pos}",
597                    [
598                        'raw_pos' => $pos->__toString(),
599                        'failed_hosts' => $failedReplicas,
600                        'timeout' => $timeout,
601                        'exception' => new RuntimeException()
602                    ]
603                );
604            }
605
606            return !$failed;
607        } finally {
608            // Restore the old position; this is used for throttling, not lag-protection
609            $this->waitForPos = $oldPos;
610        }
611    }
612
613    /** @inheritDoc */
614    public function getAnyOpenConnection( $i, $flags = 0 ) {
615        $i = ( $i === self::DB_PRIMARY ) ? ServerInfo::WRITER_INDEX : $i;
616        $conn = false;
617        foreach ( $this->conns as $type => $poolConnsByServer ) {
618            if ( $i === self::DB_REPLICA ) {
619                // Consider all existing connections to any server
620                $applicableConnsByServer = $poolConnsByServer;
621            } else {
622                // Consider all existing connections to a specific server
623                $applicableConnsByServer = isset( $poolConnsByServer[$i] )
624                    ? [ $i => $poolConnsByServer[$i] ]
625                    : [];
626            }
627
628            $conn = $this->pickAnyOpenConnection( $applicableConnsByServer );
629            if ( $conn ) {
630                $this->logger->debug( __METHOD__ . ": found '$type' connection to #$i." );
631                break;
632            }
633        }
634
635        if ( $conn ) {
636            $this->enforceConnectionFlags( $conn, $flags );
637        }
638
639        return $conn;
640    }
641
642    /**
643     * @param Database[][] $connsByServer Map of (server index => array of DB handles)
644     * @return IDatabaseForOwner|false An appropriate open connection or false if none found
645     */
646    private function pickAnyOpenConnection( array $connsByServer ) {
647        foreach ( $connsByServer as $i => $conns ) {
648            foreach ( $conns as $conn ) {
649                if ( !$conn->isOpen() ) {
650                    $this->logger->warning(
651                        __METHOD__ .
652                        ": pooled DB handle for {db_server} (#$i) has no open connection.",
653                        $this->getConnLogContext( $conn )
654                    );
655                    continue; // some sort of error occurred?
656                }
657                return $conn;
658            }
659        }
660
661        return false;
662    }
663
664    /**
665     * Wait for a given replica DB to catch up to the primary DB pos stored in "waitForPos"
666     *
667     * @see loadSessionPrimaryPos()
668     *
669     * @param int $index Specific server index
670     * @param int|null $timeout Max seconds to wait; default is "waitTimeout"
671     * @return bool Success
672     */
673    private function awaitSessionPrimaryPos( $index, $timeout = null ) {
674        $timeout = max( 1, intval( $timeout ?: self::MAX_WAIT_DEFAULT ) );
675
676        if ( !$this->waitForPos || $index === ServerInfo::WRITER_INDEX ) {
677            return true;
678        }
679
680        $srvName = $this->getServerName( $index );
681
682        // Check if we already know that the DB has reached this point
683        $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $srvName, 'v2' );
684
685        /** @var DBPrimaryPos $knownReachedPos */
686        $position = $this->srvCache->get( $key );
687        if ( !is_array( $position ) ) {
688            $knownReachedPos = null;
689        } else {
690            $class = $position['_type_'];
691            $knownReachedPos = $class::newFromArray( $position );
692        }
693        if (
694            $knownReachedPos instanceof DBPrimaryPos &&
695            $knownReachedPos->hasReached( $this->waitForPos )
696        ) {
697            $this->logger->debug(
698                __METHOD__ .
699                ": replica DB {db_server} known to be caught up (pos >= $knownReachedPos).",
700                [ 'db_server' => $srvName ]
701            );
702
703            return true;
704        }
705
706        $close = false; // close the connection afterwards
707        $flags = self::CONN_SILENCE_ERRORS;
708        // Check if there is an existing connection that can be used
709        $conn = $this->getAnyOpenConnection( $index, $flags );
710        if ( !$conn ) {
711            // Get a connection to this server without triggering complementary connections
712            // to other servers (due to things like lag or read-only checks). We want to avoid
713            // the risk of overhead and recursion here.
714            $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
715            if ( !$conn ) {
716                $this->logger->warning(
717                    __METHOD__ . ': failed to connect to {db_server}',
718                    [ 'db_server' => $srvName ]
719                );
720
721                return false;
722            }
723            // Avoid connection spam in waitForAll() when connections
724            // are made just for the sake of doing this lag check.
725            $close = true;
726        }
727
728        $this->logger->info(
729            __METHOD__ .
730            ': waiting for replica DB {db_server} to catch up...',
731            $this->getConnLogContext( $conn )
732        );
733
734        $result = $conn->primaryPosWait( $this->waitForPos, $timeout );
735
736        $ok = ( $result !== null && $result != -1 );
737        if ( $ok ) {
738            // Remember that the DB reached this point
739            $this->srvCache->set( $key, $this->waitForPos->toArray(), BagOStuff::TTL_DAY );
740        }
741
742        if ( $close ) {
743            $this->closeConnection( $conn );
744        }
745
746        return $ok;
747    }
748
749    /** @inheritDoc */
750    public function getConnection( $i, $groups = [], string|false $domain = false, $flags = 0 ) {
751        if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
752            throw new UnexpectedValueException(
753                __METHOD__ . ' got CONN_SILENCE_ERRORS; connection is already deferred'
754            );
755        }
756
757        $domain = $this->resolveDomainID( $domain );
758        $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX )
759            ? self::DB_PRIMARY
760            : self::DB_REPLICA;
761
762        return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount );
763    }
764
765    /** @inheritDoc */
766    public function getConnectionInternal( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase {
767        $domain = $this->resolveDomainID( $domain );
768        $flags = $this->sanitizeConnectionFlags( $flags, $domain );
769        // If given DB_PRIMARY/DB_REPLICA, resolve it to a specific server index. Resolving
770        // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
771        // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
772        // The use of getServerConnection() instead of getConnection() avoids infinite loops.
773        $serverIndex = $i;
774        if ( $i === self::DB_PRIMARY ) {
775            $serverIndex = ServerInfo::WRITER_INDEX;
776        } elseif ( $i === self::DB_REPLICA ) {
777            $groupIndex = $this->getReaderIndex( $groups );
778            if ( $groupIndex !== false ) {
779                // Group connection succeeded
780                $serverIndex = $groupIndex;
781            }
782            if ( $serverIndex < 0 ) {
783                $this->reportConnectionError( 'could not connect to any replica DB server' );
784            }
785        } elseif ( !$this->serverInfo->hasServerIndex( $i ) ) {
786            throw new UnexpectedValueException( "Invalid server index index #$i" );
787        }
788        // Get an open connection to that server (might trigger a new connection)
789        return $this->getServerConnection( $serverIndex, $domain, $flags );
790    }
791
792    /** @inheritDoc */
793    public function getServerConnection( $i, $domain, $flags = 0 ) {
794        $domainInstance = DatabaseDomain::newFromId( $domain );
795        // Number of connections made before getting the server index and handle
796        $priorConnectionsMade = $this->connectionCounter;
797        // Get an open connection to this server (might trigger a new connection)
798        $conn = $this->reuseOrOpenConnectionForNewRef( $i, $domainInstance, $flags );
799        // Throw an error or otherwise bail out if the connection attempt failed
800        if ( !( $conn instanceof IDatabaseForOwner ) ) {
801            if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
802                $this->reportConnectionError();
803            }
804
805            return false;
806        }
807
808        // Profile any new connections caused by this method
809        if ( $this->connectionCounter > $priorConnectionsMade ) {
810            $this->trxProfiler->recordConnection(
811                $conn->getServerName(),
812                $conn->getDBname(),
813                ( $i === ServerInfo::WRITER_INDEX && $this->hasStreamingReplicaServers() )
814            );
815        }
816
817        if ( !$conn->isOpen() ) {
818            $this->lastErrorConn = $conn;
819            // Connection was made but later unrecoverably lost for some reason.
820            // Do not return a handle that will just throw exceptions on use, but
821            // let the calling code, e.g. getReaderIndex(), try another server.
822            if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
823                $this->reportConnectionError();
824            }
825            return false;
826        }
827
828        return $conn;
829    }
830
831    /** @inheritDoc */
832    public function getMaintenanceConnectionRef(
833        $i,
834        $groups = [],
835        $domain = false,
836        $flags = 0
837    ): DBConnRef {
838        if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
839            throw new UnexpectedValueException(
840                __METHOD__ . ' CONN_SILENCE_ERRORS is not supported'
841            );
842        }
843
844        $domain = $this->resolveDomainID( $domain );
845        $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX )
846            ? self::DB_PRIMARY
847            : self::DB_REPLICA;
848
849        return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount );
850    }
851
852    /**
853     * Get a live connection handle to the given domain
854     *
855     * This will reuse an existing tracked connection when possible. In some cases, this
856     * involves switching the DB domain of an existing handle in order to reuse it. If no
857     * existing handles can be reused, then a new connection will be made.
858     *
859     * @param int $i Specific server index
860     * @param DatabaseDomain $domain Database domain ID required by the reference
861     * @param int $flags Bit field of class CONN_* constants
862     * @return IDatabase|null Database or null on error
863     * @throws DBError When database selection fails
864     * @throws InvalidArgumentException When the server index is invalid
865     * @throws UnexpectedValueException When the DB domain of the connection is corrupted
866     * @throws DBAccessError If disable() was called
867     */
868    private function reuseOrOpenConnectionForNewRef( $i, DatabaseDomain $domain, $flags = 0 ) {
869        // Figure out which connection pool to use based on the flags
870        if ( $this->fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE ) ) {
871            // Use low timeouts, use autocommit mode, ignore transaction rounds
872            $category = self::CATEGORY_GAUGE;
873        } elseif ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
874            // Use autocommit mode, ignore transaction rounds
875            $category = self::CATEGORY_AUTOCOMMIT;
876        } else {
877            // Respect DBO_DEFAULT, respect transaction rounds
878            $category = self::CATEGORY_ROUND;
879        }
880
881        $conn = null;
882        // Reuse a free connection in the pool from any domain if possible. There should only
883        // be one connection in this pool unless either:
884        //  - a) IDatabase::databasesAreIndependent() returns true (e.g. postgres) and two
885        //       or more database domains have been used during the load balancer's lifetime
886        //  - b) Two or more nested function calls used getConnection() on different domains.
887        foreach ( ( $this->conns[$category][$i] ?? [] ) as $poolConn ) {
888            // Check if any required DB domain changes for the new reference are possible
889            // Calling selectDomain() would trigger a reconnect, which will break if a
890            // transaction is active or if there is any other meaningful session state.
891            $isShareable = !(
892                $poolConn->databasesAreIndependent() &&
893                $domain->getDatabase() !== null &&
894                $domain->getDatabase() !== $poolConn->getDBname()
895            );
896            if ( $isShareable ) {
897                $conn = $poolConn;
898                // Make any required DB domain changes for the new reference
899                if ( !$domain->isUnspecified() ) {
900                    $conn->selectDomain( $domain );
901                }
902                $this->logger->debug( __METHOD__ . ": reusing connection for $i/$domain" );
903                break;
904            }
905        }
906
907        // If necessary, try to open a new connection and add it to the pool
908        if ( !$conn ) {
909            $conn = $this->reallyOpenConnection(
910                $i,
911                $domain,
912                [ self::INFO_CONN_CATEGORY => $category ]
913            );
914            if ( $conn->isOpen() ) {
915                // Make Database::isReadOnly() respect server-side and configuration-based
916                // read-only mode. Note that replica handles are always seen as read-only
917                // in Database::isReadOnly() and Database::assertIsWritablePrimary().
918                if ( $i === ServerInfo::WRITER_INDEX ) {
919                    if ( $this->readOnlyReason !== false ) {
920                        $readOnlyReason = $this->readOnlyReason;
921                    } elseif ( $this->isPrimaryRunningReadOnly( $conn ) ) {
922                        $readOnlyReason = 'The primary database server is running in read-only mode.';
923                    } else {
924                        $readOnlyReason = false;
925                    }
926                    $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $readOnlyReason );
927                }
928                // Connection obtained; check if it belongs to a tracked connection category
929                if ( isset( $this->conns[$category] ) ) {
930                    // Track this connection for future reuse
931                    $this->conns[$category][$i][] = $conn;
932                }
933            } else {
934                $this->logger->warning( __METHOD__ . ": connection error for $i/$domain" );
935                $this->lastErrorConn = $conn;
936                $conn = null;
937            }
938        }
939
940        if ( $conn instanceof IDatabaseForOwner ) {
941            // Check to make sure that the right domain is selected
942            $this->assertConnectionDomain( $conn, $domain );
943            // Check to make sure that the CONN_* flags are respected
944            $this->enforceConnectionFlags( $conn, $flags );
945        }
946
947        return $conn;
948    }
949
950    /**
951     * Sanity check to make sure that the right domain is selected
952     *
953     * @param Database $conn
954     * @param DatabaseDomain $domain
955     * @throws DBUnexpectedError
956     */
957    private function assertConnectionDomain( Database $conn, DatabaseDomain $domain ) {
958        if ( !$domain->isCompatible( $conn->getDomainID() ) ) {
959            throw new UnexpectedValueException(
960                "Got connection to '{$conn->getDomainID()}', but expected one for '{$domain}'"
961            );
962        }
963    }
964
965    /** @inheritDoc */
966    public function getServerAttributes( $i ) {
967        return $this->databaseFactory->attributesFromType(
968            $this->getServerType( $i ),
969            $this->serverInfo->getServerDriver( $i )
970        );
971    }
972
973    /**
974     * Open a new network connection to a server (uncached)
975     *
976     * Returns a Database object whether or not the connection was successful.
977     *
978     * @param int $i Specific server index
979     * @param DatabaseDomain $domain Domain the connection is for, possibly unspecified
980     * @param array $lbInfo Additional information for setLBInfo()
981     * @return Database
982     * @throws DBAccessError
983     * @throws InvalidArgumentException
984     */
985    protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo ) {
986        if ( $this->disabled ) {
987            throw new DBAccessError();
988        }
989
990        $server = $this->serverInfo->getServerInfoStrict( $i );
991        if ( $lbInfo[self::INFO_CONN_CATEGORY] === self::CATEGORY_GAUGE ) {
992            // Use low connection/read timeouts for connection used for gauging server health.
993            // Gauge information should be cached and used to avoid outages. Indefinite hanging
994            // while gauging servers would do the opposite.
995            $server['connectTimeout'] = min( 1, $server['connectTimeout'] ?? INF );
996            $server['receiveTimeout'] = min( 1, $server['receiveTimeout'] ?? INF );
997            // Avoid implicit transactions and avoid any SET query for session variables during
998            // Database::open(). If a server becomes slow, every extra query can cause significant
999            // delays, even with low connect/receive timeouts.
1000            $server['flags'] ??= 0;
1001            $server['flags'] &= ~IDatabase::DBO_DEFAULT;
1002            $server['flags'] |= IDatabase::DBO_GAUGE;
1003        } else {
1004            // Use implicit transactions unless explicitly configured otherwise
1005            $server['flags'] ??= IDatabase::DBO_DEFAULT;
1006        }
1007
1008        if ( !empty( $server['is static'] ) ) {
1009            $topologyRole = IDatabase::ROLE_STATIC_CLONE;
1010        } else {
1011            $topologyRole = ( $i === ServerInfo::WRITER_INDEX )
1012                ? IDatabase::ROLE_STREAMING_MASTER
1013                : IDatabase::ROLE_STREAMING_REPLICA;
1014        }
1015
1016        $conn = $this->databaseFactory->create(
1017            $server['type'],
1018            array_merge( $server, [
1019                // Basic replication role information
1020                'topologyRole' => $topologyRole,
1021                // Use the database specified in $domain (null means "none or entrypoint DB");
1022                // fallback to the $server default if the RDBMs is an embedded library using a
1023                // file on disk since there would be nothing to access to without a DB/file name.
1024                'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE]
1025                    ? ( $domain->getDatabase() ?? $server['dbname'] ?? null )
1026                    : $domain->getDatabase(),
1027                // Override the $server default schema with that of $domain if specified
1028                'schema' => $domain->getSchema(),
1029                // Use the table prefix specified in $domain
1030                'tablePrefix' => $domain->getTablePrefix(),
1031                'srvCache' => $this->srvCache,
1032                'logger' => $this->logger,
1033                'errorLogger' => $this->errorLogger,
1034                'trxProfiler' => $this->trxProfiler,
1035                'lbInfo' => [ self::INFO_SERVER_INDEX => $i ] + $lbInfo
1036            ] ),
1037            Database::NEW_UNCONNECTED
1038        );
1039        // Set alternative table names before any queries can be issued
1040        $conn->setTableAliases( $this->tableAliases );
1041        // Account for any active transaction round and listeners
1042        $this->syncConnectionRoundState( $conn );
1043        if ( $i === ServerInfo::WRITER_INDEX ) {
1044            foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1045                $conn->setTransactionListener( $name, $callback );
1046            }
1047        }
1048
1049        // Make the connection handle live
1050        try {
1051            $conn->initConnection();
1052            ++$this->connectionCounter;
1053        } catch ( DBConnectionError ) {
1054            $this->lastErrorConn = $conn;
1055            // ignore; let the DB handle the logging
1056        }
1057
1058        if ( $conn->isOpen() ) {
1059            $this->logger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1060        } else {
1061            $this->logger->warning(
1062                __METHOD__ . ": connection error for $i/{db_domain}",
1063                [ 'db_domain' => $domain->getId() ]
1064            );
1065        }
1066
1067        // Log when many connection are made during a single request/script
1068        $count = 0;
1069        foreach ( $this->conns as $poolConnsByServer ) {
1070            foreach ( $poolConnsByServer as $serverConns ) {
1071                $count += count( $serverConns );
1072            }
1073        }
1074        if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
1075            $this->logger->warning(
1076                __METHOD__ . ": {connections}+ connections made (primary={primarydb})",
1077                $this->getConnLogContext(
1078                    $conn,
1079                    [
1080                        'connections' => $count,
1081                        'primarydb' => $this->serverInfo->getPrimaryServerName(),
1082                        'db_domain' => $domain->getId()
1083                    ]
1084                )
1085            );
1086        }
1087
1088        $this->assertConnectionDomain( $conn, $domain );
1089
1090        return $conn;
1091    }
1092
1093    /**
1094     * Make sure that any "waitForPos" replication positions are loaded and available
1095     *
1096     * Each load balancer cluster has up to one replication position for the session.
1097     * These are used when data read by queries is expected to reflect writes caused
1098     * by a prior request/script from the same client.
1099     *
1100     * @see awaitSessionPrimaryPos()
1101     */
1102    private function loadSessionPrimaryPos() {
1103        if ( !$this->chronologyProtectorCalled && $this->chronologyProtector ) {
1104            $this->chronologyProtectorCalled = true;
1105            $pos = $this->chronologyProtector->getSessionPrimaryPos( $this );
1106            $this->logger->debug( __METHOD__ . ': executed chronology callback.' );
1107            if ( $pos ) {
1108                if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
1109                    $this->waitForPos = $pos;
1110                }
1111            }
1112        }
1113    }
1114
1115    /**
1116     * @param string $extraLbError Separat load balancer error
1117     * @throws DBConnectionError
1118     * @return never
1119     */
1120    private function reportConnectionError( $extraLbError = '' ): never {
1121        if ( $this->lastErrorConn instanceof IDatabaseForOwner ) {
1122            $srvName = $this->lastErrorConn->getServerName();
1123            $lastDbError = $this->lastErrorConn->lastError() ?: 'unknown error';
1124
1125            $exception = new DBConnectionError(
1126                $this->lastErrorConn,
1127                $extraLbError
1128                    ? "{$extraLbError}{$lastDbError} ({$srvName})"
1129                    : "{$lastDbError} ({$srvName})"
1130            );
1131
1132            if ( $extraLbError ) {
1133                $this->logger->warning(
1134                    __METHOD__ . "$extraLbError; {last_error} ({db_server})",
1135                    $this->getConnLogContext(
1136                        $this->lastErrorConn,
1137                        [
1138                            'method' => __METHOD__,
1139                            'last_error' => $lastDbError
1140                        ]
1141                    )
1142                );
1143            }
1144        } else {
1145            $exception = new DBConnectionError(
1146                null,
1147                $extraLbError ?: 'could not connect to the DB server'
1148            );
1149
1150            if ( $extraLbError ) {
1151                $this->logger->error(
1152                    __METHOD__ . "$extraLbError",
1153                    [
1154                        'method' => __METHOD__,
1155                        'last_error' => '(last connection error missing)'
1156                    ]
1157                );
1158            }
1159        }
1160
1161        throw $exception;
1162    }
1163
1164    /** @inheritDoc */
1165    public function getServerCount() {
1166        return $this->serverInfo->getServerCount();
1167    }
1168
1169    /** @inheritDoc */
1170    public function hasReplicaServers() {
1171        return $this->serverInfo->hasReplicaServers();
1172    }
1173
1174    /** @inheritDoc */
1175    public function hasStreamingReplicaServers() {
1176        return $this->serverInfo->hasStreamingReplicaServers();
1177    }
1178
1179    /** @inheritDoc */
1180    public function getServerName( $i ): string {
1181        return $this->serverInfo->getServerName( $i );
1182    }
1183
1184    /** @inheritDoc */
1185    public function getServerInfo( $i ) {
1186        return $this->serverInfo->getServerInfo( $i );
1187    }
1188
1189    /** @inheritDoc */
1190    public function getServerType( $i ) {
1191        return $this->serverInfo->getServerType( $i );
1192    }
1193
1194    /** @inheritDoc */
1195    public function getPrimaryPos() {
1196        $conn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX );
1197        if ( $conn ) {
1198            return $conn->getPrimaryPos();
1199        }
1200
1201        /** @var IDatabaseForOwner|null $conn */
1202        $conn = $this->getConnectionInternal( ServerInfo::WRITER_INDEX, [], false, self::CONN_SILENCE_ERRORS );
1203        // @phan-suppress-next-line PhanRedundantCondition
1204        if ( !$conn ) {
1205            $this->reportConnectionError();
1206        }
1207
1208        // ::getConnectionInternal() should return IDatabaseForOwner but changing signature
1209        // is not straightforward (being implemented in Wikibase)
1210        '@phan-var IDatabaseForOwner $conn';
1211        try {
1212            return $conn->getPrimaryPos();
1213        } finally {
1214            $this->closeConnection( $conn );
1215        }
1216    }
1217
1218    /**
1219     * Apply updated configuration.
1220     *
1221     * This only unregisters servers that were removed in the new configuration.
1222     * It does not register new servers nor update the group load weights.
1223     *
1224     * This invalidates any open connections. However, existing connections may continue to be
1225     * used while they are in an active transaction. In that case, the old connection will be
1226     * discarded on the first operation after the transaction is complete. The next operation
1227     * will use a new connection based on the new configuration.
1228     *
1229     * @internal for use by LBFactory::reconfigure()
1230     *
1231     * @see DBConnRef::ensureConnection()
1232     * @see LBFactory::reconfigure()
1233     *
1234     * @param array $params A database configuration array, see $wgLBFactoryConf.
1235     *
1236     * @return void
1237     */
1238    public function reconfigure( array $params ) {
1239        $anyServerDepooled = false;
1240
1241        $paramServers = $params['servers'];
1242        $newIndexByServerIndex = $this->serverInfo->reconfigureServers( $paramServers );
1243        foreach ( $newIndexByServerIndex as $i => $ni ) {
1244            if ( $ni === null ) {
1245                // Server no longer exists in the new config
1246                $anyServerDepooled = true;
1247                // Note that if the primary server is depooled and a replica server promoted
1248                // to new primary, then DB_PRIMARY handles will fail with server index errors
1249                unset( $this->loads[$i] );
1250            }
1251        }
1252
1253        if ( $anyServerDepooled ) {
1254            // NOTE: We could close all connection here, but some may be in the middle of
1255            //       a transaction. So instead, we leave it to DBConnRef to close the
1256            //       connection when it detects that the modcount has changed and no
1257            //       transaction is open.
1258            $this->logger->info( 'Reconfiguring dbs!' );
1259            // Unpin DB_REPLICA connection groups from server indexes
1260            $this->readIndexByGroup = [];
1261            // We could close all connection here, but some may be in the middle of a
1262            // transaction. So instead, we leave it to DBConnRef to close the connection
1263            // when it detects that the modcount has changed and no transaction is open.
1264            $this->conns = self::newTrackedConnectionsArray();
1265            // Bump modification counter to invalidate the connections held by DBConnRef
1266            // instances. This will cause the next call to a method on the DBConnRef
1267            // to get a new connection from getConnectionInternal()
1268            $this->modcount++;
1269        }
1270    }
1271
1272    /** @inheritDoc */
1273    public function disable( $fname = __METHOD__ ) {
1274        $this->closeAll( $fname );
1275        $this->disabled = true;
1276    }
1277
1278    /** @inheritDoc */
1279    public function closeAll( $fname = __METHOD__ ) {
1280        /** @noinspection PhpUnusedLocalVariableInspection */
1281        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1282        foreach ( $this->getOpenConnections() as $conn ) {
1283            $conn->close( $fname );
1284        }
1285
1286        $this->conns = self::newTrackedConnectionsArray();
1287    }
1288
1289    /**
1290     * Close a connection
1291     *
1292     * Using this function makes sure the LoadBalancer knows the connection is closed.
1293     * If you use $conn->close() directly, the load balancer won't update its state.
1294     */
1295    private function closeConnection( IDatabaseForOwner $conn ) {
1296        if ( $conn instanceof DBConnRef ) {
1297            // Avoid calling close() but still leaving the handle in the pool
1298            throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
1299        }
1300
1301        $domain = $conn->getDomainID();
1302        $serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX );
1303        if ( $serverIndex === null ) {
1304            throw new UnexpectedValueException( "Handle on '$domain' missing server index" );
1305        }
1306
1307        $srvName = $this->serverInfo->getServerName( $serverIndex );
1308
1309        $found = false;
1310        foreach ( $this->conns as $type => $poolConnsByServer ) {
1311            $key = array_search( $conn, $poolConnsByServer[$serverIndex] ?? [], true );
1312            if ( $key !== false ) {
1313                $found = true;
1314                unset( $this->conns[$type][$serverIndex][$key] );
1315            }
1316        }
1317
1318        if ( !$found ) {
1319            $this->logger->warning(
1320                __METHOD__ .
1321                ": orphaned connection to database {$this->stringifyConn( $conn )} at '$srvName'."
1322            );
1323        }
1324
1325        $this->logger->debug(
1326            __METHOD__ .
1327            ": closing connection to database {$this->stringifyConn( $conn )} at '$srvName'."
1328        );
1329
1330        $conn->close( __METHOD__ );
1331    }
1332
1333    /** @inheritDoc */
1334    public function finalizePrimaryChanges( $fname = __METHOD__ ) {
1335        $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1336        /** @noinspection PhpUnusedLocalVariableInspection */
1337        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1338
1339        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1340        // Loop until callbacks stop adding callbacks on other connections
1341        $total = 0;
1342        do {
1343            $count = 0; // callbacks execution attempts
1344            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1345                // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1346                // Any error should cause all (peer) transactions to be rolled back together.
1347                $count += $conn->runOnTransactionPreCommitCallbacks();
1348            }
1349            $total += $count;
1350        } while ( $count > 0 );
1351        // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1352        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1353            $conn->setTrxEndCallbackSuppression( true );
1354        }
1355        $this->trxRoundStage = self::ROUND_FINALIZED;
1356
1357        return $total;
1358    }
1359
1360    /** @inheritDoc */
1361    public function approvePrimaryChanges( int $maxWriteDuration, $fname = __METHOD__ ) {
1362        $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1363        /** @noinspection PhpUnusedLocalVariableInspection */
1364        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1365
1366        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1367        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1368            // Any atomic sections should have been closed by now and there definitely should
1369            // not be any open transactions started by begin() from callers outside Database.
1370            if ( $conn->explicitTrxActive() ) {
1371                throw new DBTransactionError(
1372                    $conn,
1373                    "Explicit transaction still active; a caller might have failed to call " .
1374                    "endAtomic() or cancelAtomic()."
1375                );
1376            }
1377            // Assert that the time to replicate the transaction will be reasonable.
1378            // If this fails, then all DB transactions will be rollback back together.
1379            $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1380            if ( $maxWriteDuration > 0 ) {
1381                if ( $time > $maxWriteDuration ) {
1382                    $humanTimeSec = round( $time, 3 );
1383                    throw new DBTransactionSizeError(
1384                        $conn,
1385                        "Transaction spent {time}s in writes, exceeding the {$maxWriteDuration}s limit",
1386                        // Message parameters for: transaction-duration-limit-exceeded
1387                        [ $time, $maxWriteDuration ],
1388                        null,
1389                        [ 'time' => $humanTimeSec ]
1390                    );
1391                } elseif ( $time > 0 ) {
1392                    $timeMs = $time * 1000;
1393                    $humanTimeMs = round( $timeMs, $timeMs > 1 ? 0 : 3 );
1394                    $this->logger->debug(
1395                        "Transaction spent {time_ms}ms in writes, under the {$maxWriteDuration}s limit",
1396                        [ 'time_ms' => $humanTimeMs ]
1397                    );
1398                }
1399            }
1400            // If a connection sits idle for too long it might be dropped, causing transaction
1401            // writes and session locks to be lost. Ping all the server connections before making
1402            // any attempt to commit the transactions belonging to the active transaction round.
1403            if ( $conn->writesOrCallbacksPending() || $conn->sessionLocksPending() ) {
1404                if ( !$conn->ping() ) {
1405                    throw new DBTransactionError(
1406                        $conn,
1407                        "Pre-commit ping failed on server {$conn->getServerName()}"
1408                    );
1409                }
1410            }
1411        }
1412        $this->trxRoundStage = self::ROUND_APPROVED;
1413    }
1414
1415    /** @inheritDoc */
1416    public function beginPrimaryChanges( $fname = __METHOD__ ) {
1417        if ( $this->trxRoundFname !== null ) {
1418            throw new DBTransactionError(
1419                null,
1420                "Transaction round '{$this->trxRoundFname}' already started"
1421            );
1422        }
1423        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1424        /** @noinspection PhpUnusedLocalVariableInspection */
1425        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1426
1427        // Clear any empty transactions (no writes/callbacks) from the implicit round
1428        $this->flushPrimarySnapshots( $fname );
1429
1430        $this->trxRoundFname = $fname;
1431        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1432        // Mark applicable handles as participating in this explicit transaction round.
1433        // For each of these handles, any writes and callbacks will be tied to a single
1434        // transaction. The (peer) handles will reject begin()/commit() calls unless they
1435        // are part of an en masse commit or an en masse rollback.
1436        foreach ( $this->getOpenConnections() as $conn ) {
1437            $this->syncConnectionRoundState( $conn );
1438        }
1439        $this->trxRoundStage = self::ROUND_CURSORY;
1440    }
1441
1442    /** @inheritDoc */
1443    public function commitPrimaryChanges( $fname = __METHOD__ ) {
1444        $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1445        /** @noinspection PhpUnusedLocalVariableInspection */
1446        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1447
1448        $failures = [];
1449
1450        $this->trxRoundFname = null;
1451        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1452        // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1453        // Note that callbacks should already be suppressed due to finalizePrimaryChanges().
1454        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1455            try {
1456                $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1457            } catch ( DBError $e ) {
1458                ( $this->errorLogger )( $e );
1459                $failures[] = "{$conn->getServerName()}{$e->getMessage()}";
1460            }
1461        }
1462        if ( $failures ) {
1463            throw new DBTransactionError(
1464                null,
1465                "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1466            );
1467        }
1468        // Unmark handles as participating in this explicit transaction round
1469        foreach ( $this->getOpenConnections() as $conn ) {
1470            $this->syncConnectionRoundState( $conn );
1471        }
1472        $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1473    }
1474
1475    /** @inheritDoc */
1476    public function runPrimaryTransactionIdleCallbacks( $fname = __METHOD__ ) {
1477        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1478            $type = IDatabase::TRIGGER_COMMIT;
1479        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1480            $type = IDatabase::TRIGGER_ROLLBACK;
1481        } else {
1482            throw new DBTransactionError(
1483                null,
1484                "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1485            );
1486        }
1487        /** @noinspection PhpUnusedLocalVariableInspection */
1488        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1489
1490        $oldStage = $this->trxRoundStage;
1491        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1492
1493        // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1494        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1495            $conn->setTrxEndCallbackSuppression( false );
1496        }
1497
1498        $errors = [];
1499        $fname = __METHOD__;
1500        // Loop until callbacks stop adding callbacks on other connections
1501        do {
1502            // Run any pending callbacks for each connection...
1503            $count = 0; // callback execution attempts
1504            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1505                if ( $conn->trxLevel() ) {
1506                    continue; // retry in the next iteration, after commit() is called
1507                }
1508                $count += $conn->runOnTransactionIdleCallbacks( $type, $errors );
1509            }
1510            // Clear out any active transactions left over from callbacks...
1511            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1512                if ( $conn->writesPending() ) {
1513                    // A callback from another handle wrote to this one and DBO_TRX is set
1514                    $fnames = implode( ', ', $conn->pendingWriteCallers() );
1515                    $this->logger->info(
1516                        "$fname: found writes pending ($fnames).",
1517                        $this->getConnLogContext(
1518                            $conn,
1519                            [ 'exception' => new RuntimeException() ]
1520                        )
1521                    );
1522                    $this->statsFactory->getCounter( 'rdbms_callback_writes_found_total' )
1523                        ->setLabel( 'db_cluster', $this->getClusterName() )
1524                        ->setLabel( 'db_server', $conn->getServerName() )
1525                        ->increment();
1526                } elseif ( $conn->trxLevel() ) {
1527                    // A callback from another handle read from this one and DBO_TRX is set,
1528                    // which can easily happen if there is only one DB (no replicas)
1529                    $this->logger->debug( "$fname: found empty transaction." );
1530                }
1531                try {
1532                    $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1533                } catch ( DBError $ex ) {
1534                    $errors[] = $ex;
1535                }
1536            }
1537        } while ( $count > 0 );
1538
1539        $this->trxRoundStage = $oldStage;
1540
1541        return $errors[0] ?? null;
1542    }
1543
1544    /** @inheritDoc */
1545    public function runPrimaryTransactionListenerCallbacks( $fname = __METHOD__ ) {
1546        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1547            $type = IDatabase::TRIGGER_COMMIT;
1548        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1549            $type = IDatabase::TRIGGER_ROLLBACK;
1550        } else {
1551            throw new DBTransactionError(
1552                null,
1553                "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1554            );
1555        }
1556        /** @noinspection PhpUnusedLocalVariableInspection */
1557        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1558
1559        $errors = [];
1560        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1561        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1562            $conn->runTransactionListenerCallbacks( $type, $errors );
1563        }
1564        $this->trxRoundStage = self::ROUND_CURSORY;
1565
1566        return $errors[0] ?? null;
1567    }
1568
1569    /** @inheritDoc */
1570    public function rollbackPrimaryChanges( $fname = __METHOD__ ) {
1571        /** @noinspection PhpUnusedLocalVariableInspection */
1572        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1573
1574        $this->trxRoundFname = null;
1575        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1576        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1577            $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1578        }
1579        // Unmark handles as participating in this explicit transaction round
1580        foreach ( $this->getOpenConnections() as $conn ) {
1581            $this->syncConnectionRoundState( $conn );
1582        }
1583        $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1584    }
1585
1586    /** @inheritDoc */
1587    public function flushPrimarySessions( $fname = __METHOD__ ) {
1588        $this->assertTransactionRoundStage( [ self::ROUND_CURSORY ] );
1589        if ( $this->hasPrimaryChanges() ) {
1590            // Any transaction should have been rolled back beforehand
1591            throw new DBTransactionError( null, "Cannot reset session while writes are pending" );
1592        }
1593
1594        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1595            $conn->flushSession( $fname, $conn::FLUSHING_ALL_PEERS );
1596        }
1597    }
1598
1599    /**
1600     * @param string|string[] $stage
1601     * @throws DBTransactionError
1602     */
1603    private function assertTransactionRoundStage( $stage ) {
1604        $stages = (array)$stage;
1605
1606        if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1607            $stageList = implode(
1608                '/',
1609                array_map( static function ( $v ) {
1610                    return "'$v'";
1611                }, $stages )
1612            );
1613            throw new DBTransactionError(
1614                null,
1615                "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1616            );
1617        }
1618    }
1619
1620    /**
1621     * Make connections with DBO_DEFAULT/DBO_TRX set join any active transaction round
1622     *
1623     * Some servers may have neither flag enabled, meaning that they opt out of such
1624     * transaction rounds and remain in auto-commit mode. Such behavior might be desired
1625     * when a DB server is used for something like simple key/value storage.
1626     *
1627     * @param Database $conn
1628     */
1629    private function syncConnectionRoundState( Database $conn ) {
1630        if ( $conn->getLBInfo( self::INFO_CONN_CATEGORY ) !== self::CATEGORY_ROUND ) {
1631            return; // transaction rounds do not apply to these connections
1632        }
1633
1634        if ( $this->trxRoundFname !== null ) {
1635            // Explicit transaction round
1636            $trxRoundLevel = 1;
1637        } else {
1638            // Implicit auto-commit round (cli mode) or implicit transaction round (web mode)
1639            $trxRoundLevel = ( $this->cliMode ? 0 : 1 );
1640        }
1641
1642        // CATEGORY_ROUND DB handles with DBO_DEFAULT are considered "round aware" and the
1643        // load balancer will take over the logic of when DBO_TRX is set for such DB handles.
1644        // DBO_TRX will be set only when a transaction round is active (explicit or implicit).
1645        if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1646            if ( $trxRoundLevel ) {
1647                // Wrap queries in a transaction joined to the active transaction round
1648                $conn->setFlag( $conn::DBO_TRX );
1649            } else {
1650                // Do not wrap queries in a transaction (no active transaction round)
1651                $conn->clearFlag( $conn::DBO_TRX );
1652            }
1653        }
1654
1655        // Note that DBO_TRX is normally only set above or by DatabaseFactory applying
1656        // DBO_DEFAULT. However, integration tests might directly force DBO_TRX in the
1657        // server configuration arrays. Such handles will still be flushed during calls
1658        // to {@link LoadBalancer::commitPrimaryChanges()}.
1659        if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1660            // DB handle is participating in the active transaction round
1661            $conn->setLBInfo( $conn::LB_TRX_ROUND_LEVEL, $trxRoundLevel );
1662            $conn->setLBInfo( $conn::LB_TRX_ROUND_FNAME, $this->trxRoundFname );
1663        } else {
1664            // DB handle is not participating in any transaction rounds
1665            $conn->setLBInfo( $conn::LB_TRX_ROUND_LEVEL, 0 );
1666            $conn->setLBInfo( $conn::LB_TRX_ROUND_FNAME, null );
1667        }
1668    }
1669
1670    /** @inheritDoc */
1671    public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1672        foreach ( $this->conns as $poolConnsByServer ) {
1673            foreach ( $poolConnsByServer as $serverIndex => $serverConns ) {
1674                if ( $serverIndex === ServerInfo::WRITER_INDEX ) {
1675                    continue; // skip primary
1676                }
1677                foreach ( $serverConns as $conn ) {
1678                    $conn->flushSnapshot( $fname );
1679                }
1680            }
1681        }
1682    }
1683
1684    /** @inheritDoc */
1685    public function flushPrimarySnapshots( $fname = __METHOD__ ) {
1686        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1687            $conn->flushSnapshot( $fname );
1688        }
1689    }
1690
1691    /** @inheritDoc */
1692    public function hasPrimaryConnection() {
1693        return (bool)$this->getAnyOpenConnection( ServerInfo::WRITER_INDEX );
1694    }
1695
1696    /** @inheritDoc */
1697    public function hasPrimaryChanges() {
1698        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1699            if ( $conn->writesOrCallbacksPending() ) {
1700                return true;
1701            }
1702        }
1703
1704        return false;
1705    }
1706
1707    /** @inheritDoc */
1708    public function lastPrimaryChangeTimestamp() {
1709        $lastTime = null;
1710        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1711            $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1712        }
1713
1714        return $lastTime;
1715    }
1716
1717    /** @inheritDoc */
1718    public function hasOrMadeRecentPrimaryChanges( $age = null ) {
1719        $age ??= self::MAX_WAIT_DEFAULT;
1720
1721        return ( $this->hasPrimaryChanges()
1722            || $this->lastPrimaryChangeTimestamp() > microtime( true ) - $age );
1723    }
1724
1725    /** @inheritDoc */
1726    public function pendingPrimaryChangeCallers() {
1727        $fnames = [];
1728        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1729            $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1730        }
1731
1732        return $fnames;
1733    }
1734
1735    /** @inheritDoc */
1736    public function explicitTrxActive() {
1737        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1738            if ( $conn->explicitTrxActive() ) {
1739                return true;
1740            }
1741        }
1742        return false;
1743    }
1744
1745    private function setLaggedReplicaMode(): void {
1746        $this->laggedReplicaMode = true;
1747        $this->logger->warning( __METHOD__ . ": setting lagged replica mode" );
1748    }
1749
1750    /** @inheritDoc */
1751    public function laggedReplicaUsed() {
1752        return $this->laggedReplicaMode;
1753    }
1754
1755    /** @inheritDoc */
1756    public function getReadOnlyReason() {
1757        if ( $this->readOnlyReason !== false ) {
1758            return $this->readOnlyReason;
1759        } elseif ( $this->isPrimaryRunningReadOnly() ) {
1760            return 'The primary database server is running in read-only mode.';
1761        }
1762
1763        return false;
1764    }
1765
1766    /**
1767     * @note This method suppresses DBError exceptions in order to avoid severe downtime
1768     * @param IDatabaseForOwner|null $conn Recently acquired primary connection; null if not applicable
1769     * @return bool Whether the entire primary DB server or the local domain DB is read-only
1770     */
1771    private function isPrimaryRunningReadOnly( ?IDatabaseForOwner $conn = null ) {
1772        // Context will often be HTTP GET/HEAD; heavily cache the results
1773        return (bool)$this->wanCache->getWithSetCallback(
1774            // Note that table prefixes are not related to server-side read-only mode
1775            $this->wanCache->makeGlobalKey(
1776                'rdbms-server-readonly',
1777                $this->serverInfo->getPrimaryServerName()
1778            ),
1779            self::TTL_CACHE_READONLY,
1780            function ( $oldValue ) use ( $conn ) {
1781                $scope = $this->trxProfiler->silenceForScope();
1782                $conn ??= $this->getServerConnection(
1783                    ServerInfo::WRITER_INDEX,
1784                    self::DOMAIN_ANY,
1785                    self::CONN_SILENCE_ERRORS
1786                );
1787                if ( $conn ) {
1788                    try {
1789                        $value = (int)$conn->serverIsReadOnly();
1790                    } catch ( DBError ) {
1791                        $value = is_int( $oldValue ) ? $oldValue : 0;
1792                    }
1793                } else {
1794                    $value = 0;
1795                }
1796                ScopedCallback::consume( $scope );
1797
1798                return $value;
1799            },
1800            [
1801                'busyValue' => 0,
1802                'pcTTL' => WANObjectCache::TTL_PROC_LONG
1803            ]
1804        );
1805    }
1806
1807    /** @inheritDoc */
1808    public function pingAll() {
1809        $success = true;
1810        foreach ( $this->getOpenConnections() as $conn ) {
1811            if ( !$conn->ping() ) {
1812                $success = false;
1813            }
1814        }
1815
1816        return $success;
1817    }
1818
1819    /**
1820     * Get all open connections
1821     * @return \Generator|Database[]
1822     */
1823    private function getOpenConnections() {
1824        foreach ( $this->conns as $poolConnsByServer ) {
1825            foreach ( $poolConnsByServer as $serverConns ) {
1826                foreach ( $serverConns as $conn ) {
1827                    yield $conn;
1828                }
1829            }
1830        }
1831    }
1832
1833    /**
1834     * Get all open primary connections
1835     * @return \Generator|Database[]
1836     */
1837    private function getOpenPrimaryConnections() {
1838        foreach ( $this->conns as $poolConnsByServer ) {
1839            /** @var IDatabaseForOwner $conn */
1840            foreach ( ( $poolConnsByServer[ServerInfo::WRITER_INDEX] ?? [] ) as $conn ) {
1841                yield $conn;
1842            }
1843        }
1844    }
1845
1846    /** @inheritDoc */
1847    public function getMaxLag() {
1848        $host = '';
1849        $maxLag = -1;
1850        $maxIndex = 0;
1851
1852        if ( $this->serverInfo->hasReplicaServers() ) {
1853            $lagTimes = $this->getLagTimes();
1854            foreach ( $lagTimes as $i => $lag ) {
1855                // Allowing the value to be unset due to stale cache (T361824)
1856                $load = $this->loads[$i] ?? 0;
1857                if ( $load > 0 && $lag > $maxLag ) {
1858                    $maxLag = $lag;
1859                    $host = $this->serverInfo->getServerInfoStrict( $i, 'host' );
1860                    $maxIndex = $i;
1861                }
1862            }
1863        }
1864
1865        return [ $host, $maxLag, $maxIndex ];
1866    }
1867
1868    /** @inheritDoc */
1869    public function getLagTimes() {
1870        if ( !$this->hasReplicaServers() ) {
1871            return [ ServerInfo::WRITER_INDEX => 0 ]; // no replication = no lag
1872        }
1873        $fname = __METHOD__;
1874        return $this->wanCache->getWithSetCallback(
1875            $this->wanCache->makeGlobalKey( 'rdbms-lags', $this->getClusterName() ),
1876            // Add jitter to avoid stampede
1877            10 + mt_rand( 1, 10 ),
1878            function () use ( $fname ) {
1879                $lags = [];
1880                foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) {
1881                    $conn = $this->getServerConnection(
1882                        $i,
1883                        self::DOMAIN_ANY,
1884                        self::CONN_SILENCE_ERRORS | self::CONN_UNTRACKED_GAUGE
1885                    );
1886                    if ( $conn ) {
1887                        $lags[$i] = $conn->getLag();
1888                        $conn->close( $fname );
1889                    } else {
1890                        $lags[$i] = false;
1891                    }
1892                }
1893                return $lags;
1894            },
1895            [ 'lockTSE' => 30 ]
1896        );
1897    }
1898
1899    /** @inheritDoc */
1900    public function waitForPrimaryPos( IDatabase $conn ) {
1901        if ( $conn->getLBInfo( self::INFO_SERVER_INDEX ) === ServerInfo::WRITER_INDEX ) {
1902            return true; // not a replica DB server
1903        }
1904
1905        // Get the current primary DB position, opening a connection only if needed
1906        $flags = self::CONN_SILENCE_ERRORS;
1907        $primaryConn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX, $flags );
1908        if ( $primaryConn ) {
1909            $pos = $primaryConn->getPrimaryPos();
1910        } else {
1911            $primaryConn = $this->getServerConnection( ServerInfo::WRITER_INDEX, self::DOMAIN_ANY, $flags );
1912            if ( !$primaryConn ) {
1913                throw new DBReplicationWaitError(
1914                    null,
1915                    "Could not obtain a primary database connection to get the position"
1916                );
1917            }
1918            $pos = $primaryConn->getPrimaryPos();
1919            $this->closeConnection( $primaryConn );
1920        }
1921
1922        if ( $pos instanceof DBPrimaryPos && $conn instanceof IDatabaseForOwner ) {
1923            $this->logger->debug( __METHOD__ . ': waiting' );
1924            $result = $conn->primaryPosWait( $pos, self::MAX_WAIT_DEFAULT );
1925            $ok = ( $result !== null && $result != -1 );
1926            if ( $ok ) {
1927                $this->logger->debug( __METHOD__ . ': done waiting (success)' );
1928            } else {
1929                $this->logger->debug( __METHOD__ . ': done waiting (failure)' );
1930            }
1931        } else {
1932            $ok = false; // something is misconfigured
1933            $this->logger->error(
1934                __METHOD__ . ': could not get primary pos for {db_server}',
1935                $this->getConnLogContext( $conn, [ 'exception' => new RuntimeException() ] )
1936            );
1937        }
1938
1939        return $ok;
1940    }
1941
1942    /** @inheritDoc */
1943    public function setTransactionListener( $name, ?callable $callback = null ) {
1944        if ( $callback ) {
1945            $this->trxRecurringCallbacks[$name] = $callback;
1946        } else {
1947            unset( $this->trxRecurringCallbacks[$name] );
1948        }
1949        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1950            $conn->setTransactionListener( $name, $callback );
1951        }
1952    }
1953
1954    public function setTableAliases( array $aliases ) {
1955        $this->tableAliases = $aliases;
1956    }
1957
1958    public function setDomainAliases( array $aliases ) {
1959        $this->domainAliases = $aliases;
1960    }
1961
1962    /** @inheritDoc */
1963    public function setLocalDomainPrefix( $prefix ) {
1964        $oldLocalDomain = $this->localDomain;
1965        $this->localDomain = new DatabaseDomain(
1966            $this->localDomain->getDatabase(),
1967            $this->localDomain->getSchema(),
1968            $prefix
1969        );
1970
1971        // Update the prefix for existing connections.
1972        // Existing DBConnRef handles will not be affected.
1973        foreach ( $this->getOpenConnections() as $conn ) {
1974            if ( $oldLocalDomain->equals( $conn->getDomainID() ) ) {
1975                $conn->tablePrefix( $prefix );
1976            }
1977        }
1978    }
1979
1980    /** @inheritDoc */
1981    public function redefineLocalDomain( $domain ) {
1982        $this->closeAll( __METHOD__ );
1983        $this->localDomain = DatabaseDomain::newFromId( $domain );
1984    }
1985
1986    /** @inheritDoc */
1987    public function setTempTablesOnlyMode( $value, $domain ) {
1988        $old = $this->tempTablesOnlyMode[$domain] ?? false;
1989        if ( $value ) {
1990            $this->tempTablesOnlyMode[$domain] = true;
1991        } else {
1992            unset( $this->tempTablesOnlyMode[$domain] );
1993        }
1994
1995        return $old;
1996    }
1997
1998    /**
1999     * @param IDatabase $conn
2000     * @return string Description of a connection handle for log messages
2001     * @throws InvalidArgumentException
2002     */
2003    private function stringifyConn( IDatabase $conn ) {
2004        return $conn->getLBInfo( self::INFO_SERVER_INDEX ) . '/' . $conn->getDomainID();
2005    }
2006
2007    /**
2008     * @param int $flags A bitfield of flags
2009     * @param int $bit Bit flag constant
2010     * @return bool Whether the bit field has the specified bit flag set
2011     */
2012    private function fieldHasBit( int $flags, int $bit ) {
2013        return ( ( $flags & $bit ) === $bit );
2014    }
2015
2016    /**
2017     * Create a log context to pass to PSR-3 logger functions.
2018     *
2019     * @param IDatabase $conn
2020     * @param array $extras Additional data to add to context
2021     * @return array
2022     */
2023    protected function getConnLogContext( IDatabase $conn, array $extras = [] ) {
2024        return $extras + [
2025            'db_server' => $conn->getServerName(),
2026            'db_domain' => $conn->getDomainID(),
2027        ];
2028    }
2029
2030    /**
2031     * @param float|null &$time Mock UNIX timestamp for testing
2032     * @internal
2033     * @codeCoverageIgnore
2034     */
2035    public function setMockTime( &$time ) {
2036        if ( !$this->loadMonitor instanceof LoadMonitor ) {
2037            throw new LogicException( 'Cannot set mock time on ' . $this::class . ' (consider adding ' .
2038                'setMockTime to ILoadMonitor).' );
2039        }
2040        $this->loadMonitor->setMockTime( $time );
2041    }
2042
2043    public function setDefaultGroupName( string $defaultGroup ): void {
2044        $this->defaultGroup = $defaultGroup;
2045    }
2046}