MediaWiki REL1_37
LoadBalancer.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use ArrayUtils;
25use BagOStuff;
27use InvalidArgumentException;
28use LogicException;
29use Psr\Log\LoggerInterface;
30use Psr\Log\NullLogger;
31use RuntimeException;
32use Throwable;
33use UnexpectedValueException;
35use Wikimedia\RequestTimeout\CriticalSectionProvider;
36use Wikimedia\ScopedCallback;
37
43class LoadBalancer implements ILoadBalancer {
45 private $loadMonitor;
47 private $csProvider;
51 private $srvCache;
53 private $wanCache;
58 private $profiler;
60 private $trxProfiler;
62 private $connLogger;
64 private $queryLogger;
66 private $replLogger;
68 private $perfLogger;
70 private $errorLogger;
73
75 private $localDomain;
76
78 private $conns;
79
81 private $clusterName;
83 private $servers;
85 private $groupLoads;
89 private $waitTimeout;
93 private $maxLag;
96
98 private $cliMode;
100 private $agent;
101
103 private $tableAliases = [];
105 private $indexAliases = [];
107 private $domainAliases = [];
112
114 private $trxRoundId = false;
120 private $readIndexByGroup = [];
122 private $waitForPos;
124 private $allowLagged = false;
126 private $laggedReplicaMode = false;
128 private $lastError = 'Unknown error';
130 private $readOnlyReason = false;
134 private $disabled = false;
137
139 private $id;
141 private $ownerId;
142
145
146 private const INFO_SERVER_INDEX = 'serverIndex';
147 private const INFO_AUTOCOMMIT_ONLY = 'autoCommitOnly';
148 private const INFO_FORIEGN = 'foreign';
149 private const INFO_FOREIGN_REF_COUNT = 'foreignPoolRefCount';
150
155 public const MAX_LAG_DEFAULT = 6;
156
158 private const CONN_HELD_WARN_THRESHOLD = 10;
159
161 private const MAX_WAIT_DEFAULT = 10;
163 private const TTL_CACHE_READONLY = 5;
164
165 private const KEY_LOCAL = 'local';
166 private const KEY_FOREIGN_FREE = 'foreignFree';
167 private const KEY_FOREIGN_INUSE = 'foreignInUse';
168
169 private const KEY_LOCAL_NOROUND = 'localAutoCommit';
170 private const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
171 private const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
172
173 private const KEY_LOCAL_DOMAIN = '__local__';
174
176 private const ROUND_CURSORY = 'cursory';
178 private const ROUND_FINALIZED = 'finalized';
180 private const ROUND_APPROVED = 'approved';
182 private const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
184 private const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
186 private const ROUND_ERROR = 'error';
187
189 private const READER_INDEX_NONE = -1;
190
191 public function __construct( array $params ) {
192 if ( !isset( $params['servers'] ) || !count( $params['servers'] ) ) {
193 throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
194 }
195
196 $localDomain = isset( $params['localDomain'] )
197 ? DatabaseDomain::newFromId( $params['localDomain'] )
200
201 $this->maxLag = $params['maxLag'] ?? self::MAX_LAG_DEFAULT;
202
203 $listKey = -1;
204 $this->servers = [];
205 $this->groupLoads = [ self::GROUP_GENERIC => [] ];
206 foreach ( $params['servers'] as $i => $server ) {
207 if ( ++$listKey !== $i ) {
208 throw new UnexpectedValueException( 'List expected for "servers" parameter' );
209 }
210 $this->servers[$i] = $server;
211 foreach ( ( $server['groupLoads'] ?? [] ) as $group => $ratio ) {
212 $this->groupLoads[$group][$i] = $ratio;
213 }
214 $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
215 $this->maxLagByIndex[$i] = $server['max lag'] ?? $this->maxLag;
216 }
217
218 $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
219
220 $this->conns = self::newTrackedConnectionsArray();
221
222 if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
223 $this->readOnlyReason = $params['readOnlyReason'];
224 }
225
226 $this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
227 $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
228
229 $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
230 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
231 $this->errorLogger = $params['errorLogger'] ?? static function ( Throwable $e ) {
232 trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
233 };
234 $this->deprecationLogger = $params['deprecationLogger'] ?? static function ( $msg ) {
235 trigger_error( $msg, E_USER_DEPRECATED );
236 };
237 $this->replLogger = $params['replLogger'] ?? new NullLogger();
238 $this->connLogger = $params['connLogger'] ?? new NullLogger();
239 $this->queryLogger = $params['queryLogger'] ?? new NullLogger();
240 $this->perfLogger = $params['perfLogger'] ?? new NullLogger();
241
242 $this->clusterName = $params['clusterName'] ?? null;
243 $this->profiler = $params['profiler'] ?? null;
244 $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
245
246 $this->csProvider = $params['criticalSectionProvider'] ?? null;
247
248 $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
249 $this->agent = $params['agent'] ?? '';
250
251 if ( isset( $params['chronologyCallback'] ) ) {
252 $this->chronologyCallback = $params['chronologyCallback'];
253 }
254
255 if ( isset( $params['roundStage'] ) ) {
256 if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
257 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
258 } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
259 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
260 }
261 }
262
263 $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
264 $this->defaultGroup = isset( $this->groupLoads[$group] ) ? $group : self::GROUP_GENERIC;
265
266 static $nextId;
267 $this->id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
268 $this->ownerId = $params['ownerId'] ?? null;
269 }
270
271 private static function newTrackedConnectionsArray() {
272 return [
273 // Connection were transaction rounds may be applied
274 self::KEY_LOCAL => [],
275 self::KEY_FOREIGN_INUSE => [],
276 self::KEY_FOREIGN_FREE => [],
277 // Auto-committing counterpart connections that ignore transaction rounds
278 self::KEY_LOCAL_NOROUND => [],
279 self::KEY_FOREIGN_INUSE_NOROUND => [],
280 self::KEY_FOREIGN_FREE_NOROUND => []
281 ];
282 }
283
284 public function getClusterName() {
285 if ( $this->clusterName !== null ) {
286 $name = $this->clusterName;
287 } else {
288 // Fallback to the current primary name if not specified
289 $name = $this->getServerName( $this->getWriterIndex() );
290 }
291
292 return $name;
293 }
294
295 public function getLocalDomainID() {
296 return $this->localDomain->getId();
297 }
298
299 public function resolveDomainID( $domain ) {
300 return $this->resolveDomainInstance( $domain )->getId();
301 }
302
307 final protected function resolveDomainInstance( $domain ) {
308 if ( $domain instanceof DatabaseDomain ) {
309 return $domain; // already a domain instance
310 } elseif ( $domain === false || $domain === $this->localDomain->getId() ) {
311 return $this->localDomain;
312 } elseif ( isset( $this->domainAliases[$domain] ) ) {
313 $this->domainAliases[$domain] =
314 DatabaseDomain::newFromId( $this->domainAliases[$domain] );
315
316 return $this->domainAliases[$domain];
317 }
318
319 $cachedDomain = $this->nonLocalDomainCache[$domain] ?? null;
320 if ( $cachedDomain === null ) {
321 $cachedDomain = DatabaseDomain::newFromId( $domain );
322 $this->nonLocalDomainCache = [ $domain => $cachedDomain ];
323 }
324
325 return $cachedDomain;
326 }
327
335 private function resolveGroups( $groups, $i ) {
336 // If a specific replica server was specified, then $groups makes no sense
337 if ( $i > 0 && $groups !== [] && $groups !== false ) {
338 $list = implode( ', ', (array)$groups );
339 throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
340 }
341
342 if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
343 $resolvedGroups = [ $this->defaultGroup ]; // common case
344 } elseif ( is_string( $groups ) && isset( $this->groupLoads[$groups] ) ) {
345 $resolvedGroups = [ $groups, $this->defaultGroup ];
346 } elseif ( is_array( $groups ) ) {
347 $resolvedGroups = $groups;
348 if ( array_search( $this->defaultGroup, $resolvedGroups ) === false ) {
349 $resolvedGroups[] = $this->defaultGroup;
350 }
351 } else {
352 $resolvedGroups = [ $this->defaultGroup ];
353 }
354
355 return $resolvedGroups;
356 }
357
364 private function sanitizeConnectionFlags( $flags, $i, $domain ) {
365 // Whether an outside caller is explicitly requesting the primary database server
366 if ( $i === self::DB_PRIMARY || $i === $this->getWriterIndex() ) {
368 }
369
370 if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
371 // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
372 // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
373 // during larger transactions. This is useful for avoiding lock contention.
374
375 // Primary DB server attributes (should match those of the replica DB servers)
376 $attributes = $this->getServerAttributes( $this->getWriterIndex() );
377 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
378 // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
379 // to use separate connections would just cause self-deadlocks. Note that
380 // REPEATABLE-READ staleness is not an issue since DB-level locking means
381 // that transactions are Strict Serializable anyway.
382 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
383 $type = $this->getServerType( $this->getWriterIndex() );
384 $this->connLogger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
385 } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
386 // T202116: integration tests are active and queries should be all be using
387 // temporary clone tables (via prefix). Such tables are not visible accross
388 // different connections nor can there be REPEATABLE-READ snapshot staleness,
389 // so use the same connection for everything.
390 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
391 }
392 }
393
394 return $flags;
395 }
396
402 private function enforceConnectionFlags( IDatabase $conn, $flags ) {
403 if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
404 if ( $conn->trxLevel() ) { // sanity
405 throw new DBUnexpectedError(
406 $conn,
407 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
408 );
409 }
410
411 $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
412 }
413 }
414
420 private function getLoadMonitor() {
421 if ( !isset( $this->loadMonitor ) ) {
422 $compat = [
423 'LoadMonitor' => LoadMonitor::class,
424 'LoadMonitorNull' => LoadMonitorNull::class,
425 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
426 ];
427
428 $class = $this->loadMonitorConfig['class'];
429 if ( isset( $compat[$class] ) ) {
430 $class = $compat[$class];
431 }
432
433 $this->loadMonitor = new $class(
434 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
435 $this->loadMonitor->setLogger( $this->replLogger );
436 }
437
438 return $this->loadMonitor;
439 }
440
447 private function getRandomNonLagged( array $loads, string $domain, $maxLag = INF ) {
448 $lags = $this->getLagTimes( $domain );
449
450 # Unset excessively lagged servers
451 foreach ( $lags as $i => $lag ) {
452 if ( $i !== $this->getWriterIndex() ) {
453 # How much lag this server nominally is allowed to have
454 $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
455 # Constrain that futher by $maxLag argument
456 $maxServerLag = min( $maxServerLag, $maxLag );
457
458 $srvName = $this->getServerName( $i );
459 if ( $lag === false && !is_infinite( $maxServerLag ) ) {
460 $this->replLogger->debug(
461 __METHOD__ . ": server {db_server} is not replicating?",
462 [ 'db_server' => $srvName ]
463 );
464 unset( $loads[$i] );
465 } elseif ( $lag > $maxServerLag ) {
466 $this->replLogger->debug(
467 __METHOD__ .
468 ": server {db_server} has {lag} seconds of lag (>= {maxlag})",
469 [ 'db_server' => $srvName, 'lag' => $lag, 'maxlag' => $maxServerLag ]
470 );
471 unset( $loads[$i] );
472 }
473 }
474 }
475
476 # Find out if all the replica DBs with non-zero load are lagged
477 $sum = 0;
478 foreach ( $loads as $load ) {
479 $sum += $load;
480 }
481 if ( $sum == 0 ) {
482 # No appropriate DB servers except maybe the primary and some replica DBs with zero load
483 # Do NOT use the primary DB
484 # Instead, this function will return false, triggering read-only mode,
485 # and a lagged replica DB will be used instead.
486 return false;
487 }
488
489 if ( count( $loads ) == 0 ) {
490 return false;
491 }
492
493 # Return a random representative of the remainder
494 return ArrayUtils::pickRandom( $loads );
495 }
496
505 private function getConnectionIndex( $i, array $groups, $domain ) {
506 if ( $i === self::DB_PRIMARY ) {
507 $i = $this->getWriterIndex();
508 } elseif ( $i === self::DB_REPLICA ) {
509 foreach ( $groups as $group ) {
510 $groupIndex = $this->getReaderIndex( $group, $domain );
511 if ( $groupIndex !== false ) {
512 $i = $groupIndex; // group connection succeeded
513 break;
514 }
515 }
516 } elseif ( !isset( $this->servers[$i] ) ) {
517 throw new UnexpectedValueException( "Invalid server index index #$i" );
518 }
519
520 if ( $i === self::DB_REPLICA ) {
521 $this->lastError = 'Unknown error'; // set here in case of worse failure
522 $this->lastError = 'No working replica DB server: ' . $this->lastError;
523 $this->reportConnectionError();
524 }
525
526 return $i;
527 }
528
529 public function getReaderIndex( $group = false, $domain = false ) {
530 $domain = $this->resolveDomainID( $domain );
531 $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
532
533 if ( $this->getServerCount() == 1 ) {
534 // Skip the load balancing if there's only one server
535 return $this->getWriterIndex();
536 }
537
538 $index = $this->getExistingReaderIndex( $group );
539 if ( $index !== self::READER_INDEX_NONE ) {
540 // A reader index was already selected and "waitForPos" was handled
541 return $index;
542 }
543
544 // Use the server weight array for this load group
545 if ( isset( $this->groupLoads[$group] ) ) {
546 $loads = $this->groupLoads[$group];
547 } else {
548 $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
549
550 return false;
551 }
552
553 // Scale the configured load ratios according to each server's load and state
554 $this->getLoadMonitor()->scaleLoads( $loads, $domain );
555
556 // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
557 $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
558 list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
559 if ( $i === false ) {
560 // DB connection unsuccessful
561 return false;
562 }
563
564 // If data seen by queries is expected to reflect the transactions committed as of
565 // or after a given replication position then wait for the DB to apply those changes
566 if ( $this->waitForPos && $i !== $this->getWriterIndex() && !$this->doWait( $i ) ) {
567 // Data will be outdated compared to what was expected
568 $laggedReplicaMode = true;
569 }
570
571 // Cache the reader index for future DB_REPLICA handles
572 $this->setExistingReaderIndex( $group, $i );
573 // Record whether the generic reader index is in "lagged replica DB" mode
574 if ( $group === self::GROUP_GENERIC && $laggedReplicaMode ) {
575 $this->laggedReplicaMode = true;
576 $this->replLogger->debug( __METHOD__ . ": setting lagged replica mode" );
577 }
578
579 $serverName = $this->getServerName( $i );
580 $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
581
582 return $i;
583 }
584
591 protected function getExistingReaderIndex( $group ) {
592 return $this->readIndexByGroup[$group] ?? self::READER_INDEX_NONE;
593 }
594
601 private function setExistingReaderIndex( $group, $index ) {
602 if ( $index < 0 ) {
603 throw new UnexpectedValueException( "Cannot set a negative read server index" );
604 }
605 $this->readIndexByGroup[$group] = $index;
606 }
607
617 private function pickReaderIndex( array $loads, string $domain ) {
618 if ( $loads === [] ) {
619 throw new InvalidArgumentException( "Server configuration array is empty" );
620 }
621
623 $i = false;
625 $laggedReplicaMode = false;
626
627 // Quickly look through the available servers for a server that meets criteria...
628 $currentLoads = $loads;
629 while ( count( $currentLoads ) ) {
630 if ( $this->allowLagged || $laggedReplicaMode ) {
631 $i = ArrayUtils::pickRandom( $currentLoads );
632 } else {
633 $i = false;
634 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
635 $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
636 // "chronologyCallback" sets "waitForPos" for session consistency.
637 // This triggers doWait() after connect, so it's especially good to
638 // avoid lagged servers so as to avoid excessive delay in that method.
639 $ago = microtime( true ) - $this->waitForPos->asOfTime();
640 // Aim for <= 1 second of waiting (being too picky can backfire)
641 $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
642 }
643 if ( $i === false ) {
644 // Any server with less lag than it's 'max lag' param is preferable
645 $i = $this->getRandomNonLagged( $currentLoads, $domain );
646 }
647 if ( $i === false && count( $currentLoads ) ) {
648 // All replica DBs lagged. Switch to read-only mode
649 $this->replLogger->error(
650 __METHOD__ . ": all replica DBs lagged. Switch to read-only mode",
651 [ 'db_domain' => $domain ]
652 );
653 $i = ArrayUtils::pickRandom( $currentLoads );
654 $laggedReplicaMode = true;
655 }
656 }
657
658 if ( $i === false ) {
659 // pickRandom() returned false.
660 // This is permanent and means the configuration or the load monitor
661 // wants us to return false.
662 $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
663
664 return [ false, false ];
665 }
666
667 $serverName = $this->getServerName( $i );
668 $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
669
670 // Get a connection to this server without triggering complementary connections
671 // to other servers (due to things like lag or read-only checks). We want to avoid
672 // the risk of overhead and recursion here.
673 $conn = $this->getServerConnection( $i, $domain, self::CONN_SILENCE_ERRORS );
674 if ( !$conn ) {
675 $this->connLogger->warning(
676 __METHOD__ . ": failed connecting to $i/{db_domain}",
677 [ 'db_domain' => $domain ]
678 );
679 unset( $currentLoads[$i] ); // avoid this server next iteration
680 $i = false;
681 continue;
682 }
683
684 // Decrement reference counter, we are finished with this connection.
685 // It will be incremented for the caller later.
686 if ( !$this->localDomain->equals( $domain ) ) {
687 $this->reuseConnection( $conn );
688 }
689
690 // Return this server
691 break;
692 }
693
694 // If all servers were down, quit now
695 if ( $currentLoads === [] ) {
696 $this->connLogger->error(
697 __METHOD__ . ": all servers down",
698 [ 'db_domain' => $domain ]
699 );
700 }
701
702 return [ $i, $laggedReplicaMode ];
703 }
704
705 public function waitFor( $pos ) {
706 $oldPos = $this->waitForPos;
707 try {
708 $this->waitForPos = $pos;
709
710 $genericIndex = $this->getExistingReaderIndex( self::GROUP_GENERIC );
711 // If a generic reader connection was already established, then wait now.
712 // Otherwise, wait until a connection is established in getReaderIndex().
713 if ( $genericIndex > $this->getWriterIndex() && !$this->doWait( $genericIndex ) ) {
714 $this->laggedReplicaMode = true;
715 $this->replLogger->debug( __METHOD__ . ": setting lagged replica mode" );
716 }
717 } finally {
718 // Restore the older position if it was higher since this is used for lag-protection
719 $this->setWaitForPositionIfHigher( $oldPos );
720 }
721 }
722
723 public function waitForOne( $pos, $timeout = null ) {
724 $oldPos = $this->waitForPos;
725 try {
726 $this->waitForPos = $pos;
727
728 $genericIndex = $this->getExistingReaderIndex( self::GROUP_GENERIC );
729 // If a generic reader connection was already established to a replica server,
730 // then use that server. Otherwise, just pick a random replica server.
731 if ( $genericIndex > $this->getWriterIndex() ) {
732 $i = $genericIndex;
733 } else {
734 $readLoads = $this->groupLoads[self::GROUP_GENERIC];
735 unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
736 $readLoads = array_filter( $readLoads ); // with non-zero load
737 $i = ArrayUtils::pickRandom( $readLoads );
738 }
739
740 if ( $i !== false ) {
741 $ok = $this->doWait( $i, $timeout );
742 } else {
743 $ok = true; // no applicable loads
744 }
745
746 return $ok;
747 } finally {
748 // Restore the old position; this is used for throttling, not lag-protection
749 $this->waitForPos = $oldPos;
750 }
751 }
752
753 public function waitForAll( $pos, $timeout = null ) {
754 $timeout = $timeout ?: $this->waitTimeout;
755
756 $oldPos = $this->waitForPos;
757 try {
758 $this->waitForPos = $pos;
759
760 $ok = true;
761 foreach ( $this->getStreamingReplicaIndexes() as $i ) {
762 if ( $this->serverHasLoadInAnyGroup( $i ) ) {
763 $start = microtime( true );
764 $ok = $this->doWait( $i, $timeout ) && $ok;
765 $timeout -= intval( microtime( true ) - $start );
766 if ( $timeout <= 0 ) {
767 break; // timeout reached
768 }
769 }
770 }
771
772 return $ok;
773 } finally {
774 // Restore the old position; this is used for throttling, not lag-protection
775 $this->waitForPos = $oldPos;
776 }
777 }
778
783 private function serverHasLoadInAnyGroup( $i ) {
784 foreach ( $this->groupLoads as $loadsByIndex ) {
785 if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
786 return true;
787 }
788 }
789
790 return false;
791 }
792
796 private function setWaitForPositionIfHigher( $pos ) {
797 if ( !$pos ) {
798 return;
799 }
800
801 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
802 $this->waitForPos = $pos;
803 }
804 }
805
806 public function getAnyOpenConnection( $i, $flags = 0 ) {
807 $i = ( $i === self::DB_PRIMARY ) ? $this->getWriterIndex() : $i;
808 // Connection handles required to be in auto-commit mode use a separate connection
809 // pool since the main pool is effected by implicit and explicit transaction rounds
810 $autoCommitOnly = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
811
812 $conn = false;
813 foreach ( $this->conns as $type => $connsByServer ) {
814 if ( $i === self::DB_REPLICA ) {
815 // Consider all existing connections to any server
816 $applicableConnsByServer = $connsByServer;
817 } else {
818 // Consider all existing connections to a specific server
819 $applicableConnsByServer = isset( $connsByServer[$i] )
820 ? [ $i => $connsByServer[$i] ]
821 : [];
822 }
823
824 $conn = $this->pickAnyOpenConnection( $applicableConnsByServer, $autoCommitOnly );
825 if ( $conn ) {
826 $this->connLogger->debug( __METHOD__ . ": found '$type' connection to #$i." );
827 break;
828 }
829 }
830
831 if ( $conn ) {
832 $this->enforceConnectionFlags( $conn, $flags );
833 }
834
835 return $conn;
836 }
837
843 private function pickAnyOpenConnection( array $connsByServer, $autoCommitOnly ) {
844 foreach ( $connsByServer as $i => $conns ) {
845 foreach ( $conns as $conn ) {
846 if ( !$conn->isOpen() ) {
847 $this->connLogger->warning(
848 __METHOD__ .
849 ": pooled DB handle for {db_server} (#$i) has no open connection.",
850 $this->getConnLogContext( $conn )
851 );
852
853 continue; // some sort of error occurred?
854 }
855
856 if ( $autoCommitOnly ) {
857 // Only accept CONN_TRX_AUTOCOMMIT connections
858 if ( !$conn->getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
859 // Connection is aware of transaction rounds
860 continue;
861 }
862
863 if ( $conn->trxLevel() ) {
864 // Some sort of bug left a transaction open
865 $this->connLogger->warning(
866 __METHOD__ .
867 ": pooled DB handle for {db_server} (#$i) has a pending transaction.",
868 $this->getConnLogContext( $conn )
869 );
870
871 continue;
872 }
873 }
874
875 return $conn;
876 }
877 }
878
879 return false;
880 }
881
889 private function doWait( $index, $timeout = null ) {
890 $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
891
892 // Check if we already know that the DB has reached this point
893 $srvName = $this->getServerName( $index );
894 $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $srvName, 'v1' );
896 $knownReachedPos = $this->srvCache->get( $key );
897 if (
898 $knownReachedPos instanceof DBPrimaryPos &&
899 $knownReachedPos->hasReached( $this->waitForPos )
900 ) {
901 $this->replLogger->debug(
902 __METHOD__ .
903 ": replica DB {db_server} known to be caught up (pos >= $knownReachedPos).",
904 [ 'db_server' => $srvName ]
905 );
906
907 return true;
908 }
909
910 $close = false; // close the connection afterwards
912 // Check if there is an existing connection that can be used
913 $conn = $this->getAnyOpenConnection( $index, $flags );
914 if ( !$conn ) {
915 // Get a connection to this server without triggering complementary connections
916 // to other servers (due to things like lag or read-only checks). We want to avoid
917 // the risk of overhead and recursion here.
918 $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
919 if ( !$conn ) {
920 $this->replLogger->warning(
921 __METHOD__ . ': failed to connect to {db_server}',
922 [ 'db_server' => $srvName ]
923 );
924
925 return false;
926 }
927 // Avoid connection spam in waitForAll() when connections
928 // are made just for the sake of doing this lag check.
929 $close = true;
930 }
931
932 $this->replLogger->info(
933 __METHOD__ .
934 ': waiting for replica DB {db_server} to catch up...',
935 $this->getConnLogContext( $conn )
936 );
937
938 $result = $conn->primaryPosWait( $this->waitForPos, $timeout );
939
940 $ok = ( $result !== null && $result != -1 );
941 if ( $ok ) {
942 // Remember that the DB reached this point
943 $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
944 }
945
946 if ( $close ) {
947 $this->closeConnection( $conn );
948 }
949
950 return $ok;
951 }
952
953 public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
954 $domain = $this->resolveDomainID( $domain );
955 $groups = $this->resolveGroups( $groups, $i );
956 $flags = $this->sanitizeConnectionFlags( $flags, $i, $domain );
957 // If given DB_PRIMARY/DB_REPLICA, resolve it to a specific server index. Resolving
958 // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
959 // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
960 // The use of getServerConnection() instead of getConnection() avoids infinite loops.
961 $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
962 // Get an open connection to that server (might trigger a new connection)
963 $conn = $this->getServerConnection( $serverIndex, $domain, $flags );
964 // Set primary DB handles as read-only if there is high replication lag
965 if (
966 $conn &&
967 $serverIndex === $this->getWriterIndex() &&
968 $this->getLaggedReplicaMode( $domain ) &&
969 !is_string( $conn->getLBInfo( $conn::LB_READ_ONLY_REASON ) )
970 ) {
971 $genericIndex = $this->getExistingReaderIndex( self::GROUP_GENERIC );
972 $reason = ( $genericIndex !== self::READER_INDEX_NONE )
973 ? 'The database is read-only until replication lag decreases.'
974 : 'The database is read-only until replica database servers becomes reachable.';
975 $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $reason );
976 }
977
978 return $conn;
979 }
980
981 public function getServerConnection( $i, $domain, $flags = 0 ) {
982 // Number of connections made before getting the server index and handle
983 $priorConnectionsMade = $this->connectionCounter;
984 // Get an open connection to this server (might trigger a new connection)
985 $conn = $this->localDomain->equals( $domain )
986 ? $this->getLocalConnection( $i, $flags )
987 : $this->getForeignConnection( $i, $domain, $flags );
988 // Throw an error or otherwise bail out if the connection attempt failed
989 if ( !( $conn instanceof IDatabase ) ) {
990 if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
991 $this->reportConnectionError();
992 }
993
994 return false;
995 }
996
997 // Profile any new connections caused by this method
998 if ( $this->connectionCounter > $priorConnectionsMade ) {
999 $this->trxProfiler->recordConnection(
1000 $conn->getServerName(),
1001 $conn->getDBname(),
1002 self::fieldHasBit( $flags, self::CONN_INTENT_WRITABLE )
1003 );
1004 }
1005
1006 if ( !$conn->isOpen() ) {
1007 $this->errorConnection = $conn;
1008 // Connection was made but later unrecoverably lost for some reason.
1009 // Do not return a handle that will just throw exceptions on use, but
1010 // let the calling code, e.g. getReaderIndex(), try another server.
1011 // FIXME: report an error here unless CONN_SILENCE_ERRORS was set
1012 return false;
1013 }
1014
1015 // Make sure that flags like CONN_TRX_AUTOCOMMIT are respected by this handle
1016 $this->enforceConnectionFlags( $conn, $flags );
1017 // Set primary DB handles as read-only if the load balancer is configured as read-only
1018 // or the primary database server is running in server-side read-only mode. Note that
1019 // replica DB handles are always read-only via Database::assertIsWritablePrimary().
1020 // Read-only mode due to replication lag is *avoided* here to avoid recursion.
1021 if ( $i === $this->getWriterIndex() ) {
1022 if ( $this->readOnlyReason !== false ) {
1024 } elseif ( $this->isPrimaryConnectionReadOnly( $conn, $flags ) ) {
1025 $readOnlyReason = 'The primary database server is running in read-only mode.';
1026 } else {
1027 $readOnlyReason = false;
1028 }
1029 $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $readOnlyReason );
1030 }
1031
1032 return $conn;
1033 }
1034
1035 public function reuseConnection( IDatabase $conn ) {
1036 $serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX );
1037 $refCount = $conn->getLBInfo( self::INFO_FOREIGN_REF_COUNT );
1038 if ( $serverIndex === null || $refCount === null ) {
1039 return; // non-foreign connection; no domain-use tracking to update
1040 } elseif ( $conn instanceof DBConnRef ) {
1041 // DBConnRef already handles calling reuseConnection() and only passes the live
1042 // Database instance to this method. Any caller passing in a DBConnRef is broken.
1043 $this->connLogger->error(
1044 __METHOD__ . ": got DBConnRef instance",
1045 [ 'db_domain' => $conn->getDomainID(), 'exception' => new RuntimeException() ]
1046 );
1047
1048 return;
1049 }
1050
1051 if ( $this->disabled ) {
1052 return; // DBConnRef handle probably survived longer than the LoadBalancer
1053 }
1054
1055 if ( $conn->getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
1056 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
1057 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1058 } else {
1059 $connFreeKey = self::KEY_FOREIGN_FREE;
1060 $connInUseKey = self::KEY_FOREIGN_INUSE;
1061 }
1062
1063 $domain = $conn->getDomainID();
1064 $existingDomainConn = $this->conns[$connInUseKey][$serverIndex][$domain] ?? null;
1065 if ( !$existingDomainConn ) {
1066 throw new InvalidArgumentException(
1067 "Connection $serverIndex/$domain not found; it may have already been freed" );
1068 } elseif ( $existingDomainConn !== $conn ) {
1069 throw new InvalidArgumentException(
1070 "Connection $serverIndex/$domain mismatched; it may have already been freed" );
1071 }
1072
1073 $existingDomainConn->setLBInfo( self::INFO_FOREIGN_REF_COUNT, --$refCount );
1074 if ( $refCount <= 0 ) {
1075 $this->conns[$connFreeKey][$serverIndex][$domain] = $existingDomainConn;
1076 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
1077 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
1078 unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
1079 }
1080 $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
1081 } else {
1082 $this->connLogger->debug( __METHOD__ .
1083 ": reference count for $serverIndex/$domain reduced to $refCount" );
1084 }
1085 }
1086
1087 public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1088 if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
1089 throw new UnexpectedValueException(
1090 __METHOD__ . ' CONN_SILENCE_ERRORS is not supported'
1091 );
1092 }
1093
1094 $domain = $this->resolveDomainID( $domain );
1095 $role = $this->getRoleFromIndex( $i );
1096 $conn = $this->getConnection( $i, $groups, $domain, $flags );
1097
1098 return new DBConnRef( $this, $conn, $role );
1099 }
1100
1101 public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ): DBConnRef {
1102 if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
1103 throw new UnexpectedValueException(
1104 __METHOD__ . ' got CONN_SILENCE_ERRORS; connection is already deferred'
1105 );
1106 }
1107
1108 $domain = $this->resolveDomainID( $domain );
1109 $role = $this->getRoleFromIndex( $i );
1110
1111 return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
1112 }
1113
1115 $i,
1116 $groups = [],
1117 $domain = false,
1118 $flags = 0
1120 if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
1121 throw new UnexpectedValueException(
1122 __METHOD__ . ' CONN_SILENCE_ERRORS is not supported'
1123 );
1124 }
1125
1126 $domain = $this->resolveDomainID( $domain );
1127 $role = $this->getRoleFromIndex( $i );
1128 $conn = $this->getConnection( $i, $groups, $domain, $flags );
1129
1130 return new MaintainableDBConnRef( $this, $conn, $role );
1131 }
1132
1137 private function getRoleFromIndex( $i ) {
1138 return ( $i === self::DB_PRIMARY || $i === $this->getWriterIndex() )
1141 }
1142
1150 public function openConnection( $i, $domain = false, $flags = 0 ) {
1151 return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
1152 }
1153
1168 private function getLocalConnection( $i, $flags = 0 ) {
1169 $autoCommit = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
1170 // Connection handles required to be in auto-commit mode use a separate connection
1171 // pool since the main pool is effected by implicit and explicit transaction rounds
1172 $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
1173
1174 if ( isset( $this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN] ) ) {
1175 $conn = $this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN];
1176 $this->connLogger->debug( __METHOD__ . ": reused a connection for $connKey/$i" );
1177 } else {
1178 $conn = $this->reallyOpenConnection(
1179 $i,
1180 $this->localDomain,
1181 [ self::INFO_AUTOCOMMIT_ONLY => $autoCommit ]
1182 );
1183 if ( $conn->isOpen() ) {
1184 $this->connLogger->debug( __METHOD__ . ": opened new connection for $connKey/$i" );
1185 $this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN] = $conn;
1186 } else {
1187 $this->connLogger->warning( __METHOD__ . ": connection error for $connKey/$i" );
1188 $this->errorConnection = $conn;
1189 $conn = false;
1190 }
1191 }
1192
1193 // Sanity check to make sure that the right domain is selected
1194 if (
1195 $conn instanceof IDatabase &&
1196 !$this->localDomain->isCompatible( $conn->getDomainID() )
1197 ) {
1198 throw new UnexpectedValueException(
1199 "Got connection to '{$conn->getDomainID()}', " .
1200 "but expected local domain ('{$this->localDomain}')"
1201 );
1202 }
1203
1204 return $conn;
1205 }
1206
1231 private function getForeignConnection( $i, $domain, $flags = 0 ) {
1232 $domainInstance = DatabaseDomain::newFromId( $domain );
1233 $autoCommit = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
1234 // Connection handles required to be in auto-commit mode use a separate connection
1235 // pool since the main pool is effected by implicit and explicit transaction rounds
1236 if ( $autoCommit ) {
1237 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
1238 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1239 } else {
1240 $connFreeKey = self::KEY_FOREIGN_FREE;
1241 $connInUseKey = self::KEY_FOREIGN_INUSE;
1242 }
1243
1245 $conn = null;
1246
1247 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1248 // Reuse an in-use connection for the same domain
1249 $conn = $this->conns[$connInUseKey][$i][$domain];
1250 $this->connLogger->debug( __METHOD__ . ": reusing connection $connInUseKey/$i/$domain" );
1251 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1252 // Reuse a free connection for the same domain
1253 $conn = $this->conns[$connFreeKey][$i][$domain];
1254 unset( $this->conns[$connFreeKey][$i][$domain] );
1255 $this->conns[$connInUseKey][$i][$domain] = $conn;
1256 $this->connLogger->debug( __METHOD__ . ": reusing free connection $connInUseKey/$i/$domain" );
1257 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1258 // Reuse a free connection from another domain if possible
1259 foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $oldConn ) {
1260 if ( $domainInstance->getDatabase() !== null ) {
1261 // Check if changing the database will require a new connection.
1262 // In that case, leave the connection handle alone and keep looking.
1263 // This prevents connections from being closed mid-transaction and can
1264 // also avoid overhead if the same database will later be requested.
1265 if (
1266 $oldConn->databasesAreIndependent() &&
1267 $oldConn->getDBname() !== $domainInstance->getDatabase()
1268 ) {
1269 continue;
1270 }
1271 // Select the new database, schema, and prefix
1272 $conn = $oldConn;
1273 $conn->selectDomain( $domainInstance );
1274 } else {
1275 // Stay on the current database, but update the schema/prefix
1276 $conn = $oldConn;
1277 $conn->dbSchema( $domainInstance->getSchema() );
1278 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1279 }
1280 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1281 // Note that if $domain is an empty string, getDomainID() might not match it
1282 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1283 $this->connLogger->debug( __METHOD__ .
1284 ": reusing free connection from $oldDomain for $domain" );
1285 break;
1286 }
1287 }
1288
1289 if ( !$conn ) {
1290 $conn = $this->reallyOpenConnection(
1291 $i,
1292 $domainInstance,
1293 [
1294 self::INFO_AUTOCOMMIT_ONLY => $autoCommit,
1295 self::INFO_FORIEGN => true,
1296 self::INFO_FOREIGN_REF_COUNT => 0
1297 ]
1298 );
1299 if ( $conn->isOpen() ) {
1300 // Note that if $domain is an empty string, getDomainID() might not match it
1301 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1302 $this->connLogger->debug( __METHOD__ . ": opened new connection for $connInUseKey/$i/$domain" );
1303 } else {
1304 $this->connLogger->warning(
1305 __METHOD__ . ": connection error for $connInUseKey/$i/{db_domain}",
1306 [ 'db_domain' => $domain ]
1307 );
1308 $this->errorConnection = $conn;
1309 $conn = false;
1310 }
1311 }
1312
1313 if ( $conn instanceof IDatabase ) {
1314 // Sanity check to make sure that the right domain is selected
1315 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1316 throw new UnexpectedValueException(
1317 "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
1318 }
1319 // Increment reference count
1320 $refCount = $conn->getLBInfo( self::INFO_FOREIGN_REF_COUNT );
1321 $conn->setLBInfo( self::INFO_FOREIGN_REF_COUNT, $refCount + 1 );
1322 }
1323
1324 return $conn;
1325 }
1326
1327 public function getServerAttributes( $i ) {
1328 return Database::attributesFromType(
1329 $this->getServerType( $i ),
1330 $this->servers[$i]['driver'] ?? null
1331 );
1332 }
1333
1340 private function isOpen( $index ) {
1341 return (bool)$this->getAnyOpenConnection( $index );
1342 }
1343
1356 protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo ) {
1357 if ( $this->disabled ) {
1358 throw new DBAccessError();
1359 }
1360
1361 $server = $this->getServerInfoStrict( $i );
1362
1363 $conn = Database::factory(
1364 $server['type'],
1365 array_merge( $server, [
1366 // Basic replication role information
1367 'topologyRole' => $this->getTopologyRole( $i, $server ),
1368 'topologicalMaster' => $this->getPrimaryServerName(),
1369 // Use the database specified in $domain (null means "none or entrypoint DB");
1370 // fallback to the $server default if the RDBMs is an embedded library using a
1371 // file on disk since there would be nothing to access to without a DB/file name.
1372 'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE]
1373 ? ( $domain->getDatabase() ?? $server['dbname'] ?? null )
1374 : $domain->getDatabase(),
1375 // Override the $server default schema with that of $domain if specified
1376 'schema' => $domain->getSchema() ?? $server['schema'] ?? null,
1377 // Use the table prefix specified in $domain
1378 'tablePrefix' => $domain->getTablePrefix(),
1379 // Participate in transaction rounds if $server does not specify otherwise
1380 'flags' => $this->initConnFlags( $server['flags'] ?? IDatabase::DBO_DEFAULT ),
1381 // Inject the PHP execution mode and the agent string
1382 'cliMode' => $this->cliMode,
1383 'agent' => $this->agent,
1384 'ownerId' => $this->id,
1385 // Inject object and callback dependencies
1386 'lazyMasterHandle' => $this->getLazyConnectionRef(
1387 self::DB_PRIMARY,
1388 [],
1389 $domain->getId()
1390 ),
1391 'srvCache' => $this->srvCache,
1392 'connLogger' => $this->connLogger,
1393 'queryLogger' => $this->queryLogger,
1394 'replLogger' => $this->replLogger,
1395 'errorLogger' => $this->errorLogger,
1396 'deprecationLogger' => $this->deprecationLogger,
1397 'profiler' => $this->profiler,
1398 'trxProfiler' => $this->trxProfiler,
1399 'criticalSectionProvider' => $this->csProvider
1400 ] ),
1401 Database::NEW_UNCONNECTED
1402 );
1403 // Attach load balancer information to the handle
1404 $conn->setLBInfo( [ self::INFO_SERVER_INDEX => $i ] + $lbInfo );
1405 // Set alternative table/index names before any queries can be issued
1406 $conn->setTableAliases( $this->tableAliases );
1407 $conn->setIndexAliases( $this->indexAliases );
1408 // Account for any active transaction round and listeners
1409 if ( $i === $this->getWriterIndex() ) {
1410 if ( $this->trxRoundId !== false ) {
1411 $this->applyTransactionRoundFlags( $conn );
1412 }
1413 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1414 $conn->setTransactionListener( $name, $callback );
1415 }
1416 }
1417
1418 // Make the connection handle live
1419 try {
1420 $conn->initConnection();
1421 ++$this->connectionCounter;
1422 } catch ( DBConnectionError $e ) {
1423 // ignore; let the DB handle the logging
1424 }
1425
1426 // Try to maintain session consistency for clients that trigger write transactions
1427 // in a request or script and then return soon after in another request or script.
1428 // This requires cooperation with ChronologyProtector and the application wiring.
1429 if ( $conn->isOpen() ) {
1430 $this->lazyLoadReplicationPositions();
1431 }
1432
1433 // Log when many connection are made during a single request/script
1434 $count = $this->getCurrentConnectionCount();
1435 if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
1436 $this->perfLogger->warning(
1437 __METHOD__ . ": {connections}+ connections made (primary={primarydb})",
1438 $this->getConnLogContext(
1439 $conn,
1440 [
1441 'connections' => $count,
1442 'primarydb' => $this->getPrimaryServerName(),
1443 'db_domain' => $domain->getId()
1444 ]
1445 )
1446 );
1447 }
1448
1449 return $conn;
1450 }
1451
1457 private function getTopologyRole( $i, array $server ) {
1458 if ( !empty( $server['is static'] ) ) {
1459 return IDatabase::ROLE_STATIC_CLONE;
1460 }
1461
1462 return ( $i === $this->getWriterIndex() )
1463 ? IDatabase::ROLE_STREAMING_MASTER
1464 : IDatabase::ROLE_STREAMING_REPLICA;
1465 }
1466
1472 private function initConnFlags( $flags ) {
1473 if ( self::fieldHasBit( $flags, IDatabase::DBO_DEFAULT ) ) {
1474 if ( $this->cliMode ) {
1475 $flags &= ~IDatabase::DBO_TRX;
1476 } else {
1477 $flags |= IDatabase::DBO_TRX;
1478 }
1479 }
1480
1481 return $flags;
1482 }
1483
1487 private function lazyLoadReplicationPositions() {
1488 if ( !$this->chronologyCallbackTriggered && $this->chronologyCallback ) {
1489 $this->chronologyCallbackTriggered = true;
1490 ( $this->chronologyCallback )( $this ); // generally calls waitFor()
1491 $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
1492 }
1493 }
1494
1499 private function reportConnectionError() {
1500 $conn = $this->errorConnection; // the connection which caused the error
1501 $context = [
1502 'method' => __METHOD__,
1503 'last_error' => $this->lastError,
1504 ];
1505
1506 if ( $conn instanceof IDatabase ) {
1507 $srvName = $conn->getServerName();
1508 $this->connLogger->warning(
1509 __METHOD__ . ": connection error: {last_error} ({db_server})",
1510 $this->getConnLogContext( $conn, $context )
1511 );
1512 $error = $conn->lastError() ?: $this->lastError;
1513 throw new DBConnectionError( $conn, "{$error} ($srvName)" );
1514 } else {
1515 // No last connection, probably due to all servers being too busy
1516 $this->connLogger->error(
1517 __METHOD__ .
1518 ": LB failure with no last connection. Connection error: {last_error}",
1519 $context
1520 );
1521
1522 // If all servers were busy, "lastError" will contain something sensible
1523 throw new DBConnectionError( null, $this->lastError );
1524 }
1525 }
1526
1527 public function getWriterIndex() {
1528 return 0;
1529 }
1530
1538 public function haveIndex( $i ) {
1539 return array_key_exists( $i, $this->servers );
1540 }
1541
1549 public function isNonZeroLoad( $i ) {
1550 return ( isset( $this->servers[$i] ) && $this->groupLoads[self::GROUP_GENERIC][$i] > 0 );
1551 }
1552
1553 public function getServerCount() {
1554 return count( $this->servers );
1555 }
1556
1557 public function hasReplicaServers() {
1558 return ( $this->getServerCount() > 1 );
1559 }
1560
1564 private function getStreamingReplicaIndexes() {
1565 $indexes = [];
1566 foreach ( $this->servers as $i => $server ) {
1567 if ( $i !== $this->getWriterIndex() && empty( $server['is static'] ) ) {
1568 $indexes[] = $i;
1569 }
1570 }
1571
1572 return $indexes;
1573 }
1574
1575 public function hasStreamingReplicaServers() {
1576 return (bool)$this->getStreamingReplicaIndexes();
1577 }
1578
1579 public function getServerName( $i ) {
1580 $name = $this->servers[$i]['serverName'] ?? ( $this->servers[$i]['host'] ?? '' );
1581
1582 return ( $name != '' ) ? $name : 'localhost';
1583 }
1584
1585 public function getServerInfo( $i ) {
1586 return $this->servers[$i] ?? false;
1587 }
1588
1589 public function getServerType( $i ) {
1590 return $this->servers[$i]['type'] ?? 'unknown';
1591 }
1592
1593 public function getPrimaryPos() {
1594 $index = $this->getWriterIndex();
1595
1596 $conn = $this->getAnyOpenConnection( $index );
1597 if ( $conn ) {
1598 return $conn->getPrimaryPos();
1599 }
1600
1601 $conn = $this->getConnection( $index, self::CONN_SILENCE_ERRORS );
1602 if ( !$conn ) {
1603 $this->reportConnectionError();
1604 }
1605
1606 try {
1607 return $conn->getPrimaryPos();
1608 } finally {
1609 $this->closeConnection( $conn );
1610 }
1611 }
1612
1613 public function getMasterPos() {
1614 wfDeprecated( __METHOD__, '1.37' );
1615 return $this->getPrimaryPos();
1616 }
1617
1618 public function getReplicaResumePos() {
1619 // Get the position of any existing primary DB server connection
1620 $primaryConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1621 if ( $primaryConn ) {
1622 return $primaryConn->getPrimaryPos();
1623 }
1624
1625 // Get the highest position of any existing replica server connection
1626 $highestPos = false;
1627 foreach ( $this->getStreamingReplicaIndexes() as $i ) {
1628 $conn = $this->getAnyOpenConnection( $i );
1629 $pos = $conn ? $conn->getReplicaPos() : false;
1630 if ( !$pos ) {
1631 continue; // no open connection or could not get position
1632 }
1633
1634 $highestPos = $highestPos ?: $pos;
1635 if ( $pos->hasReached( $highestPos ) ) {
1636 $highestPos = $pos;
1637 }
1638 }
1639
1640 return $highestPos;
1641 }
1642
1643 public function disable( $fname = __METHOD__, $owner = null ) {
1644 $this->assertOwnership( $fname, $owner );
1645 $this->closeAll( $fname, $owner );
1646 $this->disabled = true;
1647 }
1648
1649 public function closeAll( $fname = __METHOD__, $owner = null ) {
1650 $this->assertOwnership( $fname, $owner );
1651 if ( $this->ownerId === null ) {
1653 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1654 }
1655 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
1656 $srvName = $conn->getServerName();
1657 $this->connLogger->debug( "$fname: closing connection to database '$srvName'." );
1658 $conn->close( $fname, $this->id );
1659 } );
1660
1661 $this->conns = self::newTrackedConnectionsArray();
1662 }
1663
1664 public function closeConnection( IDatabase $conn ) {
1665 if ( $conn instanceof DBConnRef ) {
1666 // Avoid calling close() but still leaving the handle in the pool
1667 throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
1668 }
1669
1670 $serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX );
1671 if ( $serverIndex === null ) {
1672 throw new RuntimeException( 'Database handle is missing server index' );
1673 }
1674
1675 $srvName = $this->getServerName( $serverIndex );
1676 $domain = $conn->getDomainID();
1677
1678 $found = false;
1679 foreach ( $this->conns as $type => $connsByServer ) {
1680 $key = array_search( $conn, $connsByServer[$serverIndex] ?? [], true );
1681 if ( $key !== false ) {
1682 $found = true;
1683 unset( $this->conns[$type][$serverIndex][$key] );
1684 }
1685 }
1686
1687 if ( !$found ) {
1688 $this->connLogger->warning(
1689 __METHOD__ .
1690 ": got orphaned connection to database $serverIndex/$domain at '$srvName'."
1691 );
1692 }
1693
1694 $this->connLogger->debug(
1695 __METHOD__ .
1696 ": closing connection to database $serverIndex/$domain at '$srvName'."
1697 );
1698
1699 $conn->close( __METHOD__ );
1700 }
1701
1702 public function commitAll( $fname = __METHOD__, $owner = null ) {
1703 $this->commitPrimaryChanges( $fname, $owner );
1704 $this->flushPrimarySnapshots( $fname, $owner );
1705 $this->flushReplicaSnapshots( $fname, $owner );
1706 }
1707
1708 public function finalizePrimaryChanges( $fname = __METHOD__, $owner = null ) {
1709 $this->assertOwnership( $fname, $owner );
1710 $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1711 if ( $this->ownerId === null ) {
1713 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1714 }
1715
1716 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1717 // Loop until callbacks stop adding callbacks on other connections
1718 $total = 0;
1719 do {
1720 $count = 0; // callbacks execution attempts
1721 $this->forEachOpenPrimaryConnection( static function ( Database $conn ) use ( &$count ) {
1722 // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1723 // Any error should cause all (peer) transactions to be rolled back together.
1724 $count += $conn->runOnTransactionPreCommitCallbacks();
1725 } );
1726 $total += $count;
1727 } while ( $count > 0 );
1728 // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1729 $this->forEachOpenPrimaryConnection( static function ( Database $conn ) {
1730 $conn->setTrxEndCallbackSuppression( true );
1731 } );
1732 $this->trxRoundStage = self::ROUND_FINALIZED;
1733
1734 return $total;
1735 }
1736
1737 public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
1738 wfDeprecated( __METHOD__, '1.37' );
1739 return $this->finalizePrimaryChanges( $fname, $owner );
1740 }
1741
1742 public function approvePrimaryChanges( array $options, $fname = __METHOD__, $owner = null ) {
1743 $this->assertOwnership( $fname, $owner );
1744 $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1745 if ( $this->ownerId === null ) {
1747 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1748 }
1749
1750 $limit = $options['maxWriteDuration'] ?? 0;
1751
1752 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1753 $this->forEachOpenPrimaryConnection( function ( IDatabase $conn ) use ( $limit ) {
1754 // If atomic sections or explicit transactions are still open, some caller must have
1755 // caught an exception but failed to properly rollback any changes. Detect that and
1756 // throw an error (causing rollback).
1757 $conn->assertNoOpenTransactions();
1758 // Assert that the time to replicate the transaction will be sane.
1759 // If this fails, then all DB transactions will be rollback back together.
1760 $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1761 if ( $limit > 0 ) {
1762 if ( $time > $limit ) {
1763 throw new DBTransactionSizeError(
1764 $conn,
1765 "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
1766 [ $time, $limit ]
1767 );
1768 } elseif ( $time > 0 ) {
1769 $this->perfLogger->debug( "Transaction spent $time second(s) in writes, " .
1770 "less than the limit of $limit" );
1771 }
1772 }
1773 // If a connection sits idle while slow queries execute on another, that connection
1774 // may end up dropped before the commit round is reached. Ping servers to detect this.
1775 if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1776 throw new DBTransactionError(
1777 $conn,
1778 "A connection to the {$conn->getDBname()} database was lost before commit"
1779 );
1780 }
1781 } );
1782 $this->trxRoundStage = self::ROUND_APPROVED;
1783 }
1784
1785 public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
1786 wfDeprecated( __METHOD__, '1.37' );
1787 $this->approvePrimaryChanges( $options, $fname, $owner );
1788 }
1789
1790 public function beginPrimaryChanges( $fname = __METHOD__, $owner = null ) {
1791 $this->assertOwnership( $fname, $owner );
1792 if ( $this->trxRoundId !== false ) {
1793 throw new DBTransactionError(
1794 null,
1795 "$fname: Transaction round '{$this->trxRoundId}' already started"
1796 );
1797 }
1798 $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1799 if ( $this->ownerId === null ) {
1801 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1802 }
1803
1804 // Clear any empty transactions (no writes/callbacks) from the implicit round
1805 $this->flushPrimarySnapshots( $fname, $owner );
1806
1807 $this->trxRoundId = $fname;
1808 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1809 // Mark applicable handles as participating in this explicit transaction round.
1810 // For each of these handles, any writes and callbacks will be tied to a single
1811 // transaction. The (peer) handles will reject begin()/commit() calls unless they
1812 // are part of an en masse commit or an en masse rollback.
1813 $this->forEachOpenPrimaryConnection( function ( Database $conn ) {
1814 $this->applyTransactionRoundFlags( $conn );
1815 } );
1816 $this->trxRoundStage = self::ROUND_CURSORY;
1817 }
1818
1819 public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
1820 wfDeprecated( __METHOD__, '1.37' );
1821 $this->beginPrimaryChanges( $fname, $owner );
1822 }
1823
1824 public function commitPrimaryChanges( $fname = __METHOD__, $owner = null ) {
1825 $this->assertOwnership( $fname, $owner );
1826 $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1827 if ( $this->ownerId === null ) {
1829 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1830 }
1831
1832 $failures = [];
1833
1834 $restore = ( $this->trxRoundId !== false );
1835 $this->trxRoundId = false;
1836 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1837 // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1838 // Note that callbacks should already be suppressed due to finalizePrimaryChanges().
1839 $this->forEachOpenPrimaryConnection(
1840 function ( IDatabase $conn ) use ( $fname, &$failures ) {
1841 try {
1842 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1843 } catch ( DBError $e ) {
1844 ( $this->errorLogger )( $e );
1845 $failures[] = "{$conn->getServerName()}: {$e->getMessage()}";
1846 }
1847 }
1848 );
1849 if ( $failures ) {
1850 throw new DBTransactionError(
1851 null,
1852 "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1853 );
1854 }
1855 if ( $restore ) {
1856 // Unmark handles as participating in this explicit transaction round
1857 $this->forEachOpenPrimaryConnection( function ( Database $conn ) {
1858 $this->undoTransactionRoundFlags( $conn );
1859 } );
1860 }
1861 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1862 }
1863
1864 public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
1865 wfDeprecated( __METHOD__, '1.37' );
1866 $this->commitPrimaryChanges( $fname, $owner );
1867 }
1868
1869 public function runPrimaryTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
1870 $this->assertOwnership( $fname, $owner );
1871 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1872 $type = IDatabase::TRIGGER_COMMIT;
1873 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1874 $type = IDatabase::TRIGGER_ROLLBACK;
1875 } else {
1876 throw new DBTransactionError(
1877 null,
1878 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1879 );
1880 }
1881 if ( $this->ownerId === null ) {
1883 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1884 }
1885
1886 $oldStage = $this->trxRoundStage;
1887 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1888
1889 // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1890 $this->forEachOpenPrimaryConnection( static function ( Database $conn ) {
1891 $conn->setTrxEndCallbackSuppression( false );
1892 } );
1893
1894 $errors = [];
1895 $fname = __METHOD__;
1896 // Loop until callbacks stop adding callbacks on other connections
1897 do {
1898 // Run any pending callbacks for each connection...
1899 $count = 0; // callback execution attempts
1900 $this->forEachOpenPrimaryConnection(
1901 static function ( Database $conn ) use ( $type, &$errors, &$count ) {
1902 if ( $conn->trxLevel() ) {
1903 return; // retry in the next iteration, after commit() is called
1904 }
1905 $count += $conn->runOnTransactionIdleCallbacks( $type, $errors );
1906 }
1907 );
1908 // Clear out any active transactions left over from callbacks...
1909 $this->forEachOpenPrimaryConnection(
1910 function ( Database $conn ) use ( &$errors, $fname ) {
1911 if ( $conn->writesPending() ) {
1912 // A callback from another handle wrote to this one and DBO_TRX is set
1913 $this->queryLogger->warning( $fname . ": found writes pending." );
1914 $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
1915 $this->queryLogger->warning(
1916 "$fname: found writes pending ($fnames).",
1917 $this->getConnLogContext(
1918 $conn,
1919 [ 'exception' => new RuntimeException() ]
1920 )
1921 );
1922 } elseif ( $conn->trxLevel() ) {
1923 // A callback from another handle read from this one and DBO_TRX is set,
1924 // which can easily happen if there is only one DB (no replicas)
1925 $this->queryLogger->debug( "$fname: found empty transaction." );
1926 }
1927 try {
1928 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1929 } catch ( DBError $ex ) {
1930 $errors[] = $ex;
1931 }
1932 }
1933 );
1934 } while ( $count > 0 );
1935
1936 $this->trxRoundStage = $oldStage;
1937
1938 return $errors ? $errors[0] : null;
1939 }
1940
1941 public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
1942 wfDeprecated( __METHOD__, '1.37' );
1943 $this->runPrimaryTransactionIdleCallbacks( $fname, $owner );
1944 }
1945
1946 public function runPrimaryTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
1947 $this->assertOwnership( $fname, $owner );
1948 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1949 $type = IDatabase::TRIGGER_COMMIT;
1950 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1951 $type = IDatabase::TRIGGER_ROLLBACK;
1952 } else {
1953 throw new DBTransactionError(
1954 null,
1955 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1956 );
1957 }
1958 if ( $this->ownerId === null ) {
1960 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1961 }
1962
1963 $errors = [];
1964 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1965 $this->forEachOpenPrimaryConnection(
1966 static function ( Database $conn ) use ( $type, &$errors ) {
1967 $conn->runTransactionListenerCallbacks( $type, $errors );
1968 }
1969 );
1970 $this->trxRoundStage = self::ROUND_CURSORY;
1971
1972 return $errors ? $errors[0] : null;
1973 }
1974
1975 public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
1976 wfDeprecated( __METHOD__, '1.37' );
1977 $this->runPrimaryTransactionListenerCallbacks( $fname, $owner );
1978 }
1979
1980 public function rollbackPrimaryChanges( $fname = __METHOD__, $owner = null ) {
1981 $this->assertOwnership( $fname, $owner );
1982 if ( $this->ownerId === null ) {
1984 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1985 }
1986
1987 $restore = ( $this->trxRoundId !== false );
1988 $this->trxRoundId = false;
1989 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1990 $this->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( $fname ) {
1991 $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1992 } );
1993 if ( $restore ) {
1994 // Unmark handles as participating in this explicit transaction round
1995 $this->forEachOpenPrimaryConnection( function ( Database $conn ) {
1996 $this->undoTransactionRoundFlags( $conn );
1997 } );
1998 }
1999 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
2000 }
2001
2002 public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
2003 wfDeprecated( __METHOD__, '1.37' );
2004 $this->rollbackPrimaryChanges( $fname, $owner );
2005 }
2006
2011 private function assertTransactionRoundStage( $stage ) {
2012 $stages = (array)$stage;
2013
2014 if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
2015 $stageList = implode(
2016 '/',
2017 array_map( static function ( $v ) {
2018 return "'$v'";
2019 }, $stages )
2020 );
2021 throw new DBTransactionError(
2022 null,
2023 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
2024 );
2025 }
2026 }
2027
2040 private function assertOwnership( $fname, $owner ) {
2041 if ( $this->ownerId !== null && $owner !== $this->ownerId && $owner !== $this->id ) {
2042 throw new DBTransactionError(
2043 null,
2044 "$fname: LoadBalancer is owned by ID '{$this->ownerId}' (got '$owner')."
2045 );
2046 }
2047 }
2048
2058 private function applyTransactionRoundFlags( Database $conn ) {
2059 if ( $conn->getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
2060 return; // transaction rounds do not apply to these connections
2061 }
2062
2063 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
2064 // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
2065 // Force DBO_TRX even in CLI mode since a commit round is expected soon.
2066 $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
2067 }
2068
2069 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
2070 $conn->setLBInfo( $conn::LB_TRX_ROUND_ID, $this->trxRoundId );
2071 }
2072 }
2073
2077 private function undoTransactionRoundFlags( Database $conn ) {
2078 if ( $conn->getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
2079 return; // transaction rounds do not apply to these connections
2080 }
2081
2082 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
2083 $conn->setLBInfo( $conn::LB_TRX_ROUND_ID, null ); // remove the round ID
2084 }
2085
2086 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
2087 $conn->restoreFlags( $conn::RESTORE_PRIOR );
2088 }
2089 }
2090
2091 public function flushReplicaSnapshots( $fname = __METHOD__, $owner = null ) {
2092 $this->assertOwnership( $fname, $owner );
2093 $this->forEachOpenReplicaConnection( static function ( IDatabase $conn ) use ( $fname ) {
2094 $conn->flushSnapshot( $fname );
2095 } );
2096 }
2097
2098 public function flushPrimarySnapshots( $fname = __METHOD__, $owner = null ) {
2099 $this->assertOwnership( $fname, $owner );
2100 $this->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( $fname ) {
2101 $conn->flushSnapshot( $fname );
2102 } );
2103 }
2104
2105 public function flushMasterSnapshots( $fname = __METHOD__, $owner = null ) {
2106 wfDeprecated( __METHOD__, '1.37' );
2107 $this->flushPrimarySnapshots( $fname, $owner );
2108 }
2109
2114 public function getTransactionRoundStage() {
2115 return $this->trxRoundStage;
2116 }
2117
2118 public function hasPrimaryConnection() {
2119 return $this->isOpen( $this->getWriterIndex() );
2120 }
2121
2122 public function hasMasterConnection() {
2123 wfDeprecated( __METHOD__, '1.37' );
2124 return $this->hasPrimaryConnection();
2125 }
2126
2127 public function hasPrimaryChanges() {
2128 $pending = false;
2129 $this->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( &$pending ) {
2130 $pending = $pending || $conn->writesOrCallbacksPending();
2131 } );
2132
2133 return $pending;
2134 }
2135
2136 public function hasMasterChanges() {
2137 wfDeprecated( __METHOD__, '1.37' );
2138 return $this->hasPrimaryChanges();
2139 }
2140
2141 public function lastPrimaryChangeTimestamp() {
2142 $lastTime = false;
2143 $this->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( &$lastTime ) {
2144 $lastTime = max( $lastTime, $conn->lastDoneWrites() );
2145 } );
2146
2147 return $lastTime;
2148 }
2149
2150 public function lastMasterChangeTimestamp() {
2151 wfDeprecated( __METHOD__, '1.37' );
2152 return $this->lastPrimaryChangeTimestamp();
2153 }
2154
2155 public function hasOrMadeRecentPrimaryChanges( $age = null ) {
2156 $age = $age ?? $this->waitTimeout;
2157
2158 return ( $this->hasPrimaryChanges()
2159 || $this->lastPrimaryChangeTimestamp() > microtime( true ) - $age );
2160 }
2161
2162 public function hasOrMadeRecentMasterChanges( $age = null ) {
2163 wfDeprecated( __METHOD__, '1.37' );
2164 return $this->hasOrMadeRecentPrimaryChanges( $age );
2165 }
2166
2168 $fnames = [];
2169 $this->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( &$fnames ) {
2170 $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
2171 } );
2172
2173 return $fnames;
2174 }
2175
2176 public function pendingMasterChangeCallers() {
2177 wfDeprecated( __METHOD__, '1.37' );
2178 return $this->pendingPrimaryChangeCallers();
2179 }
2180
2181 public function getLaggedReplicaMode( $domain = false ) {
2182 $domain = $this->resolveDomainID( $domain );
2183
2184 if ( $this->laggedReplicaMode ) {
2185 // Stay in lagged replica mode once it is observed on any domain
2186 return true;
2187 }
2188
2189 if ( $this->hasStreamingReplicaServers() ) {
2190 // This will set "laggedReplicaMode" as needed
2191 $this->getReaderIndex( self::GROUP_GENERIC, $domain );
2192 }
2193
2194 return $this->laggedReplicaMode;
2195 }
2196
2197 public function laggedReplicaUsed() {
2198 return $this->laggedReplicaMode;
2199 }
2200
2201 public function getReadOnlyReason( $domain = false ) {
2202 $domainInstance = DatabaseDomain::newFromId( $this->resolveDomainID( $domain ) );
2203
2204 if ( $this->readOnlyReason !== false ) {
2205 return $this->readOnlyReason;
2206 } elseif ( $this->isPrimaryRunningReadOnly( $domainInstance ) ) {
2207 return 'The primary database server is running in read-only mode.';
2208 } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
2209 $genericIndex = $this->getExistingReaderIndex( self::GROUP_GENERIC );
2210
2211 return ( $genericIndex !== self::READER_INDEX_NONE )
2212 ? 'The database is read-only until replication lag decreases.'
2213 : 'The database is read-only until a replica database server becomes reachable.';
2214 }
2215
2216 return false;
2217 }
2218
2225 private function isPrimaryConnectionReadOnly( IDatabase $conn, $flags = 0 ) {
2226 // Note that table prefixes are not related to server-side read-only mode
2227 $key = $this->srvCache->makeGlobalKey(
2228 'rdbms-server-readonly',
2229 $conn->getServerName(),
2230 (string)$conn->getDBname(),
2231 (string)$conn->dbSchema()
2232 );
2233
2234 if ( self::fieldHasBit( $flags, self::CONN_REFRESH_READ_ONLY ) ) {
2235 // Refresh the local server cache. This is useful when the caller is
2236 // currently in the process of updating a corresponding WANCache key.
2237 try {
2238 $readOnly = (int)$conn->serverIsReadOnly();
2239 } catch ( DBError $e ) {
2240 $readOnly = 0;
2241 }
2242 $this->srvCache->set( $key, $readOnly, BagOStuff::TTL_PROC_SHORT );
2243 } else {
2244 $readOnly = $this->srvCache->getWithSetCallback(
2245 $key,
2246 BagOStuff::TTL_PROC_SHORT,
2247 static function () use ( $conn ) {
2248 try {
2249 $readOnly = (int)$conn->serverIsReadOnly();
2250 } catch ( DBError $e ) {
2251 $readOnly = 0;
2252 }
2253
2254 return $readOnly;
2255 }
2256 );
2257 }
2258
2259 return (bool)$readOnly;
2260 }
2261
2267 private function isPrimaryRunningReadOnly( DatabaseDomain $domain ) {
2268 // Context will often be HTTP GET/HEAD; heavily cache the results
2269 return (bool)$this->wanCache->getWithSetCallback(
2270 // Note that table prefixes are not related to server-side read-only mode
2271 $this->wanCache->makeGlobalKey(
2272 'rdbms-server-readonly',
2273 $this->getPrimaryServerName(),
2274 $domain->getDatabase(),
2275 (string)$domain->getSchema()
2276 ),
2277 self::TTL_CACHE_READONLY,
2278 function () use ( $domain ) {
2279 $scope = $this->trxProfiler->silenceForScope();
2280
2281 $index = $this->getWriterIndex();
2282 // Refresh the local server cache as well. This is done in order to avoid
2283 // backfilling the WANCache with data that is already significantly stale
2284 $flags = self::CONN_SILENCE_ERRORS | self::CONN_REFRESH_READ_ONLY;
2285 $conn = $this->getServerConnection( $index, $domain->getId(), $flags );
2286 if ( $conn ) {
2287 try {
2288 $readOnly = (int)$this->isPrimaryConnectionReadOnly( $conn );
2289 } catch ( DBError $e ) {
2290 $readOnly = 0;
2291 }
2292 $this->reuseConnection( $conn );
2293 } else {
2294 $readOnly = 0;
2295 }
2296
2297 ScopedCallback::consume( $scope );
2298
2299 return $readOnly;
2300 },
2301 [
2302 'busyValue' => 0,
2303 'pcTTL' => WANObjectCache::TTL_PROC_LONG
2304 ]
2305 );
2306 }
2307
2308 public function allowLagged( $mode = null ) {
2309 if ( $mode === null ) {
2310 return $this->allowLagged;
2311 }
2312 $this->allowLagged = $mode;
2313
2314 return $this->allowLagged;
2315 }
2316
2317 public function pingAll() {
2318 $success = true;
2319 $this->forEachOpenConnection( static function ( IDatabase $conn ) use ( &$success ) {
2320 if ( !$conn->ping() ) {
2321 $success = false;
2322 }
2323 } );
2324
2325 return $success;
2326 }
2327
2328 public function forEachOpenConnection( $callback, array $params = [] ) {
2329 foreach ( $this->conns as $connsByServer ) {
2330 foreach ( $connsByServer as $serverConns ) {
2331 foreach ( $serverConns as $conn ) {
2332 $callback( $conn, ...$params );
2333 }
2334 }
2335 }
2336 }
2337
2338 public function forEachOpenPrimaryConnection( $callback, array $params = [] ) {
2339 $primaryIndex = $this->getWriterIndex();
2340 foreach ( $this->conns as $connsByServer ) {
2341 if ( isset( $connsByServer[$primaryIndex] ) ) {
2343 foreach ( $connsByServer[$primaryIndex] as $conn ) {
2344 $callback( $conn, ...$params );
2345 }
2346 }
2347 }
2348 }
2349
2350 public function forEachOpenMasterConnection( $callback, array $params = [] ) {
2351 wfDeprecated( __METHOD__, '1.37' );
2352 $this->forEachOpenPrimaryConnection( $callback, $params );
2353 }
2354
2355 public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
2356 foreach ( $this->conns as $connsByServer ) {
2357 foreach ( $connsByServer as $i => $serverConns ) {
2358 if ( $i === $this->getWriterIndex() ) {
2359 continue; // skip primary DB
2360 }
2361 foreach ( $serverConns as $conn ) {
2362 $callback( $conn, ...$params );
2363 }
2364 }
2365 }
2366 }
2367
2371 private function getCurrentConnectionCount() {
2372 $count = 0;
2373 foreach ( $this->conns as $connsByServer ) {
2374 foreach ( $connsByServer as $serverConns ) {
2375 $count += count( $serverConns );
2376 }
2377 }
2378
2379 return $count;
2380 }
2381
2382 public function getMaxLag( $domain = false ) {
2383 $domain = $this->resolveDomainID( $domain );
2384
2385 $host = '';
2386 $maxLag = -1;
2387 $maxIndex = 0;
2388
2389 if ( $this->hasReplicaServers() ) {
2390 $lagTimes = $this->getLagTimes( $domain );
2391 foreach ( $lagTimes as $i => $lag ) {
2392 if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag > $maxLag ) {
2393 $maxLag = $lag;
2394 $host = $this->getServerInfoStrict( $i, 'host' );
2395 $maxIndex = $i;
2396 }
2397 }
2398 }
2399
2400 return [ $host, $maxLag, $maxIndex ];
2401 }
2402
2403 public function getLagTimes( $domain = false ) {
2404 $domain = $this->resolveDomainID( $domain );
2405
2406 if ( !$this->hasReplicaServers() ) {
2407 return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
2408 }
2409
2410 $knownLagTimes = []; // map of (server index => 0 seconds)
2411 $indexesWithLag = [];
2412 foreach ( $this->servers as $i => $server ) {
2413 if ( empty( $server['is static'] ) ) {
2414 $indexesWithLag[] = $i; // DB server might have replication lag
2415 } else {
2416 $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
2417 }
2418 }
2419
2420 return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
2421 }
2422
2438 public function safeGetLag( IDatabase $conn ) {
2439 return $conn->getLag();
2440 }
2441
2442 public function waitForPrimaryPos( IDatabase $conn, $pos = false, $timeout = null ) {
2443 $timeout = max( 1, $timeout ?: $this->waitTimeout );
2444
2445 if ( $conn->getLBInfo( self::INFO_SERVER_INDEX ) === $this->getWriterIndex() ) {
2446 return true; // not a replica DB server
2447 }
2448
2449 if ( !$pos ) {
2450 // Get the current primary DB position, opening a connection only if needed
2451 $this->replLogger->debug( __METHOD__ . ': no position passed; using current' );
2452 $index = $this->getWriterIndex();
2453 $flags = self::CONN_SILENCE_ERRORS;
2454 $primaryConn = $this->getAnyOpenConnection( $index, $flags );
2455 if ( $primaryConn ) {
2456 $pos = $primaryConn->getPrimaryPos();
2457 } else {
2458 $primaryConn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
2459 if ( !$primaryConn ) {
2460 throw new DBReplicationWaitError(
2461 null,
2462 "Could not obtain a primary database connection to get the position"
2463 );
2464 }
2465 $pos = $primaryConn->getPrimaryPos();
2466 $this->closeConnection( $primaryConn );
2467 }
2468 }
2469
2470 if ( $pos instanceof DBPrimaryPos ) {
2471 $this->replLogger->debug( __METHOD__ . ': waiting' );
2472 $result = $conn->primaryPosWait( $pos, $timeout );
2473 $ok = ( $result !== null && $result != -1 );
2474 if ( $ok ) {
2475 $this->replLogger->debug( __METHOD__ . ': done waiting (success)' );
2476 } else {
2477 $this->replLogger->debug( __METHOD__ . ': done waiting (failure)' );
2478 }
2479 } else {
2480 $ok = false; // something is misconfigured
2481 $this->replLogger->error(
2482 __METHOD__ . ': could not get primary pos for {db_server}',
2483 $this->getConnLogContext( $conn, [ 'exception' => new RuntimeException() ] )
2484 );
2485 }
2486
2487 return $ok;
2488 }
2489
2490 public function waitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
2491 wfDeprecated( __METHOD__, '1.37' );
2492 return $this->waitForPrimaryPos( $conn, $pos, $timeout );
2493 }
2494
2495 public function setTransactionListener( $name, callable $callback = null ) {
2496 if ( $callback ) {
2497 $this->trxRecurringCallbacks[$name] = $callback;
2498 } else {
2499 unset( $this->trxRecurringCallbacks[$name] );
2500 }
2501 $this->forEachOpenPrimaryConnection(
2502 static function ( IDatabase $conn ) use ( $name, $callback ) {
2503 $conn->setTransactionListener( $name, $callback );
2504 }
2505 );
2506 }
2507
2508 public function setTableAliases( array $aliases ) {
2509 $this->tableAliases = $aliases;
2510 }
2511
2512 public function setIndexAliases( array $aliases ) {
2513 $this->indexAliases = $aliases;
2514 }
2515
2516 public function setDomainAliases( array $aliases ) {
2517 $this->domainAliases = $aliases;
2518 }
2519
2520 public function setLocalDomainPrefix( $prefix ) {
2521 // Find connections to explicit foreign domains still marked as in-use...
2522 $domainsInUse = [];
2523 $this->forEachOpenConnection( static function ( IDatabase $conn ) use ( &$domainsInUse ) {
2524 // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
2525 // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
2526 if ( $conn->getLBInfo( self::INFO_FOREIGN_REF_COUNT ) > 0 ) {
2527 $domainsInUse[] = $conn->getDomainID();
2528 }
2529 } );
2530
2531 // Do not switch connections to explicit foreign domains unless marked as safe
2532 if ( $domainsInUse ) {
2533 $domains = implode( ', ', $domainsInUse );
2534 throw new DBUnexpectedError( null,
2535 "Foreign domain connections are still in use ($domains)" );
2536 }
2537
2538 $this->setLocalDomain( new DatabaseDomain(
2539 $this->localDomain->getDatabase(),
2540 $this->localDomain->getSchema(),
2541 $prefix
2542 ) );
2543
2544 // Update the prefix for all local connections...
2545 $this->forEachOpenConnection( static function ( IDatabase $conn ) use ( $prefix ) {
2546 if ( !$conn->getLBInfo( self::INFO_FORIEGN ) ) {
2547 $conn->tablePrefix( $prefix );
2548 }
2549 } );
2550 }
2551
2552 public function redefineLocalDomain( $domain ) {
2553 $this->closeAll( __METHOD__, $this->id );
2554
2555 $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
2556 }
2557
2558 public function setTempTablesOnlyMode( $value, $domain ) {
2559 $old = $this->tempTablesOnlyMode[$domain] ?? false;
2560 if ( $value ) {
2561 $this->tempTablesOnlyMode[$domain] = true;
2562 } else {
2563 unset( $this->tempTablesOnlyMode[$domain] );
2564 }
2565
2566 return $old;
2567 }
2568
2572 private function setLocalDomain( DatabaseDomain $domain ) {
2573 $this->localDomain = $domain;
2574 }
2575
2582 private function getServerInfoStrict( $i, $field = null ) {
2583 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
2584 throw new InvalidArgumentException( "No server with index '$i'" );
2585 }
2586
2587 if ( $field !== null ) {
2588 if ( !array_key_exists( $field, $this->servers[$i] ) ) {
2589 throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
2590 }
2591
2592 return $this->servers[$i][$field];
2593 }
2594
2595 return $this->servers[$i];
2596 }
2597
2601 private function getPrimaryServerName() {
2602 return $this->getServerName( $this->getWriterIndex() );
2603 }
2604
2610 private function fieldHasBit( int $flags, int $bit ) {
2611 return ( ( $flags & $bit ) === $bit );
2612 }
2613
2621 protected function getConnLogContext( IDatabase $conn, array $extras = [] ) {
2622 return array_merge(
2623 [
2624 'db_server' => $conn->getServerName(),
2625 'db_domain' => $conn->getDomainID()
2626 ],
2627 $extras
2628 );
2629 }
2630
2631 public function __destruct() {
2632 // Avoid connection leaks for sanity
2633 $this->disable( __METHOD__, $this->ownerId );
2634 }
2635}
2636
2640class_alias( LoadBalancer::class, 'LoadBalancer' );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
if(ini_get('mbstring.func_overload')) if(!defined('MW_ENTRY_POINT'))
Pre-config setup: Before loading LocalSettings.php.
Definition Setup.php:88
A collection of static methods to play with arrays.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:86
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Exception class for attempted DB access @newable.
Helper class used for automatically marking an IDatabase connection as reusable (once it no longer ma...
Definition DBConnRef.php:29
Database error base class @newable.
Definition DBError.php:32
Exception class for replica DB wait errors @newable.
Class to handle database/schema/prefix specifications for IDatabase.
Relational database abstraction object.
Definition Database.php:52
runOnTransactionPreCommitCallbacks()
Consume and run any "on transaction pre-commit" callbacks.
setLBInfo( $nameOrArray, $value=null)
Set the entire array or a particular key of the managing load balancer info array.
Definition Database.php:671
runTransactionListenerCallbacks( $trigger, array &$errors=[])
Actually run any "transaction listener" callbacks.
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
Definition Database.php:851
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
Definition Database.php:659
trxLevel()
Gets the current transaction level.
Definition Database.php:599
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
Definition Database.php:821
runOnTransactionIdleCallbacks( $trigger, array &$errors=[])
Consume and run any "on transaction idle/resolution" callbacks.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Definition Database.php:864
Database connection, tracking, load balancing, and transaction manager for a cluster.
array[] $servers
Map of (server index => server config array)
undoTransactionRoundFlags(Database $conn)
bool $allowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
getAnyOpenConnection( $i, $flags=0)
Get an existing live handle to the given server index (on any domain)
callable $errorLogger
Exception logger.
forEachOpenPrimaryConnection( $callback, array $params=[])
Call a function with each open connection object to a primary.
int $waitTimeout
Seconds to spend waiting on replica DB lag to resolve.
callable $deprecationLogger
Deprecation logger.
setLocalDomain(DatabaseDomain $domain)
$id
var int An identifier for this class instance
getServerAttributes( $i)
Get basic attributes of the server with the specified index without connecting.
const CONN_HELD_WARN_THRESHOLD
Warn when this many connection are held.
getLocalDomainID()
Get the local (and default) database domain ID of connection handles.
hasReplicaServers()
Whether there are any replica servers configured.
getConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle reference for a server index.
string $trxRoundStage
Stage of the current transaction round in the transaction round life-cycle.
const MAX_LAG_DEFAULT
Default 'maxLag' when unspecified.
getServerType( $i)
Get the RDBMS type of the server with the specified index (e.g.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
commitPrimaryChanges( $fname=__METHOD__, $owner=null)
Issue COMMIT on all open primary connections to flush changes and view snapshots.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
flushMasterSnapshots( $fname=__METHOD__, $owner=null)
DatabaseDomain[] $nonLocalDomainCache
Map of (domain ID => domain instance)
int[] $maxLagByIndex
Map of (server index => seconds of lag considered "high")
flushReplicaSnapshots( $fname=__METHOD__, $owner=null)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
bool $chronologyCallbackTriggered
Whether the session consistency callback already executed.
applyTransactionRoundFlags(Database $conn)
Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round.
lastPrimaryChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
DatabaseDomain[] string[] $domainAliases
Map of (domain alias => DB domain)
string $agent
Agent name for query profiling.
waitFor( $pos)
Set the primary position to reach before the next generic group DB handle query.
getForeignConnection( $i, $domain, $flags=0)
Open a connection to a foreign DB, or return one if it is already open.
beginMasterChanges( $fname=__METHOD__, $owner=null)
const MAX_WAIT_DEFAULT
Default 'waitTimeout' when unspecified.
getServerConnection( $i, $domain, $flags=0)
Get a live handle for a specific server index.
rollbackPrimaryChanges( $fname=__METHOD__, $owner=null)
Issue ROLLBACK only on primary, only if queries were done on connection.
hasPrimaryChanges()
Whether there are pending changes or callbacks in a transaction by this thread.
assertOwnership( $fname, $owner)
Assure that if this instance is owned, the caller is either the owner or is internal.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
forEachOpenMasterConnection( $callback, array $params=[])
getConnLogContext(IDatabase $conn, array $extras=[])
Create a log context to pass to PSR-3 logger functions.
redefineLocalDomain( $domain)
Close all connection and redefine the local domain for testing or schema creation.
callable null $profiler
An optional callback that returns a ScopedCallback instance, meant to profile the actual query execut...
enforceConnectionFlags(IDatabase $conn, $flags)
pickReaderIndex(array $loads, string $domain)
Pick a server that is reachable, preferably non-lagged, and return its server index.
const ROUND_ROLLBACK_CALLBACKS
Transaction round was rolled back and post-rollback callbacks must be run.
const ROUND_FINALIZED
Transaction round writes are complete and ready for pre-commit checks.
setDomainAliases(array $aliases)
Convert certain database domains to alternative ones.
isPrimaryConnectionReadOnly(IDatabase $conn, $flags=0)
const ROUND_COMMIT_CALLBACKS
Transaction round was committed and post-commit callbacks must be run.
bool DBPrimaryPos $waitForPos
Replication sync position or false if not set.
IDatabase[][][] $conns
Map of (pool category => server index => domain => IDatabase)
sanitizeConnectionFlags( $flags, $i, $domain)
runMasterTransactionListenerCallbacks( $fname=__METHOD__, $owner=null)
laggedReplicaUsed()
Checks whether the database for generic connections this request was both:
int null $ownerId
Integer ID of the managing LBFactory instance or null if none.
string[] $indexAliases
Map of (index alias => index)
flushPrimarySnapshots( $fname=__METHOD__, $owner=null)
Commit all primary DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
const ROUND_ERROR
Transaction round encountered an error.
isOpen( $index)
Test if the specified index represents an open connection.
array $loadMonitorConfig
The LoadMonitor configuration.
array[] $groupLoads
Map of (group => server index => weight)
callable[] $trxRecurringCallbacks
Map of (name => callable)
commitAll( $fname=__METHOD__, $owner=null)
Commit transactions on all open connections.
string null $defaultGroup
Default query group to use with getConnection()
getConnectionIndex( $i, array $groups, $domain)
Get the server index to use for a specified server index and query group list.
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
waitForPrimaryPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified primary position.
getRandomNonLagged(array $loads, string $domain, $maxLag=INF)
int[] $readIndexByGroup
The group replica server indexes keyed by group.
getLoadMonitor()
Get a LoadMonitor instance.
isPrimaryRunningReadOnly(DatabaseDomain $domain)
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future primary connections ...
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
finalizePrimaryChanges( $fname=__METHOD__, $owner=null)
Run pre-commit callbacks and defer execution of post-commit callbacks.
setTempTablesOnlyMode( $value, $domain)
Indicate whether the tables on this domain are only temporary tables for testing.
approvePrimaryChanges(array $options, $fname=__METHOD__, $owner=null)
Perform all pre-commit checks for things like replication safety.
CriticalSectionProvider null $csProvider
reuseConnection(IDatabase $conn)
Mark a live handle as being available for reuse under a different database domain.
const ROUND_APPROVED
Transaction round passed final pre-commit checks.
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
string bool $trxRoundId
Explicit DBO_TRX transaction round active or false if none.
closeConnection(IDatabase $conn)
Close a connection.
int $connectionCounter
Total number of new connections ever made with this instance.
getExistingReaderIndex( $group)
Get the server index chosen by the load balancer for use with the given query group.
const ROUND_CURSORY
Transaction round, explicit or implicit, has not finished writing.
getLazyConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a lazy-connecting database handle reference for a server index.
closeAll( $fname=__METHOD__, $owner=null)
Close all open connections.
setLocalDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
bool $cliMode
Whether this PHP instance is for a CLI script.
getMaintenanceConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle, suitable for migrations and schema changes, for a server index.
commitMasterChanges( $fname=__METHOD__, $owner=null)
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
openConnection( $i, $domain=false, $flags=0)
runPrimaryTransactionListenerCallbacks( $fname=__METHOD__, $owner=null)
Run all recurring post-COMMIT/ROLLBACK listener callbacks.
const TTL_CACHE_READONLY
Seconds to cache primary DB server read-only status.
getServerInfoStrict( $i, $field=null)
setExistingReaderIndex( $group, $index)
Set the server index chosen by the load balancer for use with the given query group.
fieldHasBit(int $flags, int $bit)
runPrimaryTransactionIdleCallbacks( $fname=__METHOD__, $owner=null)
Consume and run all pending post-COMMIT/ROLLBACK callbacks and commit dangling transactions.
getReaderIndex( $group=false, $domain=false)
Get the specific server index of the reader connection for a given group.
getLocalConnection( $i, $flags=0)
Open a connection to a local DB, or return one if it is already open.
waitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
DatabaseDomain $localDomain
Local DB domain ID and default for selectDB() calls.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
setIndexAliases(array $aliases)
Convert certain index names to alternative names before querying the DB.
getConnection( $i, $groups=[], $domain=false, $flags=0)
Get a live handle for a specific or virtual (DB_PRIMARY/DB_REPLICA) server index.
TransactionProfiler $trxProfiler
pendingPrimaryChangeCallers()
Get the list of callers that have pending primary changes.
disable( $fname=__METHOD__, $owner=null)
Close all connections and disable this load balancer.
getServerName( $i)
Get the readable name of the server with the specified index.
approveMasterChanges(array $options, $fname=__METHOD__, $owner=null)
getLaggedReplicaMode( $domain=false)
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo)
Open a new network connection to a server (uncached)
haveIndex( $i)
Returns true if the specified index is a valid server index.
getWriterIndex()
Get the specific server index of the primary server.
array[] $tableAliases
$aliases Map of (table => (dbname, schema, prefix) map)
beginPrimaryChanges( $fname=__METHOD__, $owner=null)
Flush any primary transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
callable null $chronologyCallback
Callback to run before the first connection attempt.
getMaxLag( $domain=false)
Get the name and lag time of the most-lagged replica server.
Database $errorConnection
Connection handle that caused a problem.
string null $clusterName
The name of the DB cluster.
string $lastError
The last DB selection or connection error.
doWait( $index, $timeout=null)
Wait for a given replica DB to catch up to the primary DB pos stored in "waitForPos".
lazyLoadReplicationPositions()
Make sure that any "waitForPos" positions are loaded and available to doWait()
finalizeMasterChanges( $fname=__METHOD__, $owner=null)
getTopologyRole( $i, array $server)
getServerInfo( $i)
Return the server configuration map for the server with the specified index.
getPrimaryPos()
Get the current primary replication position.
allowLagged( $mode=null)
Disables/enables lag checks.
getServerCount()
Get the number of servers defined in configuration.
getClusterName()
Get the logical name of the database cluster.
runMasterTransactionIdleCallbacks( $fname=__METHOD__, $owner=null)
resolveGroups( $groups, $i)
Resolve $groups into a list of query groups defining as having database servers.
pickAnyOpenConnection(array $connsByServer, $autoCommitOnly)
string bool $readOnlyReason
Reason this instance is read-only or false if not.
waitForOne( $pos, $timeout=null)
Set the primary wait position and wait for a generic replica DB to catch up to it.
waitForAll( $pos, $timeout=null)
Set the primary wait position and wait for ALL replica DBs to catch up to it.
bool[] $tempTablesOnlyMode
Map of (domain => whether to use "temp tables only" mode)
__construct(array $params)
Construct a manager of IDatabase connection objects.
rollbackMasterChanges( $fname=__METHOD__, $owner=null)
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Detect high-contention DB queries via profiling calls.
An object representing a primary or replica DB position in a replicated setup.
hasReached(DBPrimaryPos $pos)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
flushSnapshot( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commit any transaction but error out if writes or callbacks are pending.
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
lastDoneWrites()
Get the last time the connection may have been used for a write query.
primaryPosWait(DBPrimaryPos $pos, $timeout)
Wait for the replica DB to catch up to a given primary DB position.
getDomainID()
Return the currently selected domain ID.
assertNoOpenTransactions()
Assert that all explicit transactions or atomic sections have been closed.
setTransactionListener( $name, callable $callback=null)
Run a callback after each time any transaction commits or rolls back.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
commit( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commits a transaction previously started using begin()
getLag()
Get the amount of replication lag for this database server.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
close( $fname=__METHOD__, $owner=null)
Close the database connection.
trxLevel()
Gets the current transaction level.
getDBname()
Get the current database name; null if there isn't one.
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
getServerName()
Get the readable name for the server.
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
dbSchema( $schema=null)
Get/set the db schema.
Database cluster connection, tracking, load balancing, and transaction manager interface.
const CONN_INTENT_WRITABLE
Caller is requesting the primary DB server for possibly writes.
const GROUP_GENERIC
The generic query group.
const CONN_TRX_AUTOCOMMIT
DB handle should have DBO_TRX disabled and the caller will leave it as such.
const CONN_SILENCE_ERRORS
Return null on connection failure instead of throwing an exception.
const DB_PRIMARY
Request a primary, write-enabled DB connection.
An interface for database load monitoring.
const DB_REPLICA
Definition defines.php:25
const DB_PRIMARY
Definition defines.php:27
const DBO_TRX
Definition defines.php:12