Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
50.56% covered (warning)
50.56%
136 / 269
26.83% covered (danger)
26.83%
11 / 41
CRAP
0.00% covered (danger)
0.00%
0 / 1
LBFactory
50.56% covered (warning)
50.56%
136 / 269
26.83% covered (danger)
26.83%
11 / 41
1713.43
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 configure
88.89% covered (warning)
88.89%
24 / 27
0.00% covered (danger)
0.00%
0 / 1
7.07
 destroy
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 autoReconfigure
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
3.07
 reconfigure
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
3
 getLocalDomainID
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 shutdown
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
4
 getAllLBs
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 getLBsForOwner
n/a
0 / 0
n/a
0 / 0
0
 flushReplicaSnapshots
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 beginPrimaryChanges
66.67% covered (warning)
66.67%
8 / 12
0.00% covered (danger)
0.00%
0 / 1
3.33
 commitPrimaryChanges
76.19% covered (warning)
76.19%
16 / 21
0.00% covered (danger)
0.00%
0 / 1
7.66
 rollbackPrimaryChanges
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 flushPrimarySessions
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 executePostTransactionCallbacks
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
5
 hasTransactionRound
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 isReadyForRoundOperations
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 logIfMultiDbTransaction
50.00% covered (danger)
50.00%
6 / 12
0.00% covered (danger)
0.00%
0 / 1
8.12
 hasPrimaryChanges
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 laggedReplicaUsed
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 hasOrMadeRecentPrimaryChanges
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 waitForReplication
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
156
 setWaitForReplicationListener
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getEmptyTransactionTicket
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 getPrimaryDatabase
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getReplicaDatabase
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 getMappedDatabase
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
5
 commitAndWaitForReplication
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
30
 disableChronologyProtection
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 baseLoadBalancerParams
95.65% covered (success)
95.65%
22 / 23
0.00% covered (danger)
0.00%
0 / 1
3
 initLoadBalancer
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 setTableAliases
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setIndexAliases
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setDomainAliases
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getTransactionProfiler
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setLocalDomainPrefix
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 redefineLocalDomain
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 closeAll
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 setAgentName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 hasStreamingReplicaServers
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 setDefaultReplicationWaitTimeout
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 assertTransactionRoundStage
20.00% covered (danger)
20.00%
1 / 5
0.00% covered (danger)
0.00%
0 / 1
4.05
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use BagOStuff;
23use EmptyBagOStuff;
24use Exception;
25use Generator;
26use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27use NullStatsdDataFactory;
28use Psr\Log\LoggerInterface;
29use Psr\Log\NullLogger;
30use RuntimeException;
31use Throwable;
32use WANObjectCache;
33use Wikimedia\RequestTimeout\CriticalSectionProvider;
34use Wikimedia\ScopedCallback;
35
36/**
37 * @see ILBFactory
38 * @ingroup Database
39 */
40abstract class LBFactory implements ILBFactory {
41    /** @var CriticalSectionProvider|null */
42    private $csProvider;
43    /**
44     * @var callable|null An optional callback that returns a ScopedCallback instance,
45     * meant to profile the actual query execution in {@see Database::doQuery}
46     */
47    private $profiler;
48    /** @var TransactionProfiler */
49    private $trxProfiler;
50    /** @var StatsdDataFactoryInterface */
51    private $statsd;
52    /** @var LoggerInterface */
53    private $logger;
54    /** @var callable Error logger */
55    private $errorLogger;
56    /** @var callable Deprecation logger */
57    private $deprecationLogger;
58
59    /** @var ChronologyProtector */
60    protected $chronologyProtector;
61    /** @var BagOStuff */
62    protected $srvCache;
63    /** @var WANObjectCache */
64    protected $wanCache;
65    /** @var DatabaseDomain Local domain */
66    protected $localDomain;
67
68    /** @var bool Whether this PHP instance is for a CLI script */
69    private $cliMode;
70    /** @var string Agent name for query profiling */
71    private $agent;
72
73    /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
74    private $tableAliases = [];
75    /** @var string[] Map of (index alias => index) */
76    private $indexAliases = [];
77    /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */
78    protected $domainAliases = [];
79    /** @var array[] Map of virtual domain to array of cluster and domain */
80    protected array $virtualDomainsMapping = [];
81    /** @var string[] List of registered virtual domains */
82    protected array $virtualDomains = [];
83    /** @var callable[] */
84    private $replicationWaitCallbacks = [];
85
86    /** @var int|null Ticket used to delegate transaction ownership */
87    private $ticket;
88    /** @var string|false String if a requested DBO_TRX transaction round is active */
89    private $trxRoundId = false;
90    /** @var string One of the ROUND_* class constants */
91    private $trxRoundStage = self::ROUND_CURSORY;
92    /** @var int Default replication wait timeout */
93    private $replicationWaitTimeout;
94
95    /** @var string|false Reason all LBs are read-only or false if not */
96    protected $readOnlyReason = false;
97
98    /** @var string|null */
99    private $defaultGroup = null;
100
101    private const ROUND_CURSORY = 'cursory';
102    private const ROUND_BEGINNING = 'within-begin';
103    private const ROUND_COMMITTING = 'within-commit';
104    private const ROUND_ROLLING_BACK = 'within-rollback';
105    private const ROUND_COMMIT_CALLBACKS = 'within-commit-callbacks';
106    private const ROUND_ROLLBACK_CALLBACKS = 'within-rollback-callbacks';
107    private const ROUND_ROLLBACK_SESSIONS = 'within-rollback-session';
108
109    /**
110     * @var callable
111     */
112    private $configCallback = null;
113
114    public function __construct( array $conf ) {
115        $this->configure( $conf );
116
117        if ( isset( $conf['configCallback'] ) ) {
118            $this->configCallback = $conf['configCallback'];
119        }
120    }
121
122    /**
123     * @param array $conf
124     * @return void
125     */
126    protected function configure( array $conf ): void {
127        $this->localDomain = isset( $conf['localDomain'] )
128            ? DatabaseDomain::newFromId( $conf['localDomain'] )
129            : DatabaseDomain::newUnspecified();
130
131        if ( isset( $conf['readOnlyReason'] ) && is_string( $conf['readOnlyReason'] ) ) {
132            $this->readOnlyReason = $conf['readOnlyReason'];
133        }
134
135        $this->chronologyProtector = $conf['chronologyProtector'] ?? new ChronologyProtector();
136        $this->srvCache = $conf['srvCache'] ?? new EmptyBagOStuff();
137        $this->wanCache = $conf['wanCache'] ?? WANObjectCache::newEmpty();
138
139        $this->logger = $conf['logger'] ?? new NullLogger();
140        $this->errorLogger = $conf['errorLogger'] ?? static function ( Throwable $e ) {
141                trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
142        };
143        $this->deprecationLogger = $conf['deprecationLogger'] ?? static function ( $msg ) {
144                trigger_error( $msg, E_USER_DEPRECATED );
145        };
146
147        $this->profiler = $conf['profiler'] ?? null;
148        $this->trxProfiler = $conf['trxProfiler'] ?? new TransactionProfiler();
149        $this->statsd = $conf['statsdDataFactory'] ?? new NullStatsdDataFactory();
150
151        $this->csProvider = $conf['criticalSectionProvider'] ?? null;
152
153        $this->cliMode = $conf['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
154        $this->agent = $conf['agent'] ?? '';
155        $this->defaultGroup = $conf['defaultGroup'] ?? null;
156        $this->replicationWaitTimeout = $this->cliMode ? 60 : 1;
157        $this->virtualDomainsMapping = $conf['virtualDomainsMapping'] ?? [];
158        $this->virtualDomains = $conf['virtualDomains'] ?? [];
159
160        static $nextTicket;
161        $this->ticket = $nextTicket = ( is_int( $nextTicket ) ? $nextTicket++ : mt_rand() );
162    }
163
164    public function destroy() {
165        /** @noinspection PhpUnusedLocalVariableInspection */
166        $scope = ScopedCallback::newScopedIgnoreUserAbort();
167
168        foreach ( $this->getLBsForOwner() as $lb ) {
169            $lb->disable( __METHOD__ );
170        }
171    }
172
173    /**
174     * Reload config using the callback passed defined $config['configCallback'].
175     *
176     * If the config returned by the callback is different from the existing config,
177     * this calls reconfigure() on all load balancers, which causes them to invalidate
178     * any existing connections and re-connect using the new configuration.
179     *
180     * Long-running processes should call this from time to time
181     * (but not too often, because it is somewhat expensive),
182     * preferably after each batch.
183     * Maintenance scripts can do that by calling $this->waitForReplication(),
184     * which calls this method.
185     */
186    public function autoReconfigure(): void {
187        if ( !$this->configCallback ) {
188            return;
189        }
190
191        $conf = ( $this->configCallback )();
192        if ( $conf ) {
193            $this->reconfigure( $conf );
194        }
195    }
196
197    /**
198     * Reconfigure using the given config array.
199     * Any fields omitted from $conf will be taken from the current config.
200     *
201     * If the config changed, this calls reconfigure() on all load balancers,
202     * which causes them to close all existing connections.
203     *
204     * @note This invalidates the current transaction ticket.
205     *
206     * @warning This must only be called in top level code such as the execute()
207     * method of a maintenance script. Any database connection in use when this
208     * method is called will become defunct.
209     *
210     * @since 1.39
211     *
212     * @param array $conf A configuration array, using the same structure as
213     *        the one passed to the constructor (see also $wgLBFactoryConf).
214     */
215    public function reconfigure( array $conf ): void {
216        if ( !$conf ) {
217            return;
218        }
219
220        foreach ( $this->getLBsForOwner() as $lb ) {
221            $lb->reconfigure( $conf );
222        }
223    }
224
225    public function getLocalDomainID() {
226        return $this->localDomain->getId();
227    }
228
229    public function shutdown(
230        $flags = self::SHUTDOWN_NORMAL,
231        callable $workCallback = null,
232        &$cpIndex = null,
233        &$cpClientId = null
234    ) {
235        /** @noinspection PhpUnusedLocalVariableInspection */
236        $scope = ScopedCallback::newScopedIgnoreUserAbort();
237
238        if ( ( $flags & self::SHUTDOWN_NO_CHRONPROT ) != self::SHUTDOWN_NO_CHRONPROT ) {
239            // Remark all of the relevant DB primary positions
240            foreach ( $this->getLBsForOwner() as $lb ) {
241                if ( $lb->hasPrimaryConnection() ) {
242                    $this->chronologyProtector->stageSessionPrimaryPos( $lb );
243                }
244            }
245            // Write the positions to the persistent stash
246            $this->chronologyProtector->persistSessionReplicationPositions( $cpIndex );
247            $this->logger->debug( __METHOD__ . ': finished ChronologyProtector shutdown' );
248        }
249        $cpClientId = $this->chronologyProtector->getClientId();
250
251        $this->commitPrimaryChanges( __METHOD__ );
252
253        $this->logger->debug( 'LBFactory shutdown completed' );
254    }
255
256    public function getAllLBs() {
257        foreach ( $this->getLBsForOwner() as $lb ) {
258            yield $lb;
259        }
260    }
261
262    /**
263     * Get all tracked load balancers with the internal "for owner" interface.
264     * Most subclasses override this, we just provide an implementation here
265     * for the benefit of Wikibase's FakeLBFactory.
266     *
267     * @return Generator|ILoadBalancerForOwner[]
268     */
269    abstract protected function getLBsForOwner();
270
271    public function flushReplicaSnapshots( $fname = __METHOD__ ) {
272        if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
273            $this->logger->warning(
274                "$fname: transaction round '{$this->trxRoundId}' still running",
275                [ 'exception' => new RuntimeException() ]
276            );
277        }
278        foreach ( $this->getLBsForOwner() as $lb ) {
279            $lb->flushReplicaSnapshots( $fname );
280        }
281    }
282
283    final public function beginPrimaryChanges( $fname = __METHOD__ ) {
284        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
285        /** @noinspection PhpUnusedLocalVariableInspection */
286        $scope = ScopedCallback::newScopedIgnoreUserAbort();
287
288        $this->trxRoundStage = self::ROUND_BEGINNING;
289        if ( $this->trxRoundId !== false ) {
290            throw new DBTransactionError(
291                null,
292                "$fname: transaction round '{$this->trxRoundId}' already started"
293            );
294        }
295        $this->trxRoundId = $fname;
296        // Set DBO_TRX flags on all appropriate DBs
297        foreach ( $this->getLBsForOwner() as $lb ) {
298            $lb->beginPrimaryChanges( $fname );
299        }
300        $this->trxRoundStage = self::ROUND_CURSORY;
301    }
302
303    final public function commitPrimaryChanges( $fname = __METHOD__, int $maxWriteDuration = 0 ) {
304        $this->assertTransactionRoundStage( self::ROUND_CURSORY );
305        /** @noinspection PhpUnusedLocalVariableInspection */
306        $scope = ScopedCallback::newScopedIgnoreUserAbort();
307
308        $this->trxRoundStage = self::ROUND_COMMITTING;
309        if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
310            throw new DBTransactionError(
311                null,
312                "$fname: transaction round '{$this->trxRoundId}' still running"
313            );
314        }
315        // Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
316        do {
317            $count = 0; // number of callbacks executed this iteration
318            foreach ( $this->getLBsForOwner() as $lb ) {
319                $count += $lb->finalizePrimaryChanges( $fname );
320            }
321        } while ( $count > 0 );
322        $this->trxRoundId = false;
323        // Perform pre-commit checks, aborting on failure
324        foreach ( $this->getLBsForOwner() as $lb ) {
325            $lb->approvePrimaryChanges( $maxWriteDuration, $fname );
326        }
327        // Log the DBs and methods involved in multi-DB transactions
328        $this->logIfMultiDbTransaction();
329        // Actually perform the commit on all primary DB connections and revert DBO_TRX
330        foreach ( $this->getLBsForOwner() as $lb ) {
331            $lb->commitPrimaryChanges( $fname );
332        }
333        // Run all post-commit callbacks in a separate step
334        $e = $this->executePostTransactionCallbacks();
335        // Throw any last post-commit callback error
336        if ( $e instanceof Exception ) {
337            throw $e;
338        }
339    }
340
341    final public function rollbackPrimaryChanges( $fname = __METHOD__ ) {
342        /** @noinspection PhpUnusedLocalVariableInspection */
343        $scope = ScopedCallback::newScopedIgnoreUserAbort();
344
345        $this->trxRoundStage = self::ROUND_ROLLING_BACK;
346        $this->trxRoundId = false;
347        // Actually perform the rollback on all primary DB connections and revert DBO_TRX
348        foreach ( $this->getLBsForOwner() as $lb ) {
349            $lb->rollbackPrimaryChanges( $fname );
350        }
351        // Run all post-commit callbacks in a separate step
352        $this->executePostTransactionCallbacks();
353    }
354
355    final public function flushPrimarySessions( $fname = __METHOD__ ) {
356        /** @noinspection PhpUnusedLocalVariableInspection */
357        $scope = ScopedCallback::newScopedIgnoreUserAbort();
358
359        // Release named locks and table locks on all primary DB connections
360        $this->trxRoundStage = self::ROUND_ROLLBACK_SESSIONS;
361        foreach ( $this->getLBsForOwner() as $lb ) {
362            $lb->flushPrimarySessions( $fname );
363        }
364        $this->trxRoundStage = self::ROUND_CURSORY;
365    }
366
367    /**
368     * @return Exception|null
369     */
370    private function executePostTransactionCallbacks() {
371        $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
372        $fname = __METHOD__;
373        // Run all post-commit callbacks until new ones stop getting added
374        $e = null; // first callback exception
375        do {
376            foreach ( $this->getLBsForOwner() as $lb ) {
377                $ex = $lb->runPrimaryTransactionIdleCallbacks( $fname );
378                $e = $e ?: $ex;
379            }
380        } while ( $this->hasPrimaryChanges() );
381        // Run all listener callbacks once
382        foreach ( $this->getLBsForOwner() as $lb ) {
383            $ex = $lb->runPrimaryTransactionListenerCallbacks( $fname );
384            $e = $e ?: $ex;
385        }
386        $this->trxRoundStage = self::ROUND_CURSORY;
387
388        return $e;
389    }
390
391    public function hasTransactionRound() {
392        return ( $this->trxRoundId !== false );
393    }
394
395    public function isReadyForRoundOperations() {
396        return ( $this->trxRoundStage === self::ROUND_CURSORY );
397    }
398
399    /**
400     * Log query info if multi DB transactions are going to be committed now
401     */
402    private function logIfMultiDbTransaction() {
403        $callersByDB = [];
404        foreach ( $this->getLBsForOwner() as $lb ) {
405            $primaryName = $lb->getServerName( $lb->getWriterIndex() );
406            $callers = $lb->pendingPrimaryChangeCallers();
407            if ( $callers ) {
408                $callersByDB[$primaryName] = $callers;
409            }
410        }
411
412        if ( count( $callersByDB ) >= 2 ) {
413            $dbs = implode( ', ', array_keys( $callersByDB ) );
414            $msg = "Multi-DB transaction [{$dbs}]:\n";
415            foreach ( $callersByDB as $db => $callers ) {
416                $msg .= "$db" . implode( '; ', $callers ) . "\n";
417            }
418            $this->logger->info( $msg );
419        }
420    }
421
422    public function hasPrimaryChanges() {
423        foreach ( $this->getLBsForOwner() as $lb ) {
424            if ( $lb->hasPrimaryChanges() ) {
425                return true;
426            }
427        }
428        return false;
429    }
430
431    public function laggedReplicaUsed() {
432        foreach ( $this->getLBsForOwner() as $lb ) {
433            if ( $lb->laggedReplicaUsed() ) {
434                return true;
435            }
436        }
437        return false;
438    }
439
440    public function hasOrMadeRecentPrimaryChanges( $age = null ) {
441        foreach ( $this->getLBsForOwner() as $lb ) {
442            if ( $lb->hasOrMadeRecentPrimaryChanges( $age ) ) {
443                return true;
444            }
445        }
446        return false;
447    }
448
449    public function waitForReplication( array $opts = [] ) {
450        $opts += [
451            'timeout' => $this->replicationWaitTimeout,
452            'ifWritesSince' => null
453        ];
454
455        $lbs = [];
456        foreach ( $this->getLBsForOwner() as $lb ) {
457            $lbs[] = $lb;
458        }
459        if ( !$lbs ) {
460            return true; // nothing actually used
461        }
462
463        // Get all the primary DB positions of applicable DBs right now.
464        // This can be faster since waiting on one cluster reduces the
465        // time needed to wait on the next clusters.
466        $primaryPositions = array_fill( 0, count( $lbs ), false );
467        foreach ( $lbs as $i => $lb ) {
468            if (
469                // No writes to wait on getting replicated
470                !$lb->hasPrimaryConnection() ||
471                // No replication; avoid getPrimaryPos() permissions errors (T29975)
472                !$lb->hasStreamingReplicaServers() ||
473                // No writes since the last replication wait
474                (
475                    $opts['ifWritesSince'] &&
476                    $lb->lastPrimaryChangeTimestamp() < $opts['ifWritesSince']
477                )
478            ) {
479                continue; // no need to wait
480            }
481
482            $primaryPositions[$i] = $lb->getPrimaryPos();
483        }
484
485        // Run any listener callbacks *after* getting the DB positions. The more
486        // time spent in the callbacks, the less time is spent in waitForAll().
487        foreach ( $this->replicationWaitCallbacks as $callback ) {
488            $callback();
489        }
490
491        $failed = [];
492        foreach ( $lbs as $i => $lb ) {
493            if ( $primaryPositions[$i] ) {
494                // The RDBMS may not support getPrimaryPos()
495                if ( !$lb->waitForAll( $primaryPositions[$i], $opts['timeout'] ) ) {
496                    $failed[] = $lb->getServerName( $lb->getWriterIndex() );
497                }
498            }
499        }
500
501        return !$failed;
502    }
503
504    public function setWaitForReplicationListener( $name, callable $callback = null ) {
505        if ( $callback ) {
506            $this->replicationWaitCallbacks[$name] = $callback;
507        } else {
508            unset( $this->replicationWaitCallbacks[$name] );
509        }
510    }
511
512    public function getEmptyTransactionTicket( $fname ) {
513        if ( $this->hasPrimaryChanges() ) {
514            $this->logger->error(
515                __METHOD__ . "$fname does not have outer scope",
516                [ 'exception' => new RuntimeException() ]
517            );
518
519            return null;
520        }
521
522        return $this->ticket;
523    }
524
525    public function getPrimaryDatabase( $domain = false ): IDatabase {
526        return $this->getMappedDatabase( DB_PRIMARY, [], $domain );
527    }
528
529    public function getReplicaDatabase( $domain = false, $group = null ): IReadableDatabase {
530        if ( $group === null ) {
531            $groups = [];
532        } else {
533            $groups = [ $group ];
534        }
535        return $this->getMappedDatabase( DB_REPLICA, $groups, $domain );
536    }
537
538    /**
539     * Helper for getPrimaryDatabase and getReplicaDatabase() providing virtual
540     * domain mapping.
541     *
542     * @param int $index
543     * @param array $groups
544     * @param string|false $domain
545     * @return IDatabase
546     */
547    private function getMappedDatabase( $index, $groups, $domain ) {
548        if ( $domain !== false && in_array( $domain, $this->virtualDomains ) ) {
549            if ( isset( $this->virtualDomainsMapping[$domain] ) ) {
550                $config = $this->virtualDomainsMapping[$domain];
551                if ( isset( $config['cluster'] ) ) {
552                    return $this
553                        ->getExternalLB( $config['cluster'] )
554                        ->getConnection( $index, $groups, $config['db'] );
555                }
556                $domain = $config['db'];
557            } else {
558                // It's not configured, assume local db.
559                $domain = false;
560            }
561        }
562        return $this->getMainLB( $domain )->getConnection( $index, $groups, $domain );
563    }
564
565    final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
566        if ( $ticket !== $this->ticket ) {
567            $this->logger->error(
568                __METHOD__ . "$fname does not have outer scope ($ticket vs {$this->ticket})",
569                [ 'exception' => new RuntimeException() ]
570            );
571
572            return false;
573        }
574
575        // The transaction owner and any caller with the empty transaction ticket can commit
576        // so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError.
577        if ( $this->trxRoundId !== false && $fname !== $this->trxRoundId ) {
578            $this->logger->info( "$fname: committing on behalf of {$this->trxRoundId}" );
579            $fnameEffective = $this->trxRoundId;
580        } else {
581            $fnameEffective = $fname;
582        }
583
584        $this->commitPrimaryChanges( $fnameEffective );
585        $waitSucceeded = $this->waitForReplication( $opts );
586        // If a nested caller committed on behalf of $fname, start another empty $fname
587        // transaction, leaving the caller with the same empty transaction state as before.
588        if ( $fnameEffective !== $fname ) {
589            $this->beginPrimaryChanges( $fnameEffective );
590        }
591
592        return $waitSucceeded;
593    }
594
595    public function disableChronologyProtection() {
596        $this->chronologyProtector->setEnabled( false );
597    }
598
599    /**
600     * Get parameters to ILoadBalancer::__construct()
601     *
602     * @return array
603     */
604    final protected function baseLoadBalancerParams() {
605        if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
606            $initStage = ILoadBalancerForOwner::STAGE_POSTCOMMIT_CALLBACKS;
607        } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
608            $initStage = ILoadBalancerForOwner::STAGE_POSTROLLBACK_CALLBACKS;
609        } else {
610            $initStage = null;
611        }
612
613        return [
614            'localDomain' => $this->localDomain,
615            'readOnlyReason' => $this->readOnlyReason,
616            'srvCache' => $this->srvCache,
617            'wanCache' => $this->wanCache,
618            'profiler' => $this->profiler,
619            'trxProfiler' => $this->trxProfiler,
620            'logger' => $this->logger,
621            'errorLogger' => $this->errorLogger,
622            'deprecationLogger' => $this->deprecationLogger,
623            'statsdDataFactory' => $this->statsd,
624            'cliMode' => $this->cliMode,
625            'agent' => $this->agent,
626            'defaultGroup' => $this->defaultGroup,
627            'chronologyProtector' => $this->chronologyProtector,
628            'roundStage' => $initStage,
629            'criticalSectionProvider' => $this->csProvider
630        ];
631    }
632
633    /**
634     * @param ILoadBalancerForOwner $lb
635     */
636    protected function initLoadBalancer( ILoadBalancerForOwner $lb ) {
637        if ( $this->trxRoundId !== false ) {
638            $lb->beginPrimaryChanges( $this->trxRoundId ); // set DBO_TRX
639        }
640
641        $lb->setTableAliases( $this->tableAliases );
642        $lb->setIndexAliases( $this->indexAliases );
643        $lb->setDomainAliases( $this->domainAliases );
644    }
645
646    public function setTableAliases( array $aliases ) {
647        $this->tableAliases = $aliases;
648    }
649
650    public function setIndexAliases( array $aliases ) {
651        $this->indexAliases = $aliases;
652    }
653
654    public function setDomainAliases( array $aliases ) {
655        $this->domainAliases = $aliases;
656    }
657
658    public function getTransactionProfiler(): TransactionProfiler {
659        return $this->trxProfiler;
660    }
661
662    public function setLocalDomainPrefix( $prefix ) {
663        $this->localDomain = new DatabaseDomain(
664            $this->localDomain->getDatabase(),
665            $this->localDomain->getSchema(),
666            $prefix
667        );
668
669        foreach ( $this->getLBsForOwner() as $lb ) {
670            $lb->setLocalDomainPrefix( $prefix );
671        }
672    }
673
674    public function redefineLocalDomain( $domain ) {
675        $this->closeAll( __METHOD__ );
676
677        $this->localDomain = DatabaseDomain::newFromId( $domain );
678
679        foreach ( $this->getLBsForOwner() as $lb ) {
680            $lb->redefineLocalDomain( $this->localDomain );
681        }
682    }
683
684    public function closeAll( $fname = __METHOD__ ) {
685        /** @noinspection PhpUnusedLocalVariableInspection */
686        $scope = ScopedCallback::newScopedIgnoreUserAbort();
687
688        foreach ( $this->getLBsForOwner() as $lb ) {
689            $lb->closeAll( $fname );
690        }
691    }
692
693    public function setAgentName( $agent ) {
694        $this->agent = $agent;
695    }
696
697    public function hasStreamingReplicaServers() {
698        foreach ( $this->getLBsForOwner() as $lb ) {
699            if ( $lb->hasStreamingReplicaServers() ) {
700                return true;
701            }
702        }
703        return false;
704    }
705
706    public function setDefaultReplicationWaitTimeout( $seconds ) {
707        $old = $this->replicationWaitTimeout;
708        $this->replicationWaitTimeout = max( 1, (int)$seconds );
709
710        return $old;
711    }
712
713    /**
714     * @param string $stage
715     */
716    private function assertTransactionRoundStage( $stage ) {
717        if ( $this->trxRoundStage !== $stage ) {
718            throw new DBTransactionError(
719                null,
720                "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')"
721            );
722        }
723    }
724}