Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
56.46% covered (warning)
56.46%
498 / 882
32.93% covered (danger)
32.93%
27 / 82
CRAP
0.00% covered (danger)
0.00%
0 / 1
LoadBalancer
56.46% covered (warning)
56.46%
498 / 882
32.93% covered (danger)
32.93%
27 / 82
8932.80
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 configure
88.64% covered (warning)
88.64%
39 / 44
0.00% covered (danger)
0.00%
0 / 1
12.21
 newTrackedConnectionsArray
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 getClusterName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getLocalDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 resolveDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 resolveDomainInstance
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
6
 resolveGroups
73.33% covered (warning)
73.33%
11 / 15
0.00% covered (danger)
0.00%
0 / 1
14.73
 sanitizeConnectionFlags
81.82% covered (warning)
81.82%
9 / 11
0.00% covered (danger)
0.00%
0 / 1
6.22
 enforceConnectionFlags
50.00% covered (danger)
50.00%
4 / 8
0.00% covered (danger)
0.00%
0 / 1
6.00
 getRandomNonLagged
45.45% covered (danger)
45.45%
10 / 22
0.00% covered (danger)
0.00%
0 / 1
14.95
 getReaderIndex
78.26% covered (warning)
78.26%
18 / 23
0.00% covered (danger)
0.00%
0 / 1
8.66
 getExistingReaderIndex
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 pickReaderIndex
55.56% covered (warning)
55.56%
15 / 27
0.00% covered (danger)
0.00%
0 / 1
18.78
 waitForAll
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
72
 serverHasLoadInAnyGroup
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 getAnyOpenConnection
93.33% covered (success)
93.33%
14 / 15
0.00% covered (danger)
0.00%
0 / 1
7.01
 pickAnyOpenConnection
45.45% covered (danger)
45.45%
5 / 11
0.00% covered (danger)
0.00%
0 / 1
6.60
 awaitSessionPrimaryPos
7.14% covered (danger)
7.14%
3 / 42
0.00% covered (danger)
0.00%
0 / 1
127.29
 getConnection
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getConnectionInternal
93.33% covered (success)
93.33%
14 / 15
0.00% covered (danger)
0.00%
0 / 1
6.01
 getServerConnection
73.68% covered (warning)
73.68%
14 / 19
0.00% covered (danger)
0.00%
0 / 1
6.66
 reuseConnection
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getConnectionRef
66.67% covered (warning)
66.67%
6 / 9
0.00% covered (danger)
0.00%
0 / 1
4.59
 getMaintenanceConnectionRef
55.56% covered (warning)
55.56%
5 / 9
0.00% covered (danger)
0.00%
0 / 1
5.40
 reuseOrOpenConnectionForNewRef
92.68% covered (success)
92.68%
38 / 41
0.00% covered (danger)
0.00%
0 / 1
15.09
 assertConnectionDomain
25.00% covered (danger)
25.00%
1 / 4
0.00% covered (danger)
0.00%
0 / 1
3.69
 getServerAttributes
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 reallyOpenConnection
79.10% covered (warning)
79.10%
53 / 67
0.00% covered (danger)
0.00%
0 / 1
15.79
 loadSessionPrimaryPos
71.43% covered (warning)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
6.84
 reportConnectionError
60.61% covered (warning)
60.61%
20 / 33
0.00% covered (danger)
0.00%
0 / 1
10.00
 getWriterIndex
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerCount
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasReplicaServers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasStreamingReplicaServers
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerInfo
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getServerType
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getPrimaryPos
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
 reconfigure
89.47% covered (warning)
89.47%
17 / 19
0.00% covered (danger)
0.00%
0 / 1
8.07
 disable
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 closeAll
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 closeConnection
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
42
 finalizePrimaryChanges
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
3
 approvePrimaryChanges
28.57% covered (danger)
28.57%
10 / 35
0.00% covered (danger)
0.00%
0 / 1
46.44
 beginPrimaryChanges
69.23% covered (warning)
69.23%
9 / 13
0.00% covered (danger)
0.00%
0 / 1
3.26
 commitPrimaryChanges
65.00% covered (warning)
65.00%
13 / 20
0.00% covered (danger)
0.00%
0 / 1
7.54
 runPrimaryTransactionIdleCallbacks
52.63% covered (warning)
52.63%
20 / 38
0.00% covered (danger)
0.00%
0 / 1
20.63
 runPrimaryTransactionListenerCallbacks
60.00% covered (warning)
60.00%
9 / 15
0.00% covered (danger)
0.00%
0 / 1
5.02
 rollbackPrimaryChanges
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
20
 flushPrimarySessions
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 assertTransactionRoundStage
16.67% covered (danger)
16.67%
2 / 12
0.00% covered (danger)
0.00%
0 / 1
4.31
 applyTransactionRoundFlags
83.33% covered (warning)
83.33%
5 / 6
0.00% covered (danger)
0.00%
0 / 1
4.07
 undoTransactionRoundFlags
83.33% covered (warning)
83.33%
5 / 6
0.00% covered (danger)
0.00%
0 / 1
4.07
 flushReplicaSnapshots
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
30
 flushPrimarySnapshots
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 hasPrimaryConnection
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 hasPrimaryChanges
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 lastPrimaryChangeTimestamp
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 hasOrMadeRecentPrimaryChanges
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 pendingPrimaryChangeCallers
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 setLaggedReplicaMode
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 laggedReplicaUsed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getReadOnlyReason
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 isPrimaryRunningReadOnly
92.31% covered (success)
92.31%
24 / 26
0.00% covered (danger)
0.00%
0 / 1
4.01
 pingAll
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 getOpenConnections
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
4
 getOpenPrimaryConnections
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 getMaxLag
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 getLagTimes
90.48% covered (success)
90.48%
19 / 21
0.00% covered (danger)
0.00%
0 / 1
4.01
 waitForPrimaryPos
