Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
57.18% covered (warning)
57.18%
502 / 878
34.18% covered (danger)
34.18%
27 / 79
CRAP
0.00% covered (danger)
0.00%
0 / 1
LoadBalancer
57.18% covered (warning)
57.18%
502 / 878
34.18% covered (danger)
34.18%
27 / 79
8260.08
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 configure
86.67% covered (warning)
86.67%
39 / 45
0.00% covered (danger)
0.00%
0 / 1
13.40
 newTrackedConnectionsArray
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 getClusterName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getLocalDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 resolveDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 resolveDomainInstance
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
6
 resolveGroups
73.33% covered (warning)
73.33%
11 / 15
0.00% covered (danger)
0.00%
0 / 1
14.73
 sanitizeConnectionFlags
77.78% covered (warning)
77.78%
7 / 9
0.00% covered (danger)
0.00%
0 / 1
4.18
 enforceConnectionFlags
50.00% covered (danger)
50.00%
4 / 8
0.00% covered (danger)
0.00%
0 / 1
6.00
 getRandomNonLagged
45.45% covered (danger)
45.45%
10 / 22
0.00% covered (danger)
0.00%
0 / 1
14.95
 getReaderIndex
78.26% covered (warning)
78.26%
18 / 23
0.00% covered (danger)
0.00%
0 / 1
8.66
 getExistingReaderIndex
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 pickReaderIndex
55.56% covered (warning)
55.56%
15 / 27
0.00% covered (danger)
0.00%
0 / 1
18.78
 waitForAll
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
72
 serverHasLoadInAnyGroup
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 getAnyOpenConnection
93.33% covered (success)
93.33%
14 / 15
0.00% covered (danger)
0.00%
0 / 1
7.01
 pickAnyOpenConnection
45.45% covered (danger)
45.45%
5 / 11
0.00% covered (danger)
0.00%
0 / 1
6.60
 awaitSessionPrimaryPos
7.14% covered (danger)
7.14%
3 / 42
0.00% covered (danger)
0.00%
0 / 1
127.29
 getConnection
66.67% covered (warning)
66.67%
6 / 9
0.00% covered (danger)
0.00%
0 / 1
4.59
 getConnectionInternal
93.33% covered (success)
93.33%
14 / 15
0.00% covered (danger)
0.00%
0 / 1
6.01
 getServerConnection
73.68% covered (warning)
73.68%
14 / 19
0.00% covered (danger)
0.00%
0 / 1
7.89
 getConnectionRef
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getMaintenanceConnectionRef
55.56% covered (warning)
55.56%
5 / 9
0.00% covered (danger)
0.00%
0 / 1
5.40
 reuseOrOpenConnectionForNewRef
92.68% covered (success)
92.68%
38 / 41
0.00% covered (danger)
0.00%
0 / 1
15.09
 assertConnectionDomain
25.00% covered (danger)
25.00%
1 / 4
0.00% covered (danger)
0.00%
0 / 1
3.69
 getServerAttributes
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 reallyOpenConnection
78.79% covered (warning)
78.79%
52 / 66
0.00% covered (danger)
0.00%
0 / 1
14.61
 loadSessionPrimaryPos
71.43% covered (warning)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
6.84
 reportConnectionError
60.61% covered (warning)
60.61%
20 / 33
0.00% covered (danger)
0.00%
0 / 1
10.00
 getServerCount
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasReplicaServers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasStreamingReplicaServers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerInfo
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerType
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getPrimaryPos
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
 reconfigure
89.47% covered (warning)
89.47%
17 / 19
0.00% covered (danger)
0.00%
0 / 1
8.07
 disable
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 closeAll
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 closeConnection
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
42
 finalizePrimaryChanges
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
3
 approvePrimaryChanges
28.57% covered (danger)
28.57%
10 / 35
0.00% covered (danger)
0.00%
0 / 1
46.44
 beginPrimaryChanges
69.23% covered (warning)
69.23%
9 / 13
0.00% covered (danger)
0.00%
0 / 1
3.26
 commitPrimaryChanges
61.11% covered (warning)
61.11%
11 / 18
0.00% covered (danger)
0.00%
0 / 1
6.47
 runPrimaryTransactionIdleCallbacks
52.63% covered (warning)
52.63%
20 / 38
0.00% covered (danger)
0.00%
0 / 1
20.63
 runPrimaryTransactionListenerCallbacks
60.00% covered (warning)
60.00%
9 / 15
0.00% covered (danger)
0.00%
0 / 1
5.02
 rollbackPrimaryChanges
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
 flushPrimarySessions
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 assertTransactionRoundStage
16.67% covered (danger)
16.67%
2 / 12
0.00% covered (danger)
0.00%
0 / 1
4.31
 syncConnectionRoundState
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
7
 flushReplicaSnapshots
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
5
 flushPrimarySnapshots
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 hasPrimaryConnection
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasPrimaryChanges
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 lastPrimaryChangeTimestamp
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 hasOrMadeRecentPrimaryChanges
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 pendingPrimaryChangeCallers
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 setLaggedReplicaMode
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 laggedReplicaUsed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getReadOnlyReason
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 isPrimaryRunningReadOnly
92.31% covered (success)
92.31%
24 / 26
0.00% covered (danger)
0.00%
0 / 1
4.01
 pingAll
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 getOpenConnections
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
4
 getOpenPrimaryConnections
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 getMaxLag
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 getLagTimes
90.91% covered (success)
90.91%
20 / 22
0.00% covered (danger)
0.00%
0 / 1
4.01
 waitForPrimaryPos
