Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
58.84% |
163 / 277 |
|
26.19% |
11 / 42 |
CRAP | |
0.00% |
0 / 1 |
LBFactory | |
58.84% |
163 / 277 |
|
26.19% |
11 / 42 |
1123.78 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
configure | |
88.89% |
24 / 27 |
|
0.00% |
0 / 1 |
7.07 | |||
destroy | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
autoReconfigure | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
3.07 | |||
reconfigure | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
3 | |||
getLocalDomainID | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
shutdown | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
4 | |||
getAllLBs | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
getLBsForOwner | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
flushReplicaSnapshots | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
beginPrimaryChanges | |
71.43% |
10 / 14 |
|
0.00% |
0 / 1 |
4.37 | |||
commitPrimaryChanges | |
80.00% |
20 / 25 |
|
0.00% |
0 / 1 |
8.51 | |||
rollbackPrimaryChanges | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
flushPrimarySessions | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
executePostTransactionCallbacks | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
5 | |||
hasTransactionRound | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
isReadyForRoundOperations | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
logIfMultiDbTransaction | |
50.00% |
6 / 12 |
|
0.00% |
0 / 1 |
8.12 | |||
hasPrimaryChanges | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
3.14 | |||
laggedReplicaUsed | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
hasOrMadeRecentPrimaryChanges | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
waitForReplication | |
86.96% |
20 / 23 |
|
0.00% |
0 / 1 |
11.27 | |||
setWaitForReplicationListener | |
66.67% |
2 / 3 |
|
0.00% |
0 / 1 |
2.15 | |||
getEmptyTransactionTicket | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
getPrimaryDatabase | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getReplicaDatabase | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getLoadBalancer | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
5 | |||
getMappedDatabase | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
3.14 | |||
commitAndWaitForReplication | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
30 | |||
disableChronologyProtection | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
baseLoadBalancerParams | |
95.65% |
22 / 23 |
|
0.00% |
0 / 1 |
3 | |||
initLoadBalancer | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
setTableAliases | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setIndexAliases | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setDomainAliases | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getTransactionProfiler | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setLocalDomainPrefix | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
redefineLocalDomain | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
closeAll | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
setAgentName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
hasStreamingReplicaServers | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
setDefaultReplicationWaitTimeout | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
assertTransactionRoundStage | |
20.00% |
1 / 5 |
|
0.00% |
0 / 1 |
4.05 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | namespace Wikimedia\Rdbms; |
21 | |
22 | use Exception; |
23 | use Generator; |
24 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
25 | use Psr\Log\LoggerInterface; |
26 | use Psr\Log\NullLogger; |
27 | use RuntimeException; |
28 | use Throwable; |
29 | use Wikimedia\ObjectCache\BagOStuff; |
30 | use Wikimedia\ObjectCache\EmptyBagOStuff; |
31 | use Wikimedia\ObjectCache\WANObjectCache; |
32 | use Wikimedia\RequestTimeout\CriticalSectionProvider; |
33 | use Wikimedia\ScopedCallback; |
34 | use Wikimedia\Stats\NullStatsdDataFactory; |
35 | |
36 | /** |
37 | * @see ILBFactory |
38 | * @ingroup Database |
39 | */ |
40 | abstract class LBFactory implements ILBFactory { |
41 | /** @var CriticalSectionProvider|null */ |
42 | private $csProvider; |
43 | /** |
44 | * @var callable|null An optional callback that returns a ScopedCallback instance, |
45 | * meant to profile the actual query execution in {@see Database::doQuery} |
46 | */ |
47 | private $profiler; |
48 | /** @var TransactionProfiler */ |
49 | private $trxProfiler; |
50 | /** @var StatsdDataFactoryInterface */ |
51 | private $statsd; |
52 | /** @var LoggerInterface */ |
53 | private $logger; |
54 | /** @var callable Error logger */ |
55 | private $errorLogger; |
56 | /** @var callable Deprecation logger */ |
57 | private $deprecationLogger; |
58 | |
59 | /** @var ChronologyProtector */ |
60 | protected $chronologyProtector; |
61 | /** @var BagOStuff */ |
62 | protected $srvCache; |
63 | /** @var WANObjectCache */ |
64 | protected $wanCache; |
65 | /** @var DatabaseDomain Local domain */ |
66 | protected $localDomain; |
67 | |
68 | /** @var bool Whether this PHP instance is for a CLI script */ |
69 | private $cliMode; |
70 | /** @var string Agent name for query profiling */ |
71 | private $agent; |
72 | |
73 | /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */ |
74 | private $tableAliases = []; |
75 | /** @var string[] Map of (index alias => index) */ |
76 | private $indexAliases = []; |
77 | /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */ |
78 | protected $domainAliases = []; |
79 | /** @var array[] Map of virtual domain to array of cluster and domain */ |
80 | protected array $virtualDomainsMapping = []; |
81 | /** @var string[] List of registered virtual domains */ |
82 | protected array $virtualDomains = []; |
83 | /** @var callable[] */ |
84 | private $replicationWaitCallbacks = []; |
85 | |
86 | /** @var int|null Ticket used to delegate transaction ownership */ |
87 | private $ticket; |
88 | /** @var string|false String if a requested DBO_TRX transaction round is active */ |
89 | private $trxRoundId = false; |
90 | /** @var string One of the ROUND_* class constants */ |
91 | private $trxRoundStage = self::ROUND_CURSORY; |
92 | /** @var int Default replication wait timeout */ |
93 | private $replicationWaitTimeout; |
94 | |
95 | /** @var string|false Reason all LBs are read-only or false if not */ |
96 | protected $readOnlyReason = false; |
97 | |
98 | /** @var string|null */ |
99 | private $defaultGroup = null; |
100 | |
101 | private const ROUND_CURSORY = 'cursory'; |
102 | private const ROUND_BEGINNING = 'within-begin'; |
103 | private const ROUND_COMMITTING = 'within-commit'; |
104 | private const ROUND_ROLLING_BACK = 'within-rollback'; |
105 | private const ROUND_COMMIT_CALLBACKS = 'within-commit-callbacks'; |
106 | private const ROUND_ROLLBACK_CALLBACKS = 'within-rollback-callbacks'; |
107 | private const ROUND_ROLLBACK_SESSIONS = 'within-rollback-session'; |
108 | |
109 | /** |
110 | * @var callable |
111 | */ |
112 | private $configCallback = null; |
113 | |
114 | public function __construct( array $conf ) { |
115 | $this->configure( $conf ); |
116 | |
117 | if ( isset( $conf['configCallback'] ) ) { |
118 | $this->configCallback = $conf['configCallback']; |
119 | } |
120 | } |
121 | |
122 | /** |
123 | * @param array $conf |
124 | * @return void |
125 | */ |
126 | protected function configure( array $conf ): void { |
127 | $this->localDomain = isset( $conf['localDomain'] ) |
128 | ? DatabaseDomain::newFromId( $conf['localDomain'] ) |
129 | : DatabaseDomain::newUnspecified(); |
130 | |
131 | if ( isset( $conf['readOnlyReason'] ) && is_string( $conf['readOnlyReason'] ) ) { |
132 | $this->readOnlyReason = $conf['readOnlyReason']; |
133 | } |
134 | |
135 | $this->chronologyProtector = $conf['chronologyProtector'] ?? new ChronologyProtector(); |
136 | $this->srvCache = $conf['srvCache'] ?? new EmptyBagOStuff(); |
137 | $this->wanCache = $conf['wanCache'] ?? WANObjectCache::newEmpty(); |
138 | |
139 | $this->logger = $conf['logger'] ?? new NullLogger(); |
140 | $this->errorLogger = $conf['errorLogger'] ?? static function ( Throwable $e ) { |
141 | trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING ); |
142 | }; |
143 | $this->deprecationLogger = $conf['deprecationLogger'] ?? static function ( $msg ) { |
144 | trigger_error( $msg, E_USER_DEPRECATED ); |
145 | }; |
146 | |
147 | $this->profiler = $conf['profiler'] ?? null; |
148 | $this->trxProfiler = $conf['trxProfiler'] ?? new TransactionProfiler(); |
149 | $this->statsd = $conf['statsdDataFactory'] ?? new NullStatsdDataFactory(); |
150 | |
151 | $this->csProvider = $conf['criticalSectionProvider'] ?? null; |
152 | |
153 | $this->cliMode = $conf['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ); |
154 | $this->agent = $conf['agent'] ?? ''; |
155 | $this->defaultGroup = $conf['defaultGroup'] ?? null; |
156 | $this->replicationWaitTimeout = $this->cliMode ? 60 : 1; |
157 | $this->virtualDomainsMapping = $conf['virtualDomainsMapping'] ?? []; |
158 | $this->virtualDomains = $conf['virtualDomains'] ?? []; |
159 | |
160 | static $nextTicket; |
161 | $this->ticket = $nextTicket = ( is_int( $nextTicket ) ? $nextTicket++ : mt_rand() ); |
162 | } |
163 | |
164 | public function destroy() { |
165 | /** @noinspection PhpUnusedLocalVariableInspection */ |
166 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
167 | |
168 | foreach ( $this->getLBsForOwner() as $lb ) { |
169 | $lb->disable( __METHOD__ ); |
170 | } |
171 | } |
172 | |
173 | /** |
174 | * Reload config using the callback passed defined $config['configCallback']. |
175 | * |
176 | * If the config returned by the callback is different from the existing config, |
177 | * this calls reconfigure() on all load balancers, which causes them to invalidate |
178 | * any existing connections and re-connect using the new configuration. |
179 | * |
180 | * Long-running processes should call this from time to time |
181 | * (but not too often, because it is somewhat expensive), |
182 | * preferably after each batch. |
183 | * Maintenance scripts can do that by calling $this->waitForReplication(), |
184 | * which calls this method. |
185 | */ |
186 | public function autoReconfigure(): void { |
187 | if ( !$this->configCallback ) { |
188 | return; |
189 | } |
190 | |
191 | $conf = ( $this->configCallback )(); |
192 | if ( $conf ) { |
193 | $this->reconfigure( $conf ); |
194 | } |
195 | } |
196 | |
197 | /** |
198 | * Reconfigure using the given config array. |
199 | * Any fields omitted from $conf will be taken from the current config. |
200 | * |
201 | * If the config changed, this calls reconfigure() on all load balancers, |
202 | * which causes them to close all existing connections. |
203 | * |
204 | * @note This invalidates the current transaction ticket. |
205 | * |
206 | * @warning This must only be called in top level code such as the execute() |
207 | * method of a maintenance script. Any database connection in use when this |
208 | * method is called will become defunct. |
209 | * |
210 | * @since 1.39 |
211 | * |
212 | * @param array $conf A configuration array, using the same structure as |
213 | * the one passed to the constructor (see also $wgLBFactoryConf). |
214 | */ |
215 | public function reconfigure( array $conf ): void { |
216 | if ( !$conf ) { |
217 | return; |
218 | } |
219 | |
220 | foreach ( $this->getLBsForOwner() as $lb ) { |
221 | $lb->reconfigure( $conf ); |
222 | } |
223 | } |
224 | |
225 | public function getLocalDomainID(): string { |
226 | return $this->localDomain->getId(); |
227 | } |
228 | |
229 | public function shutdown( |
230 | $flags = self::SHUTDOWN_NORMAL, |
231 | ?callable $workCallback = null, |
232 | &$cpIndex = null, |
233 | &$cpClientId = null |
234 | ) { |
235 | /** @noinspection PhpUnusedLocalVariableInspection */ |
236 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
237 | |
238 | if ( ( $flags & self::SHUTDOWN_NO_CHRONPROT ) != self::SHUTDOWN_NO_CHRONPROT ) { |
239 | // Remark all of the relevant DB primary positions |
240 | foreach ( $this->getLBsForOwner() as $lb ) { |
241 | if ( $lb->hasPrimaryConnection() ) { |
242 | $this->chronologyProtector->stageSessionPrimaryPos( $lb ); |
243 | } |
244 | } |
245 | // Write the positions to the persistent stash |
246 | $this->chronologyProtector->persistSessionReplicationPositions( $cpIndex ); |
247 | $this->logger->debug( __METHOD__ . ': finished ChronologyProtector shutdown' ); |
248 | } |
249 | $cpClientId = $this->chronologyProtector->getClientId(); |
250 | |
251 | $this->commitPrimaryChanges( __METHOD__ ); |
252 | |
253 | $this->logger->debug( 'LBFactory shutdown completed' ); |
254 | } |
255 | |
256 | public function getAllLBs() { |
257 | foreach ( $this->getLBsForOwner() as $lb ) { |
258 | yield $lb; |
259 | } |
260 | } |
261 | |
262 | /** |
263 | * Get all tracked load balancers with the internal "for owner" interface. |
264 | * |
265 | * @return Generator|ILoadBalancerForOwner[] |
266 | */ |
267 | abstract protected function getLBsForOwner(); |
268 | |
269 | public function flushReplicaSnapshots( $fname = __METHOD__ ) { |
270 | if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) { |
271 | $this->logger->warning( |
272 | "$fname: transaction round '{$this->trxRoundId}' still running", |
273 | [ 'exception' => new RuntimeException() ] |
274 | ); |
275 | } |
276 | foreach ( $this->getLBsForOwner() as $lb ) { |
277 | $lb->flushReplicaSnapshots( $fname ); |
278 | } |
279 | } |
280 | |
281 | final public function beginPrimaryChanges( $fname = __METHOD__ ) { |
282 | $this->assertTransactionRoundStage( self::ROUND_CURSORY ); |
283 | /** @noinspection PhpUnusedLocalVariableInspection */ |
284 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
285 | |
286 | foreach ( $this->getLBsForOwner() as $lb ) { |
287 | $lb->flushReplicaSnapshots( $fname ); |
288 | } |
289 | |
290 | $this->trxRoundStage = self::ROUND_BEGINNING; |
291 | if ( $this->trxRoundId !== false ) { |
292 | throw new DBTransactionError( |
293 | null, |
294 | "$fname: transaction round '{$this->trxRoundId}' already started" |
295 | ); |
296 | } |
297 | $this->trxRoundId = $fname; |
298 | // Flush snapshots and appropriately set DBO_TRX on primary connections |
299 | foreach ( $this->getLBsForOwner() as $lb ) { |
300 | $lb->beginPrimaryChanges( $fname ); |
301 | } |
302 | $this->trxRoundStage = self::ROUND_CURSORY; |
303 | } |
304 | |
305 | final public function commitPrimaryChanges( $fname = __METHOD__, int $maxWriteDuration = 0 ) { |
306 | $this->assertTransactionRoundStage( self::ROUND_CURSORY ); |
307 | /** @noinspection PhpUnusedLocalVariableInspection */ |
308 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
309 | |
310 | $this->trxRoundStage = self::ROUND_COMMITTING; |
311 | if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) { |
312 | throw new DBTransactionError( |
313 | null, |
314 | "$fname: transaction round '{$this->trxRoundId}' still running" |
315 | ); |
316 | } |
317 | // Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure |
318 | do { |
319 | $count = 0; // number of callbacks executed this iteration |
320 | foreach ( $this->getLBsForOwner() as $lb ) { |
321 | $count += $lb->finalizePrimaryChanges( $fname ); |
322 | } |
323 | } while ( $count > 0 ); |
324 | $this->trxRoundId = false; |
325 | // Perform pre-commit checks, aborting on failure |
326 | foreach ( $this->getLBsForOwner() as $lb ) { |
327 | $lb->approvePrimaryChanges( $maxWriteDuration, $fname ); |
328 | } |
329 | // Log the DBs and methods involved in multi-DB transactions |
330 | $this->logIfMultiDbTransaction(); |
331 | // Actually perform the commit on all primary DB connections and revert DBO_TRX |
332 | foreach ( $this->getLBsForOwner() as $lb ) { |
333 | $lb->commitPrimaryChanges( $fname ); |
334 | } |
335 | // Run all post-commit callbacks in a separate step |
336 | $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS; |
337 | $e = $this->executePostTransactionCallbacks(); |
338 | $this->trxRoundStage = self::ROUND_CURSORY; |
339 | // Throw any last post-commit callback error |
340 | if ( $e instanceof Exception ) { |
341 | throw $e; |
342 | } |
343 | |
344 | foreach ( $this->getLBsForOwner() as $lb ) { |
345 | $lb->flushReplicaSnapshots( $fname ); |
346 | } |
347 | } |
348 | |
349 | final public function rollbackPrimaryChanges( $fname = __METHOD__ ) { |
350 | /** @noinspection PhpUnusedLocalVariableInspection */ |
351 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
352 | |
353 | $this->trxRoundStage = self::ROUND_ROLLING_BACK; |
354 | $this->trxRoundId = false; |
355 | // Actually perform the rollback on all primary DB connections and revert DBO_TRX |
356 | foreach ( $this->getLBsForOwner() as $lb ) { |
357 | $lb->rollbackPrimaryChanges( $fname ); |
358 | } |
359 | // Run all post-commit callbacks in a separate step |
360 | $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS; |
361 | $this->executePostTransactionCallbacks(); |
362 | $this->trxRoundStage = self::ROUND_CURSORY; |
363 | |
364 | foreach ( $this->getLBsForOwner() as $lb ) { |
365 | $lb->flushReplicaSnapshots( $fname ); |
366 | } |
367 | } |
368 | |
369 | final public function flushPrimarySessions( $fname = __METHOD__ ) { |
370 | /** @noinspection PhpUnusedLocalVariableInspection */ |
371 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
372 | |
373 | // Release named locks and table locks on all primary DB connections |
374 | $this->trxRoundStage = self::ROUND_ROLLBACK_SESSIONS; |
375 | foreach ( $this->getLBsForOwner() as $lb ) { |
376 | $lb->flushPrimarySessions( $fname ); |
377 | } |
378 | $this->trxRoundStage = self::ROUND_CURSORY; |
379 | } |
380 | |
381 | /** |
382 | * @return Exception|null |
383 | */ |
384 | private function executePostTransactionCallbacks() { |
385 | $fname = __METHOD__; |
386 | // Run all post-commit callbacks until new ones stop getting added |
387 | $e = null; // first callback exception |
388 | do { |
389 | foreach ( $this->getLBsForOwner() as $lb ) { |
390 | $ex = $lb->runPrimaryTransactionIdleCallbacks( $fname ); |
391 | $e = $e ?: $ex; |
392 | } |
393 | } while ( $this->hasPrimaryChanges() ); |
394 | // Run all listener callbacks once |
395 | foreach ( $this->getLBsForOwner() as $lb ) { |
396 | $ex = $lb->runPrimaryTransactionListenerCallbacks( $fname ); |
397 | $e = $e ?: $ex; |
398 | } |
399 | |
400 | return $e; |
401 | } |
402 | |
403 | public function hasTransactionRound() { |
404 | return ( $this->trxRoundId !== false ); |
405 | } |
406 | |
407 | public function isReadyForRoundOperations() { |
408 | return ( $this->trxRoundStage === self::ROUND_CURSORY ); |
409 | } |
410 | |
411 | /** |
412 | * Log query info if multi DB transactions are going to be committed now |
413 | */ |
414 | private function logIfMultiDbTransaction() { |
415 | $callersByDB = []; |
416 | foreach ( $this->getLBsForOwner() as $lb ) { |
417 | $primaryName = $lb->getServerName( ServerInfo::WRITER_INDEX ); |
418 | $callers = $lb->pendingPrimaryChangeCallers(); |
419 | if ( $callers ) { |
420 | $callersByDB[$primaryName] = $callers; |
421 | } |
422 | } |
423 | |
424 | if ( count( $callersByDB ) >= 2 ) { |
425 | $dbs = implode( ', ', array_keys( $callersByDB ) ); |
426 | $msg = "Multi-DB transaction [{$dbs}]:\n"; |
427 | foreach ( $callersByDB as $db => $callers ) { |
428 | $msg .= "$db: " . implode( '; ', $callers ) . "\n"; |
429 | } |
430 | $this->logger->info( $msg ); |
431 | } |
432 | } |
433 | |
434 | public function hasPrimaryChanges() { |
435 | foreach ( $this->getLBsForOwner() as $lb ) { |
436 | if ( $lb->hasPrimaryChanges() ) { |
437 | return true; |
438 | } |
439 | } |
440 | return false; |
441 | } |
442 | |
443 | public function laggedReplicaUsed() { |
444 | foreach ( $this->getLBsForOwner() as $lb ) { |
445 | if ( $lb->laggedReplicaUsed() ) { |
446 | return true; |
447 | } |
448 | } |
449 | return false; |
450 | } |
451 | |
452 | public function hasOrMadeRecentPrimaryChanges( $age = null ) { |
453 | foreach ( $this->getLBsForOwner() as $lb ) { |
454 | if ( $lb->hasOrMadeRecentPrimaryChanges( $age ) ) { |
455 | return true; |
456 | } |
457 | } |
458 | return false; |
459 | } |
460 | |
461 | public function waitForReplication( array $opts = [] ) { |
462 | $opts += [ |
463 | 'timeout' => $this->replicationWaitTimeout, |
464 | 'ifWritesSince' => null |
465 | ]; |
466 | |
467 | $lbs = []; |
468 | foreach ( $this->getLBsForOwner() as $lb ) { |
469 | $lbs[] = $lb; |
470 | } |
471 | |
472 | // Get all the primary DB positions of applicable DBs right now. |
473 | // This can be faster since waiting on one cluster reduces the |
474 | // time needed to wait on the next clusters. |
475 | $primaryPositions = array_fill( 0, count( $lbs ), false ); |
476 | foreach ( $lbs as $i => $lb ) { |
477 | if ( |
478 | // No writes to wait on getting replicated |
479 | !$lb->hasPrimaryConnection() || |
480 | // No replication; avoid getPrimaryPos() permissions errors (T29975) |
481 | !$lb->hasStreamingReplicaServers() || |
482 | // No writes since the last replication wait |
483 | ( |
484 | $opts['ifWritesSince'] && |
485 | $lb->lastPrimaryChangeTimestamp() < $opts['ifWritesSince'] |
486 | ) |
487 | ) { |
488 | continue; // no need to wait |
489 | } |
490 | |
491 | $primaryPositions[$i] = $lb->getPrimaryPos(); |
492 | } |
493 | |
494 | // Run any listener callbacks *after* getting the DB positions. The more |
495 | // time spent in the callbacks, the less time is spent in waitForAll(). |
496 | foreach ( $this->replicationWaitCallbacks as $callback ) { |
497 | $callback(); |
498 | } |
499 | |
500 | $failed = []; |
501 | foreach ( $lbs as $i => $lb ) { |
502 | if ( $primaryPositions[$i] ) { |
503 | // The RDBMS may not support getPrimaryPos() |
504 | if ( !$lb->waitForAll( $primaryPositions[$i], $opts['timeout'] ) ) { |
505 | $failed[] = $lb->getServerName( ServerInfo::WRITER_INDEX ); |
506 | } |
507 | } |
508 | } |
509 | |
510 | return !$failed; |
511 | } |
512 | |
513 | public function setWaitForReplicationListener( $name, ?callable $callback = null ) { |
514 | if ( $callback ) { |
515 | $this->replicationWaitCallbacks[$name] = $callback; |
516 | } else { |
517 | unset( $this->replicationWaitCallbacks[$name] ); |
518 | } |
519 | } |
520 | |
521 | public function getEmptyTransactionTicket( $fname ) { |
522 | if ( $this->hasPrimaryChanges() ) { |
523 | $this->logger->error( |
524 | __METHOD__ . ": $fname does not have outer scope", |
525 | [ 'exception' => new RuntimeException() ] |
526 | ); |
527 | |
528 | return null; |
529 | } |
530 | |
531 | return $this->ticket; |
532 | } |
533 | |
534 | public function getPrimaryDatabase( $domain = false ): IDatabase { |
535 | return $this->getMappedDatabase( DB_PRIMARY, [], $domain ); |
536 | } |
537 | |
538 | public function getReplicaDatabase( $domain = false, $group = null ): IReadableDatabase { |
539 | if ( $group === null ) { |
540 | $groups = []; |
541 | } else { |
542 | $groups = [ $group ]; |
543 | } |
544 | return $this->getMappedDatabase( DB_REPLICA, $groups, $domain ); |
545 | } |
546 | |
547 | public function getLoadBalancer( $domain = false ): ILoadBalancer { |
548 | if ( $domain !== false && in_array( $domain, $this->virtualDomains ) ) { |
549 | if ( isset( $this->virtualDomainsMapping[$domain] ) ) { |
550 | $config = $this->virtualDomainsMapping[$domain]; |
551 | if ( isset( $config['cluster'] ) ) { |
552 | return $this->getExternalLB( $config['cluster'] ); |
553 | } |
554 | $domain = $config['db']; |
555 | } else { |
556 | // It's not configured, assume local db. |
557 | $domain = false; |
558 | } |
559 | } |
560 | return $this->getMainLB( $domain ); |
561 | } |
562 | |
563 | /** |
564 | * Helper for getPrimaryDatabase and getReplicaDatabase() providing virtual |
565 | * domain mapping. |
566 | * |
567 | * @param int $index |
568 | * @param array $groups |
569 | * @param string|false $domain |
570 | * @return IDatabase |
571 | */ |
572 | private function getMappedDatabase( $index, $groups, $domain ) { |
573 | if ( $domain !== false && in_array( $domain, $this->virtualDomains ) ) { |
574 | $dbDomain = $this->virtualDomainsMapping[$domain]['db'] ?? false; |
575 | } else { |
576 | $dbDomain = $domain; |
577 | } |
578 | return $this->getLoadBalancer( $domain )->getConnection( $index, $groups, $dbDomain ); |
579 | } |
580 | |
581 | final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) { |
582 | if ( $ticket !== $this->ticket ) { |
583 | $this->logger->error( |
584 | __METHOD__ . ": $fname does not have outer scope ($ticket vs {$this->ticket})", |
585 | [ 'exception' => new RuntimeException() ] |
586 | ); |
587 | |
588 | return false; |
589 | } |
590 | |
591 | // The transaction owner and any caller with the empty transaction ticket can commit |
592 | // so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError. |
593 | if ( $this->trxRoundId !== false && $fname !== $this->trxRoundId ) { |
594 | $this->logger->info( "$fname: committing on behalf of {$this->trxRoundId}" ); |
595 | $fnameEffective = $this->trxRoundId; |
596 | } else { |
597 | $fnameEffective = $fname; |
598 | } |
599 | |
600 | $this->commitPrimaryChanges( $fnameEffective ); |
601 | $waitSucceeded = $this->waitForReplication( $opts ); |
602 | // If a nested caller committed on behalf of $fname, start another empty $fname |
603 | // transaction, leaving the caller with the same empty transaction state as before. |
604 | if ( $fnameEffective !== $fname ) { |
605 | $this->beginPrimaryChanges( $fnameEffective ); |
606 | } |
607 | |
608 | return $waitSucceeded; |
609 | } |
610 | |
611 | public function disableChronologyProtection() { |
612 | $this->chronologyProtector->setEnabled( false ); |
613 | } |
614 | |
615 | /** |
616 | * Get parameters to ILoadBalancer::__construct() |
617 | * |
618 | * @return array |
619 | */ |
620 | final protected function baseLoadBalancerParams() { |
621 | if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { |
622 | $initStage = ILoadBalancerForOwner::STAGE_POSTCOMMIT_CALLBACKS; |
623 | } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { |
624 | $initStage = ILoadBalancerForOwner::STAGE_POSTROLLBACK_CALLBACKS; |
625 | } else { |
626 | $initStage = null; |
627 | } |
628 | |
629 | return [ |
630 | 'localDomain' => $this->localDomain, |
631 | 'readOnlyReason' => $this->readOnlyReason, |
632 | 'srvCache' => $this->srvCache, |
633 | 'wanCache' => $this->wanCache, |
634 | 'profiler' => $this->profiler, |
635 | 'trxProfiler' => $this->trxProfiler, |
636 | 'logger' => $this->logger, |
637 | 'errorLogger' => $this->errorLogger, |
638 | 'deprecationLogger' => $this->deprecationLogger, |
639 | 'statsdDataFactory' => $this->statsd, |
640 | 'cliMode' => $this->cliMode, |
641 | 'agent' => $this->agent, |
642 | 'defaultGroup' => $this->defaultGroup, |
643 | 'chronologyProtector' => $this->chronologyProtector, |
644 | 'roundStage' => $initStage, |
645 | 'criticalSectionProvider' => $this->csProvider |
646 | ]; |
647 | } |
648 | |
649 | /** |
650 | * @param ILoadBalancerForOwner $lb |
651 | */ |
652 | protected function initLoadBalancer( ILoadBalancerForOwner $lb ) { |
653 | if ( $this->trxRoundId !== false ) { |
654 | $lb->beginPrimaryChanges( $this->trxRoundId ); // set DBO_TRX |
655 | } |
656 | |
657 | $lb->setTableAliases( $this->tableAliases ); |
658 | $lb->setIndexAliases( $this->indexAliases ); |
659 | $lb->setDomainAliases( $this->domainAliases ); |
660 | } |
661 | |
662 | public function setTableAliases( array $aliases ) { |
663 | $this->tableAliases = $aliases; |
664 | } |
665 | |
666 | public function setIndexAliases( array $aliases ) { |
667 | $this->indexAliases = $aliases; |
668 | } |
669 | |
670 | public function setDomainAliases( array $aliases ) { |
671 | $this->domainAliases = $aliases; |
672 | } |
673 | |
674 | public function getTransactionProfiler(): TransactionProfiler { |
675 | return $this->trxProfiler; |
676 | } |
677 | |
678 | public function setLocalDomainPrefix( $prefix ) { |
679 | $this->localDomain = new DatabaseDomain( |
680 | $this->localDomain->getDatabase(), |
681 | $this->localDomain->getSchema(), |
682 | $prefix |
683 | ); |
684 | |
685 | foreach ( $this->getLBsForOwner() as $lb ) { |
686 | $lb->setLocalDomainPrefix( $prefix ); |
687 | } |
688 | } |
689 | |
690 | public function redefineLocalDomain( $domain ) { |
691 | $this->closeAll( __METHOD__ ); |
692 | |
693 | $this->localDomain = DatabaseDomain::newFromId( $domain ); |
694 | |
695 | foreach ( $this->getLBsForOwner() as $lb ) { |
696 | $lb->redefineLocalDomain( $this->localDomain ); |
697 | } |
698 | } |
699 | |
700 | public function closeAll( $fname = __METHOD__ ) { |
701 | /** @noinspection PhpUnusedLocalVariableInspection */ |
702 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
703 | |
704 | foreach ( $this->getLBsForOwner() as $lb ) { |
705 | $lb->closeAll( $fname ); |
706 | } |
707 | } |
708 | |
709 | public function setAgentName( $agent ) { |
710 | $this->agent = $agent; |
711 | } |
712 | |
713 | public function hasStreamingReplicaServers() { |
714 | foreach ( $this->getLBsForOwner() as $lb ) { |
715 | if ( $lb->hasStreamingReplicaServers() ) { |
716 | return true; |
717 | } |
718 | } |
719 | return false; |
720 | } |
721 | |
722 | public function setDefaultReplicationWaitTimeout( $seconds ) { |
723 | $old = $this->replicationWaitTimeout; |
724 | $this->replicationWaitTimeout = max( 1, (int)$seconds ); |
725 | |
726 | return $old; |
727 | } |
728 | |
729 | /** |
730 | * @param string $stage |
731 | */ |
732 | private function assertTransactionRoundStage( $stage ) { |
733 | if ( $this->trxRoundStage !== $stage ) { |
734 | throw new DBTransactionError( |
735 | null, |
736 | "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')" |
737 | ); |
738 | } |
739 | } |
740 | } |