Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
56.39% covered (warning)
56.39%
172 / 305
26.09% covered (danger)
26.09%
12 / 46
CRAP
0.00% covered (danger)
0.00%
0 / 1
LBFactory
56.39% covered (warning)
56.39%
172 / 305
26.09% covered (danger)
26.09%
12 / 46
1622.90
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 configure
89.66% covered (warning)
89.66%
26 / 29
0.00% covered (danger)
0.00%
0 / 1
7.05
 destroy
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 autoReconfigure
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 reconfigure
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
3
 getLocalDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 shutdown
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
4
 getAllLBs
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 getLBsForOwner
n/a
0 / 0
n/a
0 / 0
0
 flushReplicaSnapshots
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 beginPrimaryChanges
71.43% covered (warning)
71.43%
10 / 14
0.00% covered (danger)
0.00%
0 / 1
4.37
 commitPrimaryChanges
80.00% covered (warning)
80.00%
20 / 25
0.00% covered (danger)
0.00%
0 / 1
8.51
 rollbackPrimaryChanges
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 flushPrimarySessions
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 executePostTransactionCallbacks
75.00% covered (warning)
75.00%
12 / 16
0.00% covered (danger)
0.00%
0 / 1
6.56
 hasTransactionRound
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 isReadyForRoundOperations
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 logIfMultiDbTransaction
50.00% covered (danger)
50.00%
6 / 12
0.00% covered (danger)
0.00%
0 / 1
8.12
 hasPrimaryChanges
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 laggedReplicaUsed
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 hasOrMadeRecentPrimaryChanges
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 waitForReplication
86.96% covered (warning)
86.96%
20 / 23
0.00% covered (danger)
0.00%
0 / 1
11.27
 setWaitForReplicationListener
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
2.15
 getEmptyTransactionTicket
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 getPrimaryDatabase
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getAutoCommitPrimaryConnection
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getReplicaDatabase
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 getLoadBalancer
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
5
 getMappedDatabase
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getMappedDomain
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
3.33
 isLocalDomain
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
42
 isSharedVirtualDomain
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
20
 commitAndWaitForReplication
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
30
 disableChronologyProtection
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setDefaultGroupName
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 baseLoadBalancerParams
96.15% covered (success)
96.15%
25 / 26
0.00% covered (danger)
0.00%
0 / 1
3
 initLoadBalancer
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 setTableAliases
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setDomainAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getTransactionProfiler
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setLocalDomainPrefix
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 redefineLocalDomain
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 closeAll
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 setAgentName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 hasStreamingReplicaServers
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 setDefaultReplicationWaitTimeout
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 assertTransactionRoundStage
20.00% covered (danger)
20.00%
1 / 5
0.00% covered (danger)
0.00%
0 / 1
4.05
1<?php
2/**
3 * @license GPL-2.0-or-later
4 * @file
5 */
6namespace Wikimedia\Rdbms;
7
8use Exception;
9use Generator;
10use Psr\Log\LoggerInterface;
11use Psr\Log\NullLogger;
12use RuntimeException;
13use Throwable;
14use Wikimedia\ObjectCache\BagOStuff;
15use Wikimedia\ObjectCache\EmptyBagOStuff;
16use Wikimedia\ObjectCache\WANObjectCache;
17use Wikimedia\RequestTimeout\CriticalSectionProvider;
18use Wikimedia\ScopedCallback;
19use Wikimedia\Stats\StatsFactory;
20use Wikimedia\Telemetry\NoopTracer;
21use Wikimedia\Telemetry\TracerInterface;
22
23/**
24 * @see ILBFactory
25 * @ingroup Database
26 */
27abstract class LBFactory implements ILBFactory {
28    /** @var CriticalSectionProvider|null */
29    private $csProvider;
30    /**
31     * @var callable|null An optional callback that returns a ScopedCallback instance,
32     * meant to profile the actual query execution in {@see Database::doQuery}
33     */
34    private $profiler;
35    /** @var TransactionProfiler */
36    private $trxProfiler;
37    /** @var TracerInterface */
38    private $tracer;
39    /** @var StatsFactory */
40    private $statsFactory;
41    /** @var LoggerInterface */
42    private $logger;
43    /** @var callable Error logger */
44    private $errorLogger;
45    /** @var callable Deprecation logger */
46    private $deprecationLogger;
47
48    /** @var ChronologyProtector */
49    protected $chronologyProtector;
50    /** @var BagOStuff */
51    protected $srvCache;
52    /** @var WANObjectCache */
53    protected $wanCache;
54    /** @var DatabaseDomain Local domain */
55    protected $localDomain;
56
57    /** @var bool Whether this PHP instance is for a CLI script */
58    private $cliMode;
59    /** @var string Agent name for query profiling */
60    private $agent;
61
62    /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
63    private $tableAliases = [];
64    /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */
65    protected $domainAliases = [];
66    /** @var array[] Map of virtual domain to array of cluster and domain */
67    protected array $virtualDomainsMapping = [];
68    /** @var string[] List of registered virtual domains */
69    protected array $virtualDomains = [];
70    /** @var callable[] */
71    private $replicationWaitCallbacks = [];
72
73    /** @var int|null Ticket used to delegate transaction ownership */
74    private $ticket;
75    /** @var string|null Active explicit transaction round owner or null if none */
76    private $trxRoundFname = null;
77    /** @var string One of the ROUND_* class constants */
78    private $trxRoundStage = self::ROUND_CURSORY;
79    /** @var int Default replication wait timeout */
80    private $replicationWaitTimeout;
81
82    /** @var string|false Reason all LBs are read-only or false if not */
83    protected $readOnlyReason = false;
84
85    /** @var string|null */
86    private $defaultGroup = null;
87    private bool $shuffleSharding = false;
88    private ?string $uniqueIdentifier = null;
89
90    private const ROUND_CURSORY = 'cursory';
91    private const ROUND_BEGINNING = 'within-begin';
92    private const ROUND_COMMITTING = 'within-commit';
93    private const ROUND_ROLLING_BACK = 'within-rollback';
94    private const ROUND_COMMIT_CALLBACKS = 'within-commit-callbacks';
95    private const ROUND_ROLLBACK_CALLBACKS = 'within-rollback-callbacks';
96    private const ROUND_ROLLBACK_SESSIONS = 'within-rollback-session';
97
98    /**
99     * @var callable
100     */
101    private $configCallback = null;
102
103    public function __construct( array $conf ) {
104        $this->configure( $conf );
105
106        if ( isset( $conf['configCallback'] ) ) {
107            $this->configCallback = $conf['configCallback'];
108        }
109    }
110
111    protected function configure( array $conf ): void {
112        $this->localDomain = isset( $conf['localDomain'] )
113            ? DatabaseDomain::newFromId( $conf['localDomain'] )
114            : DatabaseDomain::newUnspecified();
115
116        if ( isset( $conf['readOnlyReason'] ) && is_string( $conf['readOnlyReason'] ) ) {
117            $this->readOnlyReason = $conf['readOnlyReason'];
118        }
119
120        $this->chronologyProtector = $conf['chronologyProtector'] ?? new ChronologyProtector();
121        $this->srvCache = $conf['srvCache'] ?? new EmptyBagOStuff();
122        $this->wanCache = $conf['wanCache'] ?? WANObjectCache::newEmpty();
123
124        $this->logger = $conf['logger'] ?? new NullLogger();
125        $this->errorLogger = $conf['errorLogger'] ?? static function ( Throwable $e ) {
126                trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
127        };
128        $this->deprecationLogger = $conf['deprecationLogger'] ?? static function ( $msg ) {
129                trigger_error( $msg, E_USER_DEPRECATED );
130        };
131
132        $this->profiler = $conf['profiler'] ?? null;
133        $this->trxProfiler = $conf['trxProfiler'] ?? new TransactionProfiler();
134        $this->statsFactory = $conf['statsFactory'] ?? StatsFactory::newNull();
135        $this->tracer = $conf['tracer'] ?? new NoopTracer();
136
137        $this->csProvider = $conf['criticalSectionProvider'] ?? null;
138
139        $this->cliMode = $conf['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
140        $this->agent = $conf['agent'] ?? '';
141        $this->replicationWaitTimeout = $this->cliMode ? 60 : 1;
142        $this->virtualDomainsMapping = $conf['virtualDomainsMapping'] ?? [];
143        $this->virtualDomains = $conf['virtualDomains'] ?? [];
144
145        $this->shuffleSharding = $conf['shuffleSharding'] ?? false;
146        $this->uniqueIdentifier = $conf['uniqueIdentifier'] ?? null;
147
148        static $nextTicket;
149        $this->ticket = $nextTicket = ( is_int( $nextTicket ) ? $nextTicket++ : mt_rand() );
150    }
151
152    public function destroy() {
153        /** @noinspection PhpUnusedLocalVariableInspection */
154        $scope = ScopedCallback::newScopedIgnoreUserAbort();
155
156        foreach ( $this->getLBsForOwner() as $lb ) {
157            $lb->disable( __METHOD__ );
158        }
159    }
160
161    /**
162     * Reload config using the callback passed defined $config['configCallback'].
163     *
164     * If the config returned by the callback is different from the existing config,
165     * this calls reconfigure() on all load balancers, which causes them to invalidate
166     * any existing connections and re-connect using the new configuration.
167     *
168     * Long-running processes should call this from time to time
169     * (but not too often, because it is somewhat expensive),
170     * preferably after each batch.
171     * Maintenance scripts can do that by calling $this->waitForReplication(),
172     * which calls this method.
173     */
174    public function autoReconfigure(): void {
175        if ( !$this->configCallback ) {
176            return;
177        }
178
179        $conf = ( $this->configCallback )();
180        if ( $conf ) {
181            $this->reconfigure( $conf );
182        }
183    }
184
185    /**
186     * Reconfigure using the given config array.
187     * Any fields omitted from $conf will be taken from the current config.
188     *
189     * If the config changed, this calls reconfigure() on all load balancers,
190     * which causes them to close all existing connections.
191     *
192     * @note This invalidates the current transaction ticket.
193     *
194     * @warning This must only be called in top level code such as the execute()
195     * method of a maintenance script. Any database connection in use when this
196     * method is called will become defunct.
197     *
198     * @since 1.39
199     *
200     * @param array $conf A configuration array, using the same structure as
201     *        the one passed to the constructor (see also $wgLBFactoryConf).
202     */
203    public function reconfigure( array $conf ): void {
204        if ( !$conf ) {
205            return;
206        }
207
208        foreach ( $this->getLBsForOwner() as $lb ) {
209            $lb->reconfigure( $conf );
210        }
211    }
212
213    public function getLocalDomainID(): string {
214        return $this->localDomain->getId();
215    }
216
217    /** @inheritDoc */
218    public function shutdown(
219        $flags = self::SHUTDOWN_NORMAL,
220        ?callable $workCallback = null,
221        &$cpIndex = null,
222        &$cpClientId = null
223    ) {
224        /** @noinspection PhpUnusedLocalVariableInspection */
225        $scope = ScopedCallback::newScopedIgnoreUserAbort();
226
227        if ( ( $flags & self::SHUTDOWN_NO_CHRONPROT ) != self::SHUTDOWN_NO_CHRONPROT ) {
228            // Remark all of the relevant DB primary positions
229            foreach ( $this->getLBsForOwner() as $lb ) {
230                if ( $lb->hasPrimaryConnection() ) {
231                    $this->chronologyProtector->stageSessionPrimaryPos( $lb );
232                }
233            }
234            // Write the positions to the persistent stash
235            $this->chronologyProtector->persistSessionReplicationPositions( $cpIndex );
236            $this->logger->debug( __METHOD__ . ': finished ChronologyProtector shutdown' );
237        }
238        $cpClientId = $this->chronologyProtector->getClientId();
239
240        $this->commitPrimaryChanges( __METHOD__ );
241
242        $this->logger->debug( 'LBFactory shutdown completed' );
243    }
244
245    /** @inheritDoc */
246    public function getAllLBs() {
247        foreach ( $this->getLBsForOwner() as $lb ) {
248            yield $lb;
249        }
250    }
251
252    /**
253     * Get all tracked load balancers with the internal "for owner" interface.
254     *
255     * @return Generator|ILoadBalancerForOwner[]
256     */
257    abstract protected function getLBsForOwner();
258
259    /** @inheritDoc */
260    public function flushReplicaSnapshots( $fname = __METHOD__ ) {
261        if ( $this->trxRoundFname !== null && $this->trxRoundFname !== $fname ) {
262            $this->logger->warning(
263                "$fname: transaction round '{$this->trxRoundFname}' still running",
264                [ 'exception' => new RuntimeException() ]
265            );
266        }
267        foreach ( $this->getLBsForOwner() as $lb ) {
268            $lb->flushReplicaSnapshots( $fname );
269        }
270    }
271
272    /** @inheritDoc */
273    final public function beginPrimaryChanges( $fname = __METHOD__ ) {
274        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
275        /** @noinspection PhpUnusedLocalVariableInspection */
276        $scope = ScopedCallback::newScopedIgnoreUserAbort();
277
278        foreach ( $this->getLBsForOwner() as $lb ) {
279            $lb->flushReplicaSnapshots( $fname );
280        }
281
282        $this->trxRoundStage = self::ROUND_BEGINNING;
283        if ( $this->trxRoundFname !== null ) {
284            throw new DBTransactionError(
285                null,
286                "$fname: transaction round '{$this->trxRoundFname}' already started"
287            );
288        }
289        $this->trxRoundFname = $fname;
290        // Flush snapshots and appropriately set DBO_TRX on primary connections
291        foreach ( $this->getLBsForOwner() as $lb ) {
292            $lb->beginPrimaryChanges( $fname );
293        }
294        $this->trxRoundStage = self::ROUND_CURSORY;
295    }
296
297    /** @inheritDoc */
298    final public function commitPrimaryChanges( $fname = __METHOD__, int $maxWriteDuration = 0 ) {
299        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
300        /** @noinspection PhpUnusedLocalVariableInspection */
301        $scope = ScopedCallback::newScopedIgnoreUserAbort();
302
303        $this->trxRoundStage = self::ROUND_COMMITTING;
304        if ( $this->trxRoundFname !== null && $this->trxRoundFname !== $fname ) {
305            throw new DBTransactionError(
306                null,
307                "$fname: transaction round '{$this->trxRoundFname}' still running"
308            );
309        }
310        // Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
311        do {
312            $count = 0; // number of callbacks executed this iteration
313            foreach ( $this->getLBsForOwner() as $lb ) {
314                $count += $lb->finalizePrimaryChanges( $fname );
315            }
316        } while ( $count > 0 );
317        $this->trxRoundFname = null;
318        // Perform pre-commit checks, aborting on failure
319        foreach ( $this->getLBsForOwner() as $lb ) {
320            $lb->approvePrimaryChanges( $maxWriteDuration, $fname );
321        }
322        // Log the DBs and methods involved in multi-DB transactions
323        $this->logIfMultiDbTransaction();
324        // Actually perform the commit on all primary DB connections and revert DBO_TRX
325        foreach ( $this->getLBsForOwner() as $lb ) {
326            $lb->commitPrimaryChanges( $fname );
327        }
328        // Run all post-commit callbacks in a separate step
329        $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
330        $e = $this->executePostTransactionCallbacks();
331        $this->trxRoundStage = self::ROUND_CURSORY;
332        // Throw any last post-commit callback error
333        if ( $e instanceof Exception ) {
334            throw $e;
335        }
336
337        foreach ( $this->getLBsForOwner() as $lb ) {
338            $lb->flushReplicaSnapshots( $fname );
339        }
340    }
341
342    /** @inheritDoc */
343    final public function rollbackPrimaryChanges( $fname = __METHOD__ ) {
344        /** @noinspection PhpUnusedLocalVariableInspection */
345        $scope = ScopedCallback::newScopedIgnoreUserAbort();
346
347        $this->trxRoundStage = self::ROUND_ROLLING_BACK;
348        $this->trxRoundFname = null;
349        // Actually perform the rollback on all primary DB connections and revert DBO_TRX
350        foreach ( $this->getLBsForOwner() as $lb ) {
351            $lb->rollbackPrimaryChanges( $fname );
352        }
353        // Run all post-commit callbacks in a separate step
354        $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
355        $this->executePostTransactionCallbacks();
356        $this->trxRoundStage = self::ROUND_CURSORY;
357
358        foreach ( $this->getLBsForOwner() as $lb ) {
359            $lb->flushReplicaSnapshots( $fname );
360        }
361    }
362
363    /** @inheritDoc */
364    final public function flushPrimarySessions( $fname = __METHOD__ ) {
365        /** @noinspection PhpUnusedLocalVariableInspection */
366        $scope = ScopedCallback::newScopedIgnoreUserAbort();
367
368        // Release named locks and table locks on all primary DB connections
369        $this->trxRoundStage = self::ROUND_ROLLBACK_SESSIONS;
370        foreach ( $this->getLBsForOwner() as $lb ) {
371            $lb->flushPrimarySessions( $fname );
372        }
373        $this->trxRoundStage = self::ROUND_CURSORY;
374    }
375
376    /**
377     * @return Exception|null
378     */
379    private function executePostTransactionCallbacks() {
380        $fname = __METHOD__;
381        // Run all post-commit callbacks until new ones stop getting added
382        $e = null; // first callback exception
383        $iterations = 0;
384        do {
385            // Run any callbacks on tracked load balancers
386            foreach ( $this->getLBsForOwner() as $lb ) {
387                $ex = $lb->runPrimaryTransactionIdleCallbacks( $fname );
388                $e = $e ?: $ex;
389            }
390            // T392913: log and break when this method seems to be in an obviously broken loop
391            if ( ++$iterations >= 32 ) {
392                throw new DBTransactionError(
393                    null,
394                    "Aborting likely infinite callback loop due to unresolvable pending writes"
395                );
396            }
397        } while ( $this->hasPrimaryChanges() );
398        // Run all listener callbacks once
399        foreach ( $this->getLBsForOwner() as $lb ) {
400            $ex = $lb->runPrimaryTransactionListenerCallbacks( $fname );
401            $e = $e ?: $ex;
402        }
403
404        return $e;
405    }
406
407    /** @inheritDoc */
408    public function hasTransactionRound() {
409        // TODO: check for implicit rounds or rename and check for implicit rounds with writes?
410        return ( $this->trxRoundFname !== null );
411    }
412
413    /** @inheritDoc */
414    public function isReadyForRoundOperations() {
415        return ( $this->trxRoundStage === self::ROUND_CURSORY );
416    }
417
418    /**
419     * Log query info if multi DB transactions are going to be committed now
420     */
421    private function logIfMultiDbTransaction() {
422        $callersByDB = [];
423        foreach ( $this->getLBsForOwner() as $lb ) {
424            $primaryName = $lb->getServerName( ServerInfo::WRITER_INDEX );
425            $callers = $lb->pendingPrimaryChangeCallers();
426            if ( $callers ) {
427                $callersByDB[$primaryName] = $callers;
428            }
429        }
430
431        if ( count( $callersByDB ) >= 2 ) {
432            $dbs = implode( ', ', array_keys( $callersByDB ) );
433            $msg = "Multi-DB transaction [{$dbs}]:\n";
434            foreach ( $callersByDB as $db => $callers ) {
435                $msg .= "$db" . implode( '; ', $callers ) . "\n";
436            }
437            $this->logger->info( $msg );
438        }
439    }
440
441    /** @inheritDoc */
442    public function hasPrimaryChanges() {
443        foreach ( $this->getLBsForOwner() as $lb ) {
444            if ( $lb->hasPrimaryChanges() ) {
445                return true;
446            }
447        }
448        return false;
449    }
450
451    /** @inheritDoc */
452    public function laggedReplicaUsed() {
453        foreach ( $this->getLBsForOwner() as $lb ) {
454            if ( $lb->laggedReplicaUsed() ) {
455                return true;
456            }
457        }
458        return false;
459    }
460
461    /** @inheritDoc */
462    public function hasOrMadeRecentPrimaryChanges( $age = null ) {
463        foreach ( $this->getLBsForOwner() as $lb ) {
464            if ( $lb->hasOrMadeRecentPrimaryChanges( $age ) ) {
465                return true;
466            }
467        }
468        return false;
469    }
470
471    /** @inheritDoc */
472    public function waitForReplication( array $opts = [] ) {
473        $opts += [
474            'timeout' => $this->replicationWaitTimeout,
475            'ifWritesSince' => null
476        ];
477
478        $lbs = [];
479        foreach ( $this->getLBsForOwner() as $lb ) {
480            $lbs[] = $lb;
481        }
482
483        // Get all the primary DB positions of applicable DBs right now.
484        // This can be faster since waiting on one cluster reduces the
485        // time needed to wait on the next clusters.
486        $primaryPositions = array_fill( 0, count( $lbs ), false );
487        foreach ( $lbs as $i => $lb ) {
488            if (
489                // No writes to wait on getting replicated
490                !$lb->hasPrimaryConnection() ||
491                // No replication; avoid getPrimaryPos() permissions errors (T29975)
492                !$lb->hasStreamingReplicaServers() ||
493                // No writes since the last replication wait
494                (
495                    $opts['ifWritesSince'] &&
496                    $lb->lastPrimaryChangeTimestamp() < $opts['ifWritesSince']
497                )
498            ) {
499                continue; // no need to wait
500            }
501
502            $primaryPositions[$i] = $lb->getPrimaryPos();
503        }
504
505        // Run any listener callbacks *after* getting the DB positions. The more
506        // time spent in the callbacks, the less time is spent in waitForAll().
507        foreach ( $this->replicationWaitCallbacks as $callback ) {
508            $callback();
509        }
510
511        $failed = [];
512        foreach ( $lbs as $i => $lb ) {
513            if ( $primaryPositions[$i] ) {
514                // The RDBMS may not support getPrimaryPos()
515                if ( !$lb->waitForAll( $primaryPositions[$i], $opts['timeout'] ) ) {
516                    $failed[] = $lb->getServerName( ServerInfo::WRITER_INDEX );
517                }
518            }
519        }
520
521        return !$failed;
522    }
523
524    /** @inheritDoc */
525    public function setWaitForReplicationListener( $name, ?callable $callback = null ) {
526        if ( $callback ) {
527            $this->replicationWaitCallbacks[$name] = $callback;
528        } else {
529            unset( $this->replicationWaitCallbacks[$name] );
530        }
531    }
532
533    /** @inheritDoc */
534    public function getEmptyTransactionTicket( $fname ) {
535        if ( $this->hasPrimaryChanges() ) {
536            $this->logger->error(
537                __METHOD__ . "$fname does not have outer scope",
538                [ 'exception' => new RuntimeException() ]
539            );
540
541            return null;
542        }
543
544        return $this->ticket;
545    }
546
547    /** @inheritDoc */
548    public function getPrimaryDatabase( $domain = false ): IDatabase {
549        return $this->getMappedDatabase( DB_PRIMARY, [], $domain );
550    }
551
552    /** @inheritDoc */
553    public function getAutoCommitPrimaryConnection( $domain = false ): IDatabase {
554        return $this->getLoadBalancer( $domain )
555            ->getConnection( DB_PRIMARY, [], $this->getMappedDomain( $domain ), ILoadBalancer::CONN_TRX_AUTOCOMMIT );
556    }
557
558    /** @inheritDoc */
559    public function getReplicaDatabase( string|false $domain = false, $group = null ): IReadableDatabase {
560        if ( $group === null ) {
561            $groups = [];
562        } else {
563            $groups = [ $group ];
564        }
565        return $this->getMappedDatabase( DB_REPLICA, $groups, $domain );
566    }
567
568    /** @inheritDoc */
569    public function getLoadBalancer( $domain = false ): ILoadBalancer {
570        if ( $domain !== false && in_array( $domain, $this->virtualDomains ) ) {
571            if ( isset( $this->virtualDomainsMapping[$domain] ) ) {
572                $config = $this->virtualDomainsMapping[$domain];
573                if ( isset( $config['cluster'] ) ) {
574                    return $this->getExternalLB( $config['cluster'] );
575                }
576                $domain = $config['db'];
577            } else {
578                // It's not configured, assume local db.
579                $domain = false;
580            }
581        }
582        return $this->getMainLB( $domain );
583    }
584
585    /**
586     * Helper for getPrimaryDatabase and getReplicaDatabase() providing virtual
587     * domain mapping.
588     *
589     * @param int $index
590     * @param array $groups
591     * @param string|false $domain
592     * @return IDatabase
593     */
594    private function getMappedDatabase( $index, $groups, string|false $domain ) {
595        return $this->getLoadBalancer( $domain )
596            ->getConnection( $index, $groups, $this->getMappedDomain( $domain ) );
597    }
598
599    /**
600     * @internal For installer and getMappedDatabase
601     */
602    public function getMappedDomain( string|false $domain ): string|false {
603        if ( $domain !== false && in_array( $domain, $this->virtualDomains ) ) {
604            return $this->virtualDomainsMapping[$domain]['db'] ?? false;
605        } else {
606            return $domain;
607        }
608    }
609
610    /**
611     * Determine whether, after mapping, the domain refers to the main domain
612     * of the local wiki.
613     *
614     * @internal for installer
615     * @param string|false $domain
616     * @return bool
617     */
618    public function isLocalDomain( $domain ) {
619        if ( $domain !== false && in_array( $domain, $this->virtualDomains ) ) {
620            if ( isset( $this->virtualDomainsMapping[$domain] ) ) {
621                $config = $this->virtualDomainsMapping[$domain];
622                if ( isset( $config['cluster'] ) ) {
623                    // In an external cluster
624                    return false;
625                }
626                $domain = $config['db'];
627            } else {
628                // Unconfigured virtual domain is always local
629                return true;
630            }
631        }
632        return $domain === false || $domain === $this->getLocalDomainID();
633    }
634
635    /**
636     * Is the domain a virtual domain with a statically configured database name?
637     *
638     * @internal for installer
639     * @param string|false $domain
640     * @return bool
641     */
642    public function isSharedVirtualDomain( $domain ) {
643        if ( $domain !== false
644            && in_array( $domain, $this->virtualDomains )
645            && isset( $this->virtualDomainsMapping[$domain] )
646        ) {
647            return $this->virtualDomainsMapping[$domain]['db'] !== false;
648        }
649        return false;
650    }
651
652    /** @inheritDoc */
653    final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
654        if ( $ticket !== $this->ticket ) {
655            $this->logger->error(
656                __METHOD__ . "$fname does not have outer scope ($ticket vs {$this->ticket})",
657                [ 'exception' => new RuntimeException() ]
658            );
659
660            return false;
661        }
662
663        // The transaction owner and any caller with the empty transaction ticket can commit
664        // so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError.
665        if ( $this->trxRoundFname !== null && $fname !== $this->trxRoundFname ) {
666            $this->logger->info( "$fname: committing on behalf of {$this->trxRoundFname}" );
667            $fnameEffective = $this->trxRoundFname;
668        } else {
669            $fnameEffective = $fname;
670        }
671
672        $this->commitPrimaryChanges( $fnameEffective );
673        $waitSucceeded = $this->waitForReplication( $opts );
674        // If a nested caller committed on behalf of $fname, start another empty $fname
675        // transaction, leaving the caller with the same empty transaction state as before.
676        if ( $fnameEffective !== $fname ) {
677            $this->beginPrimaryChanges( $fnameEffective );
678        }
679
680        return $waitSucceeded;
681    }
682
683    public function disableChronologyProtection() {
684        $this->chronologyProtector->setEnabled( false );
685    }
686
687    public function setDefaultGroupName( string $defaultGroup ): void {
688        // for future LBs
689        $this->defaultGroup = $defaultGroup;
690
691        // For existing LBs
692        foreach ( $this->getLBsForOwner() as $lb ) {
693            $lb->setDefaultGroupName( $defaultGroup );
694        }
695    }
696
697    /**
698     * Get parameters to ILoadBalancer::__construct()
699     *
700     * @return array
701     */
702    final protected function baseLoadBalancerParams() {
703        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
704            $initStage = ILoadBalancerForOwner::STAGE_POSTCOMMIT_CALLBACKS;
705        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
706            $initStage = ILoadBalancerForOwner::STAGE_POSTROLLBACK_CALLBACKS;
707        } else {
708            $initStage = null;
709        }
710
711        return [
712            'localDomain' => $this->localDomain,
713            'readOnlyReason' => $this->readOnlyReason,
714            'srvCache' => $this->srvCache,
715            'wanCache' => $this->wanCache,
716            'profiler' => $this->profiler,
717            'trxProfiler' => $this->trxProfiler,
718            'tracer' => $this->tracer,
719            'logger' => $this->logger,
720            'errorLogger' => $this->errorLogger,
721            'deprecationLogger' => $this->deprecationLogger,
722            'statsFactory' => $this->statsFactory,
723            'cliMode' => $this->cliMode,
724            'agent' => $this->agent,
725            'defaultGroup' => $this->defaultGroup,
726            'chronologyProtector' => $this->chronologyProtector,
727            'roundStage' => $initStage,
728            'criticalSectionProvider' => $this->csProvider,
729            'shuffleSharding' => $this->shuffleSharding,
730            'uniqueIdentifier' => $this->uniqueIdentifier,
731        ];
732    }
733
734    protected function initLoadBalancer( ILoadBalancerForOwner $lb ) {
735        if ( $this->trxRoundFname !== null ) {
736            $lb->beginPrimaryChanges( $this->trxRoundFname ); // set DBO_TRX
737        }
738
739        $lb->setTableAliases( $this->tableAliases );
740        $lb->setDomainAliases( $this->domainAliases );
741    }
742
743    /** @inheritDoc */
744    public function setTableAliases( array $aliases ) {
745        $this->tableAliases = $aliases;
746    }
747
748    /** @inheritDoc */
749    public function setDomainAliases( array $aliases ) {
750        $this->domainAliases = $aliases;
751    }
752
753    /** @inheritDoc */
754    public function getTransactionProfiler(): TransactionProfiler {
755        return $this->trxProfiler;
756    }
757
758    /** @inheritDoc */
759    public function setLocalDomainPrefix( $prefix ) {
760        $this->localDomain = new DatabaseDomain(
761            $this->localDomain->getDatabase(),
762            $this->localDomain->getSchema(),
763            $prefix
764        );
765
766        foreach ( $this->getLBsForOwner() as $lb ) {
767            $lb->setLocalDomainPrefix( $prefix );
768        }
769    }
770
771    /** @inheritDoc */
772    public function redefineLocalDomain( $domain ) {
773        $this->closeAll( __METHOD__ );
774
775        $this->localDomain = DatabaseDomain::newFromId( $domain );
776
777        foreach ( $this->getLBsForOwner() as $lb ) {
778            $lb->redefineLocalDomain( $this->localDomain );
779        }
780    }
781
782    /** @inheritDoc */
783    public function closeAll( $fname = __METHOD__ ) {
784        /** @noinspection PhpUnusedLocalVariableInspection */
785        $scope = ScopedCallback::newScopedIgnoreUserAbort();
786
787        foreach ( $this->getLBsForOwner() as $lb ) {
788            $lb->closeAll( $fname );
789        }
790    }
791
792    /** @inheritDoc */
793    public function setAgentName( $agent ) {
794        $this->agent = $agent;
795    }
796
797    /** @inheritDoc */
798    public function hasStreamingReplicaServers() {
799        foreach ( $this->getLBsForOwner() as $lb ) {
800            if ( $lb->hasStreamingReplicaServers() ) {
801                return true;
802            }
803        }
804        return false;
805    }
806
807    /** @inheritDoc */
808    public function setDefaultReplicationWaitTimeout( $seconds ) {
809        $old = $this->replicationWaitTimeout;
810        $this->replicationWaitTimeout = max( 1, (int)$seconds );
811
812        return $old;
813    }
814
815    /**
816     * @param string $stage
817     */
818    private function assertTransactionRoundStage( $stage ) {
819        if ( $this->trxRoundStage !== $stage ) {
820            throw new DBTransactionError(
821                null,
822                "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')"
823            );
824        }
825    }
826}