0.00% covered (danger)
0.00%
0 / 27
0.00% covered (danger)
0.00%
0 / 1
72
 setTransactionListener
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 setTableAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setIndexAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setDomainAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setLocalDomainPrefix
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 redefineLocalDomain
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 setTempTablesOnlyMode
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 stringifyConn
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 fieldHasBit
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getConnLogContext
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 setMockTime
n/a
0 / 0
n/a
0 / 0
1
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use ArrayUtils;
23use InvalidArgumentException;
24use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use LogicException;
26use Psr\Log\LoggerInterface;
27use Psr\Log\NullLogger;
28use RuntimeException;
29use Throwable;
30use UnexpectedValueException;
31use Wikimedia\ObjectCache\BagOStuff;
32use Wikimedia\ObjectCache\EmptyBagOStuff;
33use Wikimedia\ObjectCache\WANObjectCache;
34use Wikimedia\ScopedCallback;
35use Wikimedia\Stats\NullStatsdDataFactory;
36
37/**
38 * @see ILoadBalancer
39 * @ingroup Database
40 */
41class LoadBalancer implements ILoadBalancerForOwner {
42    /** @var ILoadMonitor */
43    private $loadMonitor;
44    /** @var BagOStuff */
45    private $srvCache;
46    /** @var WANObjectCache */
47    private $wanCache;
48    /** @var DatabaseFactory */
49    private $databaseFactory;
50
51    /** @var TransactionProfiler */
52    private $trxProfiler;
53    /** @var StatsdDataFactoryInterface */
54    private $statsd;
55    /** @var LoggerInterface */
56    private $logger;
57    /** @var callable Exception logger */
58    private $errorLogger;
59    /** @var DatabaseDomain Local DB domain ID and default for new connections */
60    private $localDomain;
61    /** @var bool Whether this PHP instance is for a CLI script */
62    private $cliMode;
63
64    /** @var array<string,array<int,Database[]>> Map of (connection pool => server index => Database[]) */
65    private $conns;
66
67    /** @var string The name of the DB cluster */
68    private $clusterName;
69    /** @var ServerInfo */
70    private $serverInfo;
71    /** @var array<string,array<int,int|float>> Map of (group => server index => weight) */
72    private $groupLoads;
73    /** @var string|null Default query group to use with getConnection() */
74    private $defaultGroup;
75
76    /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
77    private $tableAliases = [];
78    /** @var string[] Map of (index alias => index) */
79    private $indexAliases = [];
80    /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */
81    private $domainAliases = [];
82    /** @var callable[] Map of (name => callable) */
83    private $trxRecurringCallbacks = [];
84    /** @var bool[] Map of (domain => whether to use "temp tables only" mode) */
85    private $tempTablesOnlyMode = [];
86
87    /** @var string|null Active explicit transaction round owner or false if none */
88    private $trxRoundFname = null;
89    /** @var string Stage of the current transaction round in the transaction round life-cycle */
90    private $trxRoundStage = self::ROUND_CURSORY;
91    /** @var int[] The group replica server indexes keyed by group */
92    private $readIndexByGroup = [];
93    /** @var DBPrimaryPos|null Replication sync position or false if not set */
94    private $waitForPos;
95    /** @var bool Whether a lagged replica DB was used */
96    private $laggedReplicaMode = false;
97    /** @var string|false Reason this instance is read-only or false if not */
98    private $readOnlyReason = false;
99    /** @var int Total number of new connections ever made with this instance */
100    private $connectionCounter = 0;
101    /** @var bool */
102    private $disabled = false;
103    private ?ChronologyProtector $chronologyProtector = null;
104    /** @var bool Whether the session consistency callback already executed */
105    private $chronologyProtectorCalled = false;
106
107    /** @var Database|null The last connection handle that caused a problem */
108    private $lastErrorConn;
109
110    /** @var DatabaseDomain[] Map of (domain ID => domain instance) */
111    private $nonLocalDomainCache = [];
112
113    /**
114     * @var int Modification counter for invalidating connections held by
115     *      DBConnRef instances. This is bumped by reconfigure().
116     */
117    private $modcount = 0;
118
119    /** The "server index" LB info key; see {@link IDatabase::getLBInfo()} */
120    private const INFO_SERVER_INDEX = 'serverIndex';
121    /** The "connection category" LB info key; see {@link IDatabase::getLBInfo()} */
122    private const INFO_CONN_CATEGORY = 'connCategory';
123
124    /** Warn when this many connection are held */
125    private const CONN_HELD_WARN_THRESHOLD = 10;
126
127    /** Default 'waitTimeout' when unspecified */
128    private const MAX_WAIT_DEFAULT = 10;
129    /** Seconds to cache primary DB server read-only status */
130    private const TTL_CACHE_READONLY = 5;
131
132    /** A category of connections that are tracked and transaction round aware */
133    private const CATEGORY_ROUND = 'round';
134    /** A category of connections that are tracked and in autocommit-mode */
135    private const CATEGORY_AUTOCOMMIT = 'auto-commit';
136    /** A category of connections that are untracked and in gauge-mode */
137    private const CATEGORY_GAUGE = 'gauge';
138
139    /** Transaction round, explicit or implicit, has not finished writing */
140    private const ROUND_CURSORY = 'cursory';
141    /** Transaction round writes are complete and ready for pre-commit checks */
142    private const ROUND_FINALIZED = 'finalized';
143    /** Transaction round passed final pre-commit checks */
144    private const ROUND_APPROVED = 'approved';
145    /** Transaction round was committed and post-commit callbacks must be run */
146    private const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
147    /** Transaction round was rolled back and post-rollback callbacks must be run */
148    private const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
149    /** Transaction round encountered an error */
150    private const ROUND_ERROR = 'error';
151
152    /** @var int Idiom for getExistingReaderIndex() meaning "no index selected" */
153    private const READER_INDEX_NONE = -1;
154
155    public function __construct( array $params ) {
156        $this->configure( $params );
157
158        $this->conns = self::newTrackedConnectionsArray();
159    }
160
161    /**
162     * @param array $params A database configuration array, see $wgLBFactoryConf.
163     *
164     * @return void
165     */
166    protected function configure( array $params ): void {
167        $this->localDomain = isset( $params['localDomain'] )
168            ? DatabaseDomain::newFromId( $params['localDomain'] )
169            : DatabaseDomain::newUnspecified();
170
171        $this->serverInfo = new ServerInfo();
172        $this->groupLoads = [ self::GROUP_GENERIC => [] ];
173        foreach ( $this->serverInfo->normalizeServerMaps( $params['servers'] ?? [] ) as $i => $server ) {
174            $this->serverInfo->addServer( $i, $server );
175            foreach ( $server['groupLoads'] as $group => $weight ) {
176                $this->groupLoads[$group][$i] = $weight;
177            }
178            $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
179        }
180        // If the cluster name is not specified, fallback to the current primary name
181        $this->clusterName = $params['clusterName']
182            ?? $this->serverInfo->getServerName( ServerInfo::WRITER_INDEX );
183
184        if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
185            $this->readOnlyReason = $params['readOnlyReason'];
186        }
187
188        $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
189        $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
190
191        // Note: this parameter is normally absent. It is injectable for testing purposes only.
192        $this->databaseFactory = $params['databaseFactory'] ?? new DatabaseFactory( $params );
193
194        $this->errorLogger = $params['errorLogger'] ?? static function ( Throwable $e ) {
195            trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
196        };
197        $this->logger = $params['logger'] ?? new NullLogger();
198
199        $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
200        $this->statsd = $params['statsdDataFactory'] ?? new NullStatsdDataFactory();
201
202        // Set up LoadMonitor
203        $loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => LoadMonitorNull::class ];
204        $compat = [
205            'LoadMonitor' => LoadMonitor::class,
206            'LoadMonitorNull' => LoadMonitorNull::class
207        ];
208        $class = $loadMonitorConfig['class'];
209        // @phan-suppress-next-line PhanImpossibleCondition
210        if ( isset( $compat[$class] ) ) {
211            $class = $compat[$class];
212        }
213        $this->loadMonitor = new $class(
214            $this, $this->srvCache, $this->wanCache, $loadMonitorConfig );
215        $this->loadMonitor->setLogger( $this->logger );
216        $this->loadMonitor->setStatsdDataFactory( $this->statsd );
217
218        if ( isset( $params['chronologyProtector'] ) ) {
219            $this->chronologyProtector = $params['chronologyProtector'];
220        }
221
222        if ( isset( $params['roundStage'] ) ) {
223            if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
224                $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
225            } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
226                $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
227            }
228        }
229
230        $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
231
232        $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
233        $this->defaultGroup = isset( $this->groupLoads[ $group ] ) ? $group : self::GROUP_GENERIC;
234    }
235
236    private static function newTrackedConnectionsArray() {
237        // Note that CATEGORY_GAUGE connections are untracked
238        return [
239            self::CATEGORY_ROUND => [],
240            self::CATEGORY_AUTOCOMMIT => []
241        ];
242    }
243
244    public function getClusterName(): string {
245        return $this->clusterName;
246    }
247
248    public function getLocalDomainID(): string {
249        return $this->localDomain->getId();
250    }
251
252    public function resolveDomainID( $domain ): string {
253        return $this->resolveDomainInstance( $domain )->getId();
254    }
255
256    /**
257     * @param DatabaseDomain|string|false $domain
258     * @return DatabaseDomain
259     */
260    final protected function resolveDomainInstance( $domain ): DatabaseDomain {
261        if ( $domain instanceof DatabaseDomain ) {
262            return $domain; // already a domain instance
263        } elseif ( $domain === false || $domain === $this->localDomain->getId() ) {
264            return $this->localDomain;
265        } elseif ( isset( $this->domainAliases[$domain] ) ) {
266            $this->domainAliases[$domain] =
267                DatabaseDomain::newFromId( $this->domainAliases[$domain] );
268
269            return $this->domainAliases[$domain];
270        }
271
272        $cachedDomain = $this->nonLocalDomainCache[$domain] ?? null;
273        if ( $cachedDomain === null ) {
274            $cachedDomain = DatabaseDomain::newFromId( $domain );
275            $this->nonLocalDomainCache = [ $domain => $cachedDomain ];
276        }
277
278        return $cachedDomain;
279    }
280
281    /**
282     * Get the first group in $groups with assigned servers, falling back to the default group
283     *
284     * @param string[]|string|false $groups Query group(s) in preference order, [], or false
285     * @param int $i Specific server index or DB_PRIMARY/DB_REPLICA
286     * @return string Query group
287     */
288    private function resolveGroups( $groups, $i ) {
289        // If a specific replica server was specified, then $groups makes no sense
290        if ( $i > 0 && $groups !== [] && $groups !== false ) {
291            $list = implode( ', ', (array)$groups );
292            throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
293        }
294
295        if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
296            $resolvedGroup = $this->defaultGroup;
297        } elseif ( is_string( $groups ) ) {
298            $resolvedGroup = isset( $this->groupLoads[$groups] ) ? $groups : $this->defaultGroup;
299        } elseif ( is_array( $groups ) ) {
300            $resolvedGroup = $this->defaultGroup;
301            foreach ( $groups as $group ) {
302                if ( isset( $this->groupLoads[$group] ) ) {
303                    $resolvedGroup = $group;
304                    break;
305                }
306            }
307        } else {
308            $resolvedGroup = $this->defaultGroup;
309        }
310
311        return $resolvedGroup;
312    }
313
314    /**
315     * Sanitize connection flags provided by a call to getConnection()
316     *
317     * @param int $flags Bitfield of class CONN_* constants
318     * @param string $domain Database domain
319     * @return int Sanitized bitfield
320     */
321    protected function sanitizeConnectionFlags( $flags, $domain ) {
322        if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
323            // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
324            // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
325            // during larger transactions. This is useful for avoiding lock contention.
326            // Assuming all servers are of the same type (or similar), which is overwhelmingly
327            // the case, use the primary server information to get the attributes. The information
328            // for $i cannot be used since it might be DB_REPLICA, which might require connection
329            // attempts in order to be resolved into a real server index.
330            $attributes = $this->getServerAttributes( ServerInfo::WRITER_INDEX );
331            if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
332                // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
333                // to use separate connections would just cause self-deadlocks. Note that
334                // REPEATABLE-READ staleness is not an issue since DB-level locking means
335                // that transactions are Strict Serializable anyway.
336                $flags &= ~self::CONN_TRX_AUTOCOMMIT;
337                $type = $this->serverInfo->getServerType( ServerInfo::WRITER_INDEX );
338                $this->logger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
339            } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
340                // T202116: integration tests are active and queries should be all be using
341                // temporary clone tables (via prefix). Such tables are not visible across
342                // different connections nor can there be REPEATABLE-READ snapshot staleness,
343                // so use the same connection for everything.
344                $flags &= ~self::CONN_TRX_AUTOCOMMIT;
345            }
346        }
347
348        return $flags;
349    }
350
351    /**
352     * @param IDatabase $conn
353     * @param int $flags
354     * @throws DBUnexpectedError
355     */
356    private function enforceConnectionFlags( IDatabase $conn, $flags ) {
357        if (
358            self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ||
359            // Handles with open transactions are avoided since they might be subject
360            // to REPEATABLE-READ snapshots, which could affect the lag estimate query.
361            self::fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE )
362        ) {
363            if ( $conn->trxLevel() ) {
364                throw new DBUnexpectedError(
365                    $conn,
366                    'Handle requested with autocommit-mode yet it has a transaction'
367                );
368            }
369
370            $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
371        }
372    }
373
374    /**
375     * @param array<int,int|float> $loads Map of (server index => weight) for a load group
376     * @param int|float $sessionLagLimit Additional maximum lag threshold imposed by the session;
377     *  use INF if none applies. Servers will count as lagged if their lag exceeds either this
378     *  value or the configured "max lag" value.
379     * @return int|false Index of a non-lagged server with non-zero weight; false if none
380     */
381    private function getRandomNonLagged( array $loads, $sessionLagLimit = INF ) {
382        $lags = $this->getLagTimes();
383
384        // Unset excessively lagged servers from the load group
385        foreach ( $lags as $i => $lag ) {
386            if ( $i !== ServerInfo::WRITER_INDEX ) {
387                // How much lag normally counts as "excessive" for this server
388                $maxServerLag = $this->serverInfo->getServerMaxLag( $i );
389                // How much lag counts as "excessive" for this server given the session
390                $maxServerLag = min( $maxServerLag, $sessionLagLimit );
391
392                $srvName = $this->serverInfo->getServerName( $i );
393                if ( $lag === false && !is_infinite( $maxServerLag ) ) {
394                    $this->logger->debug(
395                        __METHOD__ . ": server {db_server} is not replicating?",
396                        [ 'db_server' => $srvName ]
397                    );
398                    unset( $loads[$i] );
399                } elseif ( $lag > $maxServerLag ) {
400                    $this->logger->debug(
401                        __METHOD__ .
402                            ": server {db_server} has {lag} seconds of lag (>= {maxlag})",
403                        [ 'db_server' => $srvName, 'lag' => $lag, 'maxlag' => $maxServerLag ]
404                    );
405                    unset( $loads[$i] );
406                }
407            }
408        }
409
410        if ( array_sum( $loads ) == 0 ) {
411            // All the replicas with non-zero weight are lagged and the primary has zero load.
412            // Inform caller so that it can use switch to read-only mode and use a lagged replica.
413            return false;
414        }
415
416        // Return a server index based on weighted random selection
417        return ArrayUtils::pickRandom( $loads );
418    }
419
420    public function getReaderIndex( $group = false ) {
421        $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
422
423        if ( !$this->serverInfo->hasReplicaServers() ) {
424            // There is only one possible server to use (the primary)
425            return ServerInfo::WRITER_INDEX;
426        }
427
428        $index = $this->getExistingReaderIndex( $group );
429        if ( $index !== self::READER_INDEX_NONE ) {
430            // A reader index was already selected for this query group. Keep using it,
431            // since any session replication position was already waited on and any
432            // active transaction will be reused (e.g. for point-in-time snapshots).
433            return $index;
434        }
435
436        // Get the server weight array for this load group
437        $loads = $this->groupLoads[$group] ?? [];
438        if ( !$loads ) {
439            $this->logger->info( __METHOD__ . ": no loads for group $group" );
440            return false;
441        }
442
443        // Load any session replication positions, before any connection attempts,
444        // since reading them afterwards can only cause more delay due to possibly
445        // seeing even higher replication positions (e.g. from concurrent requests).
446        $this->loadSessionPrimaryPos();
447
448        // Scale the configured load weights according to each server's load/state.
449        // This can sometimes trigger server connections due to cache regeneration.
450        $this->loadMonitor->scaleLoads( $loads );
451
452        // Pick a server, accounting for load weights, lag, and session consistency
453        $i = $this->pickReaderIndex( $loads );
454        if ( $i === false ) {
455            // Connection attempts failed
456            return false;
457        }
458
459        // If data seen by queries is expected to reflect writes from a prior transaction,
460        // then wait for the chosen server to apply those changes. This is used to improve
461        // session consistency.
462        if ( !$this->awaitSessionPrimaryPos( $i ) ) {
463            // Data will be outdated compared to what was expected
464            $this->setLaggedReplicaMode();
465        }
466
467        // Keep using this server for DB_REPLICA handles for this group
468        if ( $i < 0 ) {
469            throw new UnexpectedValueException( "Cannot set a negative read server index" );
470        }
471        $this->readIndexByGroup[$group] = $i;
472
473        $serverName = $this->getServerName( $i );
474        $this->logger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
475
476        return $i;
477    }
478
479    /**
480     * Get the server index chosen for DB_REPLICA connections for the given query group
481     *
482     * @param string $group Query group; use false for the generic group
483     * @return int Specific server index or LoadBalancer::READER_INDEX_NONE if none was chosen
484     */
485    protected function getExistingReaderIndex( $group ) {
486        return $this->readIndexByGroup[$group] ?? self::READER_INDEX_NONE;
487    }
488
489    /**
490     * Pick a server that is reachable and preferably non-lagged based on the load weights
491     *
492     * This will leave the server connection open within the pool for reuse
493     *
494     * @param array<int,int|float> $loads Map of (server index => weight) for a load group
495     * @return int|false Server index to use as the load group reader index; false on failure
496     */
497    private function pickReaderIndex( array $loads ) {
498        if ( $loads === [] ) {
499            throw new InvalidArgumentException( "Load group array is empty" );
500        }
501
502        $i = false;
503        // Quickly look through the available servers for a server that meets criteria...
504        $currentLoads = $loads;
505        while ( count( $currentLoads ) ) {
506            if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
507                $this->logger->debug( __METHOD__ . ": session has replication position" );
508                // ChronologyProtector::getSessionPrimaryPos called in loadSessionPrimaryPos()
509                // sets "waitForPos" for session consistency. This triggers doWait() after
510                // connect, so it's especially good to avoid lagged servers so as to avoid
511                // excessive delay in that method.
512                $ago = microtime( true ) - $this->waitForPos->asOfTime();
513                // Aim for <= 1 second of waiting (being too picky can backfire)
514                $i = $this->getRandomNonLagged( $currentLoads, $ago + 1 );
515            } else {
516                // Any server with less lag than it's 'max lag' param is preferable
517                $i = $this->getRandomNonLagged( $currentLoads );
518            }
519
520            if ( $i === false && count( $currentLoads ) ) {
521                $this->setLaggedReplicaMode();
522                // All replica DBs lagged, just pick anything.
523                $i = ArrayUtils::pickRandom( $currentLoads );
524            }
525
526            if ( $i === false ) {
527                // pickRandom() returned false.
528                // This is permanent and means the configuration or LoadMonitor
529                // wants us to return false.
530                $this->logger->debug( __METHOD__ . ": no suitable server found" );
531                return false;
532            }
533
534            $serverName = $this->getServerName( $i );
535            $this->logger->debug( __METHOD__ . ": connecting to $serverName..." );
536
537            // Get a connection to this server without triggering complementary connections
538            // to other servers (due to things like lag or read-only checks). We want to avoid
539            // the risk of overhead and recursion here.
540            $conn = $this->getServerConnection( $i, self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS );
541            if ( !$conn ) {
542                $this->logger->warning( __METHOD__ . ": failed connecting to $serverName" );
543                unset( $currentLoads[$i] ); // avoid this server next iteration
544                continue;
545            }
546
547            // Return this server
548            break;
549        }
550
551        // If all servers were down, quit now
552        if ( $currentLoads === [] ) {
553            $this->logger->error( __METHOD__ . ": all servers down" );
554        }
555
556        return $i;
557    }
558
559    public function waitForAll( DBPrimaryPos $pos, $timeout = null ) {
560        $timeout = $timeout ?: self::MAX_WAIT_DEFAULT;
561
562        $oldPos = $this->waitForPos;
563        try {
564            $this->waitForPos = $pos;
565
566            $failedReplicas = [];
567            foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) {
568                if ( $this->serverHasLoadInAnyGroup( $i ) ) {
569                    $start = microtime( true );
570                    $ok = $this->awaitSessionPrimaryPos( $i, $timeout );
571                    if ( !$ok ) {
572                        $failedReplicas[] = $this->getServerName( $i );
573                    }
574                    $timeout -= intval( microtime( true ) - $start );
575                }
576            }
577
578            // Stop spamming logs when only one replica is lagging and we have 5+ replicas.
579            // Mediawiki automatically stops sending queries to the lagged one.
580            $failed = $failedReplicas && ( count( $failedReplicas ) > 1 || $this->getServerCount() < 5 );
581            if ( $failed ) {
582                $this->logger->error(
583                    "Timed out waiting for replication to reach {raw_pos}",
584                    [
585                        'raw_pos' => $pos->__toString(),
586                        'failed_hosts' => $failedReplicas,
587                        'timeout' => $timeout,
588                        'exception' => new RuntimeException()
589                    ]
590                );
591            }
592
593            return !$failed;
594        } finally {
595            // Restore the old position; this is used for throttling, not lag-protection
596            $this->waitForPos = $oldPos;
597        }
598    }
599
600    /**
601     * @param int $i Specific server index
602     * @return bool
603     */
604    private function serverHasLoadInAnyGroup( $i ) {
605        foreach ( $this->groupLoads as $loadsByIndex ) {
606            if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
607                return true;
608            }
609        }
610
611        return false;
612    }
613
614    public function getAnyOpenConnection( $i, $flags = 0 ) {
615        $i = ( $i === self::DB_PRIMARY ) ? ServerInfo::WRITER_INDEX : $i;
616        $conn = false;
617        foreach ( $this->conns as $type => $poolConnsByServer ) {
618            if ( $i === self::DB_REPLICA ) {
619                // Consider all existing connections to any server
620                $applicableConnsByServer = $poolConnsByServer;
621            } else {
622                // Consider all existing connections to a specific server
623                $applicableConnsByServer = isset( $poolConnsByServer[$i] )
624                    ? [ $i => $poolConnsByServer[$i] ]
625                    : [];
626            }
627
628            $conn = $this->pickAnyOpenConnection( $applicableConnsByServer );
629            if ( $conn ) {
630                $this->logger->debug( __METHOD__ . ": found '$type' connection to #$i." );
631                break;
632            }
633        }
634
635        if ( $conn ) {
636            $this->enforceConnectionFlags( $conn, $flags );
637        }
638
639        return $conn;
640    }
641
642    /**
643     * @param Database[][] $connsByServer Map of (server index => array of DB handles)
644     * @return IDatabaseForOwner|false An appropriate open connection or false if none found
645     */
646    private function pickAnyOpenConnection( array $connsByServer ) {
647        foreach ( $connsByServer as $i => $conns ) {
648            foreach ( $conns as $conn ) {
649                if ( !$conn->isOpen() ) {
650                    $this->logger->warning(
651                        __METHOD__ .
652                        ": pooled DB handle for {db_server} (#$i) has no open connection.",
653                        $this->getConnLogContext( $conn )
654                    );
655                    continue; // some sort of error occurred?
656                }
657                return $conn;
658            }
659        }
660
661        return false;
662    }
663
664    /**
665     * Wait for a given replica DB to catch up to the primary DB pos stored in "waitForPos"
666     *
667     * @see loadSessionPrimaryPos()
668     *
669     * @param int $index Specific server index
670     * @param int|null $timeout Max seconds to wait; default is "waitTimeout"
671     * @return bool Success
672     */
673    private function awaitSessionPrimaryPos( $index, $timeout = null ) {
674        $timeout = max( 1, intval( $timeout ?: self::MAX_WAIT_DEFAULT ) );
675
676        if ( !$this->waitForPos || $index === ServerInfo::WRITER_INDEX ) {
677            return true;
678        }
679
680        $srvName = $this->getServerName( $index );
681
682        // Check if we already know that the DB has reached this point
683        $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $srvName, 'v2' );
684
685        /** @var DBPrimaryPos $knownReachedPos */
686        $position = $this->srvCache->get( $key );
687        if ( !is_array( $position ) ) {
688            $knownReachedPos = null;
689        } else {
690            $class = $position['_type_'];
691            $knownReachedPos = $class::newFromArray( $position );
692        }
693        if (
694            $knownReachedPos instanceof DBPrimaryPos &&
695            $knownReachedPos->hasReached( $this->waitForPos )
696        ) {
697            $this->logger->debug(
698                __METHOD__ .
699                ": replica DB {db_server} known to be caught up (pos >= $knownReachedPos).",
700                [ 'db_server' => $srvName ]
701            );
702
703            return true;
704        }
705
706        $close = false; // close the connection afterwards
707        $flags = self::CONN_SILENCE_ERRORS;
708        // Check if there is an existing connection that can be used
709        $conn = $this->getAnyOpenConnection( $index, $flags );
710        if ( !$conn ) {
711            // Get a connection to this server without triggering complementary connections
712            // to other servers (due to things like lag or read-only checks). We want to avoid
713            // the risk of overhead and recursion here.
714            $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
715            if ( !$conn ) {
716                $this->logger->warning(
717                    __METHOD__ . ': failed to connect to {db_server}',
718                    [ 'db_server' => $srvName ]
719                );
720
721                return false;
722            }
723            // Avoid connection spam in waitForAll() when connections
724            // are made just for the sake of doing this lag check.
725            $close = true;
726        }
727
728        $this->logger->info(
729            __METHOD__ .
730            ': waiting for replica DB {db_server} to catch up...',
731            $this->getConnLogContext( $conn )
732        );
733
734        $result = $conn->primaryPosWait( $this->waitForPos, $timeout );
735
736        $ok = ( $result !== null && $result != -1 );
737        if ( $ok ) {
738            // Remember that the DB reached this point
739            $this->srvCache->set( $key, $this->waitForPos->toArray(), BagOStuff::TTL_DAY );
740        }
741
742        if ( $close ) {
743            $this->closeConnection( $conn );
744        }
745
746        return $ok;
747    }
748
749    public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
750        if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
751            throw new UnexpectedValueException(
752                __METHOD__ . ' got CONN_SILENCE_ERRORS; connection is already deferred'
753            );
754        }
755
756        $domain = $this->resolveDomainID( $domain );
757        $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX )
758            ? self::DB_PRIMARY
759            : self::DB_REPLICA;
760
761        return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount );
762    }
763
764    public function getConnectionInternal( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase {
765        $domain = $this->resolveDomainID( $domain );
766        $group = $this->resolveGroups( $groups, $i );
767        $flags = $this->sanitizeConnectionFlags( $flags, $domain );
768        // If given DB_PRIMARY/DB_REPLICA, resolve it to a specific server index. Resolving
769        // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
770        // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
771        // The use of getServerConnection() instead of getConnection() avoids infinite loops.
772        $serverIndex = $i;
773        if ( $i === self::DB_PRIMARY ) {
774            $serverIndex = ServerInfo::WRITER_INDEX;
775        } elseif ( $i === self::DB_REPLICA ) {
776            $groupIndex = $this->getReaderIndex( $group );
777            if ( $groupIndex !== false ) {
778                // Group connection succeeded
779                $serverIndex = $groupIndex;
780            }
781            if ( $serverIndex < 0 ) {
782                $this->reportConnectionError( 'could not connect to any replica DB server' );
783            }
784        } elseif ( !$this->serverInfo->hasServerIndex( $i ) ) {
785            throw new UnexpectedValueException( "Invalid server index index #$i" );
786        }
787        // Get an open connection to that server (might trigger a new connection)
788        return $this->getServerConnection( $serverIndex, $domain, $flags );
789    }
790
791    public function getServerConnection( $i, $domain, $flags = 0 ) {
792        $domainInstance = DatabaseDomain::newFromId( $domain );
793        // Number of connections made before getting the server index and handle
794        $priorConnectionsMade = $this->connectionCounter;
795        // Get an open connection to this server (might trigger a new connection)
796        $conn = $this->reuseOrOpenConnectionForNewRef( $i, $domainInstance, $flags );
797        // Throw an error or otherwise bail out if the connection attempt failed
798        if ( !( $conn instanceof IDatabaseForOwner ) ) {
799            if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
800                $this->reportConnectionError();
801            }
802
803            return false;
804        }
805
806        // Profile any new connections caused by this method
807        if ( $this->connectionCounter > $priorConnectionsMade ) {
808            $this->trxProfiler->recordConnection(
809                $conn->getServerName(),
810                $conn->getDBname(),
811                ( $i === ServerInfo::WRITER_INDEX && $this->hasStreamingReplicaServers() )
812            );
813        }
814
815        if ( !$conn->isOpen() ) {
816            $this->lastErrorConn = $conn;
817            // Connection was made but later unrecoverably lost for some reason.
818            // Do not return a handle that will just throw exceptions on use, but
819            // let the calling code, e.g. getReaderIndex(), try another server.
820            if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
821                $this->reportConnectionError();
822            }
823            return false;
824        }
825
826        return $conn;
827    }
828
829    /**
830     * @deprecated since 1.39.
831     */
832    public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ): DBConnRef {
833        wfDeprecated( __METHOD__, '1.39' );
834        // @phan-suppress-next-line PhanTypeMismatchReturnSuperType to be removed soon
835        return $this->getConnection( $i, $groups, $domain, $flags );
836    }
837
838    public function getMaintenanceConnectionRef(
839        $i,
840        $groups = [],
841        $domain = false,
842        $flags = 0
843    ): DBConnRef {
844        if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
845            throw new UnexpectedValueException(
846                __METHOD__ . ' CONN_SILENCE_ERRORS is not supported'
847            );
848        }
849
850        $domain = $this->resolveDomainID( $domain );
851        $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX )
852            ? self::DB_PRIMARY
853            : self::DB_REPLICA;
854
855        return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount );
856    }
857
858    /**
859     * Get a live connection handle to the given domain
860     *
861     * This will reuse an existing tracked connection when possible. In some cases, this
862     * involves switching the DB domain of an existing handle in order to reuse it. If no
863     * existing handles can be reused, then a new connection will be made.
864     *
865     * @param int $i Specific server index
866     * @param DatabaseDomain $domain Database domain ID required by the reference
867     * @param int $flags Bit field of class CONN_* constants
868     * @return IDatabase|null Database or null on error
869     * @throws DBError When database selection fails
870     * @throws InvalidArgumentException When the server index is invalid
871     * @throws UnexpectedValueException When the DB domain of the connection is corrupted
872     * @throws DBAccessError If disable() was called
873     */
874    private function reuseOrOpenConnectionForNewRef( $i, DatabaseDomain $domain, $flags = 0 ) {
875        // Figure out which connection pool to use based on the flags
876        if ( $this->fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE ) ) {
877            // Use low timeouts, use autocommit mode, ignore transaction rounds
878            $category = self::CATEGORY_GAUGE;
879        } elseif ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
880            // Use autocommit mode, ignore transaction rounds
881            $category = self::CATEGORY_AUTOCOMMIT;
882        } else {
883            // Respect DBO_DEFAULT, respect transaction rounds
884            $category = self::CATEGORY_ROUND;
885        }
886
887        $conn = null;
888        // Reuse a free connection in the pool from any domain if possible. There should only
889        // be one connection in this pool unless either:
890        //  - a) IDatabase::databasesAreIndependent() returns true (e.g. postgres) and two
891        //       or more database domains have been used during the load balancer's lifetime
892        //  - b) Two or more nested function calls used getConnection() on different domains.
893        foreach ( ( $this->conns[$category][$i] ?? [] ) as $poolConn ) {
894            // Check if any required DB domain changes for the new reference are possible
895            // Calling selectDomain() would trigger a reconnect, which will break if a
896            // transaction is active or if there is any other meaningful session state.
897            $isShareable = !(
898                $poolConn->databasesAreIndependent() &&
899                $domain->getDatabase() !== null &&
900                $domain->getDatabase() !== $poolConn->getDBname()
901            );
902            if ( $isShareable ) {
903                $conn = $poolConn;
904                // Make any required DB domain changes for the new reference
905                if ( !$domain->isUnspecified() ) {
906                    $conn->selectDomain( $domain );
907                }
908                $this->logger->debug( __METHOD__ . ": reusing connection for $i/$domain" );
909                break;
910            }
911        }
912
913        // If necessary, try to open a new connection and add it to the pool
914        if ( !$conn ) {
915            $conn = $this->reallyOpenConnection(
916                $i,
917                $domain,
918                [ self::INFO_CONN_CATEGORY => $category ]
919            );
920            if ( $conn->isOpen() ) {
921                // Make Database::isReadOnly() respect server-side and configuration-based
922                // read-only mode. Note that replica handles are always seen as read-only
923                // in Database::isReadOnly() and Database::assertIsWritablePrimary().
924                if ( $i === ServerInfo::WRITER_INDEX ) {
925                    if ( $this->readOnlyReason !== false ) {
926                        $readOnlyReason = $this->readOnlyReason;
927                    } elseif ( $this->isPrimaryRunningReadOnly( $conn ) ) {
928                        $readOnlyReason = 'The primary database server is running in read-only mode.';
929                    } else {
930                        $readOnlyReason = false;
931                    }
932                    $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $readOnlyReason );
933                }
934                // Connection obtained; check if it belongs to a tracked connection category
935                if ( isset( $this->conns[$category] ) ) {
936                    // Track this connection for future reuse
937                    $this->conns[$category][$i][] = $conn;
938                }
939            } else {
940                $this->logger->warning( __METHOD__ . ": connection error for $i/$domain" );
941                $this->lastErrorConn = $conn;
942                $conn = null;
943            }
944        }
945
946        if ( $conn instanceof IDatabaseForOwner ) {
947            // Check to make sure that the right domain is selected
948            $this->assertConnectionDomain( $conn, $domain );
949            // Check to make sure that the CONN_* flags are respected
950            $this->enforceConnectionFlags( $conn, $flags );
951        }
952
953        return $conn;
954    }
955
956    /**
957     * Sanity check to make sure that the right domain is selected
958     *
959     * @param Database $conn
960     * @param DatabaseDomain $domain
961     * @throws DBUnexpectedError
962     */
963    private function assertConnectionDomain( Database $conn, DatabaseDomain $domain ) {
964        if ( !$domain->isCompatible( $conn->getDomainID() ) ) {
965            throw new UnexpectedValueException(
966                "Got connection to '{$conn->getDomainID()}', but expected one for '{$domain}'"
967            );
968        }
969    }
970
971    public function getServerAttributes( $i ) {
972        return $this->databaseFactory->attributesFromType(
973            $this->getServerType( $i ),
974            $this->serverInfo->getServerDriver( $i )
975        );
976    }
977
978    /**
979     * Open a new network connection to a server (uncached)
980     *
981     * Returns a Database object whether or not the connection was successful.
982     *
983     * @param int $i Specific server index
984     * @param DatabaseDomain $domain Domain the connection is for, possibly unspecified
985     * @param array $lbInfo Additional information for setLBInfo()
986     * @return Database
987     * @throws DBAccessError
988     * @throws InvalidArgumentException
989     */
990    protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo ) {
991        if ( $this->disabled ) {
992            throw new DBAccessError();
993        }
994
995        $server = $this->serverInfo->getServerInfoStrict( $i );
996        if ( $lbInfo[self::INFO_CONN_CATEGORY] === self::CATEGORY_GAUGE ) {
997            // Use low connection/read timeouts for connection used for gauging server health.
998            // Gauge information should be cached and used to avoid outages. Indefinite hanging
999            // while gauging servers would do the opposite.
1000            $server['connectTimeout'] = min( 1, $server['connectTimeout'] ?? INF );
1001            $server['receiveTimeout'] = min( 1, $server['receiveTimeout'] ?? INF );
1002            // Avoid implicit transactions and avoid any SET query for session variables during
1003            // Database::open(). If a server becomes slow, every extra query can cause significant
1004            // delays, even with low connect/receive timeouts.
1005            $server['flags'] ??= 0;
1006            $server['flags'] &= ~IDatabase::DBO_DEFAULT;
1007            $server['flags'] |= IDatabase::DBO_GAUGE;
1008        } else {
1009            // Use implicit transactions unless explicitly configured otherwise
1010            $server['flags'] ??= IDatabase::DBO_DEFAULT;
1011        }
1012
1013        if ( !empty( $server['is static'] ) ) {
1014            $topologyRole = IDatabase::ROLE_STATIC_CLONE;
1015        } else {
1016            $topologyRole = ( $i === ServerInfo::WRITER_INDEX )
1017                ? IDatabase::ROLE_STREAMING_MASTER
1018                : IDatabase::ROLE_STREAMING_REPLICA;
1019        }
1020
1021        $conn = $this->databaseFactory->create(
1022            $server['type'],
1023            array_merge( $server, [
1024                // Basic replication role information
1025                'topologyRole' => $topologyRole,
1026                // Use the database specified in $domain (null means "none or entrypoint DB");
1027                // fallback to the $server default if the RDBMs is an embedded library using a
1028                // file on disk since there would be nothing to access to without a DB/file name.
1029                'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE]
1030                    ? ( $domain->getDatabase() ?? $server['dbname'] ?? null )
1031                    : $domain->getDatabase(),
1032                // Override the $server default schema with that of $domain if specified
1033                'schema' => $domain->getSchema(),
1034                // Use the table prefix specified in $domain
1035                'tablePrefix' => $domain->getTablePrefix(),
1036                'srvCache' => $this->srvCache,
1037                'logger' => $this->logger,
1038                'errorLogger' => $this->errorLogger,
1039                'trxProfiler' => $this->trxProfiler,
1040                'lbInfo' => [ self::INFO_SERVER_INDEX => $i ] + $lbInfo
1041            ] ),
1042            Database::NEW_UNCONNECTED
1043        );
1044        // Set alternative table/index names before any queries can be issued
1045        $conn->setTableAliases( $this->tableAliases );
1046        $conn->setIndexAliases( $this->indexAliases );
1047        // Account for any active transaction round and listeners
1048        $this->syncConnectionRoundState( $conn );
1049        if ( $i === ServerInfo::WRITER_INDEX ) {
1050            foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1051                $conn->setTransactionListener( $name, $callback );
1052            }
1053        }
1054
1055        // Make the connection handle live
1056        try {
1057            $conn->initConnection();
1058            ++$this->connectionCounter;
1059        } catch ( DBConnectionError $e ) {
1060            $this->lastErrorConn = $conn;
1061            // ignore; let the DB handle the logging
1062        }
1063
1064        if ( $conn->isOpen() ) {
1065            $this->logger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1066        } else {
1067            $this->logger->warning(
1068                __METHOD__ . ": connection error for $i/{db_domain}",
1069                [ 'db_domain' => $domain->getId() ]
1070            );
1071        }
1072
1073        // Log when many connection are made during a single request/script
1074        $count = 0;
1075        foreach ( $this->conns as $poolConnsByServer ) {
1076            foreach ( $poolConnsByServer as $serverConns ) {
1077                $count += count( $serverConns );
1078            }
1079        }
1080        if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
1081            $this->logger->warning(
1082                __METHOD__ . ": {connections}+ connections made (primary={primarydb})",
1083                $this->getConnLogContext(
1084                    $conn,
1085                    [
1086                        'connections' => $count,
1087                        'primarydb' => $this->serverInfo->getPrimaryServerName(),
1088                        'db_domain' => $domain->getId()
1089                    ]
1090                )
1091            );
1092        }
1093
1094        $this->assertConnectionDomain( $conn, $domain );
1095
1096        return $conn;
1097    }
1098
1099    /**
1100     * Make sure that any "waitForPos" replication positions are loaded and available
1101     *
1102     * Each load balancer cluster has up to one replication position for the session.
1103     * These are used when data read by queries is expected to reflect writes caused
1104     * by a prior request/script from the same client.
1105     *
1106     * @see awaitSessionPrimaryPos()
1107     */
1108    private function loadSessionPrimaryPos() {
1109        if ( !$this->chronologyProtectorCalled && $this->chronologyProtector ) {
1110            $this->chronologyProtectorCalled = true;
1111            $pos = $this->chronologyProtector->getSessionPrimaryPos( $this );
1112            $this->logger->debug( __METHOD__ . ': executed chronology callback.' );
1113            if ( $pos ) {
1114                if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
1115                    $this->waitForPos = $pos;
1116                }
1117            }
1118        }
1119    }
1120
1121    /**
1122     * @param string $extraLbError Separat load balancer error
1123     * @throws DBConnectionError
1124     * @return never
1125     */
1126    private function reportConnectionError( $extraLbError = '' ) {
1127        if ( $this->lastErrorConn instanceof IDatabaseForOwner ) {
1128            $srvName = $this->lastErrorConn->getServerName();
1129            $lastDbError = $this->lastErrorConn->lastError() ?: 'unknown error';
1130
1131            $exception = new DBConnectionError(
1132                $this->lastErrorConn,
1133                $extraLbError
1134                    ? "{$extraLbError}{$lastDbError} ({$srvName})"
1135                    : "{$lastDbError} ({$srvName})"
1136            );
1137
1138            if ( $extraLbError ) {
1139                $this->logger->warning(
1140                    __METHOD__ . "$extraLbError; {last_error} ({db_server})",
1141                    $this->getConnLogContext(
1142                        $this->lastErrorConn,
1143                        [
1144                            'method' => __METHOD__,
1145                            'last_error' => $lastDbError
1146                        ]
1147                    )
1148                );
1149            }
1150        } else {
1151            $exception = new DBConnectionError(
1152                null,
1153                $extraLbError ?: 'could not connect to the DB server'
1154            );
1155
1156            if ( $extraLbError ) {
1157                $this->logger->error(
1158                    __METHOD__ . "$extraLbError",
1159                    [
1160                        'method' => __METHOD__,
1161                        'last_error' => '(last connection error missing)'
1162                    ]
1163                );
1164            }
1165        }
1166
1167        throw $exception;
1168    }
1169
1170    public function getServerCount() {
1171        return $this->serverInfo->getServerCount();
1172    }
1173
1174    public function hasReplicaServers() {
1175        return $this->serverInfo->hasReplicaServers();
1176    }
1177
1178    public function hasStreamingReplicaServers() {
1179        return $this->serverInfo->hasStreamingReplicaServers();
1180    }
1181
1182    public function getServerName( $i ): string {
1183        return $this->serverInfo->getServerName( $i );
1184    }
1185
1186    public function getServerInfo( $i ) {
1187        return $this->serverInfo->getServerInfo( $i );
1188    }
1189
1190    public function getServerType( $i ) {
1191        return $this->serverInfo->getServerType( $i );
1192    }
1193
1194    public function getPrimaryPos() {
1195        $conn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX );
1196        if ( $conn ) {
1197            return $conn->getPrimaryPos();
1198        }
1199
1200        /** @var IDatabaseForOwner|null $conn */
1201        $conn = $this->getConnectionInternal( ServerInfo::WRITER_INDEX, self::CONN_SILENCE_ERRORS );
1202        // @phan-suppress-next-line PhanRedundantCondition
1203        if ( !$conn ) {
1204            $this->reportConnectionError();
1205        }
1206
1207        // ::getConnectionInternal() should return IDatabaseForOwner but changing signature
1208        // is not straightforward (being implemented in Wikibase)
1209        '@phan-var IDatabaseForOwner $conn';
1210        try {
1211            return $conn->getPrimaryPos();
1212        } finally {
1213            $this->closeConnection( $conn );
1214        }
1215    }
1216
1217    /**
1218     * Apply updated configuration.
1219     *
1220     * This only unregisters servers that were removed in the new configuration.
1221     * It does not register new servers nor update the group load weights.
1222     *
1223     * This invalidates any open connections. However, existing connections may continue to be
1224     * used while they are in an active transaction. In that case, the old connection will be
1225     * discarded on the first operation after the transaction is complete. The next operation
1226     * will use a new connection based on the new configuration.
1227     *
1228     * @internal for use by LBFactory::reconfigure()
1229     *
1230     * @see DBConnRef::ensureConnection()
1231     * @see LBFactory::reconfigure()
1232     *
1233     * @param array $params A database configuration array, see $wgLBFactoryConf.
1234     *
1235     * @return void
1236     */
1237    public function reconfigure( array $params ) {
1238        $anyServerDepooled = false;
1239
1240        $paramServers = $params['servers'];
1241        $newIndexByServerIndex = $this->serverInfo->reconfigureServers( $paramServers );
1242        foreach ( $newIndexByServerIndex as $i => $ni ) {
1243            if ( $ni !== null ) {
1244                // Server still exists in the new config
1245                $newWeightByGroup = $paramServers[$ni]['groupLoads'] ?? [];
1246                $newWeightByGroup[ILoadBalancer::GROUP_GENERIC] = $paramServers[$ni]['load'];
1247                // Check if the server was removed from any load groups
1248                foreach ( $this->groupLoads as $group => $weightByIndex ) {
1249                    if ( isset( $weightByIndex[$i] ) && !isset( $newWeightByGroup[$group] ) ) {
1250                        // Server no longer in this load group in the new config
1251                        $anyServerDepooled = true;
1252                        unset( $this->groupLoads[$group][$i] );
1253                    }
1254                }
1255            } else {
1256                // Server no longer exists in the new config
1257                $anyServerDepooled = true;
1258                // Note that if the primary server is depooled and a replica server promoted
1259                // to new primary, then DB_PRIMARY handles will fail with server index errors
1260                foreach ( $this->groupLoads as $group => $loads ) {
1261                    unset( $this->groupLoads[$group][$i] );
1262                }
1263            }
1264        }
1265
1266        if ( $anyServerDepooled ) {
1267            // NOTE: We could close all connection here, but some may be in the middle of
1268            //       a transaction. So instead, we leave it to DBConnRef to close the
1269            //       connection when it detects that the modcount has changed and no
1270            //       transaction is open.
1271            $this->logger->info( 'Reconfiguring dbs!' );
1272            // Unpin DB_REPLICA connection groups from server indexes
1273            $this->readIndexByGroup = [];
1274            // We could close all connection here, but some may be in the middle of a
1275            // transaction. So instead, we leave it to DBConnRef to close the connection
1276            // when it detects that the modcount has changed and no transaction is open.
1277            $this->conns = self::newTrackedConnectionsArray();
1278            // Bump modification counter to invalidate the connections held by DBConnRef
1279            // instances. This will cause the next call to a method on the DBConnRef
1280            // to get a new connection from getConnectionInternal()
1281            $this->modcount++;
1282        }
1283    }
1284
1285    public function disable( $fname = __METHOD__ ) {
1286        $this->closeAll( $fname );
1287        $this->disabled = true;
1288    }
1289
1290    public function closeAll( $fname = __METHOD__ ) {
1291        /** @noinspection PhpUnusedLocalVariableInspection */
1292        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1293        foreach ( $this->getOpenConnections() as $conn ) {
1294            $conn->close( $fname );
1295        }
1296
1297        $this->conns = self::newTrackedConnectionsArray();
1298    }
1299
1300    /**
1301     * Close a connection
1302     *
1303     * Using this function makes sure the LoadBalancer knows the connection is closed.
1304     * If you use $conn->close() directly, the load balancer won't update its state.
1305     */
1306    private function closeConnection( IDatabaseForOwner $conn ) {
1307        if ( $conn instanceof DBConnRef ) {
1308            // Avoid calling close() but still leaving the handle in the pool
1309            throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
1310        }
1311
1312        $domain = $conn->getDomainID();
1313        $serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX );
1314        if ( $serverIndex === null ) {
1315            throw new UnexpectedValueException( "Handle on '$domain' missing server index" );
1316        }
1317
1318        $srvName = $this->serverInfo->getServerName( $serverIndex );
1319
1320        $found = false;
1321        foreach ( $this->conns as $type => $poolConnsByServer ) {
1322            $key = array_search( $conn, $poolConnsByServer[$serverIndex] ?? [], true );
1323            if ( $key !== false ) {
1324                $found = true;
1325                unset( $this->conns[$type][$serverIndex][$key] );
1326            }
1327        }
1328
1329        if ( !$found ) {
1330            $this->logger->warning(
1331                __METHOD__ .
1332                ": orphaned connection to database {$this->stringifyConn( $conn )} at '$srvName'."
1333            );
1334        }
1335
1336        $this->logger->debug(
1337            __METHOD__ .
1338            ": closing connection to database {$this->stringifyConn( $conn )} at '$srvName'."
1339        );
1340
1341        $conn->close( __METHOD__ );
1342    }
1343
1344    public function finalizePrimaryChanges( $fname = __METHOD__ ) {
1345        $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1346        /** @noinspection PhpUnusedLocalVariableInspection */
1347        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1348
1349        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1350        // Loop until callbacks stop adding callbacks on other connections
1351        $total = 0;
1352        do {
1353            $count = 0; // callbacks execution attempts
1354            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1355                // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1356                // Any error should cause all (peer) transactions to be rolled back together.
1357                $count += $conn->runOnTransactionPreCommitCallbacks();
1358            }
1359            $total += $count;
1360        } while ( $count > 0 );
1361        // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1362        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1363            $conn->setTrxEndCallbackSuppression( true );
1364        }
1365        $this->trxRoundStage = self::ROUND_FINALIZED;
1366
1367        return $total;
1368    }
1369
1370    public function approvePrimaryChanges( int $maxWriteDuration, $fname = __METHOD__ ) {
1371        $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1372        /** @noinspection PhpUnusedLocalVariableInspection */
1373        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1374
1375        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1376        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1377            // Any atomic sections should have been closed by now and there definitely should
1378            // not be any open transactions started by begin() from callers outside Database.
1379            if ( $conn->explicitTrxActive() ) {
1380                throw new DBTransactionError(
1381                    $conn,
1382                    "Explicit transaction still active; a caller might have failed to call " .
1383                    "endAtomic() or cancelAtomic()."
1384                );
1385            }
1386            // Assert that the time to replicate the transaction will be reasonable.
1387            // If this fails, then all DB transactions will be rollback back together.
1388            $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1389            if ( $maxWriteDuration > 0 ) {
1390                if ( $time > $maxWriteDuration ) {
1391                    $humanTimeSec = round( $time, 3 );
1392                    throw new DBTransactionSizeError(
1393                        $conn,
1394                        "Transaction spent {time}s in writes, exceeding the {$maxWriteDuration}s limit",
1395                        // Message parameters for: transaction-duration-limit-exceeded
1396                        [ $time, $maxWriteDuration ],
1397                        null,
1398                        [ 'time' => $humanTimeSec ]
1399                    );
1400                } elseif ( $time > 0 ) {
1401                    $timeMs = $time * 1000;
1402                    $humanTimeMs = round( $timeMs, $timeMs > 1 ? 0 : 3 );
1403                    $this->logger->debug(
1404                        "Transaction spent {time_ms}ms in writes, under the {$maxWriteDuration}s limit",
1405                        [ 'time_ms' => $humanTimeMs ]
1406                    );
1407                }
1408            }
1409            // If a connection sits idle for too long it might be dropped, causing transaction
1410            // writes and session locks to be lost. Ping all the server connections before making
1411            // any attempt to commit the transactions belonging to the active transaction round.
1412            if ( $conn->writesOrCallbacksPending() || $conn->sessionLocksPending() ) {
1413                if ( !$conn->ping() ) {
1414                    throw new DBTransactionError(
1415                        $conn,
1416                        "Pre-commit ping failed on server {$conn->getServerName()}"
1417                    );
1418                }
1419            }
1420        }
1421        $this->trxRoundStage = self::ROUND_APPROVED;
1422    }
1423
1424    public function beginPrimaryChanges( $fname = __METHOD__ ) {
1425        if ( $this->trxRoundFname !== null ) {
1426            throw new DBTransactionError(
1427                null,
1428                "Transaction round '{$this->trxRoundFname}' already started"
1429            );
1430        }
1431        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1432        /** @noinspection PhpUnusedLocalVariableInspection */
1433        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1434
1435        // Clear any empty transactions (no writes/callbacks) from the implicit round
1436        $this->flushPrimarySnapshots( $fname );
1437
1438        $this->trxRoundFname = $fname;
1439        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1440        // Mark applicable handles as participating in this explicit transaction round.
1441        // For each of these handles, any writes and callbacks will be tied to a single
1442        // transaction. The (peer) handles will reject begin()/commit() calls unless they
1443        // are part of an en masse commit or an en masse rollback.
1444        foreach ( $this->getOpenConnections() as $conn ) {
1445            $this->syncConnectionRoundState( $conn );
1446        }
1447        $this->trxRoundStage = self::ROUND_CURSORY;
1448    }
1449
1450    public function commitPrimaryChanges( $fname = __METHOD__ ) {
1451        $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1452        /** @noinspection PhpUnusedLocalVariableInspection */
1453        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1454
1455        $failures = [];
1456
1457        $this->trxRoundFname = null;
1458        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1459        // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1460        // Note that callbacks should already be suppressed due to finalizePrimaryChanges().
1461        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1462            try {
1463                $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1464            } catch ( DBError $e ) {
1465                ( $this->errorLogger )( $e );
1466                $failures[] = "{$conn->getServerName()}{$e->getMessage()}";
1467            }
1468        }
1469        if ( $failures ) {
1470            throw new DBTransactionError(
1471                null,
1472                "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1473            );
1474        }
1475        // Unmark handles as participating in this explicit transaction round
1476        foreach ( $this->getOpenConnections() as $conn ) {
1477            $this->syncConnectionRoundState( $conn );
1478        }
1479        $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1480    }
1481
1482    public function runPrimaryTransactionIdleCallbacks( $fname = __METHOD__ ) {
1483        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1484            $type = IDatabase::TRIGGER_COMMIT;
1485        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1486            $type = IDatabase::TRIGGER_ROLLBACK;
1487        } else {
1488            throw new DBTransactionError(
1489                null,
1490                "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1491            );
1492        }
1493        /** @noinspection PhpUnusedLocalVariableInspection */
1494        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1495
1496        $oldStage = $this->trxRoundStage;
1497        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1498
1499        // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1500        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1501            $conn->setTrxEndCallbackSuppression( false );
1502        }
1503
1504        $errors = [];
1505        $fname = __METHOD__;
1506        // Loop until callbacks stop adding callbacks on other connections
1507        do {
1508            // Run any pending callbacks for each connection...
1509            $count = 0; // callback execution attempts
1510            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1511                if ( $conn->trxLevel() ) {
1512                    continue; // retry in the next iteration, after commit() is called
1513                }
1514                $count += $conn->runOnTransactionIdleCallbacks( $type, $errors );
1515            }
1516            // Clear out any active transactions left over from callbacks...
1517            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1518                if ( $conn->writesPending() ) {
1519                    // A callback from another handle wrote to this one and DBO_TRX is set
1520                    $fnames = implode( ', ', $conn->pendingWriteCallers() );
1521                    $this->logger->info(
1522                        "$fname: found writes pending ($fnames).",
1523                        $this->getConnLogContext(
1524                            $conn,
1525                            [ 'exception' => new RuntimeException() ]
1526                        )
1527                    );
1528                } elseif ( $conn->trxLevel() ) {
1529                    // A callback from another handle read from this one and DBO_TRX is set,
1530                    // which can easily happen if there is only one DB (no replicas)
1531                    $this->logger->debug( "$fname: found empty transaction." );
1532                }
1533                try {
1534                    $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1535                } catch ( DBError $ex ) {
1536                    $errors[] = $ex;
1537                }
1538            }
1539        } while ( $count > 0 );
1540
1541        $this->trxRoundStage = $oldStage;
1542
1543        return $errors[0] ?? null;
1544    }
1545
1546    public function runPrimaryTransactionListenerCallbacks( $fname = __METHOD__ ) {
1547        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1548            $type = IDatabase::TRIGGER_COMMIT;
1549        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1550            $type = IDatabase::TRIGGER_ROLLBACK;
1551        } else {
1552            throw new DBTransactionError(
1553                null,
1554                "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1555            );
1556        }
1557        /** @noinspection PhpUnusedLocalVariableInspection */
1558        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1559
1560        $errors = [];
1561        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1562        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1563            $conn->runTransactionListenerCallbacks( $type, $errors );
1564        }
1565        $this->trxRoundStage = self::ROUND_CURSORY;
1566
1567        return $errors[0] ?? null;
1568    }
1569
1570    public function rollbackPrimaryChanges( $fname = __METHOD__ ) {
1571        /** @noinspection PhpUnusedLocalVariableInspection */
1572        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1573
1574        $this->trxRoundFname = null;
1575        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1576        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1577            $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1578        }
1579        // Unmark handles as participating in this explicit transaction round
1580        foreach ( $this->getOpenConnections() as $conn ) {
1581            $this->syncConnectionRoundState( $conn );
1582        }
1583        $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1584    }
1585
1586    public function flushPrimarySessions( $fname = __METHOD__ ) {
1587        $this->assertTransactionRoundStage( [ self::ROUND_CURSORY ] );
1588        if ( $this->hasPrimaryChanges() ) {
1589            // Any transaction should have been rolled back beforehand
1590            throw new DBTransactionError( null, "Cannot reset session while writes are pending" );
1591        }
1592
1593        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1594            $conn->flushSession( $fname, $conn::FLUSHING_ALL_PEERS );
1595        }
1596    }
1597
1598    /**
1599     * @param string|string[] $stage
1600     * @throws DBTransactionError
1601     */
1602    private function assertTransactionRoundStage( $stage ) {
1603        $stages = (array)$stage;
1604
1605        if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1606            $stageList = implode(
1607                '/',
1608                array_map( static function ( $v ) {
1609                    return "'$v'";
1610                }, $stages )
1611            );
1612            throw new DBTransactionError(
1613                null,
1614                "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1615            );
1616        }
1617    }
1618
1619    /**
1620     * Make connections with DBO_DEFAULT/DBO_TRX set join any active transaction round
1621     *
1622     * Some servers may have neither flag enabled, meaning that they opt out of such
1623     * transaction rounds and remain in auto-commit mode. Such behavior might be desired
1624     * when a DB server is used for something like simple key/value storage.
1625     *
1626     * @param Database $conn
1627     */
1628    private function syncConnectionRoundState( Database $conn ) {
1629        if ( $conn->getLBInfo( self::INFO_CONN_CATEGORY ) !== self::CATEGORY_ROUND ) {
1630            return; // transaction rounds do not apply to these connections
1631        }
1632
1633        if ( $this->trxRoundFname !== null ) {
1634            // Explicit transaction round
1635            $trxRoundLevel = 1;
1636        } else {
1637            // Implicit auto-commit round (cli mode) or implicit transaction round (web mode)
1638            $trxRoundLevel = ( $this->cliMode ? 0 : 1 );
1639        }
1640
1641        // CATEGORY_ROUND DB handles with DBO_DEFAULT are considered "round aware" and the
1642        // load balancer will take over the logic of when DBO_TRX is set for such DB handles.
1643        // DBO_TRX will be set only when a transaction round is active (explicit or implicit).
1644        if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1645            if ( $trxRoundLevel ) {
1646                // Wrap queries in a transaction joined to the active transaction round
1647                $conn->setFlag( $conn::DBO_TRX );
1648            } else {
1649                // Do not wrap queries in a transaction (no active transaction round)
1650                $conn->clearFlag( $conn::DBO_TRX );
1651            }
1652        }
1653
1654        // Note that DBO_TRX is normally only set above or by DatabaseFactory applying
1655        // DBO_DEFAULT. However, integration tests might directly force DBO_TRX in the
1656        // server configuration arrays. Such handles will still be flushed during calls
1657        // to {@link LoadBalancer::commitPrimaryChanges()}.
1658        if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1659            // DB handle is participating in the active transction round
1660            $conn->setLBInfo( $conn::LB_TRX_ROUND_LEVEL, $trxRoundLevel );
1661            $conn->setLBInfo( $conn::LB_TRX_ROUND_FNAME, $this->trxRoundFname );
1662        } else {
1663            // DB handle is not participating in any transction rounds
1664            $conn->setLBInfo( $conn::LB_TRX_ROUND_LEVEL, 0 );
1665            $conn->setLBInfo( $conn::LB_TRX_ROUND_FNAME, null );
1666        }
1667    }
1668
1669    public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1670        foreach ( $this->conns as $poolConnsByServer ) {
1671            foreach ( $poolConnsByServer as $serverIndex => $serverConns ) {
1672                if ( $serverIndex === ServerInfo::WRITER_INDEX ) {
1673                    continue; // skip primary
1674                }
1675                foreach ( $serverConns as $conn ) {
1676                    $conn->flushSnapshot( $fname );
1677                }
1678            }
1679        }
1680    }
1681
1682    public function flushPrimarySnapshots( $fname = __METHOD__ ) {
1683        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1684            $conn->flushSnapshot( $fname );
1685        }
1686    }
1687
1688    public function hasPrimaryConnection() {
1689        return (bool)$this->getAnyOpenConnection( ServerInfo::WRITER_INDEX );
1690    }
1691
1692    public function hasPrimaryChanges() {
1693        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1694            if ( $conn->writesOrCallbacksPending() ) {
1695                return true;
1696            }
1697        }
1698
1699        return false;
1700    }
1701
1702    public function lastPrimaryChangeTimestamp() {
1703        $lastTime = null;
1704        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1705            $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1706        }
1707
1708        return $lastTime;
1709    }
1710
1711    public function hasOrMadeRecentPrimaryChanges( $age = null ) {
1712        $age ??= self::MAX_WAIT_DEFAULT;
1713
1714        return ( $this->hasPrimaryChanges()
1715            || $this->lastPrimaryChangeTimestamp() > microtime( true ) - $age );
1716    }
1717
1718    public function pendingPrimaryChangeCallers() {
1719        $fnames = [];
1720        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1721            $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1722        }
1723
1724        return $fnames;
1725    }
1726
1727    public function explicitTrxActive() {
1728        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1729            if ( $conn->explicitTrxActive() ) {
1730                return true;
1731            }
1732        }
1733        return false;
1734    }
1735
1736    private function setLaggedReplicaMode(): void {
1737        $this->laggedReplicaMode = true;
1738        $this->logger->warning( __METHOD__ . ": setting lagged replica mode" );
1739    }
1740
1741    public function laggedReplicaUsed() {
1742        return $this->laggedReplicaMode;
1743    }
1744
1745    public function getReadOnlyReason() {
1746        if ( $this->readOnlyReason !== false ) {
1747            return $this->readOnlyReason;
1748        } elseif ( $this->isPrimaryRunningReadOnly() ) {
1749            return 'The primary database server is running in read-only mode.';
1750        }
1751
1752        return false;
1753    }
1754
1755    /**
1756     * @note This method suppresses DBError exceptions in order to avoid severe downtime
1757     * @param IDatabaseForOwner|null $conn Recently acquired primary connection; null if not applicable
1758     * @return bool Whether the entire primary DB server or the local domain DB is read-only
1759     */
1760    private function isPrimaryRunningReadOnly( ?IDatabaseForOwner $conn = null ) {
1761        // Context will often be HTTP GET/HEAD; heavily cache the results
1762        return (bool)$this->wanCache->getWithSetCallback(
1763            // Note that table prefixes are not related to server-side read-only mode
1764            $this->wanCache->makeGlobalKey(
1765                'rdbms-server-readonly',
1766                $this->serverInfo->getPrimaryServerName()
1767            ),
1768            self::TTL_CACHE_READONLY,
1769            function ( $oldValue ) use ( $conn ) {
1770                $scope = $this->trxProfiler->silenceForScope();
1771                $conn ??= $this->getServerConnection(
1772                    ServerInfo::WRITER_INDEX,
1773                    self::DOMAIN_ANY,
1774                    self::CONN_SILENCE_ERRORS
1775                );
1776                if ( $conn ) {
1777                    try {
1778                        $value = (int)$conn->serverIsReadOnly();
1779                    } catch ( DBError $e ) {
1780                        $value = is_int( $oldValue ) ? $oldValue : 0;
1781                    }
1782                } else {
1783                    $value = 0;
1784                }
1785                ScopedCallback::consume( $scope );
1786
1787                return $value;
1788            },
1789            [
1790                'busyValue' => 0,
1791                'pcTTL' => WANObjectCache::TTL_PROC_LONG
1792            ]
1793        );
1794    }
1795
1796    public function pingAll() {
1797        $success = true;
1798        foreach ( $this->getOpenConnections() as $conn ) {
1799            if ( !$conn->ping() ) {
1800                $success = false;
1801            }
1802        }
1803
1804        return $success;
1805    }
1806
1807    /**
1808     * Get all open connections
1809     * @return \Generator|Database[]
1810     */
1811    private function getOpenConnections() {
1812        foreach ( $this->conns as $poolConnsByServer ) {
1813            foreach ( $poolConnsByServer as $serverConns ) {
1814                foreach ( $serverConns as $conn ) {
1815                    yield $conn;
1816                }
1817            }
1818        }
1819    }
1820
1821    /**
1822     * Get all open primary connections
1823     * @return \Generator|Database[]
1824     */
1825    private function getOpenPrimaryConnections() {
1826        foreach ( $this->conns as $poolConnsByServer ) {
1827            /** @var IDatabaseForOwner $conn */
1828            foreach ( ( $poolConnsByServer[ServerInfo::WRITER_INDEX] ?? [] ) as $conn ) {
1829                yield $conn;
1830            }
1831        }
1832    }
1833
1834    public function getMaxLag() {
1835        $host = '';
1836        $maxLag = -1;
1837        $maxIndex = 0;
1838
1839        if ( $this->serverInfo->hasReplicaServers() ) {
1840            $lagTimes = $this->getLagTimes();
1841            foreach ( $lagTimes as $i => $lag ) {
1842                // Allowing the value to be unset due to stale cache (T361824)
1843                $load = $this->groupLoads[self::GROUP_GENERIC][$i] ?? 0;
1844                if ( $load > 0 && $lag > $maxLag ) {
1845                    $maxLag = $lag;
1846                    $host = $this->serverInfo->getServerInfoStrict( $i, 'host' );
1847                    $maxIndex = $i;
1848                }
1849            }
1850        }
1851
1852        return [ $host, $maxLag, $maxIndex ];
1853    }
1854
1855    public function getLagTimes() {
1856        if ( !$this->hasReplicaServers() ) {
1857            return [ ServerInfo::WRITER_INDEX => 0 ]; // no replication = no lag
1858        }
1859        $fname = __METHOD__;
1860        return $this->wanCache->getWithSetCallback(
1861            $this->wanCache->makeGlobalKey( 'rdbms-lags', $this->getClusterName() ),
1862            // Add jitter to avoid stampede
1863            10 + mt_rand( 1, 10 ),
1864            function () use ( $fname ) {
1865                $lags = [];
1866                foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) {
1867                    $conn = $this->getServerConnection(
1868                        $i,
1869                        self::DOMAIN_ANY,
1870                        self::CONN_SILENCE_ERRORS | self::CONN_UNTRACKED_GAUGE
1871                    );
1872                    if ( $conn ) {
1873                        $lags[$i] = $conn->getLag();
1874                        $conn->close( $fname );
1875                    } else {
1876                        $lags[$i] = false;
1877                    }
1878                }
1879                return $lags;
1880            },
1881            [ 'lockTSE' => 30 ]
1882        );
1883    }
1884
1885    public function waitForPrimaryPos( IDatabase $conn ) {
1886        if ( $conn->getLBInfo( self::INFO_SERVER_INDEX ) === ServerInfo::WRITER_INDEX ) {
1887            return true; // not a replica DB server
1888        }
1889
1890        // Get the current primary DB position, opening a connection only if needed
1891        $flags = self::CONN_SILENCE_ERRORS;
1892        $primaryConn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX, $flags );
1893        if ( $primaryConn ) {
1894            $pos = $primaryConn->getPrimaryPos();
1895        } else {
1896            $primaryConn = $this->getServerConnection( ServerInfo::WRITER_INDEX, self::DOMAIN_ANY, $flags );
1897            if ( !$primaryConn ) {
1898                throw new DBReplicationWaitError(
1899                    null,
1900                    "Could not obtain a primary database connection to get the position"
1901                );
1902            }
1903            $pos = $primaryConn->getPrimaryPos();
1904            $this->closeConnection( $primaryConn );
1905        }
1906
1907        if ( $pos instanceof DBPrimaryPos && $conn instanceof IDatabaseForOwner ) {
1908            $this->logger->debug( __METHOD__ . ': waiting' );
1909            $result = $conn->primaryPosWait( $pos, self::MAX_WAIT_DEFAULT );
1910            $ok = ( $result !== null && $result != -1 );
1911            if ( $ok ) {
1912                $this->logger->debug( __METHOD__ . ': done waiting (success)' );
1913            } else {
1914                $this->logger->debug( __METHOD__ . ': done waiting (failure)' );
1915            }
1916        } else {
1917            $ok = false; // something is misconfigured
1918            $this->logger->error(
1919                __METHOD__ . ': could not get primary pos for {db_server}',
1920                $this->getConnLogContext( $conn, [ 'exception' => new RuntimeException() ] )
1921            );
1922        }
1923
1924        return $ok;
1925    }
1926
1927    public function setTransactionListener( $name, ?callable $callback = null ) {
1928        if ( $callback ) {
1929            $this->trxRecurringCallbacks[$name] = $callback;
1930        } else {
1931            unset( $this->trxRecurringCallbacks[$name] );
1932        }
1933        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1934            $conn->setTransactionListener( $name, $callback );
1935        }
1936    }
1937
1938    public function setTableAliases( array $aliases ) {
1939        $this->tableAliases = $aliases;
1940    }
1941
1942    public function setIndexAliases( array $aliases ) {
1943        $this->indexAliases = $aliases;
1944    }
1945
1946    public function setDomainAliases( array $aliases ) {
1947        $this->domainAliases = $aliases;
1948    }
1949
1950    public function setLocalDomainPrefix( $prefix ) {
1951        $oldLocalDomain = $this->localDomain;
1952        $this->localDomain = new DatabaseDomain(
1953            $this->localDomain->getDatabase(),
1954            $this->localDomain->getSchema(),
1955            $prefix
1956        );
1957
1958        // Update the prefix for existing connections.
1959        // Existing DBConnRef handles will not be affected.
1960        foreach ( $this->getOpenConnections() as $conn ) {
1961            if ( $oldLocalDomain->equals( $conn->getDomainID() ) ) {
1962                $conn->tablePrefix( $prefix );
1963            }
1964        }
1965    }
1966
1967    public function redefineLocalDomain( $domain ) {
1968        $this->closeAll( __METHOD__ );
1969        $this->localDomain = DatabaseDomain::newFromId( $domain );
1970    }
1971
1972    public function setTempTablesOnlyMode( $value, $domain ) {
1973        $old = $this->tempTablesOnlyMode[$domain] ?? false;
1974        if ( $value ) {
1975            $this->tempTablesOnlyMode[$domain] = true;
1976        } else {
1977            unset( $this->tempTablesOnlyMode[$domain] );
1978        }
1979
1980        return $old;
1981    }
1982
1983    /**
1984     * @param IDatabase $conn
1985     * @return string Description of a connection handle for log messages
1986     * @throws InvalidArgumentException
1987     */
1988    private function stringifyConn( IDatabase $conn ) {
1989        return $conn->getLBInfo( self::INFO_SERVER_INDEX ) . '/' . $conn->getDomainID();
1990    }
1991
1992    /**
1993     * @param int $flags A bitfield of flags
1994     * @param int $bit Bit flag constant
1995     * @return bool Whether the bit field has the specified bit flag set
1996     */
1997    private function fieldHasBit( int $flags, int $bit ) {
1998        return ( ( $flags & $bit ) === $bit );
1999    }
2000
2001    /**
2002     * Create a log context to pass to PSR-3 logger functions.
2003     *
2004     * @param IDatabase $conn
2005     * @param array $extras Additional data to add to context
2006     * @return array
2007     */
2008    protected function getConnLogContext( IDatabase $conn, array $extras = [] ) {
2009        return array_merge(
2010            [
2011                'db_server' => $conn->getServerName(),
2012                'db_domain' => $conn->getDomainID()
2013            ],
2014            $extras
2015        );
2016    }
2017
2018    /**
2019     * @param float|null &$time Mock UNIX timestamp for testing
2020     * @internal
2021     * @codeCoverageIgnore
2022     */
2023    public function setMockTime( &$time ) {
2024        $this->loadMonitor->setMockTime( $time );
2025    }
2026}