Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
58.84% covered (warning)
58.84%
163 / 277
26.19% covered (danger)
26.19%
11 / 42
CRAP
0.00% covered (danger)
0.00%
0 / 1
LBFactory
58.84% covered (warning)
58.84%
163 / 277
26.19% covered (danger)
26.19%
11 / 42
1123.78
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 configure
88.89% covered (warning)
88.89%
24 / 27
0.00% covered (danger)
0.00%
0 / 1
7.07
 destroy
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 autoReconfigure
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 reconfigure
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
3
 getLocalDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 shutdown
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
4
 getAllLBs
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 getLBsForOwner
n/a
0 / 0
n/a
0 / 0
0
 flushReplicaSnapshots
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 beginPrimaryChanges
71.43% covered (warning)
71.43%
10 / 14
0.00% covered (danger)
0.00%
0 / 1
4.37
 commitPrimaryChanges
80.00% covered (warning)
80.00%
20 / 25
0.00% covered (danger)
0.00%
0 / 1
8.51
 rollbackPrimaryChanges
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 flushPrimarySessions
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 executePostTransactionCallbacks
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
5
 hasTransactionRound
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 isReadyForRoundOperations
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 logIfMultiDbTransaction
50.00% covered (danger)
50.00%
6 / 12
0.00% covered (danger)
0.00%
0 / 1
8.12
 hasPrimaryChanges
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 laggedReplicaUsed
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 hasOrMadeRecentPrimaryChanges
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 waitForReplication
86.96% covered (warning)
86.96%
20 / 23
0.00% covered (danger)
0.00%
0 / 1
11.27
 setWaitForReplicationListener
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
2.15
 getEmptyTransactionTicket
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 getPrimaryDatabase
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getReplicaDatabase
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 getLoadBalancer
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
5
 getMappedDatabase
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 commitAndWaitForReplication
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
30
 disableChronologyProtection
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 baseLoadBalancerParams
95.65% covered (success)
95.65%
22 / 23
0.00% covered (danger)
0.00%
0 / 1
3
 initLoadBalancer
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 setTableAliases
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setIndexAliases
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setDomainAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getTransactionProfiler
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setLocalDomainPrefix
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 redefineLocalDomain
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 closeAll
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 setAgentName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 hasStreamingReplicaServers
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 setDefaultReplicationWaitTimeout
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 assertTransactionRoundStage
20.00% covered (danger)
20.00%
1 / 5
0.00% covered (danger)
0.00%
0 / 1
4.05
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use Exception;
23use Generator;
24use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
27use RuntimeException;
28use Throwable;
29use Wikimedia\ObjectCache\BagOStuff;
30use Wikimedia\ObjectCache\EmptyBagOStuff;
31use Wikimedia\ObjectCache\WANObjectCache;
32use Wikimedia\RequestTimeout\CriticalSectionProvider;
33use Wikimedia\ScopedCallback;
34use Wikimedia\Stats\NullStatsdDataFactory;
35
36/**
37 * @see ILBFactory
38 * @ingroup Database
39 */
40abstract class LBFactory implements ILBFactory {
41    /** @var CriticalSectionProvider|null */
42    private $csProvider;
43    /**
44     * @var callable|null An optional callback that returns a ScopedCallback instance,
45     * meant to profile the actual query execution in {@see Database::doQuery}
46     */
47    private $profiler;
48    /** @var TransactionProfiler */
49    private $trxProfiler;
50    /** @var StatsdDataFactoryInterface */
51    private $statsd;
52    /** @var LoggerInterface */
53    private $logger;
54    /** @var callable Error logger */
55    private $errorLogger;
56    /** @var callable Deprecation logger */
57    private $deprecationLogger;
58
59    /** @var ChronologyProtector */
60    protected $chronologyProtector;
61    /** @var BagOStuff */
62    protected $srvCache;
63    /** @var WANObjectCache */
64    protected $wanCache;
65    /** @var DatabaseDomain Local domain */
66    protected $localDomain;
67
68    /** @var bool Whether this PHP instance is for a CLI script */
69    private $cliMode;
70    /** @var string Agent name for query profiling */
71    private $agent;
72
73    /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
74    private $tableAliases = [];
75    /** @var string[] Map of (index alias => index) */
76    private $indexAliases = [];
77    /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */
78    protected $domainAliases = [];
79    /** @var array[] Map of virtual domain to array of cluster and domain */
80    protected array $virtualDomainsMapping = [];
81    /** @var string[] List of registered virtual domains */
82    protected array $virtualDomains = [];
83    /** @var callable[] */
84    private $replicationWaitCallbacks = [];
85
86    /** @var int|null Ticket used to delegate transaction ownership */
87    private $ticket;
88    /** @var string|false String if a requested DBO_TRX transaction round is active */
89    private $trxRoundId = false;
90    /** @var string One of the ROUND_* class constants */
91    private $trxRoundStage = self::ROUND_CURSORY;
92    /** @var int Default replication wait timeout */
93    private $replicationWaitTimeout;
94
95    /** @var string|false Reason all LBs are read-only or false if not */
96    protected $readOnlyReason = false;
97
98    /** @var string|null */
99    private $defaultGroup = null;
100
101    private const ROUND_CURSORY = 'cursory';
102    private const ROUND_BEGINNING = 'within-begin';
103    private const ROUND_COMMITTING = 'within-commit';
104    private const ROUND_ROLLING_BACK = 'within-rollback';
105    private const ROUND_COMMIT_CALLBACKS = 'within-commit-callbacks';
106    private const ROUND_ROLLBACK_CALLBACKS = 'within-rollback-callbacks';
107    private const ROUND_ROLLBACK_SESSIONS = 'within-rollback-session';
108
109    /**
110     * @var callable
111     */
112    private $configCallback = null;
113
114    public function __construct( array $conf ) {
115        $this->configure( $conf );
116
117        if ( isset( $conf['configCallback'] ) ) {
118            $this->configCallback = $conf['configCallback'];
119        }
120    }
121
122    /**
123     * @param array $conf
124     * @return void
125     */
126    protected function configure( array $conf ): void {
127        $this->localDomain = isset( $conf['localDomain'] )
128            ? DatabaseDomain::newFromId( $conf['localDomain'] )
129            : DatabaseDomain::newUnspecified();
130
131        if ( isset( $conf['readOnlyReason'] ) && is_string( $conf['readOnlyReason'] ) ) {
132            $this->readOnlyReason = $conf['readOnlyReason'];
133        }
134
135        $this->chronologyProtector = $conf['chronologyProtector'] ?? new ChronologyProtector();
136        $this->srvCache = $conf['srvCache'] ?? new EmptyBagOStuff();
137        $this->wanCache = $conf['wanCache'] ?? WANObjectCache::newEmpty();
138
139        $this->logger = $conf['logger'] ?? new NullLogger();
140        $this->errorLogger = $conf['errorLogger'] ?? static function ( Throwable $e ) {
141                trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
142        };
143        $this->deprecationLogger = $conf['deprecationLogger'] ?? static function ( $msg ) {
144                trigger_error( $msg, E_USER_DEPRECATED );
145        };
146
147        $this->profiler = $conf['profiler'] ?? null;
148        $this->trxProfiler = $conf['trxProfiler'] ?? new TransactionProfiler();
149        $this->statsd = $conf['statsdDataFactory'] ?? new NullStatsdDataFactory();
150
151        $this->csProvider = $conf['criticalSectionProvider'] ?? null;
152
153        $this->cliMode = $conf['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
154        $this->agent = $conf['agent'] ?? '';
155        $this->defaultGroup = $conf['defaultGroup'] ?? null;
156        $this->replicationWaitTimeout = $this->cliMode ? 60 : 1;
157        $this->virtualDomainsMapping = $conf['virtualDomainsMapping'] ?? [];
158        $this->virtualDomains = $conf['virtualDomains'] ?? [];
159
160        static $nextTicket;
161        $this->ticket = $nextTicket = ( is_int( $nextTicket ) ? $nextTicket++ : mt_rand() );
162    }
163
164    public function destroy() {
165        /** @noinspection PhpUnusedLocalVariableInspection */
166        $scope = ScopedCallback::newScopedIgnoreUserAbort();
167
168        foreach ( $this->getLBsForOwner() as $lb ) {
169            $lb->disable( __METHOD__ );
170        }
171    }
172
173    /**
174     * Reload config using the callback passed defined $config['configCallback'].
175     *
176     * If the config returned by the callback is different from the existing config,
177     * this calls reconfigure() on all load balancers, which causes them to invalidate
178     * any existing connections and re-connect using the new configuration.
179     *
180     * Long-running processes should call this from time to time
181     * (but not too often, because it is somewhat expensive),
182     * preferably after each batch.
183     * Maintenance scripts can do that by calling $this->waitForReplication(),
184     * which calls this method.
185     */
186    public function autoReconfigure(): void {
187        if ( !$this->configCallback ) {
188            return;
189        }
190
191        $conf = ( $this->configCallback )();
192        if ( $conf ) {
193            $this->reconfigure( $conf );
194        }
195    }
196
197    /**
198     * Reconfigure using the given config array.
199     * Any fields omitted from $conf will be taken from the current config.
200     *
201     * If the config changed, this calls reconfigure() on all load balancers,
202     * which causes them to close all existing connections.
203     *
204     * @note This invalidates the current transaction ticket.
205     *
206     * @warning This must only be called in top level code such as the execute()
207     * method of a maintenance script. Any database connection in use when this
208     * method is called will become defunct.
209     *
210     * @since 1.39
211     *
212     * @param array $conf A configuration array, using the same structure as
213     *        the one passed to the constructor (see also $wgLBFactoryConf).
214     */
215    public function reconfigure( array $conf ): void {
216        if ( !$conf ) {
217            return;
218        }
219
220        foreach ( $this->getLBsForOwner() as $lb ) {
221            $lb->reconfigure( $conf );
222        }
223    }
224
225    public function getLocalDomainID(): string {
226        return $this->localDomain->getId();
227    }
228
229    public function shutdown(
230        $flags = self::SHUTDOWN_NORMAL,
231        ?callable $workCallback = null,
232        &$cpIndex = null,
233        &$cpClientId = null
234    ) {
235        /** @noinspection PhpUnusedLocalVariableInspection */
236        $scope = ScopedCallback::newScopedIgnoreUserAbort();
237
238        if ( ( $flags & self::SHUTDOWN_NO_CHRONPROT ) != self::SHUTDOWN_NO_CHRONPROT ) {
239            // Remark all of the relevant DB primary positions
240            foreach ( $this->getLBsForOwner() as $lb ) {
241                if ( $lb->hasPrimaryConnection() ) {
242                    $this->chronologyProtector->stageSessionPrimaryPos( $lb );
243                }
244            }
245            // Write the positions to the persistent stash
246            $this->chronologyProtector->persistSessionReplicationPositions( $cpIndex );
247            $this->logger->debug( __METHOD__ . ': finished ChronologyProtector shutdown' );
248        }
249        $cpClientId = $this->chronologyProtector->getClientId();
250
251        $this->commitPrimaryChanges( __METHOD__ );
252
253        $this->logger->debug( 'LBFactory shutdown completed' );
254    }
255
256    public function getAllLBs() {
257        foreach ( $this->getLBsForOwner() as $lb ) {
258            yield $lb;
259        }
260    }
261
262    /**
263     * Get all tracked load balancers with the internal "for owner" interface.
264     *
265     * @return Generator|ILoadBalancerForOwner[]
266     */
267    abstract protected function getLBsForOwner();
268
269    public function flushReplicaSnapshots( $fname = __METHOD__ ) {
270        if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
271            $this->logger->warning(
272                "$fname: transaction round '{$this->trxRoundId}' still running",
273                [ 'exception' => new RuntimeException() ]
274            );
275        }
276        foreach ( $this->getLBsForOwner() as $lb ) {
277            $lb->flushReplicaSnapshots( $fname );
278        }
279    }
280
281    final public function beginPrimaryChanges( $fname = __METHOD__ ) {
282        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
283        /** @noinspection PhpUnusedLocalVariableInspection */
284        $scope = ScopedCallback::newScopedIgnoreUserAbort();
285
286        foreach ( $this->getLBsForOwner() as $lb ) {
287            $lb->flushReplicaSnapshots( $fname );
288        }
289
290        $this->trxRoundStage = self::ROUND_BEGINNING;
291        if ( $this->trxRoundId !== false ) {
292            throw new DBTransactionError(
293                null,
294                "$fname: transaction round '{$this->trxRoundId}' already started"
295            );
296        }
297        $this->trxRoundId = $fname;
298        // Flush snapshots and appropriately set DBO_TRX on primary connections
299        foreach ( $this->getLBsForOwner() as $lb ) {
300            $lb->beginPrimaryChanges( $fname );
301        }
302        $this->trxRoundStage = self::ROUND_CURSORY;
303    }
304
305    final public function commitPrimaryChanges( $fname = __METHOD__, int $maxWriteDuration = 0 ) {
306        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
307        /** @noinspection PhpUnusedLocalVariableInspection */
308        $scope = ScopedCallback::newScopedIgnoreUserAbort();
309
310        $this->trxRoundStage = self::ROUND_COMMITTING;
311        if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
312            throw new DBTransactionError(
313                null,
314                "$fname: transaction round '{$this->trxRoundId}' still running"
315            );
316        }
317        // Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
318        do {
319            $count = 0; // number of callbacks executed this iteration
320            foreach ( $this->getLBsForOwner() as $lb ) {
321                $count += $lb->finalizePrimaryChanges( $fname );
322            }
323        } while ( $count > 0 );
324        $this->trxRoundId = false;
325        // Perform pre-commit checks, aborting on failure
326        foreach ( $this->getLBsForOwner() as $lb ) {
327            $lb->approvePrimaryChanges( $maxWriteDuration, $fname );
328        }
329        // Log the DBs and methods involved in multi-DB transactions
330        $this->logIfMultiDbTransaction();
331        // Actually perform the commit on all primary DB connections and revert DBO_TRX
332        foreach ( $this->getLBsForOwner() as $lb ) {
333            $lb->commitPrimaryChanges( $fname );
334        }
335        // Run all post-commit callbacks in a separate step
336        $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
337        $e = $this->executePostTransactionCallbacks();
338        $this->trxRoundStage = self::ROUND_CURSORY;
339        // Throw any last post-commit callback error
340        if ( $e instanceof Exception ) {
341            throw $e;
342        }
343
344        foreach ( $this->getLBsForOwner() as $lb ) {
345            $lb->flushReplicaSnapshots( $fname );
346        }
347    }
348
349    final public function rollbackPrimaryChanges( $fname = __METHOD__ ) {
350        /** @noinspection PhpUnusedLocalVariableInspection */
351        $scope = ScopedCallback::newScopedIgnoreUserAbort();
352
353        $this->trxRoundStage = self::ROUND_ROLLING_BACK;
354        $this->trxRoundId = false;
355        // Actually perform the rollback on all primary DB connections and revert DBO_TRX
356        foreach ( $this->getLBsForOwner() as $lb ) {
357            $lb->rollbackPrimaryChanges( $fname );
358        }
359        // Run all post-commit callbacks in a separate step
360        $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
361        $this->executePostTransactionCallbacks();
362        $this->trxRoundStage = self::ROUND_CURSORY;
363
364        foreach ( $this->getLBsForOwner() as $lb ) {
365            $lb->flushReplicaSnapshots( $fname );
366        }
367    }
368
369    final public function flushPrimarySessions( $fname = __METHOD__ ) {
370        /** @noinspection PhpUnusedLocalVariableInspection */
371        $scope = ScopedCallback::newScopedIgnoreUserAbort();
372
373        // Release named locks and table locks on all primary DB connections
374        $this->trxRoundStage = self::ROUND_ROLLBACK_SESSIONS;
375        foreach ( $this->getLBsForOwner() as $lb ) {
376            $lb->flushPrimarySessions( $fname );
377        }
378        $this->trxRoundStage = self::ROUND_CURSORY;
379    }
380
381    /**
382     * @return Exception|null
383     */
384    private function executePostTransactionCallbacks() {
385        $fname = __METHOD__;
386        // Run all post-commit callbacks until new ones stop getting added
387        $e = null; // first callback exception
388        do {
389            foreach ( $this->getLBsForOwner() as $lb ) {
390                $ex = $lb->runPrimaryTransactionIdleCallbacks( $fname );
391                $e = $e ?: $ex;
392            }
393        } while ( $this->hasPrimaryChanges() );
394        // Run all listener callbacks once
395        foreach ( $this->getLBsForOwner() as $lb ) {
396            $ex = $lb->runPrimaryTransactionListenerCallbacks( $fname );
397            $e = $e ?: $ex;
398        }
399
400        return $e;
401    }
402
403    public function hasTransactionRound() {
404        return ( $this->trxRoundId !== false );
405    }
406
407    public function isReadyForRoundOperations() {
408        return ( $this->trxRoundStage === self::ROUND_CURSORY );
409    }
410
411    /**
412     * Log query info if multi DB transactions are going to be committed now
413     */
414    private function logIfMultiDbTransaction() {
415        $callersByDB = [];
416        foreach ( $this->getLBsForOwner() as $lb ) {
417            $primaryName = $lb->getServerName( ServerInfo::WRITER_INDEX );
418            $callers = $lb->pendingPrimaryChangeCallers();
419            if ( $callers ) {
420                $callersByDB[$primaryName] = $callers;
421            }
422        }
423
424        if ( count( $callersByDB ) >= 2 ) {
425            $dbs = implode( ', ', array_keys( $callersByDB ) );
426            $msg = "Multi-DB transaction [{$dbs}]:\n";
427            foreach ( $callersByDB as $db => $callers ) {
428                $msg .= "$db" . implode( '; ', $callers ) . "\n";
429            }
430            $this->logger->info( $msg );
431        }
432    }
433
434    public function hasPrimaryChanges() {
435        foreach ( $this->getLBsForOwner() as $lb ) {
436            if ( $lb->hasPrimaryChanges() ) {
437                return true;
438            }
439        }
440        return false;
441    }
442
443    public function laggedReplicaUsed() {
444        foreach ( $this->getLBsForOwner() as $lb ) {
445            if ( $lb->laggedReplicaUsed() ) {
446                return true;
447            }
448        }
449        return false;
450    }
451
452    public function hasOrMadeRecentPrimaryChanges( $age = null ) {
453        foreach ( $this->getLBsForOwner() as $lb ) {
454            if ( $lb->hasOrMadeRecentPrimaryChanges( $age ) ) {
455                return true;
456            }
457        }
458        return false;
459    }
460
461    public function waitForReplication( array $opts = [] ) {
462        $opts += [
463            'timeout' => $this->replicationWaitTimeout,
464            'ifWritesSince' => null
465        ];
466
467        $lbs = [];
468        foreach ( $this->getLBsForOwner() as $lb ) {
469            $lbs[] = $lb;
470        }
471
472        // Get all the primary DB positions of applicable DBs right now.
473        // This can be faster since waiting on one cluster reduces the
474        // time needed to wait on the next clusters.
475        $primaryPositions = array_fill( 0, count( $lbs ), false );
476        foreach ( $lbs as $i => $lb ) {
477            if (
478                // No writes to wait on getting replicated
479                !$lb->hasPrimaryConnection() ||
480                // No replication; avoid getPrimaryPos() permissions errors (T29975)
481                !$lb->hasStreamingReplicaServers() ||
482                // No writes since the last replication wait
483                (
484                    $opts['ifWritesSince'] &&
485                    $lb->lastPrimaryChangeTimestamp() < $opts['ifWritesSince']
486                )
487            ) {
488                continue; // no need to wait
489            }
490
491            $primaryPositions[$i] = $lb->getPrimaryPos();
492        }
493
494        // Run any listener callbacks *after* getting the DB positions. The more
495        // time spent in the callbacks, the less time is spent in waitForAll().
496        foreach ( $this->replicationWaitCallbacks as $callback ) {
497            $callback();
498        }
499
500        $failed = [];
501        foreach ( $lbs as $i => $lb ) {
502            if ( $primaryPositions[$i] ) {
503                // The RDBMS may not support getPrimaryPos()
504                if ( !$lb->waitForAll( $primaryPositions[$i], $opts['timeout'] ) ) {
505                    $failed[] = $lb->getServerName( ServerInfo::WRITER_INDEX );
506                }
507            }
508        }
509
510        return !$failed;
511    }
512
513    public function setWaitForReplicationListener( $name, ?callable $callback = null ) {
514        if ( $callback ) {
515            $this->replicationWaitCallbacks[$name] = $callback;
516        } else {
517            unset( $this->replicationWaitCallbacks[$name] );
518        }
519    }
520
521    public function getEmptyTransactionTicket( $fname ) {
522        if ( $this->hasPrimaryChanges() ) {
523            $this->logger->error(
524                __METHOD__ . "$fname does not have outer scope",
525                [ 'exception' => new RuntimeException() ]
526            );
527
528            return null;
529        }
530
531        return $this->ticket;
532    }
533
534    public function getPrimaryDatabase( $domain = false ): IDatabase {
535        return $this->getMappedDatabase( DB_PRIMARY, [], $domain );
536    }
537
538    public function getReplicaDatabase( $domain = false, $group = null ): IReadableDatabase {
539        if ( $group === null ) {
540            $groups = [];
541        } else {
542            $groups = [ $group ];
543        }
544        return $this->getMappedDatabase( DB_REPLICA, $groups, $domain );
545    }
546
547    public function getLoadBalancer( $domain = false ): ILoadBalancer {
548        if ( $domain !== false && in_array( $domain, $this->virtualDomains ) ) {
549            if ( isset( $this->virtualDomainsMapping[$domain] ) ) {
550                $config = $this->virtualDomainsMapping[$domain];
551                if ( isset( $config['cluster'] ) ) {
552                    return $this->getExternalLB( $config['cluster'] );
553                }
554                $domain = $config['db'];
555            } else {
556                // It's not configured, assume local db.
557                $domain = false;
558            }
559        }
560        return $this->getMainLB( $domain );
561    }
562
563    /**
564     * Helper for getPrimaryDatabase and getReplicaDatabase() providing virtual
565     * domain mapping.
566     *
567     * @param int $index
568     * @param array $groups
569     * @param string|false $domain
570     * @return IDatabase
571     */
572    private function getMappedDatabase( $index, $groups, $domain ) {
573        if ( $domain !== false && in_array( $domain, $this->virtualDomains ) ) {
574            $dbDomain = $this->virtualDomainsMapping[$domain]['db'] ?? false;
575        } else {
576            $dbDomain = $domain;
577        }
578        return $this->getLoadBalancer( $domain )->getConnection( $index, $groups, $dbDomain );
579    }
580
581    final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
582        if ( $ticket !== $this->ticket ) {
583            $this->logger->error(
584                __METHOD__ . "$fname does not have outer scope ($ticket vs {$this->ticket})",
585                [ 'exception' => new RuntimeException() ]
586            );
587
588            return false;
589        }
590
591        // The transaction owner and any caller with the empty transaction ticket can commit
592        // so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError.
593        if ( $this->trxRoundId !== false && $fname !== $this->trxRoundId ) {
594            $this->logger->info( "$fname: committing on behalf of {$this->trxRoundId}" );
595            $fnameEffective = $this->trxRoundId;
596        } else {
597            $fnameEffective = $fname;
598        }
599
600        $this->commitPrimaryChanges( $fnameEffective );
601        $waitSucceeded = $this->waitForReplication( $opts );
602        // If a nested caller committed on behalf of $fname, start another empty $fname
603        // transaction, leaving the caller with the same empty transaction state as before.
604        if ( $fnameEffective !== $fname ) {
605            $this->beginPrimaryChanges( $fnameEffective );
606        }
607
608        return $waitSucceeded;
609    }
610
611    public function disableChronologyProtection() {
612        $this->chronologyProtector->setEnabled( false );
613    }
614
615    /**
616     * Get parameters to ILoadBalancer::__construct()
617     *
618     * @return array
619     */
620    final protected function baseLoadBalancerParams() {
621        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
622            $initStage = ILoadBalancerForOwner::STAGE_POSTCOMMIT_CALLBACKS;
623        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
624            $initStage = ILoadBalancerForOwner::STAGE_POSTROLLBACK_CALLBACKS;
625        } else {
626            $initStage = null;
627        }
628
629        return [
630            'localDomain' => $this->localDomain,
631            'readOnlyReason' => $this->readOnlyReason,
632            'srvCache' => $this->srvCache,
633            'wanCache' => $this->wanCache,
634            'profiler' => $this->profiler,
635            'trxProfiler' => $this->trxProfiler,
636            'logger' => $this->logger,
637            'errorLogger' => $this->errorLogger,
638            'deprecationLogger' => $this->deprecationLogger,
639            'statsdDataFactory' => $this->statsd,
640            'cliMode' => $this->cliMode,
641            'agent' => $this->agent,
642            'defaultGroup' => $this->defaultGroup,
643            'chronologyProtector' => $this->chronologyProtector,
644            'roundStage' => $initStage,
645            'criticalSectionProvider' => $this->csProvider
646        ];
647    }
648
649    /**
650     * @param ILoadBalancerForOwner $lb
651     */
652    protected function initLoadBalancer( ILoadBalancerForOwner $lb ) {
653        if ( $this->trxRoundId !== false ) {
654            $lb->beginPrimaryChanges( $this->trxRoundId ); // set DBO_TRX
655        }
656
657        $lb->setTableAliases( $this->tableAliases );
658        $lb->setIndexAliases( $this->indexAliases );
659        $lb->setDomainAliases( $this->domainAliases );
660    }
661
662    public function setTableAliases( array $aliases ) {
663        $this->tableAliases = $aliases;
664    }
665
666    public function setIndexAliases( array $aliases ) {
667        $this->indexAliases = $aliases;
668    }
669
670    public function setDomainAliases( array $aliases ) {
671        $this->domainAliases = $aliases;
672    }
673
674    public function getTransactionProfiler(): TransactionProfiler {
675        return $this->trxProfiler;
676    }
677
678    public function setLocalDomainPrefix( $prefix ) {
679        $this->localDomain = new DatabaseDomain(
680            $this->localDomain->getDatabase(),
681            $this->localDomain->getSchema(),
682            $prefix
683        );
684
685        foreach ( $this->getLBsForOwner() as $lb ) {
686            $lb->setLocalDomainPrefix( $prefix );
687        }
688    }
689
690    public function redefineLocalDomain( $domain ) {
691        $this->closeAll( __METHOD__ );
692
693        $this->localDomain = DatabaseDomain::newFromId( $domain );
694
695        foreach ( $this->getLBsForOwner() as $lb ) {
696            $lb->redefineLocalDomain( $this->localDomain );
697        }
698    }
699
700    public function closeAll( $fname = __METHOD__ ) {
701        /** @noinspection PhpUnusedLocalVariableInspection */
702        $scope = ScopedCallback::newScopedIgnoreUserAbort();
703
704        foreach ( $this->getLBsForOwner() as $lb ) {
705            $lb->closeAll( $fname );
706        }
707    }
708
709    public function setAgentName( $agent ) {
710        $this->agent = $agent;
711    }
712
713    public function hasStreamingReplicaServers() {
714        foreach ( $this->getLBsForOwner() as $lb ) {
715            if ( $lb->hasStreamingReplicaServers() ) {
716                return true;
717            }
718        }
719        return false;
720    }
721
722    public function setDefaultReplicationWaitTimeout( $seconds ) {
723        $old = $this->replicationWaitTimeout;
724        $this->replicationWaitTimeout = max( 1, (int)$seconds );
725
726        return $old;
727    }
728
729    /**
730     * @param string $stage
731     */
732    private function assertTransactionRoundStage( $stage ) {
733        if ( $this->trxRoundStage !== $stage ) {
734            throw new DBTransactionError(
735                null,
736                "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')"
737            );
738        }
739    }
740}