27use InvalidArgumentException;
29use Psr\Log\LoggerInterface;
30use Psr\Log\NullLogger;
33use UnexpectedValueException;
35use Wikimedia\ScopedCallback;
151 private const CONN_HELD_WARN_THRESHOLD = 10;
154 private const MAX_LAG_DEFAULT = 6;
156 private const MAX_WAIT_DEFAULT = 10;
158 private const TTL_CACHE_READONLY = 5;
169 private const ROUND_CURSORY =
'cursory';
171 private const ROUND_FINALIZED =
'finalized';
173 private const ROUND_APPROVED =
'approved';
175 private const ROUND_COMMIT_CALLBACKS =
'commit-callbacks';
177 private const ROUND_ROLLBACK_CALLBACKS =
'rollback-callbacks';
179 private const ROUND_ERROR =
'error';
182 if ( !isset( $params[
'servers'] ) || !count( $params[
'servers'] ) ) {
183 throw new InvalidArgumentException(
'Missing or empty "servers" parameter' );
191 $this->maxLag = $params[
'maxLag'] ?? self::MAX_LAG_DEFAULT;
195 $this->groupLoads = [ self::GROUP_GENERIC => [] ];
196 foreach ( $params[
'servers'] as $i => $server ) {
197 if ( ++$listKey !== $i ) {
198 throw new UnexpectedValueException(
'List expected for "servers" parameter' );
200 $this->servers[$i] = $server;
201 foreach ( ( $server[
'groupLoads'] ?? [] ) as $group => $ratio ) {
202 $this->groupLoads[$group][$i] = $ratio;
204 $this->groupLoads[self::GROUP_GENERIC][$i] = $server[
'load'];
205 $this->maxLagByIndex[$i] = $server[
'max lag'] ??
$this->maxLag;
208 $this->waitTimeout = $params[
'waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
212 if ( isset( $params[
'readOnlyReason'] ) && is_string( $params[
'readOnlyReason'] ) ) {
213 $this->readOnlyReason = $params[
'readOnlyReason'];
216 $this->loadMonitorConfig = $params[
'loadMonitor'] ?? [
'class' =>
'LoadMonitorNull' ];
217 $this->loadMonitorConfig += [
'lagWarnThreshold' =>
$this->maxLag ];
220 $this->wanCache = $params[
'wanCache'] ?? WANObjectCache::newEmpty();
221 $this->profiler = $params[
'profiler'] ??
null;
224 $this->errorLogger = $params[
'errorLogger'] ??
function ( Throwable $e ) {
225 trigger_error( get_class( $e ) .
': ' . $e->getMessage(), E_USER_WARNING );
227 $this->deprecationLogger = $params[
'deprecationLogger'] ??
function ( $msg ) {
228 trigger_error( $msg, E_USER_DEPRECATED );
230 foreach ( [
'replLogger',
'connLogger',
'queryLogger',
'perfLogger' ] as $key ) {
231 $this->$key = $params[$key] ??
new NullLogger();
234 $this->hostname = $params[
'hostname'] ?? ( gethostname() ?:
'unknown' );
235 $this->cliMode = $params[
'cliMode'] ?? ( PHP_SAPI ===
'cli' || PHP_SAPI ===
'phpdbg' );
236 $this->agent = $params[
'agent'] ??
'';
238 if ( isset( $params[
'chronologyCallback'] ) ) {
239 $this->chronologyCallback = $params[
'chronologyCallback'];
242 if ( isset( $params[
'roundStage'] ) ) {
243 if ( $params[
'roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
244 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
245 } elseif ( $params[
'roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
246 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
250 $group = $params[
'defaultGroup'] ?? self::GROUP_GENERIC;
251 $this->defaultGroup = isset( $this->groupLoads[$group] ) ? $group : self::GROUP_GENERIC;
254 $this->
id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
255 $this->ownerId = $params[
'ownerId'] ??
null;
261 self::KEY_LOCAL => [],
262 self::KEY_FOREIGN_INUSE => [],
263 self::KEY_FOREIGN_FREE => [],
265 self::KEY_LOCAL_NOROUND => [],
266 self::KEY_FOREIGN_INUSE_NOROUND => [],
267 self::KEY_FOREIGN_FREE_NOROUND => []
272 return $this->localDomain->getId();
286 } elseif ( $domain ===
false || $domain === $this->localDomain->getId() ) {
288 } elseif ( isset( $this->domainAliases[$domain] ) ) {
289 $this->domainAliases[$domain] =
292 return $this->domainAliases[$domain];
295 $cachedDomain = $this->nonLocalDomainCache[$domain] ??
null;
296 if ( $cachedDomain ===
null ) {
298 $this->nonLocalDomainCache = [ $domain => $cachedDomain ];
301 return $cachedDomain;
313 if ( $i > 0 && $groups !== [] && $groups !==
false ) {
314 $list = implode(
', ', (array)$groups );
315 throw new LogicException(
"Query group(s) ($list) given with server index (#$i)" );
318 if ( $groups === [] || $groups ===
false || $groups === $this->defaultGroup ) {
320 } elseif ( is_string( $groups ) && isset( $this->groupLoads[$groups] ) ) {
322 } elseif ( is_array( $groups ) ) {
323 $resolvedGroups = array_keys( array_flip( $groups ) + [ self::GROUP_GENERIC => 1 ] );
328 return $resolvedGroups;
340 $flags |= self::CONN_INTENT_WRITABLE;
343 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
350 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
355 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
357 $this->connLogger->info( __METHOD__ .
": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
358 } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
363 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
376 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
380 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
394 if ( !isset( $this->loadMonitor ) ) {
396 'LoadMonitor' => LoadMonitor::class,
397 'LoadMonitorNull' => LoadMonitorNull::class,
398 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
401 $class = $this->loadMonitorConfig[
'class'];
402 if ( isset( $compat[$class] ) ) {
403 $class = $compat[$class];
406 $this->loadMonitor =
new $class(
407 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
408 $this->loadMonitor->setLogger( $this->replLogger );
423 # Unset excessively lagged servers
424 foreach ( $lags as $i => $lag ) {
426 # How much lag this server nominally is allowed to have
427 $maxServerLag = $this->servers[$i][
'max lag'] ??
$this->maxLag;
428 # Constrain that futher by $maxLag argument
429 $maxServerLag = min( $maxServerLag,
$maxLag );
432 if ( $lag ===
false && !is_infinite( $maxServerLag ) ) {
433 $this->replLogger->debug(
435 ": server {dbserver} is not replicating?", [
'dbserver' => $host ] );
437 } elseif ( $lag > $maxServerLag ) {
438 $this->replLogger->debug(
440 ": server {dbserver} has {lag} seconds of lag (>= {maxlag})",
441 [
'dbserver' => $host,
'lag' => $lag,
'maxlag' => $maxServerLag ]
448 # Find out if all the replica DBs with non-zero load are lagged
450 foreach ( $loads as $load ) {
454 # No appropriate DB servers except maybe the master and some replica DBs with zero load
455 # Do NOT use the master
456 # Instead, this function will return false, triggering read-only mode,
457 # and a lagged replica DB will be used instead.
461 if ( count( $loads ) == 0 ) {
465 # Return a random representative of the remainder
478 if ( $i === self::DB_MASTER ) {
480 } elseif ( $i === self::DB_REPLICA ) {
481 foreach ( $groups as $group ) {
483 if ( $groupIndex !==
false ) {
488 } elseif ( !isset( $this->servers[$i] ) ) {
489 throw new UnexpectedValueException(
"Invalid server index index #$i" );
492 if ( $i === self::DB_REPLICA ) {
493 $this->lastError =
'Unknown error';
508 $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
517 if ( isset( $this->groupLoads[$group] ) ) {
518 $loads = $this->groupLoads[$group];
520 $this->connLogger->info( __METHOD__ .
": no loads for group $group" );
531 if ( $i ===
false ) {
547 $this->laggedReplicaMode =
true;
551 $this->connLogger->debug( __METHOD__ .
": using server $serverName for group '$group'" );
563 return $this->readIndexByGroup[$group] ?? -1;
574 throw new UnexpectedValueException(
"Cannot set a negative read server index" );
576 $this->readIndexByGroup[$group] = $index;
585 if ( $loads === [] ) {
586 throw new InvalidArgumentException(
"Server configuration array is empty" );
595 $currentLoads = $loads;
596 while ( count( $currentLoads ) ) {
601 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
602 $this->replLogger->debug( __METHOD__ .
": replication positions detected" );
606 $ago = microtime(
true ) - $this->waitForPos->asOfTime();
610 if ( $i ===
false ) {
614 if ( $i ===
false && count( $currentLoads ) ) {
616 $this->replLogger->error(
617 __METHOD__ .
": all replica DBs lagged. Switch to read-only mode" );
623 if ( $i ===
false ) {
627 $this->connLogger->debug( __METHOD__ .
": pickRandom() returned false" );
629 return [
false, false ];
633 $this->connLogger->debug( __METHOD__ .
": Using reader #$i: $serverName..." );
638 $this->connLogger->warning( __METHOD__ .
": Failed connecting to $i/$domain" );
639 unset( $currentLoads[$i] );
646 if ( $domain !==
false ) {
655 if ( $currentLoads === [] ) {
656 $this->connLogger->error( __METHOD__ .
": all servers down" );
665 $this->waitForPos = $pos;
668 if ( $i > 0 && !$this->
doWait( $i ) ) {
669 $this->laggedReplicaMode =
true;
681 $this->waitForPos = $pos;
686 $readLoads = $this->groupLoads[self::GROUP_GENERIC];
688 $readLoads = array_filter( $readLoads );
693 $ok = $this->
doWait( $i,
true, $timeout );
699 $this->waitForPos = $oldPos;
710 $this->waitForPos = $pos;
714 for ( $i = 1; $i < $serverCount; $i++ ) {
716 $start = microtime(
true );
717 $ok = $this->
doWait( $i,
true, $timeout ) && $ok;
718 $timeout -= intval( microtime(
true ) - $start );
719 if ( $timeout <= 0 ) {
726 $this->waitForPos = $oldPos;
737 foreach ( $this->groupLoads as $loadsByIndex ) {
738 if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
754 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
755 $this->waitForPos = $pos;
763 $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
766 foreach ( $this->conns as $connsByServer ) {
768 if ( $i === self::DB_REPLICA ) {
769 $indexes = array_keys( $connsByServer );
771 $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
774 foreach ( $indexes as $index ) {
797 foreach ( $candidateConns as $candidateConn ) {
798 if ( !$candidateConn->isOpen() ) {
804 !$candidateConn->getLBInfo( self::$INFO_AUTOCOMMIT_ONLY ) ||
806 $candidateConn->trxLevel()
812 $conn = $candidateConn;
825 protected function doWait( $index, $open =
false, $timeout =
null ) {
830 $key = $this->srvCache->makeGlobalKey( __CLASS__,
'last-known-pos', $server,
'v1' );
832 $knownReachedPos = $this->srvCache->get( $key );
835 $knownReachedPos->
hasReached( $this->waitForPos )
837 $this->replLogger->debug(
839 ": replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).",
840 [
'dbserver' => $server ]
847 $flags = self::CONN_SILENCE_ERRORS;
851 $this->replLogger->debug(
852 __METHOD__ .
': no connection open for {dbserver}',
853 [
'dbserver' => $server ]
861 $this->replLogger->warning(
862 __METHOD__ .
': failed to connect to {dbserver}',
863 [
'dbserver' => $server ]
873 $this->replLogger->info(
875 ': waiting for replica DB {dbserver} to catch up...',
876 [
'dbserver' => $server ]
879 $result = $conn->masterPosWait( $this->waitForPos, $timeout );
881 $ok = ( $result !==
null && $result != -1 );
884 $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
894 public function getConnection( $i, $groups = [], $domain =
false, $flags = 0 ) {
909 !is_string( $conn->getLBInfo( $conn::LB_READ_ONLY_REASON ) )
912 ?
'The database is read-only until replication lag decreases.'
913 :
'The database is read-only until replica database servers becomes reachable.';
914 $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $reason );
931 $conn = $this->localDomain->equals( $domain )
936 if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
944 if ( $this->connectionCounter > $priorConnectionsMade ) {
945 $this->trxProfiler->recordConnection(
948 ( ( $flags & self::CONN_INTENT_WRITABLE ) == self::CONN_INTENT_WRITABLE )
952 if ( !$conn->isOpen() ) {
953 $this->errorConnection = $conn;
967 if ( $this->readOnlyReason !==
false ) {
970 $readOnlyReason =
'The master database server is running in read-only mode.';
981 $serverIndex = $conn->
getLBInfo( self::$INFO_SERVER_INDEX );
982 $refCount = $conn->
getLBInfo( self::$INFO_FOREIGN_REF_COUNT );
983 if ( $serverIndex ===
null || $refCount ===
null ) {
985 } elseif ( $conn instanceof
DBConnRef ) {
988 $this->connLogger->error(
989 __METHOD__ .
": got DBConnRef instance.\n" .
990 (
new LogicException() )->getTraceAsString() );
995 if ( $this->disabled ) {
999 if ( $conn->
getLBInfo( self::$INFO_AUTOCOMMIT_ONLY ) ) {
1008 if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
1009 throw new InvalidArgumentException(
1010 "Connection $serverIndex/$domain not found; it may have already been freed" );
1011 } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
1012 throw new InvalidArgumentException(
1013 "Connection $serverIndex/$domain mismatched; it may have already been freed" );
1016 $conn->
setLBInfo( self::$INFO_FOREIGN_REF_COUNT, --$refCount );
1017 if ( $refCount <= 0 ) {
1018 $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
1019 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
1020 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
1021 unset( $this->conns[$connInUseKey][$serverIndex] );
1023 $this->connLogger->debug( __METHOD__ .
": freed connection $serverIndex/$domain" );
1025 $this->connLogger->debug( __METHOD__ .
1026 ": reference count for $serverIndex/$domain reduced to $refCount" );
1041 return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
1049 $this, $this->
getConnection( $i, $groups, $domain, $flags ), $role );
1057 return ( $i === self::DB_MASTER || $i === $this->
getWriterIndex() )
1070 return $this->
getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
1088 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1093 if ( isset( $this->conns[$connKey][$i][0] ) ) {
1094 $conn = $this->conns[$connKey][$i][0];
1099 [ self::$INFO_AUTOCOMMIT_ONLY => $autoCommit ]
1101 if ( $conn->isOpen() ) {
1102 $this->connLogger->debug( __METHOD__ .
": opened new connection for $i" );
1103 $this->conns[$connKey][$i][0] = $conn;
1105 $this->connLogger->warning( __METHOD__ .
": connection error for $i" );
1106 $this->errorConnection = $conn;
1114 !$this->localDomain->isCompatible( $conn->getDomainID() )
1116 throw new UnexpectedValueException(
1117 "Got connection to '{$conn->getDomainID()}', " .
1118 "but expected local domain ('{$this->localDomain}')"
1151 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1154 if ( $autoCommit ) {
1165 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1167 $conn = $this->conns[$connInUseKey][$i][$domain];
1168 $this->connLogger->debug( __METHOD__ .
": reusing connection $i/$domain" );
1169 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1171 $conn = $this->conns[$connFreeKey][$i][$domain];
1172 unset( $this->conns[$connFreeKey][$i][$domain] );
1173 $this->conns[$connInUseKey][$i][$domain] = $conn;
1174 $this->connLogger->debug( __METHOD__ .
": reusing free connection $i/$domain" );
1175 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1177 foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $oldConn ) {
1178 if ( $domainInstance->getDatabase() !==
null ) {
1184 $oldConn->databasesAreIndependent() &&
1185 $oldConn->getDBname() !== $domainInstance->getDatabase()
1191 $conn->selectDomain( $domainInstance );
1195 $conn->dbSchema( $domainInstance->getSchema() );
1196 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1198 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1200 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1201 $this->connLogger->debug( __METHOD__ .
1202 ": reusing free connection from $oldDomain for $domain" );
1212 self::$INFO_AUTOCOMMIT_ONLY => $autoCommit,
1213 self::$INFO_FORIEGN =>
true,
1214 self::$INFO_FOREIGN_REF_COUNT => 0
1217 if ( $conn->isOpen() ) {
1219 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1220 $this->connLogger->debug( __METHOD__ .
": opened new connection for $i/$domain" );
1222 $this->connLogger->warning( __METHOD__ .
": connection error for $i/$domain" );
1223 $this->errorConnection = $conn;
1230 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1231 throw new UnexpectedValueException(
1232 "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
1235 $refCount = $conn->getLBInfo( self::$INFO_FOREIGN_REF_COUNT );
1236 $conn->setLBInfo( self::$INFO_FOREIGN_REF_COUNT, $refCount + 1 );
1245 $this->servers[$i][
'driver'] ??
null
1272 if ( $this->disabled ) {
1280 array_merge( $server, [
1288 ? ( $domain->
getDatabase() ?? $server[
'dbname'] ??
null )
1291 'schema' => $domain->
getSchema() ?? $server[
'schema'] ??
null,
1295 'flags' => $this->
initConnFlags( $server[
'flags'] ?? IDatabase::DBO_DEFAULT ),
1315 Database::NEW_UNCONNECTED
1318 $conn->setLBInfo( [ self::$INFO_SERVER_INDEX => $i ] + $lbInfo );
1320 $conn->setTableAliases( $this->tableAliases );
1321 $conn->setIndexAliases( $this->indexAliases );
1324 if ( $this->trxRoundId !==
false ) {
1327 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1328 $conn->setTransactionListener( $name, $callback );
1334 $conn->initConnection();
1343 if ( $conn->isOpen() ) {
1349 if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
1350 $this->perfLogger->warning(
1351 __METHOD__ .
": {connections}+ connections made (master={masterdb})",
1353 'connections' => $count,
1354 'dbserver' => $conn->getServer(),
1355 'masterdb' => $this->getMasterServerName()
1369 if ( !empty( $server[
'is static'] ) ) {
1370 return IDatabase::ROLE_STATIC_CLONE;
1374 ? IDatabase::ROLE_STREAMING_MASTER
1375 : IDatabase::ROLE_STREAMING_REPLICA;
1384 if ( ( $flags & IDatabase::DBO_DEFAULT ) === IDatabase::DBO_DEFAULT ) {
1385 if ( $this->cliMode ) {
1388 $flags |= IDatabase::DBO_TRX;
1399 if ( !$this->connectionAttempted && $this->chronologyCallback ) {
1400 $this->connectionAttempted =
true;
1402 $this->connLogger->debug( __METHOD__ .
': executed chronology callback.' );
1412 'method' => __METHOD__,
1417 $context[
'db_server'] = $conn->
getServer();
1418 $this->connLogger->warning(
1419 __METHOD__ .
": connection error: {last_error} ({db_server})",
1423 throw new DBConnectionError( $conn,
"{$this->lastError} ({$context['db_server']})" );
1426 $this->connLogger->error(
1428 ": LB failure with no last connection. Connection error: {last_error}",
1449 return array_key_exists( $i, $this->servers );
1460 return ( isset( $this->servers[$i] ) && $this->groupLoads[self::GROUP_GENERIC][$i] > 0 );
1464 return count( $this->servers );
1472 foreach ( $this->servers as $i => $server ) {
1473 if ( $i !== $this->
getWriterIndex() && empty( $server[
'is static'] ) ) {
1482 $name = $this->servers[$i][
'hostName'] ?? ( $this->servers[$i][
'host'] ??
'' );
1484 return ( $name !=
'' ) ? $name :
'localhost';
1488 return $this->servers[$i] ??
false;
1492 return $this->servers[$i][
'type'] ??
'unknown';
1500 return $conn->getMasterPos();
1503 $conn = $this->
getConnection( $index, self::CONN_SILENCE_ERRORS );
1510 $pos = $conn->getMasterPos();
1521 if ( $masterConn ) {
1522 return $masterConn->getMasterPos();
1526 $highestPos =
false;
1528 for ( $i = 1; $i < $serverCount; $i++ ) {
1529 if ( !empty( $this->servers[$i][
'is static'] ) ) {
1534 $pos = $conn ? $conn->getReplicaPos() :
false;
1539 $highestPos = $highestPos ?: $pos;
1540 if ( $pos->hasReached( $highestPos ) ) {
1548 public function disable( $fname = __METHOD__, $owner =
null ) {
1551 $this->disabled =
true;
1554 public function closeAll( $fname = __METHOD__, $owner =
null ) {
1556 if ( $this->ownerId ===
null ) {
1558 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1562 $this->connLogger->debug(
"$fname: closing connection to database '$host'." );
1563 $conn->
close( $fname, $this->
id );
1572 throw new RuntimeException(
'Cannot close DBConnRef instance; it must be shareable' );
1575 $serverIndex = $conn->
getLBInfo( self::$INFO_SERVER_INDEX );
1576 foreach ( $this->conns as
$type => $connsByServer ) {
1577 if ( !isset( $connsByServer[$serverIndex] ) ) {
1581 foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
1582 if ( $conn === $trackedConn ) {
1584 $this->connLogger->debug(
1585 __METHOD__ .
": closing connection to database $i at '$host'." );
1586 unset( $this->conns[
$type][$serverIndex][$i] );
1592 $conn->
close( __METHOD__ );
1595 public function commitAll( $fname = __METHOD__, $owner =
null ) {
1604 if ( $this->ownerId ===
null ) {
1606 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1609 $this->trxRoundStage = self::ROUND_ERROR;
1620 }
while ( $count > 0 );
1625 $this->trxRoundStage = self::ROUND_FINALIZED;
1633 if ( $this->ownerId ===
null ) {
1635 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1638 $limit = $options[
'maxWriteDuration'] ?? 0;
1640 $this->trxRoundStage = self::ROUND_ERROR;
1649 if ( $limit > 0 && $time > $limit ) {
1652 "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
1661 "A connection to the {$conn->getDBname()} database was lost before commit"
1665 $this->trxRoundStage = self::ROUND_APPROVED;
1670 if ( $this->trxRoundId !==
false ) {
1673 "$fname: Transaction round '{$this->trxRoundId}' already started"
1677 if ( $this->ownerId ===
null ) {
1679 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1685 $this->trxRoundId = $fname;
1686 $this->trxRoundStage = self::ROUND_ERROR;
1694 $this->trxRoundStage = self::ROUND_CURSORY;
1700 if ( $this->ownerId ===
null ) {
1702 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1707 $restore = ( $this->trxRoundId !== false );
1708 $this->trxRoundId =
false;
1709 $this->trxRoundStage = self::ROUND_ERROR;
1713 function (
IDatabase $conn ) use ( $fname, &$failures ) {
1715 $conn->
commit( $fname, $conn::FLUSHING_ALL_PEERS );
1718 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1725 "$fname: Commit failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1734 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1739 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1740 $type = IDatabase::TRIGGER_COMMIT;
1741 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1742 $type = IDatabase::TRIGGER_ROLLBACK;
1746 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1749 if ( $this->ownerId ===
null ) {
1751 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1755 $this->trxRoundStage = self::ROUND_ERROR;
1763 $fname = __METHOD__;
1774 $count += $conn->runOnTransactionIdleCallbacks( $type );
1775 } catch ( Throwable $ex ) {
1784 $this->queryLogger->warning( $fname .
": found writes pending." );
1785 $fnames = implode(
', ', $conn->pendingWriteAndCallbackCallers() );
1786 $this->queryLogger->warning(
1787 "$fname: found writes pending ($fnames).",
1789 'db_server' => $conn->getServer(),
1790 'db_name' => $conn->getDBname(),
1791 'exception' => new RuntimeException()
1797 $this->queryLogger->debug(
"$fname: found empty transaction." );
1800 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1801 } catch ( Throwable $ex ) {
1805 }
while ( $count > 0 );
1807 $this->trxRoundStage = $oldStage;
1814 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1815 $type = IDatabase::TRIGGER_COMMIT;
1816 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1817 $type = IDatabase::TRIGGER_ROLLBACK;
1821 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1824 if ( $this->ownerId ===
null ) {
1826 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1831 $this->trxRoundStage = self::ROUND_ERROR;
1835 }
catch ( Throwable $ex ) {
1839 $this->trxRoundStage = self::ROUND_CURSORY;
1846 if ( $this->ownerId ===
null ) {
1848 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1851 $restore = ( $this->trxRoundId !== false );
1852 $this->trxRoundId =
false;
1853 $this->trxRoundStage = self::ROUND_ERROR;
1855 $conn->
rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1863 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1871 $stages = (array)$stage;
1873 if ( !in_array( $this->trxRoundStage, $stages,
true ) ) {
1874 $stageList = implode(
1876 array_map(
function ( $v ) {
1882 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1900 if ( $this->ownerId !==
null && $owner !== $this->ownerId && $owner !== $this->
id ) {
1903 "$fname: LoadBalancer is owned by ID '{$this->ownerId}' (got '$owner')."
1918 if ( $conn->
getLBInfo( self::$INFO_AUTOCOMMIT_ONLY ) ) {
1922 if ( $conn->
getFlag( $conn::DBO_DEFAULT ) ) {
1925 $conn->
setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1928 if ( $conn->
getFlag( $conn::DBO_TRX ) ) {
1929 $conn->
setLBInfo( $conn::LB_TRX_ROUND_ID, $this->trxRoundId );
1937 if ( $conn->
getLBInfo( self::$INFO_AUTOCOMMIT_ONLY ) ) {
1941 if ( $conn->
getFlag( $conn::DBO_TRX ) ) {
1942 $conn->
setLBInfo( $conn::LB_TRX_ROUND_ID,
null );
1945 if ( $conn->
getFlag( $conn::DBO_DEFAULT ) ) {
1995 $age = ( $age === null ) ? $this->waitTimeout : $age;
2011 if ( $this->laggedReplicaMode ) {
2030 if ( $this->readOnlyReason !==
false ) {
2033 return 'The master database server is running in read-only mode.';
2036 ?
'The database is read-only until replication lag decreases.'
2037 :
'The database is read-only until a replica database server becomes reachable.';
2050 $key = $this->srvCache->makeGlobalKey(
2051 'rdbms-server-readonly',
2057 if ( ( $flags & self::CONN_REFRESH_READ_ONLY ) == self::CONN_REFRESH_READ_ONLY ) {
2063 $this->srvCache->set( $key, $readOnly, BagOStuff::TTL_PROC_SHORT );
2065 $readOnly = $this->srvCache->getWithSetCallback(
2067 BagOStuff::TTL_PROC_SHORT,
2068 function () use ( $conn ) {
2078 return (
bool)$readOnly;
2087 return (
bool)$this->wanCache->getWithSetCallback(
2089 $this->wanCache->makeGlobalKey(
2090 'rdbms-server-readonly',
2091 $this->getMasterServerName(),
2095 self::TTL_CACHE_READONLY,
2096 function () use ( $domain ) {
2097 $old = $this->trxProfiler->setSilenced(
true );
2101 $flags = self::CONN_REFRESH_READ_ONLY;
2109 $this->trxProfiler->setSilenced( $old );
2113 [
'pcTTL' => WANObjectCache::TTL_PROC_LONG,
'lockTSE' => 10,
'busyValue' => 0 ]
2118 if ( $mode ===
null ) {
2129 if ( !$conn->
ping() ) {
2138 foreach ( $this->conns as $connsByServer ) {
2139 foreach ( $connsByServer as $serverConns ) {
2140 foreach ( $serverConns as $conn ) {
2141 $callback( $conn, ...$params );
2149 foreach ( $this->conns as $connsByServer ) {
2150 if ( isset( $connsByServer[$masterIndex] ) ) {
2152 foreach ( $connsByServer[$masterIndex] as $conn ) {
2153 $callback( $conn, ...$params );
2160 foreach ( $this->conns as $connsByServer ) {
2161 foreach ( $connsByServer as $i => $serverConns ) {
2165 foreach ( $serverConns as $conn ) {
2166 $callback( $conn, ...$params );
2177 foreach ( $this->conns as $connsByServer ) {
2178 foreach ( $connsByServer as $serverConns ) {
2179 $count += count( $serverConns );
2193 foreach ( $lagTimes as $i => $lag ) {
2194 if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag >
$maxLag ) {
2202 return [ $host,
$maxLag, $maxIndex ];
2210 $knownLagTimes = [];
2211 $indexesWithLag = [];
2212 foreach ( $this->servers as $i => $server ) {
2213 if ( empty( $server[
'is static'] ) ) {
2214 $indexesWithLag[] = $i;
2216 $knownLagTimes[$i] = 0;
2220 return $this->
getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
2252 $flags = self::CONN_SILENCE_ERRORS;
2254 if ( $masterConn ) {
2255 $pos = $masterConn->getMasterPos();
2258 if ( !$masterConn ) {
2261 "Could not obtain a master database connection to get the position"
2264 $pos = $masterConn->getMasterPos();
2270 $start = microtime(
true );
2272 $seconds = max( microtime(
true ) - $start, 0 );
2274 $ok = ( $result !==
null && $result != -1 );
2276 $this->replLogger->warning(
2277 __METHOD__ .
': timed out waiting on {dbserver} pos {pos} [{seconds}s]',
2281 'seconds' => round( $seconds, 6 ),
2282 'trace' => (
new RuntimeException() )->getTraceAsString()
2286 $this->replLogger->debug( __METHOD__ .
': done waiting' );
2290 $this->replLogger->error(
2291 __METHOD__ .
': could not get master pos for {dbserver}',
2294 'trace' => (
new RuntimeException() )->getTraceAsString()
2320 $this->trxRecurringCallbacks[$name] = $callback;
2322 unset( $this->trxRecurringCallbacks[$name] );
2325 function (
IDatabase $conn ) use ( $name, $callback ) {
2332 $this->tableAliases = $aliases;
2336 $this->indexAliases = $aliases;
2340 $this->domainAliases = $aliases;
2349 if ( $conn->
getLBInfo( self::$INFO_FOREIGN_REF_COUNT ) > 0 ) {
2350 $domainsInUse[] = $conn->getDomainID();
2355 if ( $domainsInUse ) {
2356 $domains = implode(
', ', $domainsInUse );
2358 "Foreign domain connections are still in use ($domains)" );
2362 $this->localDomain->getDatabase(),
2363 $this->localDomain->getSchema(),
2369 if ( !$conn->
getLBInfo( self::$INFO_FORIEGN ) ) {
2370 $conn->tablePrefix( $prefix );
2376 $this->
closeAll( __METHOD__, $this->
id );
2382 $old = $this->tempTablesOnlyMode[$domain] ??
false;
2384 $this->tempTablesOnlyMode[$domain] =
true;
2386 unset( $this->tempTablesOnlyMode[$domain] );
2396 $this->localDomain = $domain;
2406 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
2407 throw new InvalidArgumentException(
"No server with index '$i'" );
2410 if ( $field !==
null ) {
2411 if ( !array_key_exists( $field, $this->servers[$i] ) ) {
2412 throw new InvalidArgumentException(
"No field '$field' in server index '$i'" );
2415 return $this->servers[$i][$field];
2418 return $this->servers[$i];
2430 $this->
disable( __METHOD__, $this->ownerId );
2437class_alias( LoadBalancer::class,
'LoadBalancer' );
A collection of static methods to play with arrays.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Class representing a cache/ephemeral data store.
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Class to handle database/schema/prefix specifications for IDatabase.
static newFromId( $domain)
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...