0.00% covered (danger)
0.00%
0 / 27
0.00% covered (danger)
0.00%
0 / 1
56
 setTransactionListener
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 setTableAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setIndexAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setDomainAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setLocalDomainPrefix
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 redefineLocalDomain
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 setTempTablesOnlyMode
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 stringifyConn
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 fieldHasBit
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getConnLogContext
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 setMockTime
n/a
0 / 0
n/a
0 / 0
1
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use ArrayUtils;
23use BagOStuff;
24use EmptyBagOStuff;
25use InvalidArgumentException;
26use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27use LogicException;
28use NullStatsdDataFactory;
29use Psr\Log\LoggerInterface;
30use Psr\Log\NullLogger;
31use RuntimeException;
32use Throwable;
33use UnexpectedValueException;
34use WANObjectCache;
35use Wikimedia\ScopedCallback;
36
37/**
38 * @see ILoadBalancer
39 * @ingroup Database
40 */
41class LoadBalancer implements ILoadBalancerForOwner {
42    /** @var ILoadMonitor */
43    private $loadMonitor;
44    /** @var BagOStuff */
45    private $srvCache;
46    /** @var WANObjectCache */
47    private $wanCache;
48    /** @var DatabaseFactory */
49    private $databaseFactory;
50
51    /** @var TransactionProfiler */
52    private $trxProfiler;
53    /** @var StatsdDataFactoryInterface */
54    private $statsd;
55    /** @var LoggerInterface */
56    private $logger;
57    /** @var callable Exception logger */
58    private $errorLogger;
59    /** @var DatabaseDomain Local DB domain ID and default for new connections */
60    private $localDomain;
61
62    /** @var array<string,array<int,Database[]>> Map of (connection pool => server index => Database[]) */
63    private $conns;
64
65    /** @var string|null The name of the DB cluster */
66    private $clusterName;
67    /** @var ServerInfo */
68    private $serverInfo;
69    /** @var array[] Map of (group => server index => weight) */
70    private $groupLoads;
71    /** @var string|null Default query group to use with getConnection() */
72    private $defaultGroup;
73
74    /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
75    private $tableAliases = [];
76    /** @var string[] Map of (index alias => index) */
77    private $indexAliases = [];
78    /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */
79    private $domainAliases = [];
80    /** @var callable[] Map of (name => callable) */
81    private $trxRecurringCallbacks = [];
82    /** @var bool[] Map of (domain => whether to use "temp tables only" mode) */
83    private $tempTablesOnlyMode = [];
84
85    /** @var string|false Explicit DBO_TRX transaction round active or false if none */
86    private $trxRoundId = false;
87    /** @var string Stage of the current transaction round in the transaction round life-cycle */
88    private $trxRoundStage = self::ROUND_CURSORY;
89    /** @var int[] The group replica server indexes keyed by group */
90    private $readIndexByGroup = [];
91    /** @var DBPrimaryPos|null Replication sync position or false if not set */
92    private $waitForPos;
93    /** @var bool Whether a lagged replica DB was used */
94    private $laggedReplicaMode = false;
95    /** @var string|false Reason this instance is read-only or false if not */
96    private $readOnlyReason = false;
97    /** @var int Total number of new connections ever made with this instance */
98    private $connectionCounter = 0;
99    /** @var bool */
100    private $disabled = false;
101    private ?ChronologyProtector $chronologyProtector = null;
102    /** @var bool Whether the session consistency callback already executed */
103    private $chronologyProtectorCalled = false;
104
105    /** @var Database|null The last connection handle that caused a problem */
106    private $lastErrorConn;
107
108    /** @var DatabaseDomain[] Map of (domain ID => domain instance) */
109    private $nonLocalDomainCache = [];
110
111    /**
112     * @var int Modification counter for invalidating connections held by
113     *      DBConnRef instances. This is bumped by reconfigure().
114     */
115    private $modcount = 0;
116
117    /** The "server index" LB info key; see {@link IDatabase::getLBInfo()} */
118    private const INFO_SERVER_INDEX = 'serverIndex';
119    /** The "connection category" LB info key; see {@link IDatabase::getLBInfo()} */
120    private const INFO_CONN_CATEGORY = 'connCategory';
121
122    /**
123     * Default 'maxLag' when unspecified
124     * @internal Only for use within LoadBalancer/LoadMonitor
125     */
126    public const MAX_LAG_DEFAULT = ServerInfo::MAX_LAG_DEFAULT;
127
128    /** Warn when this many connection are held */
129    private const CONN_HELD_WARN_THRESHOLD = 10;
130
131    /** Default 'waitTimeout' when unspecified */
132    private const MAX_WAIT_DEFAULT = 10;
133    /** Seconds to cache primary DB server read-only status */
134    private const TTL_CACHE_READONLY = 5;
135
136    /** A category of connections that are tracked and transaction round aware */
137    private const CATEGORY_ROUND = 'round';
138    /** A category of connections that are tracked and in autocommit-mode */
139    private const CATEGORY_AUTOCOMMIT = 'auto-commit';
140    /** A category of connections that are untracked and in gauge-mode */
141    private const CATEGORY_GAUGE = 'gauge';
142
143    /** Transaction round, explicit or implicit, has not finished writing */
144    private const ROUND_CURSORY = 'cursory';
145    /** Transaction round writes are complete and ready for pre-commit checks */
146    private const ROUND_FINALIZED = 'finalized';
147    /** Transaction round passed final pre-commit checks */
148    private const ROUND_APPROVED = 'approved';
149    /** Transaction round was committed and post-commit callbacks must be run */
150    private const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
151    /** Transaction round was rolled back and post-rollback callbacks must be run */
152    private const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
153    /** Transaction round encountered an error */
154    private const ROUND_ERROR = 'error';
155
156    /** @var int Idiom for getExistingReaderIndex() meaning "no index selected" */
157    private const READER_INDEX_NONE = -1;
158
159    public function __construct( array $params ) {
160        $this->configure( $params );
161
162        $this->conns = self::newTrackedConnectionsArray();
163    }
164
165    /**
166     * @param array $params A database configuration array, see $wgLBFactoryConf.
167     *
168     * @return void
169     */
170    protected function configure( array $params ): void {
171        $this->localDomain = isset( $params['localDomain'] )
172            ? DatabaseDomain::newFromId( $params['localDomain'] )
173            : DatabaseDomain::newUnspecified();
174
175        $this->serverInfo = new ServerInfo();
176        $this->groupLoads = [ self::GROUP_GENERIC => [] ];
177        foreach ( $this->serverInfo->normalizeServerMaps( $params['servers'] ?? [] ) as $i => $server ) {
178            $this->serverInfo->addServer( $i, $server );
179            foreach ( $server['groupLoads'] as $group => $weight ) {
180                $this->groupLoads[$group][$i] = $weight;
181            }
182            $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
183        }
184
185        if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
186            $this->readOnlyReason = $params['readOnlyReason'];
187        }
188
189        $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
190        $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
191
192        // Note: this parameter is normally absent. It is injectable for testing purposes only.
193        $this->databaseFactory = $params['databaseFactory'] ?? new DatabaseFactory( $params );
194
195        $this->errorLogger = $params['errorLogger'] ?? static function ( Throwable $e ) {
196                trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
197        };
198        $this->logger = $params['logger'] ?? new NullLogger();
199
200        $this->clusterName = $params['clusterName'] ?? null;
201        $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
202        $this->statsd = $params['statsdDataFactory'] ?? new NullStatsdDataFactory();
203
204        // Set up LoadMonitor
205        $loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
206        $loadMonitorConfig += [ 'maxConnCount' => 500 ];
207        $compat = [
208            'LoadMonitor' => LoadMonitor::class,
209            'LoadMonitorNull' => LoadMonitorNull::class
210        ];
211        $class = $loadMonitorConfig['class'];
212        if ( isset( $compat[$class] ) ) {
213            $class = $compat[$class];
214        }
215        $this->loadMonitor = new $class(
216            $this, $this->srvCache, $this->wanCache, $loadMonitorConfig );
217        $this->loadMonitor->setLogger( $this->logger );
218        $this->loadMonitor->setStatsdDataFactory( $this->statsd );
219
220        if ( isset( $params['chronologyProtector'] ) ) {
221            $this->chronologyProtector = $params['chronologyProtector'];
222        }
223
224        if ( isset( $params['roundStage'] ) ) {
225            if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
226                $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
227            } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
228                $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
229            }
230        }
231
232        $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
233        $this->defaultGroup = isset( $this->groupLoads[ $group ] ) ? $group : self::GROUP_GENERIC;
234    }
235
236    private static function newTrackedConnectionsArray() {
237        // Note that CATEGORY_GAUGE connections are untracked
238        return [
239            self::CATEGORY_ROUND => [],
240            self::CATEGORY_AUTOCOMMIT => []
241        ];
242    }
243
244    public function getClusterName(): string {
245        // Fallback to the current primary name if not specified
246        return $this->clusterName ?? $this->getServerName( ServerInfo::WRITER_INDEX );
247    }
248
249    public function getLocalDomainID(): string {
250        return $this->localDomain->getId();
251    }
252
253    public function resolveDomainID( $domain ): string {
254        return $this->resolveDomainInstance( $domain )->getId();
255    }
256
257    /**
258     * @param DatabaseDomain|string|false $domain
259     * @return DatabaseDomain
260     */
261    final protected function resolveDomainInstance( $domain ): DatabaseDomain {
262        if ( $domain instanceof DatabaseDomain ) {
263            return $domain; // already a domain instance
264        } elseif ( $domain === false || $domain === $this->localDomain->getId() ) {
265            return $this->localDomain;
266        } elseif ( isset( $this->domainAliases[$domain] ) ) {
267            $this->domainAliases[$domain] =
268                DatabaseDomain::newFromId( $this->domainAliases[$domain] );
269
270            return $this->domainAliases[$domain];
271        }
272
273        $cachedDomain = $this->nonLocalDomainCache[$domain] ?? null;
274        if ( $cachedDomain === null ) {
275            $cachedDomain = DatabaseDomain::newFromId( $domain );
276            $this->nonLocalDomainCache = [ $domain => $cachedDomain ];
277        }
278
279        return $cachedDomain;
280    }
281
282    /**
283     * Get the first group in $groups with assigned servers, falling back to the default group
284     *
285     * @param string[]|string|false $groups Query group(s) in preference order, [], or false
286     * @param int $i Specific server index or DB_PRIMARY/DB_REPLICA
287     * @return string Query group
288     */
289    private function resolveGroups( $groups, $i ) {
290        // If a specific replica server was specified, then $groups makes no sense
291        if ( $i > 0 && $groups !== [] && $groups !== false ) {
292            $list = implode( ', ', (array)$groups );
293            throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
294        }
295
296        if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
297            $resolvedGroup = $this->defaultGroup;
298        } elseif ( is_string( $groups ) ) {
299            $resolvedGroup = isset( $this->groupLoads[$groups] ) ? $groups : $this->defaultGroup;
300        } elseif ( is_array( $groups ) ) {
301            $resolvedGroup = $this->defaultGroup;
302            foreach ( $groups as $group ) {
303                if ( isset( $this->groupLoads[$group] ) ) {
304                    $resolvedGroup = $group;
305                    break;
306                }
307            }
308        } else {
309            $resolvedGroup = $this->defaultGroup;
310        }
311
312        return $resolvedGroup;
313    }
314
315    /**
316     * Sanitize connection flags provided by a call to getConnection()
317     *
318     * @param int $flags Bitfield of class CONN_* constants
319     * @param int $i Specific server index or DB_PRIMARY/DB_REPLICA
320     * @param string $domain Database domain
321     * @return int Sanitized bitfield
322     */
323    private function sanitizeConnectionFlags( $flags, $i, $domain ) {
324        // Whether an outside caller is explicitly requesting the primary database server
325        if ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX ) {
326            $flags |= self::CONN_INTENT_WRITABLE;
327        }
328
329        if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
330            // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
331            // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
332            // during larger transactions. This is useful for avoiding lock contention.
333            // Assuming all servers are of the same type (or similar), which is overwhelmingly
334            // the case, use the primary server information to get the attributes. The information
335            // for $i cannot be used since it might be DB_REPLICA, which might require connection
336            // attempts in order to be resolved into a real server index.
337            $attributes = $this->getServerAttributes( ServerInfo::WRITER_INDEX );
338            if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
339                // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
340                // to use separate connections would just cause self-deadlocks. Note that
341                // REPEATABLE-READ staleness is not an issue since DB-level locking means
342                // that transactions are Strict Serializable anyway.
343                $flags &= ~self::CONN_TRX_AUTOCOMMIT;
344                $type = $this->serverInfo->getServerType( ServerInfo::WRITER_INDEX );
345                $this->logger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
346            } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
347                // T202116: integration tests are active and queries should be all be using
348                // temporary clone tables (via prefix). Such tables are not visible across
349                // different connections nor can there be REPEATABLE-READ snapshot staleness,
350                // so use the same connection for everything.
351                $flags &= ~self::CONN_TRX_AUTOCOMMIT;
352            }
353        }
354
355        return $flags;
356    }
357
358    /**
359     * @param IDatabase $conn
360     * @param int $flags
361     * @throws DBUnexpectedError
362     */
363    private function enforceConnectionFlags( IDatabase $conn, $flags ) {
364        if (
365            self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ||
366            // Handles with open transactions are avoided since they might be subject
367            // to REPEATABLE-READ snapshots, which could affect the lag estimate query.
368            self::fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE )
369        ) {
370            if ( $conn->trxLevel() ) {
371                throw new DBUnexpectedError(
372                    $conn,
373                    'Handle requested with autocommit-mode yet it has a transaction'
374                );
375            }
376
377            $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
378        }
379    }
380
381    /**
382     * @param array $loads
383     * @param int|float $maxLag Restrict the maximum allowed lag to this many seconds, or INF for no max
384     * @return int|string|false
385     */
386    private function getRandomNonLagged( array $loads, $maxLag = INF ) {
387        $lags = $this->getLagTimes();
388
389        # Unset excessively lagged servers
390        foreach ( $lags as $i => $lag ) {
391            if ( $i !== ServerInfo::WRITER_INDEX ) {
392                # How much lag this server nominally is allowed to have
393                $maxServerLag = $this->serverInfo->getServerMaxLag( $i ); // default
394                # Constrain that further by $maxLag argument
395                $maxServerLag = min( $maxServerLag, $maxLag );
396
397                $srvName = $this->serverInfo->getServerName( $i );
398                if ( $lag === false && !is_infinite( $maxServerLag ) ) {
399                    $this->logger->debug(
400                        __METHOD__ . ": server {db_server} is not replicating?",
401                        [ 'db_server' => $srvName ]
402                    );
403                    unset( $loads[$i] );
404                } elseif ( $lag > $maxServerLag ) {
405                    $this->logger->debug(
406                        __METHOD__ .
407                            ": server {db_server} has {lag} seconds of lag (>= {maxlag})",
408                        [ 'db_server' => $srvName, 'lag' => $lag, 'maxlag' => $maxServerLag ]
409                    );
410                    unset( $loads[$i] );
411                }
412            }
413        }
414
415        if ( array_sum( $loads ) == 0 ) {
416            // All the replicas with non-zero load are lagged and the primary has zero load.
417            // Inform caller so that it can use switch to read-only mode and use a lagged replica.
418            return false;
419        }
420
421        # Return a random representative of the remainder
422        return ArrayUtils::pickRandom( $loads );
423    }
424
425    public function getReaderIndex( $group = false ) {
426        $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
427
428        if ( !$this->serverInfo->hasReplicaServers() ) {
429            // There is only one possible server to use (the primary)
430            return ServerInfo::WRITER_INDEX;
431        }
432
433        $index = $this->getExistingReaderIndex( $group );
434        if ( $index !== self::READER_INDEX_NONE ) {
435            // A reader index was already selected for this query group. Keep using it,
436            // since any session replication position was already waited on and any
437            // active transaction will be reused (e.g. for point-in-time snapshots).
438            return $index;
439        }
440
441        // Get the server weight array for this load group
442        $loads = $this->groupLoads[$group] ?? [];
443        if ( !$loads ) {
444            $this->logger->info( __METHOD__ . ": no loads for group $group" );
445            return false;
446        }
447
448        // Load any session replication positions, before any connection attempts,
449        // since reading them afterwards can only cause more delay due to possibly
450        // seeing even higher replication positions (e.g. from concurrent requests).
451        $this->loadSessionPrimaryPos();
452
453        // Scale the configured load ratios according to each server's load/state.
454        // This can sometimes trigger server connections due to cache regeneration.
455        $this->loadMonitor->scaleLoads( $loads );
456
457        // Pick a server, accounting for weight, load, lag, and session consistency
458        $i = $this->pickReaderIndex( $loads );
459        if ( $i === false ) {
460            // Connection attempts failed
461            return false;
462        }
463
464        // If data seen by queries is expected to reflect writes from a prior transaction,
465        // then wait for the chosen server to apply those changes. This is used to improve
466        // session consistency.
467        if ( !$this->awaitSessionPrimaryPos( $i ) ) {
468            // Data will be outdated compared to what was expected
469            $this->setLaggedReplicaMode();
470        }
471
472        // Keep using this server for DB_REPLICA handles for this group
473        if ( $i < 0 ) {
474            throw new UnexpectedValueException( "Cannot set a negative read server index" );
475        }
476        $this->readIndexByGroup[$group] = $i;
477
478        $serverName = $this->getServerName( $i );
479        $this->logger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
480
481        return $i;
482    }
483
484    /**
485     * Get the server index chosen for DB_REPLICA connections for the given query group
486     *
487     * @param string $group Query group; use false for the generic group
488     * @return int Specific server index or LoadBalancer::READER_INDEX_NONE if none was chosen
489     */
490    protected function getExistingReaderIndex( $group ) {
491        return $this->readIndexByGroup[$group] ?? self::READER_INDEX_NONE;
492    }
493
494    /**
495     * Pick a server that is reachable, preferably non-lagged, and return its server index
496     *
497     * This will leave the server connection open within the pool for reuse
498     *
499     * @param array $loads List of server weights
500     * @return int|false reader index or false
501     */
502    private function pickReaderIndex( array $loads ) {
503        if ( $loads === [] ) {
504            throw new InvalidArgumentException( "Server configuration array is empty" );
505        }
506
507        // Quickly look through the available servers for a server that meets criteria...
508        $currentLoads = $loads;
509        $i = false;
510        while ( count( $currentLoads ) ) {
511            if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
512                $this->logger->debug( __METHOD__ . ": session has replication position" );
513                // ChronologyProtector::getSessionPrimaryPos called in $this->loadSessionPrimaryPos()
514                // sets "waitForPos" for session consistency.
515                // This triggers doWait() after connect, so it's especially good to
516                // avoid lagged servers so as to avoid excessive delay in that method.
517                $ago = microtime( true ) - $this->waitForPos->asOfTime();
518                // Aim for <= 1 second of waiting (being too picky can backfire)
519                $i = $this->getRandomNonLagged( $currentLoads, $ago + 1 );
520            } else {
521                // Any server with less lag than it's 'max lag' param is preferable
522                $i = $this->getRandomNonLagged( $currentLoads );
523            }
524
525            if ( $i === false && count( $currentLoads ) ) {
526                $this->setLaggedReplicaMode();
527                // All replica DBs lagged, just pick anything.
528                $i = ArrayUtils::pickRandom( $currentLoads );
529            }
530
531            if ( $i === false ) {
532                // pickRandom() returned false.
533                // This is permanent and means the configuration or LoadMonitor
534                // wants us to return false.
535                $this->logger->debug( __METHOD__ . ": no suitable server found" );
536                return false;
537            }
538
539            $serverName = $this->getServerName( $i );
540            $this->logger->debug( __METHOD__ . ": connecting to $serverName..." );
541
542            // Get a connection to this server without triggering complementary connections
543            // to other servers (due to things like lag or read-only checks). We want to avoid
544            // the risk of overhead and recursion here.
545            $conn = $this->getServerConnection( $i, self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS );
546            if ( !$conn ) {
547                $this->logger->warning( __METHOD__ . ": failed connecting to $serverName" );
548                unset( $currentLoads[$i] ); // avoid this server next iteration
549                continue;
550            }
551
552            // Return this server
553            break;
554        }
555
556        // If all servers were down, quit now
557        if ( $currentLoads === [] ) {
558            $this->logger->error( __METHOD__ . ": all servers down" );
559        }
560
561        return $i;
562    }
563
564    public function waitForAll( DBPrimaryPos $pos, $timeout = null ) {
565        $timeout = $timeout ?: self::MAX_WAIT_DEFAULT;
566
567        $oldPos = $this->waitForPos;
568        try {
569            $this->waitForPos = $pos;
570
571            $failedReplicas = [];
572            foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) {
573                if ( $this->serverHasLoadInAnyGroup( $i ) ) {
574                    $start = microtime( true );
575                    $ok = $this->awaitSessionPrimaryPos( $i, $timeout );
576                    if ( !$ok ) {
577                        $failedReplicas[] = $this->getServerName( $i );
578                    }
579                    $timeout -= intval( microtime( true ) - $start );
580                }
581            }
582
583            // Stop spamming logs when only one replica is lagging and we have 5+ replicas.
584            // Mediawiki automatically stops sending queries to the lagged one.
585            $failed = $failedReplicas && ( count( $failedReplicas ) > 1 || $this->getServerCount() < 5 );
586            if ( $failed ) {
587                $this->logger->error(
588                    "Timed out waiting for replication to reach {raw_pos}",
589                    [
590                        'raw_pos' => $pos->__toString(),
591                        'failed_hosts' => $failedReplicas,
592                        'timeout' => $timeout,
593                        'exception' => new RuntimeException()
594                    ]
595                );
596            }
597
598            return !$failed;
599        } finally {
600            // Restore the old position; this is used for throttling, not lag-protection
601            $this->waitForPos = $oldPos;
602        }
603    }
604
605    /**
606     * @param int $i Specific server index
607     * @return bool
608     */
609    private function serverHasLoadInAnyGroup( $i ) {
610        foreach ( $this->groupLoads as $loadsByIndex ) {
611            if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
612                return true;
613            }
614        }
615
616        return false;
617    }
618
619    public function getAnyOpenConnection( $i, $flags = 0 ) {
620        $i = ( $i === self::DB_PRIMARY ) ? ServerInfo::WRITER_INDEX : $i;
621        $conn = false;
622        foreach ( $this->conns as $type => $poolConnsByServer ) {
623            if ( $i === self::DB_REPLICA ) {
624                // Consider all existing connections to any server
625                $applicableConnsByServer = $poolConnsByServer;
626            } else {
627                // Consider all existing connections to a specific server
628                $applicableConnsByServer = isset( $poolConnsByServer[$i] )
629                    ? [ $i => $poolConnsByServer[$i] ]
630                    : [];
631            }
632
633            $conn = $this->pickAnyOpenConnection( $applicableConnsByServer );
634            if ( $conn ) {
635                $this->logger->debug( __METHOD__ . ": found '$type' connection to #$i." );
636                break;
637            }
638        }
639
640        if ( $conn ) {
641            $this->enforceConnectionFlags( $conn, $flags );
642        }
643
644        return $conn;
645    }
646
647    /**
648     * @param Database[][] $connsByServer Map of (server index => array of DB handles)
649     * @return IDatabase|false An appropriate open connection or false if none found
650     */
651    private function pickAnyOpenConnection( array $connsByServer ) {
652        foreach ( $connsByServer as $i => $conns ) {
653            foreach ( $conns as $conn ) {
654                if ( !$conn->isOpen() ) {
655                    $this->logger->warning(
656                        __METHOD__ .
657                        ": pooled DB handle for {db_server} (#$i) has no open connection.",
658                        $this->getConnLogContext( $conn )
659                    );
660                    continue; // some sort of error occurred?
661                }
662                return $conn;
663            }
664        }
665
666        return false;
667    }
668
669    /**
670     * Wait for a given replica DB to catch up to the primary DB pos stored in "waitForPos"
671     *
672     * @see loadSessionPrimaryPos()
673     *
674     * @param int $index Specific server index
675     * @param int|null $timeout Max seconds to wait; default is "waitTimeout"
676     * @return bool Success
677     */
678    private function awaitSessionPrimaryPos( $index, $timeout = null ) {
679        $timeout = max( 1, intval( $timeout ?: self::MAX_WAIT_DEFAULT ) );
680
681        if ( !$this->waitForPos || $index === ServerInfo::WRITER_INDEX ) {
682            return true;
683        }
684
685        $srvName = $this->getServerName( $index );
686
687        // Check if we already know that the DB has reached this point
688        $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $srvName, 'v2' );
689
690        /** @var DBPrimaryPos $knownReachedPos */
691        $position = $this->srvCache->get( $key );
692        if ( !is_array( $position ) ) {
693            $knownReachedPos = null;
694        } else {
695            $class = $position['_type_'];
696            $knownReachedPos = $class::newFromArray( $position );
697        }
698        if (
699            $knownReachedPos instanceof DBPrimaryPos &&
700            $knownReachedPos->hasReached( $this->waitForPos )
701        ) {
702            $this->logger->debug(
703                __METHOD__ .
704                ": replica DB {db_server} known to be caught up (pos >= $knownReachedPos).",
705                [ 'db_server' => $srvName ]
706            );
707
708            return true;
709        }
710
711        $close = false; // close the connection afterwards
712        $flags = self::CONN_SILENCE_ERRORS;
713        // Check if there is an existing connection that can be used
714        $conn = $this->getAnyOpenConnection( $index, $flags );
715        if ( !$conn ) {
716            // Get a connection to this server without triggering complementary connections
717            // to other servers (due to things like lag or read-only checks). We want to avoid
718            // the risk of overhead and recursion here.
719            $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
720            if ( !$conn ) {
721                $this->logger->warning(
722                    __METHOD__ . ': failed to connect to {db_server}',
723                    [ 'db_server' => $srvName ]
724                );
725
726                return false;
727            }
728            // Avoid connection spam in waitForAll() when connections
729            // are made just for the sake of doing this lag check.
730            $close = true;
731        }
732
733        $this->logger->info(
734            __METHOD__ .
735            ': waiting for replica DB {db_server} to catch up...',
736            $this->getConnLogContext( $conn )
737        );
738
739        $result = $conn->primaryPosWait( $this->waitForPos, $timeout );
740
741        $ok = ( $result !== null && $result != -1 );
742        if ( $ok ) {
743            // Remember that the DB reached this point
744            $this->srvCache->set( $key, $this->waitForPos->toArray(), BagOStuff::TTL_DAY );
745        }
746
747        if ( $close ) {
748            $this->closeConnection( $conn );
749        }
750
751        return $ok;
752    }
753
754    public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
755        return $this->getConnectionRef( $i, $groups, $domain, $flags );
756    }
757
758    public function getConnectionInternal( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase {
759        $domain = $this->resolveDomainID( $domain );
760        $group = $this->resolveGroups( $groups, $i );
761        $flags = $this->sanitizeConnectionFlags( $flags, $i, $domain );
762        // If given DB_PRIMARY/DB_REPLICA, resolve it to a specific server index. Resolving
763        // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
764        // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
765        // The use of getServerConnection() instead of getConnection() avoids infinite loops.
766        $serverIndex = $i;
767        if ( $i === self::DB_PRIMARY ) {
768            $serverIndex = ServerInfo::WRITER_INDEX;
769        } elseif ( $i === self::DB_REPLICA ) {
770            $groupIndex = $this->getReaderIndex( $group );
771            if ( $groupIndex !== false ) {
772                // Group connection succeeded
773                $serverIndex = $groupIndex;
774            }
775            if ( $serverIndex < 0 ) {
776                $this->reportConnectionError( 'could not connect to any replica DB server' );
777            }
778        } elseif ( !$this->serverInfo->hasServerIndex( $i ) ) {
779            throw new UnexpectedValueException( "Invalid server index index #$i" );
780        }
781        // Get an open connection to that server (might trigger a new connection)
782        return $this->getServerConnection( $serverIndex, $domain, $flags );
783    }
784
785    public function getServerConnection( $i, $domain, $flags = 0 ) {
786        $domainInstance = DatabaseDomain::newFromId( $domain );
787        // Number of connections made before getting the server index and handle
788        $priorConnectionsMade = $this->connectionCounter;
789        // Get an open connection to this server (might trigger a new connection)
790        $conn = $this->reuseOrOpenConnectionForNewRef( $i, $domainInstance, $flags );
791        // Throw an error or otherwise bail out if the connection attempt failed
792        if ( !( $conn instanceof IDatabase ) ) {
793            if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
794                $this->reportConnectionError();
795            }
796
797            return false;
798        }
799
800        // Profile any new connections caused by this method
801        if ( $this->connectionCounter > $priorConnectionsMade ) {
802            $this->trxProfiler->recordConnection(
803                $conn->getServerName(),
804                $conn->getDBname(),
805                self::fieldHasBit( $flags, self::CONN_INTENT_WRITABLE )
806            );
807        }
808
809        if ( !$conn->isOpen() ) {
810            $this->lastErrorConn = $conn;
811            // Connection was made but later unrecoverably lost for some reason.
812            // Do not return a handle that will just throw exceptions on use, but
813            // let the calling code, e.g. getReaderIndex(), try another server.
814            if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
815                $this->reportConnectionError();
816            }
817            return false;
818        }
819
820        return $conn;
821    }
822
823    public function reuseConnection( IDatabase $conn ) {
824        // no-op
825    }
826
827    public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase {
828        if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
829            throw new UnexpectedValueException(
830                __METHOD__ . ' got CONN_SILENCE_ERRORS; connection is already deferred'
831            );
832        }
833
834        $domain = $this->resolveDomainID( $domain );
835        $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX )
836            ? self::DB_PRIMARY
837            : self::DB_REPLICA;
838
839        return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount );
840    }
841
842    public function getMaintenanceConnectionRef(
843        $i,
844        $groups = [],
845        $domain = false,
846        $flags = 0
847    ): DBConnRef {
848        if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
849            throw new UnexpectedValueException(
850                __METHOD__ . ' CONN_SILENCE_ERRORS is not supported'
851            );
852        }
853
854        $domain = $this->resolveDomainID( $domain );
855        $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX )
856            ? self::DB_PRIMARY
857            : self::DB_REPLICA;
858
859        return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount );
860    }
861
862    /**
863     * Get a live connection handle to the given domain
864     *
865     * This will reuse an existing tracked connection when possible. In some cases, this
866     * involves switching the DB domain of an existing handle in order to reuse it. If no
867     * existing handles can be reused, then a new connection will be made.
868     *
869     * @param int $i Specific server index
870     * @param DatabaseDomain $domain Database domain ID required by the reference
871     * @param int $flags Bit field of class CONN_* constants
872     * @return IDatabase|null Database or null on error
873     * @throws DBError When database selection fails
874     * @throws InvalidArgumentException When the server index is invalid
875     * @throws UnexpectedValueException When the DB domain of the connection is corrupted
876     * @throws DBAccessError If disable() was called
877     */
878    private function reuseOrOpenConnectionForNewRef( $i, DatabaseDomain $domain, $flags = 0 ) {
879        // Figure out which connection pool to use based on the flags
880        if ( $this->fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE ) ) {
881            // Use low timeouts, use autocommit mode, ignore transaction rounds
882            $category = self::CATEGORY_GAUGE;
883        } elseif ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
884            // Use autocommit mode, ignore transaction rounds
885            $category = self::CATEGORY_AUTOCOMMIT;
886        } else {
887            // Respect DBO_DEFAULT, respect transaction rounds
888            $category = self::CATEGORY_ROUND;
889        }
890
891        $conn = null;
892        // Reuse a free connection in the pool from any domain if possible. There should only
893        // be one connection in this pool unless either:
894        //  - a) IDatabase::databasesAreIndependent() returns true (e.g. postgres) and two
895        //       or more database domains have been used during the load balancer's lifetime
896        //  - b) Two or more nested function calls used getConnection() on different domains.
897        foreach ( ( $this->conns[$category][$i] ?? [] ) as $poolConn ) {
898            // Check if any required DB domain changes for the new reference are possible
899            // Calling selectDomain() would trigger a reconnect, which will break if a
900            // transaction is active or if there is any other meaningful session state.
901            $isShareable = !(
902                $poolConn->databasesAreIndependent() &&
903                $domain->getDatabase() !== null &&
904                $domain->getDatabase() !== $poolConn->getDBname()
905            );
906            if ( $isShareable ) {
907                $conn = $poolConn;
908                // Make any required DB domain changes for the new reference
909                if ( !$domain->isUnspecified() ) {
910                    $conn->selectDomain( $domain );
911                }
912                $this->logger->debug( __METHOD__ . ": reusing connection for $i/$domain" );
913                break;
914            }
915        }
916
917        // If necessary, try to open a new connection and add it to the pool
918        if ( !$conn ) {
919            $conn = $this->reallyOpenConnection(
920                $i,
921                $domain,
922                [ self::INFO_CONN_CATEGORY => $category ]
923            );
924            if ( $conn->isOpen() ) {
925                // Make Database::isReadOnly() respect server-side and configuration-based
926                // read-only mode. Note that replica handles are always seen as read-only
927                // in Database::isReadOnly() and Database::assertIsWritablePrimary().
928                if ( $i === ServerInfo::WRITER_INDEX ) {
929                    if ( $this->readOnlyReason !== false ) {
930                        $readOnlyReason = $this->readOnlyReason;
931                    } elseif ( $this->isPrimaryRunningReadOnly( $conn ) ) {
932                        $readOnlyReason = 'The primary database server is running in read-only mode.';
933                    } else {
934                        $readOnlyReason = false;
935                    }
936                    $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $readOnlyReason );
937                }
938                // Connection obtained; check if it belongs to a tracked connection category
939                if ( isset( $this->conns[$category] ) ) {
940                    // Track this connection for future reuse
941                    $this->conns[$category][$i][] = $conn;
942                }
943            } else {
944                $this->logger->warning( __METHOD__ . ": connection error for $i/$domain" );
945                $this->lastErrorConn = $conn;
946                $conn = null;
947            }
948        }
949
950        if ( $conn instanceof IDatabase ) {
951            // Check to make sure that the right domain is selected
952            $this->assertConnectionDomain( $conn, $domain );
953            // Check to make sure that the CONN_* flags are respected
954            $this->enforceConnectionFlags( $conn, $flags );
955        }
956
957        return $conn;
958    }
959
960    /**
961     * Sanity check to make sure that the right domain is selected
962     *
963     * @param Database $conn
964     * @param DatabaseDomain $domain
965     * @throws DBUnexpectedError
966     */
967    private function assertConnectionDomain( Database $conn, DatabaseDomain $domain ) {
968        if ( !$domain->isCompatible( $conn->getDomainID() ) ) {
969            throw new UnexpectedValueException(
970                "Got connection to '{$conn->getDomainID()}', but expected one for '{$domain}'"
971            );
972        }
973    }
974
975    public function getServerAttributes( $i ) {
976        return $this->databaseFactory->attributesFromType(
977            $this->getServerType( $i ),
978            $this->serverInfo->getServerDriver( $i )
979        );
980    }
981
982    /**
983     * Open a new network connection to a server (uncached)
984     *
985     * Returns a Database object whether or not the connection was successful.
986     *
987     * @param int $i Specific server index
988     * @param DatabaseDomain $domain Domain the connection is for, possibly unspecified
989     * @param array $lbInfo Additional information for setLBInfo()
990     * @return Database
991     * @throws DBAccessError
992     * @throws InvalidArgumentException
993     */
994    protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo ) {
995        if ( $this->disabled ) {
996            throw new DBAccessError();
997        }
998
999        $server = $this->serverInfo->getServerInfoStrict( $i );
1000        if ( $lbInfo[self::INFO_CONN_CATEGORY] === self::CATEGORY_GAUGE ) {
1001            // Use low connection/read timeouts for connection used for gauging server health.
1002            // Gauge information should be cached and used to avoid outages. Indefinite hanging
1003            // while gauging servers would do the opposite.
1004            $server['connectTimeout'] = min( 1, $server['connectTimeout'] ?? INF );
1005            $server['receiveTimeout'] = min( 1, $server['receiveTimeout'] ?? INF );
1006            // Avoid implicit transactions and avoid any SET query for session variables during
1007            // Database::open(). If a server becomes slow, every extra query can cause significant
1008            // delays, even with low connect/receive timeouts.
1009            $server['flags'] ??= 0;
1010            $server['flags'] &= ~IDatabase::DBO_DEFAULT;
1011            $server['flags'] |= IDatabase::DBO_GAUGE;
1012        } else {
1013            // Use implicit transactions unless explicitly configured otherwise
1014            $server['flags'] ??= IDatabase::DBO_DEFAULT;
1015        }
1016
1017        if ( !empty( $server['is static'] ) ) {
1018            $topologyRole = IDatabase::ROLE_STATIC_CLONE;
1019        } else {
1020            $topologyRole = ( $i === ServerInfo::WRITER_INDEX )
1021                ? IDatabase::ROLE_STREAMING_MASTER
1022                : IDatabase::ROLE_STREAMING_REPLICA;
1023        }
1024
1025        $conn = $this->databaseFactory->create(
1026            $server['type'],
1027            array_merge( $server, [
1028                // Basic replication role information
1029                'topologyRole' => $topologyRole,
1030                // Use the database specified in $domain (null means "none or entrypoint DB");
1031                // fallback to the $server default if the RDBMs is an embedded library using a
1032                // file on disk since there would be nothing to access to without a DB/file name.
1033                'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE]
1034                    ? ( $domain->getDatabase() ?? $server['dbname'] ?? null )
1035                    : $domain->getDatabase(),
1036                // Override the $server default schema with that of $domain if specified
1037                'schema' => $domain->getSchema(),
1038                // Use the table prefix specified in $domain
1039                'tablePrefix' => $domain->getTablePrefix(),
1040                'srvCache' => $this->srvCache,
1041                'logger' => $this->logger,
1042                'errorLogger' => $this->errorLogger,
1043                'trxProfiler' => $this->trxProfiler,
1044                'lbInfo' => [ self::INFO_SERVER_INDEX => $i ] + $lbInfo
1045            ] ),
1046            Database::NEW_UNCONNECTED
1047        );
1048        // Set alternative table/index names before any queries can be issued
1049        $conn->setTableAliases( $this->tableAliases );
1050        $conn->setIndexAliases( $this->indexAliases );
1051        // Account for any active transaction round and listeners
1052        if ( $i === ServerInfo::WRITER_INDEX ) {
1053            if ( $this->trxRoundId !== false ) {
1054                $this->applyTransactionRoundFlags( $conn );
1055            }
1056            foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1057                $conn->setTransactionListener( $name, $callback );
1058            }
1059        }
1060
1061        // Make the connection handle live
1062        try {
1063            $conn->initConnection();
1064            ++$this->connectionCounter;
1065        } catch ( DBConnectionError $e ) {
1066            $this->lastErrorConn = $conn;
1067            // ignore; let the DB handle the logging
1068        }
1069
1070        if ( $conn->isOpen() ) {
1071            $this->logger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1072        } else {
1073            $this->logger->warning(
1074                __METHOD__ . ": connection error for $i/{db_domain}",
1075                [ 'db_domain' => $domain->getId() ]
1076            );
1077        }
1078
1079        // Log when many connection are made during a single request/script
1080        $count = 0;
1081        foreach ( $this->conns as $poolConnsByServer ) {
1082            foreach ( $poolConnsByServer as $serverConns ) {
1083                $count += count( $serverConns );
1084            }
1085        }
1086        if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
1087            $this->logger->warning(
1088                __METHOD__ . ": {connections}+ connections made (primary={primarydb})",
1089                $this->getConnLogContext(
1090                    $conn,
1091                    [
1092                        'connections' => $count,
1093                        'primarydb' => $this->serverInfo->getPrimaryServerName(),
1094                        'db_domain' => $domain->getId()
1095                    ]
1096                )
1097            );
1098        }
1099
1100        $this->assertConnectionDomain( $conn, $domain );
1101
1102        return $conn;
1103    }
1104
1105    /**
1106     * Make sure that any "waitForPos" replication positions are loaded and available
1107     *
1108     * Each load balancer cluster has up to one replication position for the session.
1109     * These are used when data read by queries is expected to reflect writes caused
1110     * by a prior request/script from the same client.
1111     *
1112     * @see awaitSessionPrimaryPos()
1113     */
1114    private function loadSessionPrimaryPos() {
1115        if ( !$this->chronologyProtectorCalled && $this->chronologyProtector ) {
1116            $this->chronologyProtectorCalled = true;
1117            $pos = $this->chronologyProtector->getSessionPrimaryPos( $this );
1118            $this->logger->debug( __METHOD__ . ': executed chronology callback.' );
1119            if ( $pos ) {
1120                if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
1121                    $this->waitForPos = $pos;
1122                }
1123            }
1124        }
1125    }
1126
1127    /**
1128     * @param string $extraLbError Separat load balancer error
1129     * @throws DBConnectionError
1130     * @return never
1131     */
1132    private function reportConnectionError( $extraLbError = '' ) {
1133        if ( $this->lastErrorConn instanceof IDatabase ) {
1134            $srvName = $this->lastErrorConn->getServerName();
1135            $lastDbError = $this->lastErrorConn->lastError() ?: 'unknown error';
1136
1137            $exception = new DBConnectionError(
1138                $this->lastErrorConn,
1139                $extraLbError
1140                    ? "{$extraLbError}{$lastDbError} ({$srvName})"
1141                    : "{$lastDbError} ({$srvName})"
1142            );
1143
1144            if ( $extraLbError ) {
1145                $this->logger->warning(
1146                    __METHOD__ . "$extraLbError; {last_error} ({db_server})",
1147                    $this->getConnLogContext(
1148                        $this->lastErrorConn,
1149                        [
1150                            'method' => __METHOD__,
1151                            'last_error' => $lastDbError
1152                        ]
1153                    )
1154                );
1155            }
1156        } else {
1157            $exception = new DBConnectionError(
1158                null,
1159                $extraLbError ?: 'could not connect to the DB server'
1160            );
1161
1162            if ( $extraLbError ) {
1163                $this->logger->error(
1164                    __METHOD__ . "$extraLbError",
1165                    [
1166                        'method' => __METHOD__,
1167                        'last_error' => '(last connection error missing)'
1168                    ]
1169                );
1170            }
1171        }
1172
1173        throw $exception;
1174    }
1175
1176    public function getWriterIndex() {
1177        return ServerInfo::WRITER_INDEX;
1178    }
1179
1180    public function getServerCount() {
1181        return $this->serverInfo->getServerCount();
1182    }
1183
1184    public function hasReplicaServers() {
1185        return $this->serverInfo->hasReplicaServers();
1186    }
1187
1188    public function hasStreamingReplicaServers() {
1189        return $this->serverInfo->hasStreamingReplicaServers();
1190    }
1191
1192    public function getServerName( $i ): string {
1193        return $this->serverInfo->getServerName( $i );
1194    }
1195
1196    public function getServerInfo( $i ) {
1197        return $this->serverInfo->getServerInfo( $i );
1198    }
1199
1200    public function getServerType( $i ) {
1201        return $this->serverInfo->getServerType( $i );
1202    }
1203
1204    public function getPrimaryPos() {
1205        $conn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX );
1206        if ( $conn ) {
1207            return $conn->getPrimaryPos();
1208        }
1209
1210        $conn = $this->getConnectionInternal( ServerInfo::WRITER_INDEX, self::CONN_SILENCE_ERRORS );
1211        // @phan-suppress-next-line PhanRedundantCondition
1212        if ( !$conn ) {
1213            $this->reportConnectionError();
1214        }
1215
1216        try {
1217            return $conn->getPrimaryPos();
1218        } finally {
1219            $this->closeConnection( $conn );
1220        }
1221    }
1222
1223    /**
1224     * Apply updated configuration.
1225     *
1226     * This only unregisters servers that were removed in the new configuration.
1227     * It does not register new servers nor update the group load weights.
1228     *
1229     * This invalidates any open connections. However, existing connections may continue to be
1230     * used while they are in an active transaction. In that case, the old connection will be
1231     * discarded on the first operation after the transaction is complete. The next operation
1232     * will use a new connection based on the new configuration.
1233     *
1234     * @internal for use by LBFactory::reconfigure()
1235     *
1236     * @see DBConnRef::ensureConnection()
1237     * @see LBFactory::reconfigure()
1238     *
1239     * @param array $params A database configuration array, see $wgLBFactoryConf.
1240     *
1241     * @return void
1242     */
1243    public function reconfigure( array $params ) {
1244        $anyServerDepooled = false;
1245
1246        $paramServers = $params['servers'];
1247        $newIndexByServerIndex = $this->serverInfo->reconfigureServers( $paramServers );
1248        foreach ( $newIndexByServerIndex as $i => $ni ) {
1249            if ( $ni !== null ) {
1250                // Server still exists in the new config
1251                $newWeightByGroup = $paramServers[$ni]['groupLoads'] ?? [];
1252                $newWeightByGroup[ILoadBalancer::GROUP_GENERIC] = $paramServers[$ni]['load'];
1253                // Check if the server was removed from any load groups
1254                foreach ( $this->groupLoads as $group => $weightByIndex ) {
1255                    if ( isset( $weightByIndex[$i] ) && !isset( $newWeightByGroup[$group] ) ) {
1256                        // Server no longer in this load group in the new config
1257                        $anyServerDepooled = true;
1258                        unset( $this->groupLoads[$group][$i] );
1259                    }
1260                }
1261            } else {
1262                // Server no longer exists in the new config
1263                $anyServerDepooled = true;
1264                // Note that if the primary server is depooled and a replica server promoted
1265                // to new primary, then DB_PRIMARY handles will fail with server index errors
1266                foreach ( $this->groupLoads as $group => $loads ) {
1267                    unset( $this->groupLoads[$group][$i] );
1268                }
1269            }
1270        }
1271
1272        if ( $anyServerDepooled ) {
1273            // NOTE: We could close all connection here, but some may be in the middle of
1274            //       a transaction. So instead, we leave it to DBConnRef to close the
1275            //       connection when it detects that the modcount has changed and no
1276            //       transaction is open.
1277            $this->logger->info( 'Reconfiguring dbs!' );
1278            // Unpin DB_REPLICA connection groups from server indexes
1279            $this->readIndexByGroup = [];
1280            // We could close all connection here, but some may be in the middle of a
1281            // transaction. So instead, we leave it to DBConnRef to close the connection
1282            // when it detects that the modcount has changed and no transaction is open.
1283            $this->conns = self::newTrackedConnectionsArray();
1284            // Bump modification counter to invalidate the connections held by DBConnRef
1285            // instances. This will cause the next call to a method on the DBConnRef
1286            // to get a new connection from getConnectionInternal()
1287            $this->modcount++;
1288        }
1289    }
1290
1291    public function disable( $fname = __METHOD__ ) {
1292        $this->closeAll( $fname );
1293        $this->disabled = true;
1294    }
1295
1296    public function closeAll( $fname = __METHOD__ ) {
1297        /** @noinspection PhpUnusedLocalVariableInspection */
1298        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1299        foreach ( $this->getOpenConnections() as $conn ) {
1300            $conn->close( $fname );
1301        }
1302
1303        $this->conns = self::newTrackedConnectionsArray();
1304    }
1305
1306    /**
1307     * Close a connection
1308     *
1309     * Using this function makes sure the LoadBalancer knows the connection is closed.
1310     * If you use $conn->close() directly, the load balancer won't update its state.
1311     *
1312     * @param IDatabase $conn
1313     */
1314    private function closeConnection( IDatabase $conn ) {
1315        if ( $conn instanceof DBConnRef ) {
1316            // Avoid calling close() but still leaving the handle in the pool
1317            throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
1318        }
1319
1320        $domain = $conn->getDomainID();
1321        $serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX );
1322        if ( $serverIndex === null ) {
1323            throw new UnexpectedValueException( "Handle on '$domain' missing server index" );
1324        }
1325
1326        $srvName = $this->serverInfo->getServerName( $serverIndex );
1327
1328        $found = false;
1329        foreach ( $this->conns as $type => $poolConnsByServer ) {
1330            $key = array_search( $conn, $poolConnsByServer[$serverIndex] ?? [], true );
1331            if ( $key !== false ) {
1332                $found = true;
1333                unset( $this->conns[$type][$serverIndex][$key] );
1334            }
1335        }
1336
1337        if ( !$found ) {
1338            $this->logger->warning(
1339                __METHOD__ .
1340                ": orphaned connection to database {$this->stringifyConn( $conn )} at '$srvName'."
1341            );
1342        }
1343
1344        $this->logger->debug(
1345            __METHOD__ .
1346            ": closing connection to database {$this->stringifyConn( $conn )} at '$srvName'."
1347        );
1348
1349        $conn->close( __METHOD__ );
1350    }
1351
1352    public function finalizePrimaryChanges( $fname = __METHOD__ ) {
1353        $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1354        /** @noinspection PhpUnusedLocalVariableInspection */
1355        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1356
1357        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1358        // Loop until callbacks stop adding callbacks on other connections
1359        $total = 0;
1360        do {
1361            $count = 0; // callbacks execution attempts
1362            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1363                // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1364                // Any error should cause all (peer) transactions to be rolled back together.
1365                $count += $conn->runOnTransactionPreCommitCallbacks();
1366            }
1367            $total += $count;
1368        } while ( $count > 0 );
1369        // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1370        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1371            $conn->setTrxEndCallbackSuppression( true );
1372        }
1373        $this->trxRoundStage = self::ROUND_FINALIZED;
1374
1375        return $total;
1376    }
1377
1378    public function approvePrimaryChanges( int $maxWriteDuration, $fname = __METHOD__ ) {
1379        $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1380        /** @noinspection PhpUnusedLocalVariableInspection */
1381        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1382
1383        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1384        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1385            // Any atomic sections should have been closed by now and there definitely should
1386            // not be any open transactions started by begin() from callers outside Database.
1387            if ( $conn->explicitTrxActive() ) {
1388                throw new DBTransactionError(
1389                    $conn,
1390                    "Explicit transaction still active; a caller might have failed to call " .
1391                    "endAtomic() or cancelAtomic()."
1392                );
1393            }
1394            // Assert that the time to replicate the transaction will be reasonable.
1395            // If this fails, then all DB transactions will be rollback back together.
1396            $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1397            if ( $maxWriteDuration > 0 ) {
1398                if ( $time > $maxWriteDuration ) {
1399                    $humanTimeSec = round( $time, 3 );
1400                    throw new DBTransactionSizeError(
1401                        $conn,
1402                        "Transaction spent {time}s in writes, exceeding the {$maxWriteDuration}s limit",
1403                        // Message parameters for: transaction-duration-limit-exceeded
1404                        [ $time, $maxWriteDuration ],
1405                        null,
1406                        [ 'time' => $humanTimeSec ]
1407                    );
1408                } elseif ( $time > 0 ) {
1409                    $timeMs = $time * 1000;
1410                    $humanTimeMs = round( $timeMs, $timeMs > 1 ? 0 : 3 );
1411                    $this->logger->debug(
1412                        "Transaction spent {time_ms}ms in writes, under the {$maxWriteDuration}s limit",
1413                        [ 'time_ms' => $humanTimeMs ]
1414                    );
1415                }
1416            }
1417            // If a connection sits idle for too long it might be dropped, causing transaction
1418            // writes and session locks to be lost. Ping all the server connections before making
1419            // any attempt to commit the transactions belonging to the active transaction round.
1420            if ( $conn->writesOrCallbacksPending() || $conn->sessionLocksPending() ) {
1421                if ( !$conn->ping() ) {
1422                    throw new DBTransactionError(
1423                        $conn,
1424                        "Pre-commit ping failed on server {$conn->getServerName()}"
1425                    );
1426                }
1427            }
1428        }
1429        $this->trxRoundStage = self::ROUND_APPROVED;
1430    }
1431
1432    public function beginPrimaryChanges( $fname = __METHOD__ ) {
1433        if ( $this->trxRoundId !== false ) {
1434            throw new DBTransactionError(
1435                null,
1436                "Transaction round '{$this->trxRoundId}' already started"
1437            );
1438        }
1439        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1440        /** @noinspection PhpUnusedLocalVariableInspection */
1441        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1442
1443        // Clear any empty transactions (no writes/callbacks) from the implicit round
1444        $this->flushPrimarySnapshots( $fname );
1445
1446        $this->trxRoundId = $fname;
1447        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1448        // Mark applicable handles as participating in this explicit transaction round.
1449        // For each of these handles, any writes and callbacks will be tied to a single
1450        // transaction. The (peer) handles will reject begin()/commit() calls unless they
1451        // are part of an en masse commit or an en masse rollback.
1452        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1453            $this->applyTransactionRoundFlags( $conn );
1454        }
1455        $this->trxRoundStage = self::ROUND_CURSORY;
1456    }
1457
1458    public function commitPrimaryChanges( $fname = __METHOD__ ) {
1459        $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1460        /** @noinspection PhpUnusedLocalVariableInspection */
1461        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1462
1463        $failures = [];
1464
1465        $restore = ( $this->trxRoundId !== false );
1466        $this->trxRoundId = false;
1467        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1468        // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1469        // Note that callbacks should already be suppressed due to finalizePrimaryChanges().
1470        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1471            try {
1472                $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1473            } catch ( DBError $e ) {
1474                ( $this->errorLogger )( $e );
1475                $failures[] = "{$conn->getServerName()}{$e->getMessage()}";
1476            }
1477        }
1478        if ( $failures ) {
1479            throw new DBTransactionError(
1480                null,
1481                "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1482            );
1483        }
1484        if ( $restore ) {
1485            // Unmark handles as participating in this explicit transaction round
1486            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1487                $this->undoTransactionRoundFlags( $conn );
1488            }
1489        }
1490        $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1491    }
1492
1493    public function runPrimaryTransactionIdleCallbacks( $fname = __METHOD__ ) {
1494        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1495            $type = IDatabase::TRIGGER_COMMIT;
1496        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1497            $type = IDatabase::TRIGGER_ROLLBACK;
1498        } else {
1499            throw new DBTransactionError(
1500                null,
1501                "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1502            );
1503        }
1504        /** @noinspection PhpUnusedLocalVariableInspection */
1505        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1506
1507        $oldStage = $this->trxRoundStage;
1508        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1509
1510        // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1511        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1512            $conn->setTrxEndCallbackSuppression( false );
1513        }
1514
1515        $errors = [];
1516        $fname = __METHOD__;
1517        // Loop until callbacks stop adding callbacks on other connections
1518        do {
1519            // Run any pending callbacks for each connection...
1520            $count = 0; // callback execution attempts
1521            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1522                if ( $conn->trxLevel() ) {
1523                    continue; // retry in the next iteration, after commit() is called
1524                }
1525                $count += $conn->runOnTransactionIdleCallbacks( $type, $errors );
1526            }
1527            // Clear out any active transactions left over from callbacks...
1528            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1529                if ( $conn->writesPending() ) {
1530                    // A callback from another handle wrote to this one and DBO_TRX is set
1531                    $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
1532                    $this->logger->warning(
1533                        "$fname: found writes pending ($fnames).",
1534                        $this->getConnLogContext(
1535                            $conn,
1536                            [ 'exception' => new RuntimeException() ]
1537                        )
1538                    );
1539                } elseif ( $conn->trxLevel() ) {
1540                    // A callback from another handle read from this one and DBO_TRX is set,
1541                    // which can easily happen if there is only one DB (no replicas)
1542                    $this->logger->debug( "$fname: found empty transaction." );
1543                }
1544                try {
1545                    $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1546                } catch ( DBError $ex ) {
1547                    $errors[] = $ex;
1548                }
1549            }
1550        } while ( $count > 0 );
1551
1552        $this->trxRoundStage = $oldStage;
1553
1554        return $errors[0] ?? null;
1555    }
1556
1557    public function runPrimaryTransactionListenerCallbacks( $fname = __METHOD__ ) {
1558        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1559            $type = IDatabase::TRIGGER_COMMIT;
1560        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1561            $type = IDatabase::TRIGGER_ROLLBACK;
1562        } else {
1563            throw new DBTransactionError(
1564                null,
1565                "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1566            );
1567        }
1568        /** @noinspection PhpUnusedLocalVariableInspection */
1569        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1570
1571        $errors = [];
1572        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1573        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1574            $conn->runTransactionListenerCallbacks( $type, $errors );
1575        }
1576        $this->trxRoundStage = self::ROUND_CURSORY;
1577
1578        return $errors[0] ?? null;
1579    }
1580
1581    public function rollbackPrimaryChanges( $fname = __METHOD__ ) {
1582        /** @noinspection PhpUnusedLocalVariableInspection */
1583        $scope = ScopedCallback::newScopedIgnoreUserAbort();
1584
1585        $restore = ( $this->trxRoundId !== false );
1586        $this->trxRoundId = false;
1587        $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1588        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1589            $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1590        }
1591        if ( $restore ) {
1592            // Unmark handles as participating in this explicit transaction round
1593            foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1594                $this->undoTransactionRoundFlags( $conn );
1595            }
1596        }
1597        $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1598    }
1599
1600    public function flushPrimarySessions( $fname = __METHOD__ ) {
1601        $this->assertTransactionRoundStage( [ self::ROUND_CURSORY ] );
1602        if ( $this->hasPrimaryChanges() ) {
1603            // Any transaction should have been rolled back beforehand
1604            throw new DBTransactionError( null, "Cannot reset session while writes are pending" );
1605        }
1606
1607        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1608            $conn->flushSession( $fname, $conn::FLUSHING_ALL_PEERS );
1609        }
1610    }
1611
1612    /**
1613     * @param string|string[] $stage
1614     * @throws DBTransactionError
1615     */
1616    private function assertTransactionRoundStage( $stage ) {
1617        $stages = (array)$stage;
1618
1619        if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1620            $stageList = implode(
1621                '/',
1622                array_map( static function ( $v ) {
1623                    return "'$v'";
1624                }, $stages )
1625            );
1626            throw new DBTransactionError(
1627                null,
1628                "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1629            );
1630        }
1631    }
1632
1633    /**
1634     * Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round
1635     *
1636     * Some servers may have neither flag enabled, meaning that they opt out of such
1637     * transaction rounds and remain in auto-commit mode. Such behavior might be desired
1638     * when a DB server is used for something like simple key/value storage.
1639     *
1640     * @param Database $conn
1641     */
1642    private function applyTransactionRoundFlags( Database $conn ) {
1643        if ( $conn->getLBInfo( self::INFO_CONN_CATEGORY ) !== self::CATEGORY_ROUND ) {
1644            return; // transaction rounds do not apply to these connections
1645        }
1646
1647        if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1648            // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1649            // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1650            $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1651        }
1652
1653        if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1654            $conn->setLBInfo( $conn::LB_TRX_ROUND_ID, $this->trxRoundId );
1655        }
1656    }
1657
1658    /**
1659     * @param Database $conn
1660     */
1661    private function undoTransactionRoundFlags( Database $conn ) {
1662        if ( $conn->getLBInfo( self::INFO_CONN_CATEGORY ) !== self::CATEGORY_ROUND ) {
1663            return; // transaction rounds do not apply to these connections
1664        }
1665
1666        if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1667            $conn->setLBInfo( $conn::LB_TRX_ROUND_ID, null ); // remove the round ID
1668        }
1669
1670        if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1671            $conn->restoreFlags( $conn::RESTORE_PRIOR );
1672        }
1673    }
1674
1675    public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1676        foreach ( $this->conns as $poolConnsByServer ) {
1677            foreach ( $poolConnsByServer as $serverIndex => $serverConns ) {
1678                if ( $serverIndex === ServerInfo::WRITER_INDEX ) {
1679                    continue; // skip primary
1680                }
1681                foreach ( $serverConns as $conn ) {
1682                    $conn->flushSnapshot( $fname );
1683                }
1684            }
1685        }
1686    }
1687
1688    public function flushPrimarySnapshots( $fname = __METHOD__ ) {
1689        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1690            $conn->flushSnapshot( $fname );
1691        }
1692    }
1693
1694    public function hasPrimaryConnection() {
1695        return (bool)$this->getAnyOpenConnection( ServerInfo::WRITER_INDEX );
1696    }
1697
1698    public function hasPrimaryChanges() {
1699        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1700            if ( $conn->writesOrCallbacksPending() ) {
1701                return true;
1702            }
1703        }
1704
1705        return false;
1706    }
1707
1708    public function lastPrimaryChangeTimestamp() {
1709        $lastTime = false;
1710        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1711            $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1712        }
1713
1714        return $lastTime;
1715    }
1716
1717    public function hasOrMadeRecentPrimaryChanges( $age = null ) {
1718        $age ??= self::MAX_WAIT_DEFAULT;
1719
1720        return ( $this->hasPrimaryChanges()
1721            || $this->lastPrimaryChangeTimestamp() > microtime( true ) - $age );
1722    }
1723
1724    public function pendingPrimaryChangeCallers() {
1725        $fnames = [];
1726        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1727            $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1728        }
1729
1730        return $fnames;
1731    }
1732
1733    public function explicitTrxActive() {
1734        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1735            if ( $conn->explicitTrxActive() ) {
1736                return true;
1737            }
1738        }
1739        return false;
1740    }
1741
1742    private function setLaggedReplicaMode(): void {
1743        $this->laggedReplicaMode = true;
1744        $this->logger->warning( __METHOD__ . ": setting lagged replica mode" );
1745    }
1746
1747    public function laggedReplicaUsed() {
1748        return $this->laggedReplicaMode;
1749    }
1750
1751    public function getReadOnlyReason( $domain = false ) {
1752        if ( $this->readOnlyReason !== false ) {
1753            return $this->readOnlyReason;
1754        } elseif ( $this->isPrimaryRunningReadOnly() ) {
1755            return 'The primary database server is running in read-only mode.';
1756        }
1757
1758        return false;
1759    }
1760
1761    /**
1762     * @note This method suppresses DBError exceptions in order to avoid severe downtime
1763     * @param IDatabase|null $conn Recently acquired primary connection; null if not applicable
1764     * @return bool Whether the entire primary DB server or the local domain DB is read-only
1765     */
1766    private function isPrimaryRunningReadOnly( IDatabase $conn = null ) {
1767        // Context will often be HTTP GET/HEAD; heavily cache the results
1768        return (bool)$this->wanCache->getWithSetCallback(
1769            // Note that table prefixes are not related to server-side read-only mode
1770            $this->wanCache->makeGlobalKey(
1771                'rdbms-server-readonly',
1772                $this->serverInfo->getPrimaryServerName()
1773            ),
1774            self::TTL_CACHE_READONLY,
1775            function ( $oldValue ) use ( $conn ) {
1776                $scope = $this->trxProfiler->silenceForScope();
1777                $conn ??= $this->getServerConnection(
1778                    ServerInfo::WRITER_INDEX,
1779                    self::DOMAIN_ANY,
1780                    self::CONN_SILENCE_ERRORS
1781                );
1782                if ( $conn ) {
1783                    try {
1784                        $value = (int)$conn->serverIsReadOnly();
1785                    } catch ( DBError $e ) {
1786                        $value = is_int( $oldValue ) ? $oldValue : 0;
1787                    }
1788                } else {
1789                    $value = 0;
1790                }
1791                ScopedCallback::consume( $scope );
1792
1793                return $value;
1794            },
1795            [
1796                'busyValue' => 0,
1797                'pcTTL' => WANObjectCache::TTL_PROC_LONG
1798            ]
1799        );
1800    }
1801
1802    public function pingAll() {
1803        $success = true;
1804        foreach ( $this->getOpenConnections() as $conn ) {
1805            if ( !$conn->ping() ) {
1806                $success = false;
1807            }
1808        }
1809
1810        return $success;
1811    }
1812
1813    /**
1814     * Get all open connections
1815     * @return \Generator|Database[]
1816     */
1817    private function getOpenConnections() {
1818        foreach ( $this->conns as $poolConnsByServer ) {
1819            foreach ( $poolConnsByServer as $serverConns ) {
1820                foreach ( $serverConns as $conn ) {
1821                    yield $conn;
1822                }
1823            }
1824        }
1825    }
1826
1827    /**
1828     * Get all open primary connections
1829     * @return \Generator|Database[]
1830     */
1831    private function getOpenPrimaryConnections() {
1832        foreach ( $this->conns as $poolConnsByServer ) {
1833            /** @var IDatabase $conn */
1834            foreach ( ( $poolConnsByServer[ServerInfo::WRITER_INDEX] ?? [] ) as $conn ) {
1835                yield $conn;
1836            }
1837        }
1838    }
1839
1840    public function getMaxLag() {
1841        $host = '';
1842        $maxLag = -1;
1843        $maxIndex = 0;
1844
1845        if ( $this->serverInfo->hasReplicaServers() ) {
1846            $lagTimes = $this->getLagTimes();
1847            foreach ( $lagTimes as $i => $lag ) {
1848                // Allowing the value to be unset due to stale cache (T361824)
1849                $load = $this->groupLoads[self::GROUP_GENERIC][$i] ?? 0;
1850                if ( $load > 0 && $lag > $maxLag ) {
1851                    $maxLag = $lag;
1852                    $host = $this->serverInfo->getServerInfoStrict( $i, 'host' );
1853                    $maxIndex = $i;
1854                }
1855            }
1856        }
1857
1858        return [ $host, $maxLag, $maxIndex ];
1859    }
1860
1861    public function getLagTimes() {
1862        if ( !$this->hasReplicaServers() ) {
1863            return [ ServerInfo::WRITER_INDEX => 0 ]; // no replication = no lag
1864        }
1865        return $this->wanCache->getWithSetCallback(
1866            $this->wanCache->makeGlobalKey( 'rdbms-lags', $this->clusterName ?? '' ),
1867            // Add jitter to avoid stampede
1868            10 + mt_rand( 1, 10 ),
1869            function () {
1870                $lags = [];
1871                foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) {
1872                    $conn = $this->getServerConnection(
1873                        $i,
1874                        self::DOMAIN_ANY,
1875                        self::CONN_SILENCE_ERRORS | self::CONN_UNTRACKED_GAUGE
1876                    );
1877                    if ( $conn ) {
1878                        $lags[$i] = $conn->getLag();
1879                        $conn->close();
1880                    } else {
1881                        $lags[$i] = false;
1882                    }
1883                }
1884                return $lags;
1885            },
1886            [ 'lockTSE' => 30 ]
1887        );
1888    }
1889
1890    public function waitForPrimaryPos( IDatabase $conn ) {
1891        if ( $conn->getLBInfo( self::INFO_SERVER_INDEX ) === ServerInfo::WRITER_INDEX ) {
1892            return true; // not a replica DB server
1893        }
1894
1895        // Get the current primary DB position, opening a connection only if needed
1896        $flags = self::CONN_SILENCE_ERRORS;
1897        $primaryConn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX, $flags );
1898        if ( $primaryConn ) {
1899            $pos = $primaryConn->getPrimaryPos();
1900        } else {
1901            $primaryConn = $this->getServerConnection( ServerInfo::WRITER_INDEX, self::DOMAIN_ANY, $flags );
1902            if ( !$primaryConn ) {
1903                throw new DBReplicationWaitError(
1904                    null,
1905                    "Could not obtain a primary database connection to get the position"
1906                );
1907            }
1908            $pos = $primaryConn->getPrimaryPos();
1909            $this->closeConnection( $primaryConn );
1910        }
1911
1912        if ( $pos instanceof DBPrimaryPos ) {
1913            $this->logger->debug( __METHOD__ . ': waiting' );
1914            $result = $conn->primaryPosWait( $pos, self::MAX_WAIT_DEFAULT );
1915            $ok = ( $result !== null && $result != -1 );
1916            if ( $ok ) {
1917                $this->logger->debug( __METHOD__ . ': done waiting (success)' );
1918            } else {
1919                $this->logger->debug( __METHOD__ . ': done waiting (failure)' );
1920            }
1921        } else {
1922            $ok = false; // something is misconfigured
1923            $this->logger->error(
1924                __METHOD__ . ': could not get primary pos for {db_server}',
1925                $this->getConnLogContext( $conn, [ 'exception' => new RuntimeException() ] )
1926            );
1927        }
1928
1929        return $ok;
1930    }
1931
1932    public function setTransactionListener( $name, callable $callback = null ) {
1933        if ( $callback ) {
1934            $this->trxRecurringCallbacks[$name] = $callback;
1935        } else {
1936            unset( $this->trxRecurringCallbacks[$name] );
1937        }
1938        foreach ( $this->getOpenPrimaryConnections() as $conn ) {
1939            $conn->setTransactionListener( $name, $callback );
1940        }
1941    }
1942
1943    public function setTableAliases( array $aliases ) {
1944        $this->tableAliases = $aliases;
1945    }
1946
1947    public function setIndexAliases( array $aliases ) {
1948        $this->indexAliases = $aliases;
1949    }
1950
1951    public function setDomainAliases( array $aliases ) {
1952        $this->domainAliases = $aliases;
1953    }
1954
1955    public function setLocalDomainPrefix( $prefix ) {
1956        $oldLocalDomain = $this->localDomain;
1957        $this->localDomain = new DatabaseDomain(
1958            $this->localDomain->getDatabase(),
1959            $this->localDomain->getSchema(),
1960            $prefix
1961        );
1962
1963        // Update the prefix for existing connections.
1964        // Existing DBConnRef handles will not be affected.
1965        foreach ( $this->getOpenConnections() as $conn ) {
1966            if ( $oldLocalDomain->equals( $conn->getDomainID() ) ) {
1967                $conn->tablePrefix( $prefix );
1968            }
1969        }
1970    }
1971
1972    public function redefineLocalDomain( $domain ) {
1973        $this->closeAll( __METHOD__ );
1974        $this->localDomain = DatabaseDomain::newFromId( $domain );
1975    }
1976
1977    public function setTempTablesOnlyMode( $value, $domain ) {
1978        $old = $this->tempTablesOnlyMode[$domain] ?? false;
1979        if ( $value ) {
1980            $this->tempTablesOnlyMode[$domain] = true;
1981        } else {
1982            unset( $this->tempTablesOnlyMode[$domain] );
1983        }
1984
1985        return $old;
1986    }
1987
1988    /**
1989     * @param IDatabase $conn
1990     * @return string Description of a connection handle for log messages
1991     * @throws InvalidArgumentException
1992     */
1993    private function stringifyConn( IDatabase $conn ) {
1994        return $conn->getLBInfo( self::INFO_SERVER_INDEX ) . '/' . $conn->getDomainID();
1995    }
1996
1997    /**
1998     * @param int $flags A bitfield of flags
1999     * @param int $bit Bit flag constant
2000     * @return bool Whether the bit field has the specified bit flag set
2001     */
2002    private function fieldHasBit( int $flags, int $bit ) {
2003        return ( ( $flags & $bit ) === $bit );
2004    }
2005
2006    /**
2007     * Create a log context to pass to PSR-3 logger functions.
2008     *
2009     * @param IDatabase $conn
2010     * @param array $extras Additional data to add to context
2011     * @return array
2012     */
2013    protected function getConnLogContext( IDatabase $conn, array $extras = [] ) {
2014        return array_merge(
2015            [
2016                'db_server' => $conn->getServerName(),
2017                'db_domain' => $conn->getDomainID()
2018            ],
2019            $extras
2020        );
2021    }
2022
2023    /**
2024     * @param float|null &$time Mock UNIX timestamp for testing
2025     * @internal
2026     * @codeCoverageIgnore
2027     */
2028    public function setMockTime( &$time ) {
2029        $this->loadMonitor->setMockTime( $time );
2030    }
2031}