27use InvalidArgumentException;
29use Psr\Log\LoggerInterface;
30use Psr\Log\NullLogger;
33use UnexpectedValueException;
35use Wikimedia\RequestTimeout\CriticalSectionProvider;
36use Wikimedia\ScopedCallback;
189 private const READER_INDEX_NONE = -1;
192 if ( !isset( $params[
'servers'] ) || !count( $params[
'servers'] ) ) {
193 throw new InvalidArgumentException(
'Missing or empty "servers" parameter' );
205 $this->groupLoads = [ self::GROUP_GENERIC => [] ];
206 foreach ( $params[
'servers'] as $i => $server ) {
207 if ( ++$listKey !== $i ) {
208 throw new UnexpectedValueException(
'List expected for "servers" parameter' );
210 $this->servers[$i] = $server;
211 foreach ( ( $server[
'groupLoads'] ?? [] ) as $group => $ratio ) {
212 $this->groupLoads[$group][$i] = $ratio;
215 $this->maxLagByIndex[$i] = $server[
'max lag'] ??
$this->maxLag;
222 if ( isset( $params[
'readOnlyReason'] ) && is_string( $params[
'readOnlyReason'] ) ) {
223 $this->readOnlyReason = $params[
'readOnlyReason'];
226 $this->loadMonitorConfig = $params[
'loadMonitor'] ?? [
'class' =>
'LoadMonitorNull' ];
227 $this->loadMonitorConfig += [
'lagWarnThreshold' =>
$this->maxLag ];
230 $this->wanCache = $params[
'wanCache'] ?? WANObjectCache::newEmpty();
231 $this->errorLogger = $params[
'errorLogger'] ??
static function ( Throwable $e ) {
232 trigger_error( get_class( $e ) .
': ' . $e->getMessage(), E_USER_WARNING );
234 $this->deprecationLogger = $params[
'deprecationLogger'] ??
static function ( $msg ) {
235 trigger_error( $msg, E_USER_DEPRECATED );
237 $this->replLogger = $params[
'replLogger'] ??
new NullLogger();
238 $this->connLogger = $params[
'connLogger'] ??
new NullLogger();
239 $this->queryLogger = $params[
'queryLogger'] ??
new NullLogger();
240 $this->perfLogger = $params[
'perfLogger'] ??
new NullLogger();
242 $this->clusterName = $params[
'clusterName'] ??
null;
243 $this->profiler = $params[
'profiler'] ??
null;
246 $this->csProvider = $params[
'criticalSectionProvider'] ??
null;
248 $this->cliMode = $params[
'cliMode'] ?? ( PHP_SAPI ===
'cli' || PHP_SAPI ===
'phpdbg' );
249 $this->agent = $params[
'agent'] ??
'';
251 if ( isset( $params[
'chronologyCallback'] ) ) {
252 $this->chronologyCallback = $params[
'chronologyCallback'];
255 if ( isset( $params[
'roundStage'] ) ) {
256 if ( $params[
'roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
258 } elseif ( $params[
'roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
267 $this->
id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
268 $this->ownerId = $params[
'ownerId'] ??
null;
274 self::KEY_LOCAL => [],
275 self::KEY_FOREIGN_INUSE => [],
276 self::KEY_FOREIGN_FREE => [],
278 self::KEY_LOCAL_NOROUND => [],
279 self::KEY_FOREIGN_INUSE_NOROUND => [],
280 self::KEY_FOREIGN_FREE_NOROUND => []
285 if ( $this->clusterName !==
null ) {
296 return $this->localDomain->getId();
310 } elseif ( $domain ===
false || $domain === $this->localDomain->getId() ) {
312 } elseif ( isset( $this->domainAliases[$domain] ) ) {
313 $this->domainAliases[$domain] =
316 return $this->domainAliases[$domain];
319 $cachedDomain = $this->nonLocalDomainCache[$domain] ??
null;
320 if ( $cachedDomain ===
null ) {
322 $this->nonLocalDomainCache = [ $domain => $cachedDomain ];
325 return $cachedDomain;
337 if ( $i > 0 && $groups !== [] && $groups !==
false ) {
338 $list = implode(
', ', (array)$groups );
339 throw new LogicException(
"Query group(s) ($list) given with server index (#$i)" );
342 if ( $groups === [] || $groups ===
false || $groups === $this->defaultGroup ) {
344 } elseif ( is_string( $groups ) && isset( $this->groupLoads[$groups] ) ) {
346 } elseif ( is_array( $groups ) ) {
347 $resolvedGroups = $groups;
348 if ( array_search( $this->defaultGroup, $resolvedGroups ) ===
false ) {
355 return $resolvedGroups;
366 if ( $i === self::DB_PRIMARY || $i === $this->
getWriterIndex() ) {
370 if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
377 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
384 $this->connLogger->info( __METHOD__ .
": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
385 } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
403 if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) {
407 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
421 if ( !isset( $this->loadMonitor ) ) {
423 'LoadMonitor' => LoadMonitor::class,
424 'LoadMonitorNull' => LoadMonitorNull::class,
425 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
428 $class = $this->loadMonitorConfig[
'class'];
429 if ( isset( $compat[$class] ) ) {
430 $class = $compat[$class];
433 $this->loadMonitor =
new $class(
434 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
435 $this->loadMonitor->setLogger( $this->replLogger );
450 # Unset excessively lagged servers
451 foreach ( $lags as $i => $lag ) {
453 # How much lag this server nominally is allowed to have
454 $maxServerLag = $this->servers[$i][
'max lag'] ??
$this->maxLag;
455 # Constrain that futher by $maxLag argument
456 $maxServerLag = min( $maxServerLag,
$maxLag );
459 if ( $lag ===
false && !is_infinite( $maxServerLag ) ) {
460 $this->replLogger->debug(
461 __METHOD__ .
": server {db_server} is not replicating?",
462 [
'db_server' => $srvName ]
465 } elseif ( $lag > $maxServerLag ) {
466 $this->replLogger->debug(
468 ": server {db_server} has {lag} seconds of lag (>= {maxlag})",
469 [
'db_server' => $srvName,
'lag' => $lag,
'maxlag' => $maxServerLag ]
476 # Find out if all the replica DBs with non-zero load are lagged
478 foreach ( $loads as $load ) {
482 # No appropriate DB servers except maybe the primary and some replica DBs with zero load
483 # Do NOT use the primary DB
484 # Instead, this function will return false, triggering read-only mode,
485 # and a lagged replica DB will be used instead.
489 if ( count( $loads ) == 0 ) {
493 # Return a random representative of the remainder
506 if ( $i === self::DB_PRIMARY ) {
508 } elseif ( $i === self::DB_REPLICA ) {
509 foreach ( $groups as $group ) {
511 if ( $groupIndex !==
false ) {
516 } elseif ( !isset( $this->servers[$i] ) ) {
517 throw new UnexpectedValueException(
"Invalid server index index #$i" );
520 if ( $i === self::DB_REPLICA ) {
521 $this->lastError =
'Unknown error';
539 if ( $index !== self::READER_INDEX_NONE ) {
545 if ( isset( $this->groupLoads[$group] ) ) {
546 $loads = $this->groupLoads[$group];
548 $this->connLogger->info( __METHOD__ .
": no loads for group $group" );
559 if ( $i ===
false ) {
575 $this->laggedReplicaMode =
true;
576 $this->replLogger->debug( __METHOD__ .
": setting lagged replica mode" );
580 $this->connLogger->debug( __METHOD__ .
": using server $serverName for group '$group'" );
592 return $this->readIndexByGroup[$group] ?? self::READER_INDEX_NONE;
603 throw new UnexpectedValueException(
"Cannot set a negative read server index" );
605 $this->readIndexByGroup[$group] = $index;
618 if ( $loads === [] ) {
619 throw new InvalidArgumentException(
"Server configuration array is empty" );
628 $currentLoads = $loads;
629 while ( count( $currentLoads ) ) {
634 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
635 $this->replLogger->debug( __METHOD__ .
": replication positions detected" );
639 $ago = microtime(
true ) - $this->waitForPos->asOfTime();
643 if ( $i ===
false ) {
647 if ( $i ===
false && count( $currentLoads ) ) {
649 $this->replLogger->error(
650 __METHOD__ .
": all replica DBs lagged. Switch to read-only mode",
651 [
'db_domain' => $domain ]
658 if ( $i ===
false ) {
662 $this->connLogger->debug( __METHOD__ .
": pickRandom() returned false" );
664 return [
false, false ];
668 $this->connLogger->debug( __METHOD__ .
": Using reader #$i: $serverName..." );
675 $this->connLogger->warning(
676 __METHOD__ .
": failed connecting to $i/{db_domain}",
677 [
'db_domain' => $domain ]
679 unset( $currentLoads[$i] );
686 if ( !$this->localDomain->equals( $domain ) ) {
695 if ( $currentLoads === [] ) {
696 $this->connLogger->error(
697 __METHOD__ .
": all servers down",
698 [
'db_domain' => $domain ]
708 $this->waitForPos = $pos;
714 $this->laggedReplicaMode =
true;
715 $this->replLogger->debug( __METHOD__ .
": setting lagged replica mode" );
726 $this->waitForPos = $pos;
736 $readLoads = array_filter( $readLoads );
740 if ( $i !==
false ) {
741 $ok = $this->
doWait( $i, $timeout );
749 $this->waitForPos = $oldPos;
758 $this->waitForPos = $pos;
763 $start = microtime(
true );
764 $ok = $this->
doWait( $i, $timeout ) && $ok;
765 $timeout -= intval( microtime(
true ) - $start );
766 if ( $timeout <= 0 ) {
775 $this->waitForPos = $oldPos;
784 foreach ( $this->groupLoads as $loadsByIndex ) {
785 if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
801 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
802 $this->waitForPos = $pos;
813 foreach ( $this->conns as
$type => $connsByServer ) {
814 if ( $i === self::DB_REPLICA ) {
816 $applicableConnsByServer = $connsByServer;
819 $applicableConnsByServer = isset( $connsByServer[$i] )
820 ? [ $i => $connsByServer[$i] ]
826 $this->connLogger->debug( __METHOD__ .
": found '$type' connection to #$i." );
844 foreach ( $connsByServer as $i =>
$conns ) {
845 foreach (
$conns as $conn ) {
846 if ( !$conn->isOpen() ) {
847 $this->connLogger->warning(
849 ": pooled DB handle for {db_server} (#$i) has no open connection.",
856 if ( $autoCommitOnly ) {
858 if ( !$conn->getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
863 if ( $conn->trxLevel() ) {
865 $this->connLogger->warning(
867 ": pooled DB handle for {db_server} (#$i) has a pending transaction.",
889 private function doWait( $index, $timeout =
null ) {
894 $key = $this->srvCache->makeGlobalKey( __CLASS__,
'last-known-pos', $srvName,
'v1' );
896 $knownReachedPos = $this->srvCache->get( $key );
899 $knownReachedPos->
hasReached( $this->waitForPos )
901 $this->replLogger->debug(
903 ": replica DB {db_server} known to be caught up (pos >= $knownReachedPos).",
904 [
'db_server' => $srvName ]
920 $this->replLogger->warning(
921 __METHOD__ .
': failed to connect to {db_server}',
922 [
'db_server' => $srvName ]
932 $this->replLogger->info(
934 ': waiting for replica DB {db_server} to catch up...',
938 $result = $conn->primaryPosWait( $this->waitForPos, $timeout );
940 $ok = ( $result !==
null && $result != -1 );
943 $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
953 public function getConnection( $i, $groups = [], $domain =
false, $flags = 0 ) {
969 !is_string( $conn->getLBInfo( $conn::LB_READ_ONLY_REASON ) )
972 $reason = ( $genericIndex !== self::READER_INDEX_NONE )
973 ?
'The database is read-only until replication lag decreases.'
974 :
'The database is read-only until replica database servers becomes reachable.';
975 $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $reason );
985 $conn = $this->localDomain->equals( $domain )
990 if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
998 if ( $this->connectionCounter > $priorConnectionsMade ) {
999 $this->trxProfiler->recordConnection(
1000 $conn->getServerName(),
1002 self::fieldHasBit( $flags, self::CONN_INTENT_WRITABLE )
1006 if ( !$conn->isOpen() ) {
1007 $this->errorConnection = $conn;
1022 if ( $this->readOnlyReason !==
false ) {
1025 $readOnlyReason =
'The primary database server is running in read-only mode.';
1036 $serverIndex = $conn->
getLBInfo( self::INFO_SERVER_INDEX );
1037 $refCount = $conn->
getLBInfo( self::INFO_FOREIGN_REF_COUNT );
1038 if ( $serverIndex ===
null || $refCount ===
null ) {
1040 } elseif ( $conn instanceof
DBConnRef ) {
1043 $this->connLogger->error(
1044 __METHOD__ .
": got DBConnRef instance",
1045 [
'db_domain' => $conn->
getDomainID(),
'exception' =>
new RuntimeException() ]
1051 if ( $this->disabled ) {
1055 if ( $conn->
getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
1064 $existingDomainConn = $this->conns[$connInUseKey][$serverIndex][$domain] ??
null;
1065 if ( !$existingDomainConn ) {
1066 throw new InvalidArgumentException(
1067 "Connection $serverIndex/$domain not found; it may have already been freed" );
1068 } elseif ( $existingDomainConn !== $conn ) {
1069 throw new InvalidArgumentException(
1070 "Connection $serverIndex/$domain mismatched; it may have already been freed" );
1073 $existingDomainConn->setLBInfo( self::INFO_FOREIGN_REF_COUNT, --$refCount );
1074 if ( $refCount <= 0 ) {
1075 $this->conns[$connFreeKey][$serverIndex][$domain] = $existingDomainConn;
1076 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
1077 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
1078 unset( $this->conns[$connInUseKey][$serverIndex] );
1080 $this->connLogger->debug( __METHOD__ .
": freed connection $serverIndex/$domain" );
1082 $this->connLogger->debug( __METHOD__ .
1083 ": reference count for $serverIndex/$domain reduced to $refCount" );
1088 if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
1089 throw new UnexpectedValueException(
1090 __METHOD__ .
' CONN_SILENCE_ERRORS is not supported'
1096 $conn = $this->
getConnection( $i, $groups, $domain, $flags );
1098 return new DBConnRef( $this, $conn, $role );
1103 throw new UnexpectedValueException(
1104 __METHOD__ .
' got CONN_SILENCE_ERRORS; connection is already deferred'
1111 return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
1120 if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
1121 throw new UnexpectedValueException(
1122 __METHOD__ .
' CONN_SILENCE_ERRORS is not supported'
1126 $domain = $this->resolveDomainID( $domain );
1127 $role = $this->getRoleFromIndex( $i );
1128 $conn = $this->getConnection( $i, $groups, $domain, $flags );
1138 return ( $i === self::DB_PRIMARY || $i === $this->getWriterIndex() )
1151 return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
1169 $autoCommit = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
1172 $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
1174 if ( isset( $this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN] ) ) {
1175 $conn = $this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN];
1176 $this->connLogger->debug( __METHOD__ .
": reused a connection for $connKey/$i" );
1178 $conn = $this->reallyOpenConnection(
1181 [ self::INFO_AUTOCOMMIT_ONLY => $autoCommit ]
1183 if ( $conn->isOpen() ) {
1184 $this->connLogger->debug( __METHOD__ .
": opened new connection for $connKey/$i" );
1185 $this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN] = $conn;
1187 $this->connLogger->warning( __METHOD__ .
": connection error for $connKey/$i" );
1188 $this->errorConnection = $conn;
1196 !$this->localDomain->isCompatible( $conn->getDomainID() )
1198 throw new UnexpectedValueException(
1199 "Got connection to '{$conn->getDomainID()}', " .
1200 "but expected local domain ('{$this->localDomain}')"
1232 $domainInstance = DatabaseDomain::newFromId( $domain );
1233 $autoCommit = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
1236 if ( $autoCommit ) {
1237 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
1238 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1240 $connFreeKey = self::KEY_FOREIGN_FREE;
1241 $connInUseKey = self::KEY_FOREIGN_INUSE;
1247 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1249 $conn = $this->conns[$connInUseKey][$i][$domain];
1250 $this->connLogger->debug( __METHOD__ .
": reusing connection $connInUseKey/$i/$domain" );
1251 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1253 $conn = $this->conns[$connFreeKey][$i][$domain];
1254 unset( $this->conns[$connFreeKey][$i][$domain] );
1255 $this->conns[$connInUseKey][$i][$domain] = $conn;
1256 $this->connLogger->debug( __METHOD__ .
": reusing free connection $connInUseKey/$i/$domain" );
1257 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1259 foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $oldConn ) {
1260 if ( $domainInstance->getDatabase() !==
null ) {
1266 $oldConn->databasesAreIndependent() &&
1267 $oldConn->getDBname() !== $domainInstance->getDatabase()
1273 $conn->selectDomain( $domainInstance );
1277 $conn->dbSchema( $domainInstance->getSchema() );
1278 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1280 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1282 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1283 $this->connLogger->debug( __METHOD__ .
1284 ": reusing free connection from $oldDomain for $domain" );
1290 $conn = $this->reallyOpenConnection(
1294 self::INFO_AUTOCOMMIT_ONLY => $autoCommit,
1295 self::INFO_FORIEGN =>
true,
1296 self::INFO_FOREIGN_REF_COUNT => 0
1299 if ( $conn->isOpen() ) {
1301 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1302 $this->connLogger->debug( __METHOD__ .
": opened new connection for $connInUseKey/$i/$domain" );
1304 $this->connLogger->warning(
1305 __METHOD__ .
": connection error for $connInUseKey/$i/{db_domain}",
1306 [
'db_domain' => $domain ]
1308 $this->errorConnection = $conn;
1315 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1316 throw new UnexpectedValueException(
1317 "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
1320 $refCount = $conn->getLBInfo( self::INFO_FOREIGN_REF_COUNT );
1321 $conn->setLBInfo( self::INFO_FOREIGN_REF_COUNT, $refCount + 1 );
1328 return Database::attributesFromType(
1329 $this->getServerType( $i ),
1330 $this->servers[$i][
'driver'] ??
null
1341 return (
bool)$this->getAnyOpenConnection( $index );
1357 if ( $this->disabled ) {
1361 $server = $this->getServerInfoStrict( $i );
1363 $conn = Database::factory(
1365 array_merge( $server, [
1367 'topologyRole' => $this->getTopologyRole( $i, $server ),
1368 'topologicalMaster' => $this->getPrimaryServerName(),
1372 'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE]
1373 ? ( $domain->
getDatabase() ?? $server[
'dbname'] ??
null )
1376 'schema' => $domain->
getSchema() ?? $server[
'schema'] ??
null,
1380 'flags' => $this->initConnFlags( $server[
'flags'] ?? IDatabase::DBO_DEFAULT ),
1382 'cliMode' => $this->cliMode,
1383 'agent' => $this->agent,
1384 'ownerId' => $this->id,
1386 'lazyMasterHandle' => $this->getLazyConnectionRef(
1391 'srvCache' => $this->srvCache,
1392 'connLogger' => $this->connLogger,
1393 'queryLogger' => $this->queryLogger,
1394 'replLogger' => $this->replLogger,
1395 'errorLogger' => $this->errorLogger,
1396 'deprecationLogger' => $this->deprecationLogger,
1397 'profiler' => $this->profiler,
1398 'trxProfiler' => $this->trxProfiler,
1399 'criticalSectionProvider' => $this->csProvider
1401 Database::NEW_UNCONNECTED
1404 $conn->setLBInfo( [ self::INFO_SERVER_INDEX => $i ] + $lbInfo );
1406 $conn->setTableAliases( $this->tableAliases );
1407 $conn->setIndexAliases( $this->indexAliases );
1409 if ( $i === $this->getWriterIndex() ) {
1410 if ( $this->trxRoundId !==
false ) {
1411 $this->applyTransactionRoundFlags( $conn );
1413 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1414 $conn->setTransactionListener( $name, $callback );
1420 $conn->initConnection();
1421 ++$this->connectionCounter;
1429 if ( $conn->isOpen() ) {
1430 $this->lazyLoadReplicationPositions();
1434 $count = $this->getCurrentConnectionCount();
1435 if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
1436 $this->perfLogger->warning(
1437 __METHOD__ .
": {connections}+ connections made (primary={primarydb})",
1438 $this->getConnLogContext(
1441 'connections' => $count,
1442 'primarydb' => $this->getPrimaryServerName(),
1443 'db_domain' => $domain->
getId()
1458 if ( !empty( $server[
'is static'] ) ) {
1459 return IDatabase::ROLE_STATIC_CLONE;
1462 return ( $i === $this->getWriterIndex() )
1463 ? IDatabase::ROLE_STREAMING_MASTER
1464 : IDatabase::ROLE_STREAMING_REPLICA;
1473 if ( self::fieldHasBit( $flags, IDatabase::DBO_DEFAULT ) ) {
1474 if ( $this->cliMode ) {
1477 $flags |= IDatabase::DBO_TRX;
1488 if ( !$this->chronologyCallbackTriggered && $this->chronologyCallback ) {
1489 $this->chronologyCallbackTriggered =
true;
1490 ( $this->chronologyCallback )( $this );
1491 $this->connLogger->debug( __METHOD__ .
': executed chronology callback.' );
1500 $conn = $this->errorConnection;
1502 'method' => __METHOD__,
1503 'last_error' => $this->lastError,
1507 $srvName = $conn->getServerName();
1508 $this->connLogger->warning(
1509 __METHOD__ .
": connection error: {last_error} ({db_server})",
1510 $this->getConnLogContext( $conn, $context )
1512 $error = $conn->lastError() ?: $this->lastError;
1516 $this->connLogger->error(
1518 ": LB failure with no last connection. Connection error: {last_error}",
1539 return array_key_exists( $i, $this->servers );
1550 return ( isset( $this->servers[$i] ) && $this->groupLoads[self::GROUP_GENERIC][$i] > 0 );
1554 return count( $this->servers );
1558 return ( $this->getServerCount() > 1 );
1566 foreach ( $this->servers as $i => $server ) {
1567 if ( $i !== $this->getWriterIndex() && empty( $server[
'is static'] ) ) {
1576 return (
bool)$this->getStreamingReplicaIndexes();
1580 $name = $this->servers[$i][
'serverName'] ?? ( $this->servers[$i][
'host'] ??
'' );
1582 return ( $name !=
'' ) ? $name :
'localhost';
1586 return $this->servers[$i] ??
false;
1590 return $this->servers[$i][
'type'] ??
'unknown';
1594 $index = $this->getWriterIndex();
1596 $conn = $this->getAnyOpenConnection( $index );
1598 return $conn->getPrimaryPos();
1601 $conn = $this->getConnection( $index, self::CONN_SILENCE_ERRORS );
1603 $this->reportConnectionError();
1607 return $conn->getPrimaryPos();
1609 $this->closeConnection( $conn );
1615 return $this->getPrimaryPos();
1620 $primaryConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1621 if ( $primaryConn ) {
1622 return $primaryConn->getPrimaryPos();
1626 $highestPos =
false;
1627 foreach ( $this->getStreamingReplicaIndexes() as $i ) {
1628 $conn = $this->getAnyOpenConnection( $i );
1629 $pos = $conn ? $conn->getReplicaPos() :
false;
1634 $highestPos = $highestPos ?: $pos;
1635 if ( $pos->hasReached( $highestPos ) ) {
1643 public function disable( $fname = __METHOD__, $owner =
null ) {
1644 $this->assertOwnership( $fname, $owner );
1645 $this->closeAll( $fname, $owner );
1646 $this->disabled =
true;
1649 public function closeAll( $fname = __METHOD__, $owner =
null ) {
1650 $this->assertOwnership( $fname, $owner );
1651 if ( $this->ownerId ===
null ) {
1653 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1655 $this->forEachOpenConnection(
function (
IDatabase $conn ) use ( $fname ) {
1657 $this->connLogger->debug(
"$fname: closing connection to database '$srvName'." );
1658 $conn->
close( $fname, $this->
id );
1661 $this->conns = self::newTrackedConnectionsArray();
1667 throw new RuntimeException(
'Cannot close DBConnRef instance; it must be shareable' );
1670 $serverIndex = $conn->
getLBInfo( self::INFO_SERVER_INDEX );
1671 if ( $serverIndex ===
null ) {
1672 throw new RuntimeException(
'Database handle is missing server index' );
1675 $srvName = $this->getServerName( $serverIndex );
1679 foreach ( $this->conns as
$type => $connsByServer ) {
1680 $key = array_search( $conn, $connsByServer[$serverIndex] ?? [],
true );
1681 if ( $key !==
false ) {
1683 unset( $this->conns[
$type][$serverIndex][$key] );
1688 $this->connLogger->warning(
1690 ": got orphaned connection to database $serverIndex/$domain at '$srvName'."
1694 $this->connLogger->debug(
1696 ": closing connection to database $serverIndex/$domain at '$srvName'."
1699 $conn->
close( __METHOD__ );
1702 public function commitAll( $fname = __METHOD__, $owner =
null ) {
1703 $this->commitPrimaryChanges( $fname, $owner );
1704 $this->flushPrimarySnapshots( $fname, $owner );
1705 $this->flushReplicaSnapshots( $fname, $owner );
1709 $this->assertOwnership( $fname, $owner );
1710 $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1711 if ( $this->ownerId ===
null ) {
1713 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1716 $this->trxRoundStage = self::ROUND_ERROR;
1721 $this->forEachOpenPrimaryConnection(
static function (
Database $conn ) use ( &$count ) {
1727 }
while ( $count > 0 );
1729 $this->forEachOpenPrimaryConnection(
static function (
Database $conn ) {
1732 $this->trxRoundStage = self::ROUND_FINALIZED;
1739 return $this->finalizePrimaryChanges( $fname, $owner );
1743 $this->assertOwnership( $fname, $owner );
1744 $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1745 if ( $this->ownerId ===
null ) {
1747 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1750 $limit = $options[
'maxWriteDuration'] ?? 0;
1752 $this->trxRoundStage = self::ROUND_ERROR;
1753 $this->forEachOpenPrimaryConnection(
function (
IDatabase $conn ) use ( $limit ) {
1762 if ( $time > $limit ) {
1765 "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
1768 } elseif ( $time > 0 ) {
1769 $this->perfLogger->debug(
"Transaction spent $time second(s) in writes, " .
1770 "less than the limit of $limit" );
1778 "A connection to the {$conn->getDBname()} database was lost before commit"
1782 $this->trxRoundStage = self::ROUND_APPROVED;
1787 $this->approvePrimaryChanges( $options, $fname, $owner );
1791 $this->assertOwnership( $fname, $owner );
1792 if ( $this->trxRoundId !==
false ) {
1795 "$fname: Transaction round '{$this->trxRoundId}' already started"
1798 $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1799 if ( $this->ownerId ===
null ) {
1801 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1805 $this->flushPrimarySnapshots( $fname, $owner );
1807 $this->trxRoundId = $fname;
1808 $this->trxRoundStage = self::ROUND_ERROR;
1813 $this->forEachOpenPrimaryConnection(
function (
Database $conn ) {
1814 $this->applyTransactionRoundFlags( $conn );
1816 $this->trxRoundStage = self::ROUND_CURSORY;
1821 $this->beginPrimaryChanges( $fname, $owner );
1825 $this->assertOwnership( $fname, $owner );
1826 $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1827 if ( $this->ownerId ===
null ) {
1829 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1834 $restore = ( $this->trxRoundId !== false );
1835 $this->trxRoundId =
false;
1836 $this->trxRoundStage = self::ROUND_ERROR;
1839 $this->forEachOpenPrimaryConnection(
1840 function (
IDatabase $conn ) use ( $fname, &$failures ) {
1842 $conn->
commit( $fname, $conn::FLUSHING_ALL_PEERS );
1844 ( $this->errorLogger )( $e );
1845 $failures[] =
"{$conn->getServerName()}: {$e->getMessage()}";
1852 "$fname: Commit failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1857 $this->forEachOpenPrimaryConnection(
function (
Database $conn ) {
1858 $this->undoTransactionRoundFlags( $conn );
1861 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1866 $this->commitPrimaryChanges( $fname, $owner );
1870 $this->assertOwnership( $fname, $owner );
1871 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1872 $type = IDatabase::TRIGGER_COMMIT;
1873 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1874 $type = IDatabase::TRIGGER_ROLLBACK;
1878 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1881 if ( $this->ownerId ===
null ) {
1883 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1886 $oldStage = $this->trxRoundStage;
1887 $this->trxRoundStage = self::ROUND_ERROR;
1890 $this->forEachOpenPrimaryConnection(
static function (
Database $conn ) {
1895 $fname = __METHOD__;
1900 $this->forEachOpenPrimaryConnection(
1901 static function (
Database $conn ) use (
$type, &$errors, &$count ) {
1909 $this->forEachOpenPrimaryConnection(
1910 function (
Database $conn ) use ( &$errors, $fname ) {
1913 $this->queryLogger->warning( $fname .
": found writes pending." );
1914 $fnames = implode(
', ', $conn->pendingWriteAndCallbackCallers() );
1915 $this->queryLogger->warning(
1916 "$fname: found writes pending ($fnames).",
1917 $this->getConnLogContext(
1919 [
'exception' => new RuntimeException() ]
1925 $this->queryLogger->debug(
"$fname: found empty transaction." );
1928 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1934 }
while ( $count > 0 );
1936 $this->trxRoundStage = $oldStage;
1938 return $errors ? $errors[0] :
null;
1943 $this->runPrimaryTransactionIdleCallbacks( $fname, $owner );
1947 $this->assertOwnership( $fname, $owner );
1948 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1949 $type = IDatabase::TRIGGER_COMMIT;
1950 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1951 $type = IDatabase::TRIGGER_ROLLBACK;
1955 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1958 if ( $this->ownerId ===
null ) {
1960 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1964 $this->trxRoundStage = self::ROUND_ERROR;
1965 $this->forEachOpenPrimaryConnection(
1966 static function (
Database $conn ) use (
$type, &$errors ) {
1970 $this->trxRoundStage = self::ROUND_CURSORY;
1972 return $errors ? $errors[0] :
null;
1977 $this->runPrimaryTransactionListenerCallbacks( $fname, $owner );
1981 $this->assertOwnership( $fname, $owner );
1982 if ( $this->ownerId ===
null ) {
1984 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1987 $restore = ( $this->trxRoundId !== false );
1988 $this->trxRoundId =
false;
1989 $this->trxRoundStage = self::ROUND_ERROR;
1990 $this->forEachOpenPrimaryConnection(
static function (
IDatabase $conn ) use ( $fname ) {
1991 $conn->
rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1995 $this->forEachOpenPrimaryConnection(
function (
Database $conn ) {
1996 $this->undoTransactionRoundFlags( $conn );
1999 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
2004 $this->rollbackPrimaryChanges( $fname, $owner );
2012 $stages = (array)$stage;
2014 if ( !in_array( $this->trxRoundStage, $stages,
true ) ) {
2015 $stageList = implode(
2017 array_map(
static function ( $v ) {
2023 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
2041 if ( $this->ownerId !==
null && $owner !== $this->ownerId && $owner !== $this->
id ) {
2044 "$fname: LoadBalancer is owned by ID '{$this->ownerId}' (got '$owner')."
2059 if ( $conn->
getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
2063 if ( $conn->
getFlag( $conn::DBO_DEFAULT ) ) {
2066 $conn->
setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
2069 if ( $conn->
getFlag( $conn::DBO_TRX ) ) {
2070 $conn->
setLBInfo( $conn::LB_TRX_ROUND_ID, $this->trxRoundId );
2078 if ( $conn->
getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
2082 if ( $conn->
getFlag( $conn::DBO_TRX ) ) {
2083 $conn->
setLBInfo( $conn::LB_TRX_ROUND_ID,
null );
2086 if ( $conn->
getFlag( $conn::DBO_DEFAULT ) ) {
2092 $this->assertOwnership( $fname, $owner );
2093 $this->forEachOpenReplicaConnection(
static function (
IDatabase $conn ) use ( $fname ) {
2099 $this->assertOwnership( $fname, $owner );
2100 $this->forEachOpenPrimaryConnection(
static function (
IDatabase $conn ) use ( $fname ) {
2107 $this->flushPrimarySnapshots( $fname, $owner );
2115 return $this->trxRoundStage;
2119 return $this->isOpen( $this->getWriterIndex() );
2124 return $this->hasPrimaryConnection();
2129 $this->forEachOpenPrimaryConnection(
static function (
IDatabase $conn ) use ( &$pending ) {
2138 return $this->hasPrimaryChanges();
2143 $this->forEachOpenPrimaryConnection(
static function (
IDatabase $conn ) use ( &$lastTime ) {
2152 return $this->lastPrimaryChangeTimestamp();
2156 $age = $age ?? $this->waitTimeout;
2158 return ( $this->hasPrimaryChanges()
2159 || $this->lastPrimaryChangeTimestamp() > microtime(
true ) - $age );
2164 return $this->hasOrMadeRecentPrimaryChanges( $age );
2169 $this->forEachOpenPrimaryConnection(
static function (
IDatabase $conn ) use ( &$fnames ) {
2178 return $this->pendingPrimaryChangeCallers();
2182 $domain = $this->resolveDomainID( $domain );
2184 if ( $this->laggedReplicaMode ) {
2189 if ( $this->hasStreamingReplicaServers() ) {
2191 $this->getReaderIndex( self::GROUP_GENERIC, $domain );
2194 return $this->laggedReplicaMode;
2198 return $this->laggedReplicaMode;
2202 $domainInstance = DatabaseDomain::newFromId( $this->resolveDomainID( $domain ) );
2204 if ( $this->readOnlyReason !==
false ) {
2205 return $this->readOnlyReason;
2206 } elseif ( $this->isPrimaryRunningReadOnly( $domainInstance ) ) {
2207 return 'The primary database server is running in read-only mode.';
2208 } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
2209 $genericIndex = $this->getExistingReaderIndex( self::GROUP_GENERIC );
2211 return ( $genericIndex !== self::READER_INDEX_NONE )
2212 ?
'The database is read-only until replication lag decreases.'
2213 :
'The database is read-only until a replica database server becomes reachable.';
2227 $key = $this->srvCache->makeGlobalKey(
2228 'rdbms-server-readonly',
2234 if ( self::fieldHasBit( $flags, self::CONN_REFRESH_READ_ONLY ) ) {
2242 $this->srvCache->set( $key, $readOnly, BagOStuff::TTL_PROC_SHORT );
2244 $readOnly = $this->srvCache->getWithSetCallback(
2246 BagOStuff::TTL_PROC_SHORT,
2247 static function () use ( $conn ) {
2259 return (
bool)$readOnly;
2269 return (
bool)$this->wanCache->getWithSetCallback(
2271 $this->wanCache->makeGlobalKey(
2272 'rdbms-server-readonly',
2273 $this->getPrimaryServerName(),
2277 self::TTL_CACHE_READONLY,
2278 function () use ( $domain ) {
2279 $scope = $this->trxProfiler->silenceForScope();
2281 $index = $this->getWriterIndex();
2284 $flags = self::CONN_SILENCE_ERRORS | self::CONN_REFRESH_READ_ONLY;
2285 $conn = $this->getServerConnection( $index, $domain->
getId(), $flags );
2288 $readOnly = (int)$this->isPrimaryConnectionReadOnly( $conn );
2292 $this->reuseConnection( $conn );
2297 ScopedCallback::consume( $scope );
2303 'pcTTL' => WANObjectCache::TTL_PROC_LONG
2309 if ( $mode ===
null ) {
2310 return $this->allowLagged;
2312 $this->allowLagged = $mode;
2314 return $this->allowLagged;
2319 $this->forEachOpenConnection(
static function (
IDatabase $conn ) use ( &
$success ) {
2320 if ( !$conn->
ping() ) {
2329 foreach ( $this->conns as $connsByServer ) {
2330 foreach ( $connsByServer as $serverConns ) {
2331 foreach ( $serverConns as $conn ) {
2332 $callback( $conn, ...$params );
2339 $primaryIndex = $this->getWriterIndex();
2340 foreach ( $this->conns as $connsByServer ) {
2341 if ( isset( $connsByServer[$primaryIndex] ) ) {
2343 foreach ( $connsByServer[$primaryIndex] as $conn ) {
2344 $callback( $conn, ...$params );
2352 $this->forEachOpenPrimaryConnection( $callback, $params );
2356 foreach ( $this->conns as $connsByServer ) {
2357 foreach ( $connsByServer as $i => $serverConns ) {
2358 if ( $i === $this->getWriterIndex() ) {
2361 foreach ( $serverConns as $conn ) {
2362 $callback( $conn, ...$params );
2373 foreach ( $this->conns as $connsByServer ) {
2374 foreach ( $connsByServer as $serverConns ) {
2375 $count += count( $serverConns );
2383 $domain = $this->resolveDomainID( $domain );
2389 if ( $this->hasReplicaServers() ) {
2390 $lagTimes = $this->getLagTimes( $domain );
2391 foreach ( $lagTimes as $i => $lag ) {
2392 if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag > $maxLag ) {
2394 $host = $this->getServerInfoStrict( $i,
'host' );
2400 return [ $host, $maxLag, $maxIndex ];
2404 $domain = $this->resolveDomainID( $domain );
2406 if ( !$this->hasReplicaServers() ) {
2407 return [ $this->getWriterIndex() => 0 ];
2410 $knownLagTimes = [];
2411 $indexesWithLag = [];
2412 foreach ( $this->servers as $i => $server ) {
2413 if ( empty( $server[
'is static'] ) ) {
2414 $indexesWithLag[] = $i;
2416 $knownLagTimes[$i] = 0;
2420 return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
2443 $timeout = max( 1, $timeout ?: $this->waitTimeout );
2445 if ( $conn->
getLBInfo( self::INFO_SERVER_INDEX ) === $this->getWriterIndex() ) {
2451 $this->replLogger->debug( __METHOD__ .
': no position passed; using current' );
2452 $index = $this->getWriterIndex();
2453 $flags = self::CONN_SILENCE_ERRORS;
2454 $primaryConn = $this->getAnyOpenConnection( $index, $flags );
2455 if ( $primaryConn ) {
2456 $pos = $primaryConn->getPrimaryPos();
2458 $primaryConn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
2459 if ( !$primaryConn ) {
2462 "Could not obtain a primary database connection to get the position"
2465 $pos = $primaryConn->getPrimaryPos();
2466 $this->closeConnection( $primaryConn );
2471 $this->replLogger->debug( __METHOD__ .
': waiting' );
2473 $ok = ( $result !==
null && $result != -1 );
2475 $this->replLogger->debug( __METHOD__ .
': done waiting (success)' );
2477 $this->replLogger->debug( __METHOD__ .
': done waiting (failure)' );
2481 $this->replLogger->error(
2482 __METHOD__ .
': could not get primary pos for {db_server}',
2483 $this->getConnLogContext( $conn, [
'exception' =>
new RuntimeException() ] )
2492 return $this->waitForPrimaryPos( $conn, $pos, $timeout );
2497 $this->trxRecurringCallbacks[$name] = $callback;
2499 unset( $this->trxRecurringCallbacks[$name] );
2501 $this->forEachOpenPrimaryConnection(
2502 static function (
IDatabase $conn ) use ( $name, $callback ) {
2509 $this->tableAliases = $aliases;
2513 $this->indexAliases = $aliases;
2517 $this->domainAliases = $aliases;
2523 $this->forEachOpenConnection(
static function (
IDatabase $conn ) use ( &$domainsInUse ) {
2526 if ( $conn->
getLBInfo( self::INFO_FOREIGN_REF_COUNT ) > 0 ) {
2527 $domainsInUse[] = $conn->getDomainID();
2532 if ( $domainsInUse ) {
2533 $domains = implode(
', ', $domainsInUse );
2535 "Foreign domain connections are still in use ($domains)" );
2539 $this->localDomain->getDatabase(),
2540 $this->localDomain->getSchema(),
2545 $this->forEachOpenConnection(
static function (
IDatabase $conn ) use ( $prefix ) {
2546 if ( !$conn->
getLBInfo( self::INFO_FORIEGN ) ) {
2547 $conn->tablePrefix( $prefix );
2553 $this->closeAll( __METHOD__, $this->
id );
2555 $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
2559 $old = $this->tempTablesOnlyMode[$domain] ??
false;
2561 $this->tempTablesOnlyMode[$domain] =
true;
2563 unset( $this->tempTablesOnlyMode[$domain] );
2573 $this->localDomain = $domain;
2583 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
2584 throw new InvalidArgumentException(
"No server with index '$i'" );
2587 if ( $field !==
null ) {
2588 if ( !array_key_exists( $field, $this->servers[$i] ) ) {
2589 throw new InvalidArgumentException(
"No field '$field' in server index '$i'" );
2592 return $this->servers[$i][$field];
2595 return $this->servers[$i];
2602 return $this->getServerName( $this->getWriterIndex() );
2611 return ( ( $flags & $bit ) === $bit );
2633 $this->disable( __METHOD__, $this->ownerId );
2640class_alias( LoadBalancer::class,
'LoadBalancer' );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
if(ini_get('mbstring.func_overload')) if(!defined('MW_ENTRY_POINT'))
Pre-config setup: Before loading LocalSettings.php.
A collection of static methods to play with arrays.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Class representing a cache/ephemeral data store.
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Class to handle database/schema/prefix specifications for IDatabase.
static newFromId( $domain)
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...