MediaWiki REL1_35
LoadBalancer.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use ArrayUtils;
25use BagOStuff;
27use InvalidArgumentException;
28use LogicException;
29use Psr\Log\LoggerInterface;
30use Psr\Log\NullLogger;
31use RuntimeException;
32use Throwable;
33use UnexpectedValueException;
35use Wikimedia\ScopedCallback;
36
42class LoadBalancer implements ILoadBalancer {
44 private $loadMonitor;
48 private $srvCache;
50 private $wanCache;
55 private $profiler;
57 private $trxProfiler;
59 private $connLogger;
61 private $queryLogger;
63 private $replLogger;
65 private $perfLogger;
67 private $errorLogger;
70
72 private $localDomain;
73
77 private $conns;
78
80 private $servers;
82 private $groupLoads;
86 private $waitTimeout;
90 private $maxLag;
93
95 private $hostname;
97 private $cliMode;
99 private $agent;
100
102 private $tableAliases = [];
104 private $indexAliases = [];
106 private $domainAliases = [];
111
113 private $trxRoundId = false;
115 private $trxRoundStage = self::ROUND_CURSORY;
119 private $readIndexByGroup = [];
121 private $waitForPos;
123 private $allowLagged = false;
125 private $laggedReplicaMode = false;
127 private $lastError = 'Unknown error';
129 private $readOnlyReason = false;
133 private $disabled = false;
135 private $connectionAttempted = false;
136
138 private $id;
140 private $ownerId;
141
144
145 private static $INFO_SERVER_INDEX = 'serverIndex';
146 private static $INFO_AUTOCOMMIT_ONLY = 'autoCommitOnly';
147 private static $INFO_FORIEGN = 'foreign';
148 private static $INFO_FOREIGN_REF_COUNT = 'foreignPoolRefCount';
149
151 private const CONN_HELD_WARN_THRESHOLD = 10;
152
154 private const MAX_LAG_DEFAULT = 6;
156 private const MAX_WAIT_DEFAULT = 10;
158 private const TTL_CACHE_READONLY = 5;
159
160 private const KEY_LOCAL = 'local';
161 private const KEY_FOREIGN_FREE = 'foreignFree';
162 private const KEY_FOREIGN_INUSE = 'foreignInUse';
163
164 private const KEY_LOCAL_NOROUND = 'localAutoCommit';
165 private const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
166 private const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
167
169 private const ROUND_CURSORY = 'cursory';
171 private const ROUND_FINALIZED = 'finalized';
173 private const ROUND_APPROVED = 'approved';
175 private const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
177 private const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
179 private const ROUND_ERROR = 'error';
180
181 public function __construct( array $params ) {
182 if ( !isset( $params['servers'] ) || !count( $params['servers'] ) ) {
183 throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
184 }
185
186 $localDomain = isset( $params['localDomain'] )
187 ? DatabaseDomain::newFromId( $params['localDomain'] )
190
191 $this->maxLag = $params['maxLag'] ?? self::MAX_LAG_DEFAULT;
192
193 $listKey = -1;
194 $this->servers = [];
195 $this->groupLoads = [ self::GROUP_GENERIC => [] ];
196 foreach ( $params['servers'] as $i => $server ) {
197 if ( ++$listKey !== $i ) {
198 throw new UnexpectedValueException( 'List expected for "servers" parameter' );
199 }
200 $this->servers[$i] = $server;
201 foreach ( ( $server['groupLoads'] ?? [] ) as $group => $ratio ) {
202 $this->groupLoads[$group][$i] = $ratio;
203 }
204 $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
205 $this->maxLagByIndex[$i] = $server['max lag'] ?? $this->maxLag;
206 }
207
208 $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
209
210 $this->conns = self::newTrackedConnectionsArray();
211
212 if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
213 $this->readOnlyReason = $params['readOnlyReason'];
214 }
215
216 $this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
217 $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
218
219 $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
220 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
221 $this->profiler = $params['profiler'] ?? null;
222 $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
223
224 $this->errorLogger = $params['errorLogger'] ?? function ( Throwable $e ) {
225 trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
226 };
227 $this->deprecationLogger = $params['deprecationLogger'] ?? function ( $msg ) {
228 trigger_error( $msg, E_USER_DEPRECATED );
229 };
230 foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
231 $this->$key = $params[$key] ?? new NullLogger();
232 }
233
234 $this->hostname = $params['hostname'] ?? ( gethostname() ?: 'unknown' );
235 $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
236 $this->agent = $params['agent'] ?? '';
237
238 if ( isset( $params['chronologyCallback'] ) ) {
239 $this->chronologyCallback = $params['chronologyCallback'];
240 }
241
242 if ( isset( $params['roundStage'] ) ) {
243 if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
244 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
245 } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
246 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
247 }
248 }
249
250 $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
251 $this->defaultGroup = isset( $this->groupLoads[$group] ) ? $group : self::GROUP_GENERIC;
252
253 static $nextId;
254 $this->id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
255 $this->ownerId = $params['ownerId'] ?? null;
256 }
257
258 private static function newTrackedConnectionsArray() {
259 return [
260 // Connection were transaction rounds may be applied
261 self::KEY_LOCAL => [],
262 self::KEY_FOREIGN_INUSE => [],
263 self::KEY_FOREIGN_FREE => [],
264 // Auto-committing counterpart connections that ignore transaction rounds
265 self::KEY_LOCAL_NOROUND => [],
266 self::KEY_FOREIGN_INUSE_NOROUND => [],
267 self::KEY_FOREIGN_FREE_NOROUND => []
268 ];
269 }
270
271 public function getLocalDomainID() {
272 return $this->localDomain->getId();
273 }
274
275 public function resolveDomainID( $domain ) {
276 return $this->resolveDomainInstance( $domain )->getId();
277 }
278
283 final protected function resolveDomainInstance( $domain ) {
284 if ( $domain instanceof DatabaseDomain ) {
285 return $domain; // already a domain instance
286 } elseif ( $domain === false || $domain === $this->localDomain->getId() ) {
287 return $this->localDomain;
288 } elseif ( isset( $this->domainAliases[$domain] ) ) {
289 $this->domainAliases[$domain] =
290 DatabaseDomain::newFromId( $this->domainAliases[$domain] );
291
292 return $this->domainAliases[$domain];
293 }
294
295 $cachedDomain = $this->nonLocalDomainCache[$domain] ?? null;
296 if ( $cachedDomain === null ) {
297 $cachedDomain = DatabaseDomain::newFromId( $domain );
298 $this->nonLocalDomainCache = [ $domain => $cachedDomain ];
299 }
300
301 return $cachedDomain;
302 }
303
311 private function resolveGroups( $groups, $i ) {
312 // If a specific replica server was specified, then $groups makes no sense
313 if ( $i > 0 && $groups !== [] && $groups !== false ) {
314 $list = implode( ', ', (array)$groups );
315 throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
316 }
317
318 if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
319 $resolvedGroups = [ $this->defaultGroup ]; // common case
320 } elseif ( is_string( $groups ) && isset( $this->groupLoads[$groups] ) ) {
321 $resolvedGroups = [ $groups, $this->defaultGroup ];
322 } elseif ( is_array( $groups ) ) {
323 $resolvedGroups = array_keys( array_flip( $groups ) + [ self::GROUP_GENERIC => 1 ] );
324 } else {
325 $resolvedGroups = [ $this->defaultGroup ];
326 }
327
328 return $resolvedGroups;
329 }
330
337 private function sanitizeConnectionFlags( $flags, $i, $domain ) {
338 // Whether an outside caller is explicitly requesting the master database server
339 if ( $i === self::DB_MASTER || $i === $this->getWriterIndex() ) {
340 $flags |= self::CONN_INTENT_WRITABLE;
341 }
342
343 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
344 // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
345 // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
346 // during larger transactions. This is useful for avoiding lock contention.
347
348 // Master DB server attributes (should match those of the replica DB servers)
349 $attributes = $this->getServerAttributes( $this->getWriterIndex() );
350 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
351 // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
352 // to use separate connections would just cause self-deadlocks. Note that
353 // REPEATABLE-READ staleness is not an issue since DB-level locking means
354 // that transactions are Strict Serializable anyway.
355 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
356 $type = $this->getServerType( $this->getWriterIndex() );
357 $this->connLogger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
358 } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
359 // T202116: integration tests are active and queries should be all be using
360 // temporary clone tables (via prefix). Such tables are not visible accross
361 // different connections nor can there be REPEATABLE-READ snapshot staleness,
362 // so use the same connection for everything.
363 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
364 }
365 }
366
367 return $flags;
368 }
369
375 private function enforceConnectionFlags( IDatabase $conn, $flags ) {
376 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
377 if ( $conn->trxLevel() ) { // sanity
378 throw new DBUnexpectedError(
379 $conn,
380 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
381 );
382 }
383
384 $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
385 }
386 }
387
393 private function getLoadMonitor() {
394 if ( !isset( $this->loadMonitor ) ) {
395 $compat = [
396 'LoadMonitor' => LoadMonitor::class,
397 'LoadMonitorNull' => LoadMonitorNull::class,
398 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
399 ];
400
401 $class = $this->loadMonitorConfig['class'];
402 if ( isset( $compat[$class] ) ) {
403 $class = $compat[$class];
404 }
405
406 $this->loadMonitor = new $class(
407 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
408 $this->loadMonitor->setLogger( $this->replLogger );
409 }
410
411 return $this->loadMonitor;
412 }
413
420 private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
421 $lags = $this->getLagTimes( $domain );
422
423 # Unset excessively lagged servers
424 foreach ( $lags as $i => $lag ) {
425 if ( $i !== $this->getWriterIndex() ) {
426 # How much lag this server nominally is allowed to have
427 $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
428 # Constrain that futher by $maxLag argument
429 $maxServerLag = min( $maxServerLag, $maxLag );
430
431 $host = $this->getServerName( $i );
432 if ( $lag === false && !is_infinite( $maxServerLag ) ) {
433 $this->replLogger->debug(
434 __METHOD__ .
435 ": server {dbserver} is not replicating?", [ 'dbserver' => $host ] );
436 unset( $loads[$i] );
437 } elseif ( $lag > $maxServerLag ) {
438 $this->replLogger->debug(
439 __METHOD__ .
440 ": server {dbserver} has {lag} seconds of lag (>= {maxlag})",
441 [ 'dbserver' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
442 );
443 unset( $loads[$i] );
444 }
445 }
446 }
447
448 # Find out if all the replica DBs with non-zero load are lagged
449 $sum = 0;
450 foreach ( $loads as $load ) {
451 $sum += $load;
452 }
453 if ( $sum == 0 ) {
454 # No appropriate DB servers except maybe the master and some replica DBs with zero load
455 # Do NOT use the master
456 # Instead, this function will return false, triggering read-only mode,
457 # and a lagged replica DB will be used instead.
458 return false;
459 }
460
461 if ( count( $loads ) == 0 ) {
462 return false;
463 }
464
465 # Return a random representative of the remainder
466 return ArrayUtils::pickRandom( $loads );
467 }
468
477 private function getConnectionIndex( $i, array $groups, $domain ) {
478 if ( $i === self::DB_MASTER ) {
479 $i = $this->getWriterIndex();
480 } elseif ( $i === self::DB_REPLICA ) {
481 foreach ( $groups as $group ) {
482 $groupIndex = $this->getReaderIndex( $group, $domain );
483 if ( $groupIndex !== false ) {
484 $i = $groupIndex; // group connection succeeded
485 break;
486 }
487 }
488 } elseif ( !isset( $this->servers[$i] ) ) {
489 throw new UnexpectedValueException( "Invalid server index index #$i" );
490 }
491
492 if ( $i === self::DB_REPLICA ) {
493 $this->lastError = 'Unknown error'; // set here in case of worse failure
494 $this->lastError = 'No working replica DB server: ' . $this->lastError;
495 $this->reportConnectionError();
496 return null; // unreachable due to exception
497 }
498
499 return $i;
500 }
501
502 public function getReaderIndex( $group = false, $domain = false ) {
503 if ( $this->getServerCount() == 1 ) {
504 // Skip the load balancing if there's only one server
505 return $this->getWriterIndex();
506 }
507
508 $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
509
510 $index = $this->getExistingReaderIndex( $group );
511 if ( $index >= 0 ) {
512 // A reader index was already selected and "waitForPos" was handled
513 return $index;
514 }
515
516 // Use the server weight array for this load group
517 if ( isset( $this->groupLoads[$group] ) ) {
518 $loads = $this->groupLoads[$group];
519 } else {
520 $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
521
522 return false;
523 }
524
525 // Scale the configured load ratios according to each server's load and state
526 $this->getLoadMonitor()->scaleLoads( $loads, $domain );
527
528 // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
529 $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
530 list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
531 if ( $i === false ) {
532 // DB connection unsuccessful
533 return false;
534 }
535
536 // If data seen by queries is expected to reflect the transactions committed as of
537 // or after a given replication position then wait for the DB to apply those changes
538 if ( $this->waitForPos && $i !== $this->getWriterIndex() && !$this->doWait( $i ) ) {
539 // Data will be outdated compared to what was expected
540 $laggedReplicaMode = true;
541 }
542
543 // Cache the reader index for future DB_REPLICA handles
544 $this->setExistingReaderIndex( $group, $i );
545 // Record whether the generic reader index is in "lagged replica DB" mode
546 if ( $group === self::GROUP_GENERIC && $laggedReplicaMode ) {
547 $this->laggedReplicaMode = true;
548 }
549
550 $serverName = $this->getServerName( $i );
551 $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
552
553 return $i;
554 }
555
562 protected function getExistingReaderIndex( $group ) {
563 return $this->readIndexByGroup[$group] ?? -1;
564 }
565
572 private function setExistingReaderIndex( $group, $index ) {
573 if ( $index < 0 ) {
574 throw new UnexpectedValueException( "Cannot set a negative read server index" );
575 }
576 $this->readIndexByGroup[$group] = $index;
577 }
578
584 private function pickReaderIndex( array $loads, $domain = false ) {
585 if ( $loads === [] ) {
586 throw new InvalidArgumentException( "Server configuration array is empty" );
587 }
588
590 $i = false;
592 $laggedReplicaMode = false;
593
594 // Quickly look through the available servers for a server that meets criteria...
595 $currentLoads = $loads;
596 while ( count( $currentLoads ) ) {
597 if ( $this->allowLagged || $laggedReplicaMode ) {
598 $i = ArrayUtils::pickRandom( $currentLoads );
599 } else {
600 $i = false;
601 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
602 $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
603 // "chronologyCallback" sets "waitForPos" for session consistency.
604 // This triggers doWait() after connect, so it's especially good to
605 // avoid lagged servers so as to avoid excessive delay in that method.
606 $ago = microtime( true ) - $this->waitForPos->asOfTime();
607 // Aim for <= 1 second of waiting (being too picky can backfire)
608 $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
609 }
610 if ( $i === false ) {
611 // Any server with less lag than it's 'max lag' param is preferable
612 $i = $this->getRandomNonLagged( $currentLoads, $domain );
613 }
614 if ( $i === false && count( $currentLoads ) ) {
615 // All replica DBs lagged. Switch to read-only mode
616 $this->replLogger->error(
617 __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
618 $i = ArrayUtils::pickRandom( $currentLoads );
619 $laggedReplicaMode = true;
620 }
621 }
622
623 if ( $i === false ) {
624 // pickRandom() returned false.
625 // This is permanent and means the configuration or the load monitor
626 // wants us to return false.
627 $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
628
629 return [ false, false ];
630 }
631
632 $serverName = $this->getServerName( $i );
633 $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
634
635 // Get a connection to this server without triggering other server connections
636 $conn = $this->getServerConnection( $i, $domain, self::CONN_SILENCE_ERRORS );
637 if ( !$conn ) {
638 $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
639 unset( $currentLoads[$i] ); // avoid this server next iteration
640 $i = false;
641 continue;
642 }
643
644 // Decrement reference counter, we are finished with this connection.
645 // It will be incremented for the caller later.
646 if ( $domain !== false ) {
647 $this->reuseConnection( $conn );
648 }
649
650 // Return this server
651 break;
652 }
653
654 // If all servers were down, quit now
655 if ( $currentLoads === [] ) {
656 $this->connLogger->error( __METHOD__ . ": all servers down" );
657 }
658
659 return [ $i, $laggedReplicaMode ];
660 }
661
662 public function waitFor( $pos ) {
663 $oldPos = $this->waitForPos;
664 try {
665 $this->waitForPos = $pos;
666 // If a generic reader connection was already established, then wait now
667 $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
668 if ( $i > 0 && !$this->doWait( $i ) ) {
669 $this->laggedReplicaMode = true;
670 }
671 // Otherwise, wait until a connection is established in getReaderIndex()
672 } finally {
673 // Restore the older position if it was higher since this is used for lag-protection
674 $this->setWaitForPositionIfHigher( $oldPos );
675 }
676 }
677
678 public function waitForOne( $pos, $timeout = null ) {
679 $oldPos = $this->waitForPos;
680 try {
681 $this->waitForPos = $pos;
682
683 $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
684 if ( $i <= 0 ) {
685 // Pick a generic replica DB if there isn't one yet
686 $readLoads = $this->groupLoads[self::GROUP_GENERIC];
687 unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
688 $readLoads = array_filter( $readLoads ); // with non-zero load
689 $i = ArrayUtils::pickRandom( $readLoads );
690 }
691
692 if ( $i > 0 ) {
693 $ok = $this->doWait( $i, true, $timeout );
694 } else {
695 $ok = true; // no applicable loads
696 }
697 } finally {
698 // Restore the old position; this is used for throttling, not lag-protection
699 $this->waitForPos = $oldPos;
700 }
701
702 return $ok;
703 }
704
705 public function waitForAll( $pos, $timeout = null ) {
706 $timeout = $timeout ?: $this->waitTimeout;
707
708 $oldPos = $this->waitForPos;
709 try {
710 $this->waitForPos = $pos;
711 $serverCount = $this->getServerCount();
712
713 $ok = true;
714 for ( $i = 1; $i < $serverCount; $i++ ) {
715 if ( $this->serverHasLoadInAnyGroup( $i ) ) {
716 $start = microtime( true );
717 $ok = $this->doWait( $i, true, $timeout ) && $ok;
718 $timeout -= intval( microtime( true ) - $start );
719 if ( $timeout <= 0 ) {
720 break; // timeout reached
721 }
722 }
723 }
724 } finally {
725 // Restore the old position; this is used for throttling, not lag-protection
726 $this->waitForPos = $oldPos;
727 }
728
729 return $ok;
730 }
731
736 private function serverHasLoadInAnyGroup( $i ) {
737 foreach ( $this->groupLoads as $loadsByIndex ) {
738 if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
739 return true;
740 }
741 }
742
743 return false;
744 }
745
749 private function setWaitForPositionIfHigher( $pos ) {
750 if ( !$pos ) {
751 return;
752 }
753
754 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
755 $this->waitForPos = $pos;
756 }
757 }
758
759 public function getAnyOpenConnection( $i, $flags = 0 ) {
760 $i = ( $i === self::DB_MASTER ) ? $this->getWriterIndex() : $i;
761 // Connection handles required to be in auto-commit mode use a separate connection
762 // pool since the main pool is effected by implicit and explicit transaction rounds
763 $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
764
765 $conn = false;
766 foreach ( $this->conns as $connsByServer ) {
767 // Get the connection array server indexes to inspect
768 if ( $i === self::DB_REPLICA ) {
769 $indexes = array_keys( $connsByServer );
770 } else {
771 $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
772 }
773
774 foreach ( $indexes as $index ) {
775 $conn = $this->pickAnyOpenConnection( $connsByServer[$index], $autocommit );
776 if ( $conn ) {
777 break;
778 }
779 }
780 }
781
782 if ( $conn ) {
783 $this->enforceConnectionFlags( $conn, $flags );
784 }
785
786 return $conn;
787 }
788
794 private function pickAnyOpenConnection( $candidateConns, $autocommit ) {
795 $conn = false;
796
797 foreach ( $candidateConns as $candidateConn ) {
798 if ( !$candidateConn->isOpen() ) {
799 continue; // some sort of error occurred?
800 } elseif (
801 $autocommit &&
802 (
803 // Connection is transaction round aware
804 !$candidateConn->getLBInfo( self::$INFO_AUTOCOMMIT_ONLY ) ||
805 // Some sort of error left a transaction open?
806 $candidateConn->trxLevel()
807 )
808 ) {
809 continue; // some sort of error left a transaction open?
810 }
811
812 $conn = $candidateConn;
813 }
814
815 return $conn;
816 }
817
825 protected function doWait( $index, $open = false, $timeout = null ) {
826 $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
827
828 // Check if we already know that the DB has reached this point
829 $server = $this->getServerName( $index );
830 $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
832 $knownReachedPos = $this->srvCache->get( $key );
833 if (
834 $knownReachedPos instanceof DBMasterPos &&
835 $knownReachedPos->hasReached( $this->waitForPos )
836 ) {
837 $this->replLogger->debug(
838 __METHOD__ .
839 ": replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).",
840 [ 'dbserver' => $server ]
841 );
842 return true;
843 }
844
845 // Find a connection to wait on, creating one if needed and allowed
846 $close = false; // close the connection afterwards
847 $flags = self::CONN_SILENCE_ERRORS;
848 $conn = $this->getAnyOpenConnection( $index, $flags );
849 if ( !$conn ) {
850 if ( !$open ) {
851 $this->replLogger->debug(
852 __METHOD__ . ': no connection open for {dbserver}',
853 [ 'dbserver' => $server ]
854 );
855
856 return false;
857 }
858 // Get a connection to this server without triggering other server connections
859 $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
860 if ( !$conn ) {
861 $this->replLogger->warning(
862 __METHOD__ . ': failed to connect to {dbserver}',
863 [ 'dbserver' => $server ]
864 );
865
866 return false;
867 }
868 // Avoid connection spam in waitForAll() when connections
869 // are made just for the sake of doing this lag check.
870 $close = true;
871 }
872
873 $this->replLogger->info(
874 __METHOD__ .
875 ': waiting for replica DB {dbserver} to catch up...',
876 [ 'dbserver' => $server ]
877 );
878
879 $result = $conn->masterPosWait( $this->waitForPos, $timeout );
880
881 $ok = ( $result !== null && $result != -1 );
882 if ( $ok ) {
883 // Remember that the DB reached this point
884 $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
885 }
886
887 if ( $close ) {
888 $this->closeConnection( $conn );
889 }
890
891 return $ok;
892 }
893
894 public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
895 $domain = $this->resolveDomainID( $domain );
896 $groups = $this->resolveGroups( $groups, $i );
897 $flags = $this->sanitizeConnectionFlags( $flags, $i, $domain );
898 // If given DB_MASTER/DB_REPLICA, resolve it to a specific server index. Resolving
899 // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
900 // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
901 // The use of getServerConnection() instead of getConnection() avoids infinite loops.
902 $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
903 // Get an open connection to that server (might trigger a new connection)
904 $conn = $this->getServerConnection( $serverIndex, $domain, $flags );
905 // Set master DB handles as read-only if there is high replication lag
906 if (
907 $serverIndex === $this->getWriterIndex() &&
908 $this->getLaggedReplicaMode( $domain ) &&
909 !is_string( $conn->getLBInfo( $conn::LB_READ_ONLY_REASON ) )
910 ) {
911 $reason = ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
912 ? 'The database is read-only until replication lag decreases.'
913 : 'The database is read-only until replica database servers becomes reachable.';
914 $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $reason );
915 }
916
917 return $conn;
918 }
919
927 public function getServerConnection( $i, $domain, $flags = 0 ) {
928 // Number of connections made before getting the server index and handle
929 $priorConnectionsMade = $this->connectionCounter;
930 // Get an open connection to this server (might trigger a new connection)
931 $conn = $this->localDomain->equals( $domain )
932 ? $this->getLocalConnection( $i, $flags )
933 : $this->getForeignConnection( $i, $domain, $flags );
934 // Throw an error or otherwise bail out if the connection attempt failed
935 if ( !( $conn instanceof IDatabase ) ) {
936 if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
937 $this->reportConnectionError();
938 }
939
940 return false;
941 }
942
943 // Profile any new connections caused by this method
944 if ( $this->connectionCounter > $priorConnectionsMade ) {
945 $this->trxProfiler->recordConnection(
946 $conn->getServer(),
947 $conn->getDBname(),
948 ( ( $flags & self::CONN_INTENT_WRITABLE ) == self::CONN_INTENT_WRITABLE )
949 );
950 }
951
952 if ( !$conn->isOpen() ) {
953 $this->errorConnection = $conn;
954 // Connection was made but later unrecoverably lost for some reason.
955 // Do not return a handle that will just throw exceptions on use, but
956 // let the calling code, e.g. getReaderIndex(), try another server.
957 return false;
958 }
959
960 // Make sure that flags like CONN_TRX_AUTOCOMMIT are respected by this handle
961 $this->enforceConnectionFlags( $conn, $flags );
962 // Set master DB handles as read-only if the load balancer is configured as read-only
963 // or the master database server is running in server-side read-only mode. Note that
964 // replica DB handles are always read-only via Database::assertIsWritableMaster().
965 // Read-only mode due to replication lag is *avoided* here to avoid recursion.
966 if ( $i === $this->getWriterIndex() ) {
967 if ( $this->readOnlyReason !== false ) {
969 } elseif ( $this->isMasterConnectionReadOnly( $conn, $flags ) ) {
970 $readOnlyReason = 'The master database server is running in read-only mode.';
971 } else {
972 $readOnlyReason = false;
973 }
974 $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $readOnlyReason );
975 }
976
977 return $conn;
978 }
979
980 public function reuseConnection( IDatabase $conn ) {
981 $serverIndex = $conn->getLBInfo( self::$INFO_SERVER_INDEX );
982 $refCount = $conn->getLBInfo( self::$INFO_FOREIGN_REF_COUNT );
983 if ( $serverIndex === null || $refCount === null ) {
984 return; // non-foreign connection; no domain-use tracking to update
985 } elseif ( $conn instanceof DBConnRef ) {
986 // DBConnRef already handles calling reuseConnection() and only passes the live
987 // Database instance to this method. Any caller passing in a DBConnRef is broken.
988 $this->connLogger->error(
989 __METHOD__ . ": got DBConnRef instance.\n" .
990 ( new LogicException() )->getTraceAsString() );
991
992 return;
993 }
994
995 if ( $this->disabled ) {
996 return; // DBConnRef handle probably survived longer than the LoadBalancer
997 }
998
999 if ( $conn->getLBInfo( self::$INFO_AUTOCOMMIT_ONLY ) ) {
1000 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
1001 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1002 } else {
1003 $connFreeKey = self::KEY_FOREIGN_FREE;
1004 $connInUseKey = self::KEY_FOREIGN_INUSE;
1005 }
1006
1007 $domain = $conn->getDomainID();
1008 if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
1009 throw new InvalidArgumentException(
1010 "Connection $serverIndex/$domain not found; it may have already been freed" );
1011 } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
1012 throw new InvalidArgumentException(
1013 "Connection $serverIndex/$domain mismatched; it may have already been freed" );
1014 }
1015
1016 $conn->setLBInfo( self::$INFO_FOREIGN_REF_COUNT, --$refCount );
1017 if ( $refCount <= 0 ) {
1018 $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
1019 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
1020 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
1021 unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
1022 }
1023 $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
1024 } else {
1025 $this->connLogger->debug( __METHOD__ .
1026 ": reference count for $serverIndex/$domain reduced to $refCount" );
1027 }
1028 }
1029
1030 public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1031 $domain = $this->resolveDomainID( $domain );
1032 $role = $this->getRoleFromIndex( $i );
1033
1034 return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
1035 }
1036
1037 public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1038 $domain = $this->resolveDomainID( $domain );
1039 $role = $this->getRoleFromIndex( $i );
1040
1041 return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
1042 }
1043
1044 public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1045 $domain = $this->resolveDomainID( $domain );
1046 $role = $this->getRoleFromIndex( $i );
1047
1048 return new MaintainableDBConnRef(
1049 $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
1050 }
1051
1056 private function getRoleFromIndex( $i ) {
1057 return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() )
1060 }
1061
1069 public function openConnection( $i, $domain = false, $flags = 0 ) {
1070 return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
1071 }
1072
1087 private function getLocalConnection( $i, $flags = 0 ) {
1088 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1089 // Connection handles required to be in auto-commit mode use a separate connection
1090 // pool since the main pool is effected by implicit and explicit transaction rounds
1091 $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
1092
1093 if ( isset( $this->conns[$connKey][$i][0] ) ) {
1094 $conn = $this->conns[$connKey][$i][0];
1095 } else {
1096 $conn = $this->reallyOpenConnection(
1097 $i,
1098 $this->localDomain,
1099 [ self::$INFO_AUTOCOMMIT_ONLY => $autoCommit ]
1100 );
1101 if ( $conn->isOpen() ) {
1102 $this->connLogger->debug( __METHOD__ . ": opened new connection for $i" );
1103 $this->conns[$connKey][$i][0] = $conn;
1104 } else {
1105 $this->connLogger->warning( __METHOD__ . ": connection error for $i" );
1106 $this->errorConnection = $conn;
1107 $conn = false;
1108 }
1109 }
1110
1111 // Sanity check to make sure that the right domain is selected
1112 if (
1113 $conn instanceof IDatabase &&
1114 !$this->localDomain->isCompatible( $conn->getDomainID() )
1115 ) {
1116 throw new UnexpectedValueException(
1117 "Got connection to '{$conn->getDomainID()}', " .
1118 "but expected local domain ('{$this->localDomain}')"
1119 );
1120 }
1121
1122 return $conn;
1123 }
1124
1149 private function getForeignConnection( $i, $domain, $flags = 0 ) {
1150 $domainInstance = DatabaseDomain::newFromId( $domain );
1151 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1152 // Connection handles required to be in auto-commit mode use a separate connection
1153 // pool since the main pool is effected by implicit and explicit transaction rounds
1154 if ( $autoCommit ) {
1155 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
1156 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1157 } else {
1158 $connFreeKey = self::KEY_FOREIGN_FREE;
1159 $connInUseKey = self::KEY_FOREIGN_INUSE;
1160 }
1161
1163 $conn = null;
1164
1165 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1166 // Reuse an in-use connection for the same domain
1167 $conn = $this->conns[$connInUseKey][$i][$domain];
1168 $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
1169 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1170 // Reuse a free connection for the same domain
1171 $conn = $this->conns[$connFreeKey][$i][$domain];
1172 unset( $this->conns[$connFreeKey][$i][$domain] );
1173 $this->conns[$connInUseKey][$i][$domain] = $conn;
1174 $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
1175 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1176 // Reuse a free connection from another domain if possible
1177 foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $oldConn ) {
1178 if ( $domainInstance->getDatabase() !== null ) {
1179 // Check if changing the database will require a new connection.
1180 // In that case, leave the connection handle alone and keep looking.
1181 // This prevents connections from being closed mid-transaction and can
1182 // also avoid overhead if the same database will later be requested.
1183 if (
1184 $oldConn->databasesAreIndependent() &&
1185 $oldConn->getDBname() !== $domainInstance->getDatabase()
1186 ) {
1187 continue;
1188 }
1189 // Select the new database, schema, and prefix
1190 $conn = $oldConn;
1191 $conn->selectDomain( $domainInstance );
1192 } else {
1193 // Stay on the current database, but update the schema/prefix
1194 $conn = $oldConn;
1195 $conn->dbSchema( $domainInstance->getSchema() );
1196 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1197 }
1198 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1199 // Note that if $domain is an empty string, getDomainID() might not match it
1200 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1201 $this->connLogger->debug( __METHOD__ .
1202 ": reusing free connection from $oldDomain for $domain" );
1203 break;
1204 }
1205 }
1206
1207 if ( !$conn ) {
1208 $conn = $this->reallyOpenConnection(
1209 $i,
1210 $domainInstance,
1211 [
1212 self::$INFO_AUTOCOMMIT_ONLY => $autoCommit,
1213 self::$INFO_FORIEGN => true,
1214 self::$INFO_FOREIGN_REF_COUNT => 0
1215 ]
1216 );
1217 if ( $conn->isOpen() ) {
1218 // Note that if $domain is an empty string, getDomainID() might not match it
1219 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1220 $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1221 } else {
1222 $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
1223 $this->errorConnection = $conn;
1224 $conn = false;
1225 }
1226 }
1227
1228 if ( $conn instanceof IDatabase ) {
1229 // Sanity check to make sure that the right domain is selected
1230 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1231 throw new UnexpectedValueException(
1232 "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
1233 }
1234 // Increment reference count
1235 $refCount = $conn->getLBInfo( self::$INFO_FOREIGN_REF_COUNT );
1236 $conn->setLBInfo( self::$INFO_FOREIGN_REF_COUNT, $refCount + 1 );
1237 }
1238
1239 return $conn;
1240 }
1241
1242 public function getServerAttributes( $i ) {
1244 $this->getServerType( $i ),
1245 $this->servers[$i]['driver'] ?? null
1246 );
1247 }
1248
1255 private function isOpen( $index ) {
1256 return (bool)$this->getAnyOpenConnection( $index );
1257 }
1258
1271 protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo ) {
1272 if ( $this->disabled ) {
1273 throw new DBAccessError();
1274 }
1275
1276 $server = $this->getServerInfoStrict( $i );
1277
1278 $conn = Database::factory(
1279 $server['type'],
1280 array_merge( $server, [
1281 // Basic replication role information
1282 'topologyRole' => $this->getTopologyRole( $i, $server ),
1283 'topologicalMaster' => $this->getMasterServerName(),
1284 // Use the database specified in $domain (null means "none or entrypoint DB");
1285 // fallback to the $server default if the RDBMs is an embedded library using a
1286 // file on disk since there would be nothing to access to without a DB/file name.
1287 'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE]
1288 ? ( $domain->getDatabase() ?? $server['dbname'] ?? null )
1289 : $domain->getDatabase(),
1290 // Override the $server default schema with that of $domain if specified
1291 'schema' => $domain->getSchema() ?? $server['schema'] ?? null,
1292 // Use the table prefix specified in $domain
1293 'tablePrefix' => $domain->getTablePrefix(),
1294 // Participate in transaction rounds if $server does not specify otherwise
1295 'flags' => $this->initConnFlags( $server['flags'] ?? IDatabase::DBO_DEFAULT ),
1296 // Inject the PHP execution mode and the agent string
1297 'cliMode' => $this->cliMode,
1298 'agent' => $this->agent,
1299 'ownerId' => $this->id,
1300 // Inject object and callback dependencies
1301 'lazyMasterHandle' => $this->getLazyConnectionRef(
1302 self::DB_MASTER,
1303 [],
1304 $domain->getId()
1305 ),
1306 'srvCache' => $this->srvCache,
1307 'connLogger' => $this->connLogger,
1308 'queryLogger' => $this->queryLogger,
1309 'replLogger' => $this->replLogger,
1310 'errorLogger' => $this->errorLogger,
1311 'deprecationLogger' => $this->deprecationLogger,
1312 'profiler' => $this->profiler,
1313 'trxProfiler' => $this->trxProfiler
1314 ] ),
1315 Database::NEW_UNCONNECTED
1316 );
1317 // Attach load balancer information to the handle
1318 $conn->setLBInfo( [ self::$INFO_SERVER_INDEX => $i ] + $lbInfo );
1319 // Set alternative table/index names before any queries can be issued
1320 $conn->setTableAliases( $this->tableAliases );
1321 $conn->setIndexAliases( $this->indexAliases );
1322 // Account for any active transaction round and listeners
1323 if ( $i === $this->getWriterIndex() ) {
1324 if ( $this->trxRoundId !== false ) {
1325 $this->applyTransactionRoundFlags( $conn );
1326 }
1327 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1328 $conn->setTransactionListener( $name, $callback );
1329 }
1330 }
1331
1332 // Make the connection handle live
1333 try {
1334 $conn->initConnection();
1336 } catch ( DBConnectionError $e ) {
1337 // ignore; let the DB handle the logging
1338 }
1339
1340 // Try to maintain session consistency for clients that trigger write transactions
1341 // in a request or script and then return soon after in another request or script.
1342 // This requires cooperation with ChronologyProtector and the application wiring.
1343 if ( $conn->isOpen() ) {
1345 }
1346
1347 // Log when many connection are made during a single request/script
1348 $count = $this->getCurrentConnectionCount();
1349 if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
1350 $this->perfLogger->warning(
1351 __METHOD__ . ": {connections}+ connections made (master={masterdb})",
1352 [
1353 'connections' => $count,
1354 'dbserver' => $conn->getServer(),
1355 'masterdb' => $this->getMasterServerName()
1356 ]
1357 );
1358 }
1359
1360 return $conn;
1361 }
1362
1368 private function getTopologyRole( $i, array $server ) {
1369 if ( !empty( $server['is static'] ) ) {
1370 return IDatabase::ROLE_STATIC_CLONE;
1371 }
1372
1373 return ( $i === $this->getWriterIndex() )
1374 ? IDatabase::ROLE_STREAMING_MASTER
1375 : IDatabase::ROLE_STREAMING_REPLICA;
1376 }
1377
1383 private function initConnFlags( $flags ) {
1384 if ( ( $flags & IDatabase::DBO_DEFAULT ) === IDatabase::DBO_DEFAULT ) {
1385 if ( $this->cliMode ) {
1386 $flags &= ~IDatabase::DBO_TRX;
1387 } else {
1388 $flags |= IDatabase::DBO_TRX;
1389 }
1390 }
1391
1392 return $flags;
1393 }
1394
1398 private function lazyLoadReplicationPositions() {
1399 if ( !$this->connectionAttempted && $this->chronologyCallback ) {
1400 $this->connectionAttempted = true;
1401 ( $this->chronologyCallback )( $this ); // generally calls waitFor()
1402 $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
1403 }
1404 }
1405
1409 private function reportConnectionError() {
1410 $conn = $this->errorConnection; // the connection which caused the error
1411 $context = [
1412 'method' => __METHOD__,
1413 'last_error' => $this->lastError,
1414 ];
1415
1416 if ( $conn instanceof IDatabase ) {
1417 $context['db_server'] = $conn->getServer();
1418 $this->connLogger->warning(
1419 __METHOD__ . ": connection error: {last_error} ({db_server})",
1420 $context
1421 );
1422
1423 throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
1424 } else {
1425 // No last connection, probably due to all servers being too busy
1426 $this->connLogger->error(
1427 __METHOD__ .
1428 ": LB failure with no last connection. Connection error: {last_error}",
1429 $context
1430 );
1431
1432 // If all servers were busy, "lastError" will contain something sensible
1433 throw new DBConnectionError( null, $this->lastError );
1434 }
1435 }
1436
1437 public function getWriterIndex() {
1438 return 0;
1439 }
1440
1448 public function haveIndex( $i ) {
1449 return array_key_exists( $i, $this->servers );
1450 }
1451
1459 public function isNonZeroLoad( $i ) {
1460 return ( isset( $this->servers[$i] ) && $this->groupLoads[self::GROUP_GENERIC][$i] > 0 );
1461 }
1462
1463 public function getServerCount() {
1464 return count( $this->servers );
1465 }
1466
1467 public function hasReplicaServers() {
1468 return ( $this->getServerCount() > 1 );
1469 }
1470
1471 public function hasStreamingReplicaServers() {
1472 foreach ( $this->servers as $i => $server ) {
1473 if ( $i !== $this->getWriterIndex() && empty( $server['is static'] ) ) {
1474 return true;
1475 }
1476 }
1477
1478 return false;
1479 }
1480
1481 public function getServerName( $i ) {
1482 $name = $this->servers[$i]['hostName'] ?? ( $this->servers[$i]['host'] ?? '' );
1483
1484 return ( $name != '' ) ? $name : 'localhost';
1485 }
1486
1487 public function getServerInfo( $i ) {
1488 return $this->servers[$i] ?? false;
1489 }
1490
1491 public function getServerType( $i ) {
1492 return $this->servers[$i]['type'] ?? 'unknown';
1493 }
1494
1495 public function getMasterPos() {
1496 $index = $this->getWriterIndex();
1497
1498 $conn = $this->getAnyOpenConnection( $index );
1499 if ( $conn ) {
1500 return $conn->getMasterPos();
1501 }
1502
1503 $conn = $this->getConnection( $index, self::CONN_SILENCE_ERRORS );
1504 if ( !$conn ) {
1505 $this->reportConnectionError();
1506 return null; // unreachable due to exception
1507 }
1508
1509 try {
1510 $pos = $conn->getMasterPos();
1511 } finally {
1512 $this->closeConnection( $conn );
1513 }
1514
1515 return $pos;
1516 }
1517
1518 public function getReplicaResumePos() {
1519 // Get the position of any existing master server connection
1520 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1521 if ( $masterConn ) {
1522 return $masterConn->getMasterPos();
1523 }
1524
1525 // Get the highest position of any existing replica server connection
1526 $highestPos = false;
1527 $serverCount = $this->getServerCount();
1528 for ( $i = 1; $i < $serverCount; $i++ ) {
1529 if ( !empty( $this->servers[$i]['is static'] ) ) {
1530 continue; // server does not use replication
1531 }
1532
1533 $conn = $this->getAnyOpenConnection( $i );
1534 $pos = $conn ? $conn->getReplicaPos() : false;
1535 if ( !$pos ) {
1536 continue; // no open connection or could not get position
1537 }
1538
1539 $highestPos = $highestPos ?: $pos;
1540 if ( $pos->hasReached( $highestPos ) ) {
1541 $highestPos = $pos;
1542 }
1543 }
1544
1545 return $highestPos;
1546 }
1547
1548 public function disable( $fname = __METHOD__, $owner = null ) {
1549 $this->assertOwnership( $fname, $owner );
1550 $this->closeAll( $fname, $owner );
1551 $this->disabled = true;
1552 }
1553
1554 public function closeAll( $fname = __METHOD__, $owner = null ) {
1555 $this->assertOwnership( $fname, $owner );
1556 if ( $this->ownerId === null ) {
1558 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1559 }
1560 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
1561 $host = $conn->getServer();
1562 $this->connLogger->debug( "$fname: closing connection to database '$host'." );
1563 $conn->close( $fname, $this->id );
1564 } );
1565
1566 $this->conns = self::newTrackedConnectionsArray();
1567 }
1568
1569 public function closeConnection( IDatabase $conn ) {
1570 if ( $conn instanceof DBConnRef ) {
1571 // Avoid calling close() but still leaving the handle in the pool
1572 throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
1573 }
1574
1575 $serverIndex = $conn->getLBInfo( self::$INFO_SERVER_INDEX );
1576 foreach ( $this->conns as $type => $connsByServer ) {
1577 if ( !isset( $connsByServer[$serverIndex] ) ) {
1578 continue;
1579 }
1580
1581 foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
1582 if ( $conn === $trackedConn ) {
1583 $host = $this->getServerName( $i );
1584 $this->connLogger->debug(
1585 __METHOD__ . ": closing connection to database $i at '$host'." );
1586 unset( $this->conns[$type][$serverIndex][$i] );
1587 break 2;
1588 }
1589 }
1590 }
1591
1592 $conn->close( __METHOD__ );
1593 }
1594
1595 public function commitAll( $fname = __METHOD__, $owner = null ) {
1596 $this->commitMasterChanges( $fname, $owner );
1597 $this->flushMasterSnapshots( $fname, $owner );
1598 $this->flushReplicaSnapshots( $fname, $owner );
1599 }
1600
1601 public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
1602 $this->assertOwnership( $fname, $owner );
1603 $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1604 if ( $this->ownerId === null ) {
1606 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1607 }
1608
1609 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1610 // Loop until callbacks stop adding callbacks on other connections
1611 $total = 0;
1612 do {
1613 $count = 0; // callbacks execution attempts
1614 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
1615 // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1616 // Any error should cause all (peer) transactions to be rolled back together.
1617 $count += $conn->runOnTransactionPreCommitCallbacks();
1618 } );
1619 $total += $count;
1620 } while ( $count > 0 );
1621 // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1622 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1623 $conn->setTrxEndCallbackSuppression( true );
1624 } );
1625 $this->trxRoundStage = self::ROUND_FINALIZED;
1626
1627 return $total;
1628 }
1629
1630 public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
1631 $this->assertOwnership( $fname, $owner );
1632 $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1633 if ( $this->ownerId === null ) {
1635 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1636 }
1637
1638 $limit = $options['maxWriteDuration'] ?? 0;
1639
1640 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1641 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1642 // If atomic sections or explicit transactions are still open, some caller must have
1643 // caught an exception but failed to properly rollback any changes. Detect that and
1644 // throw an error (causing rollback).
1645 $conn->assertNoOpenTransactions();
1646 // Assert that the time to replicate the transaction will be sane.
1647 // If this fails, then all DB transactions will be rollback back together.
1648 $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1649 if ( $limit > 0 && $time > $limit ) {
1650 throw new DBTransactionSizeError(
1651 $conn,
1652 "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
1653 [ $time, $limit ]
1654 );
1655 }
1656 // If a connection sits idle while slow queries execute on another, that connection
1657 // may end up dropped before the commit round is reached. Ping servers to detect this.
1658 if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1659 throw new DBTransactionError(
1660 $conn,
1661 "A connection to the {$conn->getDBname()} database was lost before commit"
1662 );
1663 }
1664 } );
1665 $this->trxRoundStage = self::ROUND_APPROVED;
1666 }
1667
1668 public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
1669 $this->assertOwnership( $fname, $owner );
1670 if ( $this->trxRoundId !== false ) {
1671 throw new DBTransactionError(
1672 null,
1673 "$fname: Transaction round '{$this->trxRoundId}' already started"
1674 );
1675 }
1676 $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1677 if ( $this->ownerId === null ) {
1679 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1680 }
1681
1682 // Clear any empty transactions (no writes/callbacks) from the implicit round
1683 $this->flushMasterSnapshots( $fname, $owner );
1684
1685 $this->trxRoundId = $fname;
1686 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1687 // Mark applicable handles as participating in this explicit transaction round.
1688 // For each of these handles, any writes and callbacks will be tied to a single
1689 // transaction. The (peer) handles will reject begin()/commit() calls unless they
1690 // are part of an en masse commit or an en masse rollback.
1691 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1692 $this->applyTransactionRoundFlags( $conn );
1693 } );
1694 $this->trxRoundStage = self::ROUND_CURSORY;
1695 }
1696
1697 public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
1698 $this->assertOwnership( $fname, $owner );
1699 $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1700 if ( $this->ownerId === null ) {
1702 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1703 }
1704
1705 $failures = [];
1706
1707 $restore = ( $this->trxRoundId !== false );
1708 $this->trxRoundId = false;
1709 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1710 // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1711 // Note that callbacks should already be suppressed due to finalizeMasterChanges().
1713 function ( IDatabase $conn ) use ( $fname, &$failures ) {
1714 try {
1715 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1716 } catch ( DBError $e ) {
1717 ( $this->errorLogger )( $e );
1718 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1719 }
1720 }
1721 );
1722 if ( $failures ) {
1723 throw new DBTransactionError(
1724 null,
1725 "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1726 );
1727 }
1728 if ( $restore ) {
1729 // Unmark handles as participating in this explicit transaction round
1730 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1731 $this->undoTransactionRoundFlags( $conn );
1732 } );
1733 }
1734 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1735 }
1736
1737 public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
1738 $this->assertOwnership( $fname, $owner );
1739 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1740 $type = IDatabase::TRIGGER_COMMIT;
1741 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1742 $type = IDatabase::TRIGGER_ROLLBACK;
1743 } else {
1744 throw new DBTransactionError(
1745 null,
1746 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1747 );
1748 }
1749 if ( $this->ownerId === null ) {
1751 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1752 }
1753
1754 $oldStage = $this->trxRoundStage;
1755 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1756
1757 // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1758 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1759 $conn->setTrxEndCallbackSuppression( false );
1760 } );
1761
1762 $e = null; // first exception
1763 $fname = __METHOD__;
1764 // Loop until callbacks stop adding callbacks on other connections
1765 do {
1766 // Run any pending callbacks for each connection...
1767 $count = 0; // callback execution attempts
1769 function ( Database $conn ) use ( $type, &$e, &$count ) {
1770 if ( $conn->trxLevel() ) {
1771 return; // retry in the next iteration, after commit() is called
1772 }
1773 try {
1774 $count += $conn->runOnTransactionIdleCallbacks( $type );
1775 } catch ( Throwable $ex ) {
1776 $e = $e ?: $ex;
1777 }
1778 }
1779 );
1780 // Clear out any active transactions left over from callbacks...
1781 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e, $fname ) {
1782 if ( $conn->writesPending() ) {
1783 // A callback from another handle wrote to this one and DBO_TRX is set
1784 $this->queryLogger->warning( $fname . ": found writes pending." );
1785 $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
1786 $this->queryLogger->warning(
1787 "$fname: found writes pending ($fnames).",
1788 [
1789 'db_server' => $conn->getServer(),
1790 'db_name' => $conn->getDBname(),
1791 'exception' => new RuntimeException()
1792 ]
1793 );
1794 } elseif ( $conn->trxLevel() ) {
1795 // A callback from another handle read from this one and DBO_TRX is set,
1796 // which can easily happen if there is only one DB (no replicas)
1797 $this->queryLogger->debug( "$fname: found empty transaction." );
1798 }
1799 try {
1800 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1801 } catch ( Throwable $ex ) {
1802 $e = $e ?: $ex;
1803 }
1804 } );
1805 } while ( $count > 0 );
1806
1807 $this->trxRoundStage = $oldStage;
1808
1809 return $e;
1810 }
1811
1812 public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
1813 $this->assertOwnership( $fname, $owner );
1814 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1815 $type = IDatabase::TRIGGER_COMMIT;
1816 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1817 $type = IDatabase::TRIGGER_ROLLBACK;
1818 } else {
1819 throw new DBTransactionError(
1820 null,
1821 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1822 );
1823 }
1824 if ( $this->ownerId === null ) {
1826 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1827 }
1828
1829 $e = null;
1830
1831 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1832 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1833 try {
1835 } catch ( Throwable $ex ) {
1836 $e = $e ?: $ex;
1837 }
1838 } );
1839 $this->trxRoundStage = self::ROUND_CURSORY;
1840
1841 return $e;
1842 }
1843
1844 public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
1845 $this->assertOwnership( $fname, $owner );
1846 if ( $this->ownerId === null ) {
1848 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1849 }
1850
1851 $restore = ( $this->trxRoundId !== false );
1852 $this->trxRoundId = false;
1853 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1854 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1855 $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1856 } );
1857 if ( $restore ) {
1858 // Unmark handles as participating in this explicit transaction round
1859 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1860 $this->undoTransactionRoundFlags( $conn );
1861 } );
1862 }
1863 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1864 }
1865
1870 private function assertTransactionRoundStage( $stage ) {
1871 $stages = (array)$stage;
1872
1873 if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1874 $stageList = implode(
1875 '/',
1876 array_map( function ( $v ) {
1877 return "'$v'";
1878 }, $stages )
1879 );
1880 throw new DBTransactionError(
1881 null,
1882 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1883 );
1884 }
1885 }
1886
1899 private function assertOwnership( $fname, $owner ) {
1900 if ( $this->ownerId !== null && $owner !== $this->ownerId && $owner !== $this->id ) {
1901 throw new DBTransactionError(
1902 null,
1903 "$fname: LoadBalancer is owned by ID '{$this->ownerId}' (got '$owner')."
1904 );
1905 }
1906 }
1907
1917 private function applyTransactionRoundFlags( Database $conn ) {
1918 if ( $conn->getLBInfo( self::$INFO_AUTOCOMMIT_ONLY ) ) {
1919 return; // transaction rounds do not apply to these connections
1920 }
1921
1922 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1923 // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1924 // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1925 $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1926 }
1927
1928 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1929 $conn->setLBInfo( $conn::LB_TRX_ROUND_ID, $this->trxRoundId );
1930 }
1931 }
1932
1936 private function undoTransactionRoundFlags( Database $conn ) {
1937 if ( $conn->getLBInfo( self::$INFO_AUTOCOMMIT_ONLY ) ) {
1938 return; // transaction rounds do not apply to these connections
1939 }
1940
1941 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1942 $conn->setLBInfo( $conn::LB_TRX_ROUND_ID, null ); // remove the round ID
1943 }
1944
1945 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1946 $conn->restoreFlags( $conn::RESTORE_PRIOR );
1947 }
1948 }
1949
1950 public function flushReplicaSnapshots( $fname = __METHOD__, $owner = null ) {
1951 $this->assertOwnership( $fname, $owner );
1952 $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
1953 $conn->flushSnapshot( $fname );
1954 } );
1955 }
1956
1957 public function flushMasterSnapshots( $fname = __METHOD__, $owner = null ) {
1958 $this->assertOwnership( $fname, $owner );
1959 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1960 $conn->flushSnapshot( $fname );
1961 } );
1962 }
1963
1968 public function getTransactionRoundStage() {
1969 return $this->trxRoundStage;
1970 }
1971
1972 public function hasMasterConnection() {
1973 return $this->isOpen( $this->getWriterIndex() );
1974 }
1975
1976 public function hasMasterChanges() {
1977 $pending = false;
1978 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1979 $pending = $pending || $conn->writesOrCallbacksPending();
1980 } );
1981
1982 return $pending;
1983 }
1984
1985 public function lastMasterChangeTimestamp() {
1986 $lastTime = false;
1987 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1988 $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1989 } );
1990
1991 return $lastTime;
1992 }
1993
1994 public function hasOrMadeRecentMasterChanges( $age = null ) {
1995 $age = ( $age === null ) ? $this->waitTimeout : $age;
1996
1997 return ( $this->hasMasterChanges()
1998 || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1999 }
2000
2001 public function pendingMasterChangeCallers() {
2002 $fnames = [];
2003 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
2004 $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
2005 } );
2006
2007 return $fnames;
2008 }
2009
2010 public function getLaggedReplicaMode( $domain = false ) {
2011 if ( $this->laggedReplicaMode ) {
2012 return true; // stay in lagged replica mode
2013 }
2014
2015 if ( $this->hasStreamingReplicaServers() ) {
2016 // This will set "laggedReplicaMode" as needed
2017 $this->getReaderIndex( self::GROUP_GENERIC, $domain );
2018 }
2019
2021 }
2022
2023 public function laggedReplicaUsed() {
2025 }
2026
2027 public function getReadOnlyReason( $domain = false ) {
2028 $domainInstance = DatabaseDomain::newFromId( $this->resolveDomainID( $domain ) );
2029
2030 if ( $this->readOnlyReason !== false ) {
2031 return $this->readOnlyReason;
2032 } elseif ( $this->isMasterRunningReadOnly( $domainInstance ) ) {
2033 return 'The master database server is running in read-only mode.';
2034 } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
2035 return ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
2036 ? 'The database is read-only until replication lag decreases.'
2037 : 'The database is read-only until a replica database server becomes reachable.';
2038 }
2039
2040 return false;
2041 }
2042
2048 private function isMasterConnectionReadOnly( IDatabase $conn, $flags = 0 ) {
2049 // Note that table prefixes are not related to server-side read-only mode
2050 $key = $this->srvCache->makeGlobalKey(
2051 'rdbms-server-readonly',
2052 $conn->getServer(),
2053 $conn->getDBname(),
2054 $conn->dbSchema()
2055 );
2056
2057 if ( ( $flags & self::CONN_REFRESH_READ_ONLY ) == self::CONN_REFRESH_READ_ONLY ) {
2058 try {
2059 $readOnly = (int)$conn->serverIsReadOnly();
2060 } catch ( DBError $e ) {
2061 $readOnly = 0;
2062 }
2063 $this->srvCache->set( $key, $readOnly, BagOStuff::TTL_PROC_SHORT );
2064 } else {
2065 $readOnly = $this->srvCache->getWithSetCallback(
2066 $key,
2067 BagOStuff::TTL_PROC_SHORT,
2068 function () use ( $conn ) {
2069 try {
2070 return (int)$conn->serverIsReadOnly();
2071 } catch ( DBError $e ) {
2072 return 0;
2073 }
2074 }
2075 );
2076 }
2077
2078 return (bool)$readOnly;
2079 }
2080
2085 private function isMasterRunningReadOnly( DatabaseDomain $domain ) {
2086 // Context will often be HTTP GET/HEAD; heavily cache the results
2087 return (bool)$this->wanCache->getWithSetCallback(
2088 // Note that table prefixes are not related to server-side read-only mode
2089 $this->wanCache->makeGlobalKey(
2090 'rdbms-server-readonly',
2091 $this->getMasterServerName(),
2092 $domain->getDatabase(),
2093 (string)$domain->getSchema()
2094 ),
2095 self::TTL_CACHE_READONLY,
2096 function () use ( $domain ) {
2097 $old = $this->trxProfiler->setSilenced( true );
2098 try {
2099 $index = $this->getWriterIndex();
2100 // Reset the cache for isMasterConnectionReadOnly()
2101 $flags = self::CONN_REFRESH_READ_ONLY;
2102 $conn = $this->getServerConnection( $index, $domain->getId(), $flags );
2103 // Reuse the process cache set above
2104 $readOnly = (int)$this->isMasterConnectionReadOnly( $conn );
2105 $this->reuseConnection( $conn );
2106 } catch ( DBError $e ) {
2107 $readOnly = 0;
2108 }
2109 $this->trxProfiler->setSilenced( $old );
2110
2111 return $readOnly;
2112 },
2113 [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG, 'lockTSE' => 10, 'busyValue' => 0 ]
2114 );
2115 }
2116
2117 public function allowLagged( $mode = null ) {
2118 if ( $mode === null ) {
2119 return $this->allowLagged;
2120 }
2121 $this->allowLagged = $mode;
2122
2123 return $this->allowLagged;
2124 }
2125
2126 public function pingAll() {
2127 $success = true;
2128 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
2129 if ( !$conn->ping() ) {
2130 $success = false;
2131 }
2132 } );
2133
2134 return $success;
2135 }
2136
2137 public function forEachOpenConnection( $callback, array $params = [] ) {
2138 foreach ( $this->conns as $connsByServer ) {
2139 foreach ( $connsByServer as $serverConns ) {
2140 foreach ( $serverConns as $conn ) {
2141 $callback( $conn, ...$params );
2142 }
2143 }
2144 }
2145 }
2146
2147 public function forEachOpenMasterConnection( $callback, array $params = [] ) {
2148 $masterIndex = $this->getWriterIndex();
2149 foreach ( $this->conns as $connsByServer ) {
2150 if ( isset( $connsByServer[$masterIndex] ) ) {
2152 foreach ( $connsByServer[$masterIndex] as $conn ) {
2153 $callback( $conn, ...$params );
2154 }
2155 }
2156 }
2157 }
2158
2159 public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
2160 foreach ( $this->conns as $connsByServer ) {
2161 foreach ( $connsByServer as $i => $serverConns ) {
2162 if ( $i === $this->getWriterIndex() ) {
2163 continue; // skip master
2164 }
2165 foreach ( $serverConns as $conn ) {
2166 $callback( $conn, ...$params );
2167 }
2168 }
2169 }
2170 }
2171
2175 private function getCurrentConnectionCount() {
2176 $count = 0;
2177 foreach ( $this->conns as $connsByServer ) {
2178 foreach ( $connsByServer as $serverConns ) {
2179 $count += count( $serverConns );
2180 }
2181 }
2182
2183 return $count;
2184 }
2185
2186 public function getMaxLag( $domain = false ) {
2187 $host = '';
2188 $maxLag = -1;
2189 $maxIndex = 0;
2190
2191 if ( $this->hasReplicaServers() ) {
2192 $lagTimes = $this->getLagTimes( $domain );
2193 foreach ( $lagTimes as $i => $lag ) {
2194 if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag > $maxLag ) {
2195 $maxLag = $lag;
2196 $host = $this->getServerInfoStrict( $i, 'host' );
2197 $maxIndex = $i;
2198 }
2199 }
2200 }
2201
2202 return [ $host, $maxLag, $maxIndex ];
2203 }
2204
2205 public function getLagTimes( $domain = false ) {
2206 if ( !$this->hasReplicaServers() ) {
2207 return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
2208 }
2209
2210 $knownLagTimes = []; // map of (server index => 0 seconds)
2211 $indexesWithLag = [];
2212 foreach ( $this->servers as $i => $server ) {
2213 if ( empty( $server['is static'] ) ) {
2214 $indexesWithLag[] = $i; // DB server might have replication lag
2215 } else {
2216 $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
2217 }
2218 }
2219
2220 return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
2221 }
2222
2238 public function safeGetLag( IDatabase $conn ) {
2239 return $conn->getLag();
2240 }
2241
2242 public function waitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
2243 $timeout = max( 1, $timeout ?: $this->waitTimeout );
2244
2245 if ( $conn->getLBInfo( self::$INFO_SERVER_INDEX ) === $this->getWriterIndex() ) {
2246 return true; // not a replica DB server
2247 }
2248
2249 if ( !$pos ) {
2250 // Get the current master position, opening a connection if needed
2251 $index = $this->getWriterIndex();
2252 $flags = self::CONN_SILENCE_ERRORS;
2253 $masterConn = $this->getAnyOpenConnection( $index, $flags );
2254 if ( $masterConn ) {
2255 $pos = $masterConn->getMasterPos();
2256 } else {
2257 $masterConn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
2258 if ( !$masterConn ) {
2259 throw new DBReplicationWaitError(
2260 null,
2261 "Could not obtain a master database connection to get the position"
2262 );
2263 }
2264 $pos = $masterConn->getMasterPos();
2265 $this->closeConnection( $masterConn );
2266 }
2267 }
2268
2269 if ( $pos instanceof DBMasterPos ) {
2270 $start = microtime( true );
2271 $result = $conn->masterPosWait( $pos, $timeout );
2272 $seconds = max( microtime( true ) - $start, 0 );
2273
2274 $ok = ( $result !== null && $result != -1 );
2275 if ( $ok ) {
2276 $this->replLogger->warning(
2277 __METHOD__ . ': timed out waiting on {dbserver} pos {pos} [{seconds}s]',
2278 [
2279 'dbserver' => $conn->getServer(),
2280 'pos' => $pos,
2281 'seconds' => round( $seconds, 6 ),
2282 'trace' => ( new RuntimeException() )->getTraceAsString()
2283 ]
2284 );
2285 } else {
2286 $this->replLogger->debug( __METHOD__ . ': done waiting' );
2287 }
2288 } else {
2289 $ok = false; // something is misconfigured
2290 $this->replLogger->error(
2291 __METHOD__ . ': could not get master pos for {dbserver}',
2292 [
2293 'dbserver' => $conn->getServer(),
2294 'trace' => ( new RuntimeException() )->getTraceAsString()
2295 ]
2296 );
2297 }
2298
2299 return $ok;
2300 }
2301
2314 public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
2315 return $this->waitForMasterPos( $conn, $pos, $timeout );
2316 }
2317
2318 public function setTransactionListener( $name, callable $callback = null ) {
2319 if ( $callback ) {
2320 $this->trxRecurringCallbacks[$name] = $callback;
2321 } else {
2322 unset( $this->trxRecurringCallbacks[$name] );
2323 }
2325 function ( IDatabase $conn ) use ( $name, $callback ) {
2326 $conn->setTransactionListener( $name, $callback );
2327 }
2328 );
2329 }
2330
2331 public function setTableAliases( array $aliases ) {
2332 $this->tableAliases = $aliases;
2333 }
2334
2335 public function setIndexAliases( array $aliases ) {
2336 $this->indexAliases = $aliases;
2337 }
2338
2339 public function setDomainAliases( array $aliases ) {
2340 $this->domainAliases = $aliases;
2341 }
2342
2343 public function setLocalDomainPrefix( $prefix ) {
2344 // Find connections to explicit foreign domains still marked as in-use...
2345 $domainsInUse = [];
2346 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
2347 // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
2348 // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
2349 if ( $conn->getLBInfo( self::$INFO_FOREIGN_REF_COUNT ) > 0 ) {
2350 $domainsInUse[] = $conn->getDomainID();
2351 }
2352 } );
2353
2354 // Do not switch connections to explicit foreign domains unless marked as safe
2355 if ( $domainsInUse ) {
2356 $domains = implode( ', ', $domainsInUse );
2357 throw new DBUnexpectedError( null,
2358 "Foreign domain connections are still in use ($domains)" );
2359 }
2360
2361 $this->setLocalDomain( new DatabaseDomain(
2362 $this->localDomain->getDatabase(),
2363 $this->localDomain->getSchema(),
2364 $prefix
2365 ) );
2366
2367 // Update the prefix for all local connections...
2368 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $prefix ) {
2369 if ( !$conn->getLBInfo( self::$INFO_FORIEGN ) ) {
2370 $conn->tablePrefix( $prefix );
2371 }
2372 } );
2373 }
2374
2375 public function redefineLocalDomain( $domain ) {
2376 $this->closeAll( __METHOD__, $this->id );
2377
2378 $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
2379 }
2380
2381 public function setTempTablesOnlyMode( $value, $domain ) {
2382 $old = $this->tempTablesOnlyMode[$domain] ?? false;
2383 if ( $value ) {
2384 $this->tempTablesOnlyMode[$domain] = true;
2385 } else {
2386 unset( $this->tempTablesOnlyMode[$domain] );
2387 }
2388
2389 return $old;
2390 }
2391
2395 private function setLocalDomain( DatabaseDomain $domain ) {
2396 $this->localDomain = $domain;
2397 }
2398
2405 private function getServerInfoStrict( $i, $field = null ) {
2406 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
2407 throw new InvalidArgumentException( "No server with index '$i'" );
2408 }
2409
2410 if ( $field !== null ) {
2411 if ( !array_key_exists( $field, $this->servers[$i] ) ) {
2412 throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
2413 }
2414
2415 return $this->servers[$i][$field];
2416 }
2417
2418 return $this->servers[$i];
2419 }
2420
2424 private function getMasterServerName() {
2425 return $this->getServerName( $this->getWriterIndex() );
2426 }
2427
2428 public function __destruct() {
2429 // Avoid connection leaks for sanity
2430 $this->disable( __METHOD__, $this->ownerId );
2431 }
2432}
2433
2437class_alias( LoadBalancer::class, 'LoadBalancer' );
A collection of static methods to play with arrays.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:71
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Exception class for attempted DB access @newable.
Helper class used for automatically marking an IDatabase connection as reusable (once it no longer ma...
Definition DBConnRef.php:29
Database error base class @newable Stable to extend.
Definition DBError.php:32
Exception class for replica DB wait errors @newable.
Class to handle database/schema/prefix specifications for IDatabase.
Relational database abstraction object.
Definition Database.php:50
runOnTransactionPreCommitCallbacks()
Actually consume and run any "on transaction pre-commit" callbacks.
setLBInfo( $nameOrArray, $value=null)
Set the entire array or a particular key of the managing load balancer info array.
Definition Database.php:629
runTransactionListenerCallbacks( $trigger)
Actually run any "transaction listener" callbacks.
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
Definition Database.php:806
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
Definition Database.php:617
trxLevel()
Gets the current transaction level.
Definition Database.php:557
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
Definition Database.php:776
static attributesFromType( $dbType, $driver=null)
Definition Database.php:460
getServer()
Get the server hostname or IP address.
static factory( $type, $params=[], $connect=self::NEW_CONNECTED)
Construct a Database subclass instance given a database type and parameters.
Definition Database.php:405
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Definition Database.php:819
Database connection, tracking, load balancing, and transaction manager for a cluster.
array[] $servers
Map of (server index => server config array)
undoTransactionRoundFlags(Database $conn)
bool $allowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
getAnyOpenConnection( $i, $flags=0)
Get any open connection to a given server index, local or foreign.
callable $errorLogger
Exception logger.
IDatabase[][][] Database[][][] $conns
Map of (connection category => server index => IDatabase[])
int $waitTimeout
Seconds to spend waiting on replica DB lag to resolve.
bool DBMasterPos $waitForPos
Replication sync position or false if not set.
callable $deprecationLogger
Deprecation logger.
setLocalDomain(DatabaseDomain $domain)
$id
var int An identifier for this class instance
getLocalDomainID()
Get the local (and default) database domain ID of connection handles.
hasReplicaServers()
Whether there are any replica servers configured.
getConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle reference for a real or virtual (DB_MASTER/DB_REPLICA) server index.
string $trxRoundStage
Stage of the current transaction round in the transaction round life-cycle.
getServerType( $i)
Get DB type of the server with the specified index.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
flushMasterSnapshots( $fname=__METHOD__, $owner=null)
Commit all master DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
DatabaseDomain[] $nonLocalDomainCache
Map of (domain ID => domain instance)
int[] $maxLagByIndex
Map of (server index => seconds of lag considered "high")
flushReplicaSnapshots( $fname=__METHOD__, $owner=null)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
applyTransactionRoundFlags(Database $conn)
Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round.
DatabaseDomain[] string[] $domainAliases
Map of (domain alias => DB domain)
string $agent
Agent name for query profiling.
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
getMasterPos()
Get the current master replication position.
getForeignConnection( $i, $domain, $flags=0)
Open a connection to a foreign DB, or return one if it is already open.
beginMasterChanges( $fname=__METHOD__, $owner=null)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
pickAnyOpenConnection( $candidateConns, $autocommit)
getServerConnection( $i, $domain, $flags=0)
assertOwnership( $fname, $owner)
Assure that if this instance is owned, the caller is either the owner or is internal.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
redefineLocalDomain( $domain)
Close all connection and redefine the local domain for testing or schema creation.
callable null $profiler
An optional callback that returns a ScopedCallback instance, meant to profile the actual query execut...
enforceConnectionFlags(IDatabase $conn, $flags)
pickReaderIndex(array $loads, $domain=false)
setDomainAliases(array $aliases)
Convert certain database domains to alternative ones.
sanitizeConnectionFlags( $flags, $i, $domain)
runMasterTransactionListenerCallbacks( $fname=__METHOD__, $owner=null)
Run all recurring post-COMMIT/ROLLBACK listener callbacks.
laggedReplicaUsed()
Checks whether the database for generic connections this request was both:
doWait( $index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in "waitForPos".
int null $ownerId
Integer ID of the managing LBFactory instance or null if none.
string[] $indexAliases
Map of (index alias => index)
isOpen( $index)
Test if the specified index represents an open connection.
array $loadMonitorConfig
The LoadMonitor configuration.
array[] $groupLoads
Map of (group => server index => weight)
callable[] $trxRecurringCallbacks
Map of (name => callable)
commitAll( $fname=__METHOD__, $owner=null)
Commit transactions on all open connections.
string null $defaultGroup
Default query group to use with getConnection()
getConnectionIndex( $i, array $groups, $domain)
Get the server index to use for a specified server index and query group list.
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
int[] $readIndexByGroup
The group replica server indexes keyed by group.
getLoadMonitor()
Get a LoadMonitor instance.
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
setTempTablesOnlyMode( $value, $domain)
Indicate whether the tables on this domain are only temporary tables for testing.
isMasterRunningReadOnly(DatabaseDomain $domain)
reuseConnection(IDatabase $conn)
Mark a live handle as being available for reuse under a different database domain.
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
string bool $trxRoundId
Explicit DBO_TRX transaction round active or false if none.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified master position.
closeConnection(IDatabase $conn)
Close a connection.
int $connectionCounter
Total number of new connections ever made with this instance.
getExistingReaderIndex( $group)
Get the server index chosen by the load balancer for use with the given query group.
getLazyConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a database handle reference for a real or virtual (DB_MASTER/DB_REPLICA) server index.
closeAll( $fname=__METHOD__, $owner=null)
Close all open connections.
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
setLocalDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
bool $cliMode
Whether this PHP instance is for a CLI script.
getMaintenanceConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle for a real or virtual (DB_MASTER/DB_REPLICA) server index that can be used...
commitMasterChanges( $fname=__METHOD__, $owner=null)
Issue COMMIT on all open master connections to flush changes and view snapshots.
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
openConnection( $i, $domain=false, $flags=0)
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
isMasterConnectionReadOnly(IDatabase $conn, $flags=0)
string $hostname
Current server name.
getServerInfoStrict( $i, $field=null)
setExistingReaderIndex( $group, $index)
Set the server index chosen by the load balancer for use with the given query group.
getReaderIndex( $group=false, $domain=false)
Get the server index of the reader connection for a given group.
getLocalConnection( $i, $flags=0)
Open a connection to a local DB, or return one if it is already open.
waitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified master position.
DatabaseDomain $localDomain
Local DB domain ID and default for selectDB() calls.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
setIndexAliases(array $aliases)
Convert certain index names to alternative names before querying the DB.
getConnection( $i, $groups=[], $domain=false, $flags=0)
Get a live handle for a real or virtual (DB_MASTER/DB_REPLICA) server index.
TransactionProfiler $trxProfiler
disable( $fname=__METHOD__, $owner=null)
Close all connections and disable this load balancer.
getServerName( $i)
Get the host name or IP address of the server with the specified index.
approveMasterChanges(array $options, $fname=__METHOD__, $owner=null)
Perform all pre-commit checks for things like replication safety.
getLaggedReplicaMode( $domain=false)
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo)
Open a new network connection to a server (uncached)
haveIndex( $i)
Returns true if the specified index is a valid server index.
getWriterIndex()
Get the server index of the master server.
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
bool $connectionAttempted
Whether any connection has been attempted yet.
hasMasterChanges()
Whether there are pending changes or callbacks in a transaction by this thread.
array[] $tableAliases
$aliases Map of (table => (dbname, schema, prefix) map)
callable null $chronologyCallback
Callback to run before the first connection attempt.
getMaxLag( $domain=false)
Get the hostname and lag time of the most-lagged replica server.
Database $errorConnection
Connection handle that caused a problem.
string $lastError
The last DB selection or connection error.
lazyLoadReplicationPositions()
Make sure that any "waitForPos" positions are loaded and available to doWait()
finalizeMasterChanges( $fname=__METHOD__, $owner=null)
Run pre-commit callbacks and defer execution of post-commit callbacks.
getTopologyRole( $i, array $server)
getServerInfo( $i)
Return the server info structure for a given index or false if the index is invalid.
allowLagged( $mode=null)
Disables/enables lag checks.
getServerCount()
Get the number of servers defined in configuration.
runMasterTransactionIdleCallbacks( $fname=__METHOD__, $owner=null)
Consume and run all pending post-COMMIT/ROLLBACK callbacks and commit dangling transactions.
resolveGroups( $groups, $i)
Resolve $groups into a list of query groups defining as having database servers.
string bool $readOnlyReason
Reason this instance is read-only or false if not.
waitForOne( $pos, $timeout=null)
Set the master wait position and wait for a generic replica DB to catch up to it.
waitForAll( $pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
bool[] $tempTablesOnlyMode
Map of (domain => whether to use "temp tables only" mode)
__construct(array $params)
Construct a manager of IDatabase connection objects.
rollbackMasterChanges( $fname=__METHOD__, $owner=null)
Issue ROLLBACK only on master, only if queries were done on connection.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Detect high-contention DB queries via profiling calls.
An object representing a master or replica DB position in a replicated setup.
hasReached(DBMasterPos $pos)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
flushSnapshot( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commit any transaction but error out if writes or callbacks are pending.
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
lastDoneWrites()
Get the last time the connection may have been used for a write query.
getDomainID()
Return the currently selected domain ID.
assertNoOpenTransactions()
Assert that all explicit transactions or atomic sections have been closed.
getServer()
Get the server hostname or IP address.
setLBInfo( $nameOrArray, $value=null)
Set the entire array or a particular key of the managing load balancer info array.
setTransactionListener( $name, callable $callback=null)
Run a callback after each time any transaction commits or rolls back.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
commit( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commits a transaction previously started using begin()
getLag()
Get the amount of replication lag for this database server.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
close( $fname=__METHOD__, $owner=null)
Close the database connection.
trxLevel()
Gets the current transaction level.
getDBname()
Get the current DB name.
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
dbSchema( $schema=null)
Get/set the db schema.
Database cluster connection, tracking, load balancing, and transaction manager interface.
const DB_MASTER
Request a primary, write-enabled DB connection.
An interface for database load monitoring.
const DB_REPLICA
Definition defines.php:25
const DBO_TRX
Definition defines.php:12