24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
31 use UnexpectedValueException;
32 use InvalidArgumentException;
134 const CONN_HELD_WARN_THRESHOLD = 10;
137 const MAX_LAG_DEFAULT = 6;
139 const MAX_WAIT_DEFAULT = 10;
141 const TTL_CACHE_READONLY = 5;
152 const ROUND_CURSORY =
'cursory';
154 const ROUND_FINALIZED =
'finalized';
156 const ROUND_APPROVED =
'approved';
158 const ROUND_COMMIT_CALLBACKS =
'commit-callbacks';
160 const ROUND_ROLLBACK_CALLBACKS =
'rollback-callbacks';
162 const ROUND_ERROR =
'error';
165 if ( !isset(
$params[
'servers'] ) ) {
166 throw new InvalidArgumentException( __CLASS__ .
': missing servers parameter' );
168 $this->
servers = $params[
'servers'];
169 foreach ( $this->
servers as $i => $server ) {
171 $this->
servers[$i][
'master'] =
true;
173 $this->
servers[$i][
'replica'] =
true;
182 $this->waitTimeout =
$params[
'waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
184 $this->readIndex = -1;
187 self::KEY_LOCAL => [],
188 self::KEY_FOREIGN_INUSE => [],
189 self::KEY_FOREIGN_FREE => [],
191 self::KEY_LOCAL_NOROUND => [],
192 self::KEY_FOREIGN_INUSE_NOROUND => [],
193 self::KEY_FOREIGN_FREE_NOROUND => []
196 $this->waitForPos =
false;
199 if ( isset(
$params[
'readOnlyReason'] ) && is_string(
$params[
'readOnlyReason'] ) ) {
200 $this->readOnlyReason =
$params[
'readOnlyReason'];
203 $this->maxLag =
$params[
'maxLag'] ?? self::MAX_LAG_DEFAULT;
205 $this->loadMonitorConfig =
$params[
'loadMonitor'] ?? [
'class' =>
'LoadMonitorNull' ];
206 $this->loadMonitorConfig += [
'lagWarnThreshold' =>
$this->maxLag ];
208 foreach (
$params[
'servers']
as $i => $server ) {
209 $this->loads[$i] = $server[
'load'];
210 if ( isset( $server[
'groupLoads'] ) ) {
211 foreach ( $server[
'groupLoads']
as $group => $ratio ) {
212 if ( !isset( $this->groupLoads[$group] ) ) {
213 $this->groupLoads[$group] = [];
215 $this->groupLoads[$group][$i] = $ratio;
222 $this->profiler =
$params[
'profiler'] ??
null;
225 $this->errorLogger =
$params[
'errorLogger'] ??
function ( Exception
$e ) {
226 trigger_error( get_class(
$e ) .
': ' .
$e->getMessage(), E_USER_WARNING );
228 $this->deprecationLogger =
$params[
'deprecationLogger'] ??
function ( $msg ) {
229 trigger_error( $msg, E_USER_DEPRECATED );
232 foreach ( [
'replLogger',
'connLogger',
'queryLogger',
'perfLogger' ]
as $key ) {
233 $this->$key =
$params[$key] ??
new NullLogger();
236 $this->hostname =
$params[
'hostname'] ?? ( gethostname() ?:
'unknown' );
237 $this->cliMode =
$params[
'cliMode'] ?? ( PHP_SAPI ===
'cli' || PHP_SAPI ===
'phpdbg' );
238 $this->agent =
$params[
'agent'] ??
'';
240 if ( isset(
$params[
'chronologyCallback'] ) ) {
241 $this->chronologyCallback =
$params[
'chronologyCallback'];
244 if ( isset(
$params[
'roundStage'] ) ) {
245 if (
$params[
'roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
246 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
247 } elseif (
$params[
'roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
248 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
252 $this->defaultGroup =
$params[
'defaultGroup'] ??
null;
256 return $this->localDomain->getId();
269 if ( !isset( $this->loadMonitor ) ) {
276 $class = $this->loadMonitorConfig[
'class'];
277 if ( isset( $compat[$class] ) ) {
278 $class = $compat[$class];
281 $this->loadMonitor =
new $class(
282 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
283 $this->loadMonitor->setLogger( $this->replLogger );
298 # Unset excessively lagged servers
299 foreach ( $lags
as $i => $lag ) {
301 # How much lag this server nominally is allowed to have
303 # Constrain that futher by $maxLag argument
304 $maxServerLag = min( $maxServerLag,
$maxLag );
307 if ( $lag ===
false && !is_infinite( $maxServerLag ) ) {
308 $this->replLogger->debug(
310 ": server {host} is not replicating?", [
'host' => $host ] );
312 } elseif ( $lag > $maxServerLag ) {
313 $this->replLogger->debug(
315 ": server {host} has {lag} seconds of lag (>= {maxlag})",
316 [
'host' => $host,
'lag' => $lag,
'maxlag' => $maxServerLag ]
323 # Find out if all the replica DBs with non-zero load are lagged
329 # No appropriate DB servers except maybe the master and some replica DBs with zero load
330 # Do NOT use the master
331 # Instead, this function will return false, triggering read-only mode,
332 # and a lagged replica DB will be used instead.
340 # Return a random representative of the remainder
348 } elseif ( $group ===
false && $this->readIndex >= 0 ) {
353 if ( $group !==
false ) {
355 if ( isset( $this->groupLoads[$group] ) ) {
356 $loads = $this->groupLoads[$group];
359 $this->connLogger->info( __METHOD__ .
": no loads for group $group" );
373 if ( $i ===
false ) {
383 if ( !$this->
doWait( $i ) ) {
388 if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group ===
false ) {
390 $this->readIndex = $i;
393 $this->laggedReplicaMode =
true;
398 $this->connLogger->debug( __METHOD__ .
": using server $serverName for group '$group'" );
410 throw new InvalidArgumentException(
"Empty server array given to LoadBalancer" );
420 while (
count( $currentLoads ) ) {
425 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
429 $ago = microtime(
true ) - $this->waitForPos->asOfTime();
433 if ( $i ===
false ) {
437 if ( $i ===
false &&
count( $currentLoads ) != 0 ) {
439 $this->replLogger->error(
440 __METHOD__ .
": all replica DBs lagged. Switch to read-only mode" );
446 if ( $i ===
false ) {
450 $this->connLogger->debug( __METHOD__ .
": pickRandom() returned false" );
452 return [
false,
false ];
456 $this->connLogger->debug( __METHOD__ .
": Using reader #$i: $serverName..." );
460 $this->connLogger->warning( __METHOD__ .
": Failed connecting to $i/$domain" );
461 unset( $currentLoads[$i] );
468 if ( $domain !==
false ) {
477 if ( $currentLoads === [] ) {
478 $this->connLogger->error( __METHOD__ .
": all servers down" );
487 $this->waitForPos = $pos;
490 if ( ( $i > 0 ) && !$this->
doWait( $i ) ) {
491 $this->laggedReplicaMode =
true;
502 $this->waitForPos = $pos;
509 $readLoads = array_filter( $readLoads );
514 $ok = $this->
doWait( $i,
true, $timeout );
519 # Restore the old position, as this is not used for lag-protection but for throttling
520 $this->waitForPos = $oldPos;
531 $this->waitForPos = $pos;
535 for ( $i = 1; $i < $serverCount; $i++ ) {
536 if ( $this->loads[$i] > 0 ) {
537 $start = microtime(
true );
538 $ok = $this->
doWait( $i,
true, $timeout ) && $ok;
539 $timeout -= intval( microtime(
true ) - $start );
540 if ( $timeout <= 0 ) {
546 # Restore the old position, as this is not used for lag-protection but for throttling
547 $this->waitForPos = $oldPos;
561 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
562 $this->waitForPos = $pos;
568 $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
570 foreach ( $this->conns
as $connsByServer ) {
572 $indexes = array_keys( $connsByServer );
574 $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
577 foreach ( $indexes
as $index ) {
578 foreach ( $connsByServer[$index]
as $conn ) {
579 if ( !$conn->isOpen() ) {
582 if ( !$autocommit || $conn->getLBInfo(
'autoCommitOnly' ) ) {
599 protected function doWait( $index, $open =
false, $timeout =
null ) {
600 $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
604 $key = $this->srvCache->makeGlobalKey( __CLASS__,
'last-known-pos', $server,
'v1' );
606 $knownReachedPos = $this->srvCache->get( $key );
609 $knownReachedPos->
hasReached( $this->waitForPos )
611 $this->replLogger->debug(
613 ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
614 [
'dbserver' => $server ]
624 $this->replLogger->debug(
625 __METHOD__ .
': no connection open for {dbserver}',
626 [
'dbserver' => $server ]
633 $this->replLogger->warning(
634 __METHOD__ .
': failed to connect to {dbserver}',
635 [
'dbserver' => $server ]
646 $this->replLogger->info(
648 ': waiting for replica DB {dbserver} to catch up...',
649 [
'dbserver' => $server ]
652 $result = $conn->masterPosWait( $this->waitForPos, $timeout );
655 $this->replLogger->warning(
656 __METHOD__ .
': Errored out waiting on {host} pos {pos}',
659 'pos' => $this->waitForPos,
660 'trace' => (
new RuntimeException() )->getTraceAsString()
665 $this->replLogger->warning(
666 __METHOD__ .
': Timed out waiting on {host} pos {pos}',
669 'pos' => $this->waitForPos,
670 'trace' => (
new RuntimeException() )->getTraceAsString()
675 $this->replLogger->debug( __METHOD__ .
": done waiting" );
688 public function getConnection( $i, $groups = [], $domain =
false, $flags = 0 ) {
689 if ( $i ===
null || $i ===
false ) {
690 throw new InvalidArgumentException(
'Attempt to call ' . __METHOD__ .
691 ' with invalid server index' );
698 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
704 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
709 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
710 $this->connLogger->info( __METHOD__ .
711 ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
718 $groups = ( $groups ===
false || $groups === [] )
728 # Try to find an available server in any the query groups (in order)
729 foreach ( $groups
as $group ) {
731 if ( $groupIndex !==
false ) {
738 # Operation-based index
740 $this->lastError =
'Unknown error';
741 # Try the general server pool if $groups are unavailable.
742 $i = ( $groups === [
false ] )
745 # Couldn't find a working server in getReaderIndex()?
746 if ( $i ===
false ) {
754 # Now we have an explicit index into the servers array
762 # Profile any new connections that happen
763 if ( $this->connsOpened > $oldConnsOpened ) {
764 $host = $conn->getServer();
765 $dbname = $conn->getDBname();
766 $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
770 # Make master-requested DB handles inherit any read-only mode setting
771 $conn->setLBInfo(
'readOnlyReason', $this->
getReadOnlyReason( $domain, $conn ) );
778 $serverIndex = $conn->
getLBInfo(
'serverIndex' );
779 $refCount = $conn->
getLBInfo(
'foreignPoolRefCount' );
780 if ( $serverIndex ===
null || $refCount ===
null ) {
792 } elseif ( $conn instanceof
DBConnRef ) {
795 $this->connLogger->error(
796 __METHOD__ .
": got DBConnRef instance.\n" .
797 (
new RuntimeException() )->getTraceAsString() );
802 if ( $this->disabled ) {
806 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
815 if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
816 throw new InvalidArgumentException( __METHOD__ .
817 ": connection $serverIndex/$domain not found; it may have already been freed." );
818 } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
819 throw new InvalidArgumentException( __METHOD__ .
820 ": connection $serverIndex/$domain mismatched; it may have already been freed." );
823 $conn->
setLBInfo(
'foreignPoolRefCount', --$refCount );
824 if ( $refCount <= 0 ) {
825 $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
826 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
827 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
828 unset( $this->conns[$connInUseKey][$serverIndex] );
830 $this->connLogger->debug( __METHOD__ .
": freed connection $serverIndex/$domain" );
832 $this->connLogger->debug( __METHOD__ .
833 ": reference count for $serverIndex/$domain reduced to $refCount" );
848 return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
856 $this, $this->
getConnection( $i, $groups, $domain, $flags ), $role );
874 if ( !$this->connectionAttempted && $this->chronologyCallback ) {
875 $this->connLogger->debug( __METHOD__ .
': calling initLB() before first connection.' );
877 $this->connectionAttempted =
true;
885 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
887 if ( $domain !==
false ) {
900 $this->errorConnection = $conn;
904 if ( $autoCommit && $conn instanceof
IDatabase ) {
905 if ( $conn->trxLevel() ) {
908 __METHOD__ .
': CONN_TRX_AUTOCOMMIT handle has a transaction.'
931 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
934 if ( isset( $this->conns[$connKey][$i][0] ) ) {
935 $conn = $this->conns[$connKey][$i][0];
937 if ( !isset( $this->
servers[$i] ) || !is_array( $this->
servers[$i] ) ) {
938 throw new InvalidArgumentException(
"No server with index '$i'." );
942 $server[
'serverIndex'] = $i;
943 $server[
'autoCommitOnly'] = $autoCommit;
946 if ( $conn->isOpen() ) {
947 $this->connLogger->debug(
948 __METHOD__ .
": connected to database $i at '$host'." );
949 $this->conns[$connKey][$i][0] = $conn;
951 $this->connLogger->warning(
952 __METHOD__ .
": failed to connect to database $i at '$host'." );
953 $this->errorConnection = $conn;
961 !$this->localDomain->isCompatible( $conn->getDomainID() )
963 throw new UnexpectedValueException(
964 "Got connection to '{$conn->getDomainID()}', " .
965 "but expected local domain ('{$this->localDomain}')." );
995 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1008 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1010 $conn = $this->conns[$connInUseKey][$i][$domain];
1011 $this->connLogger->debug( __METHOD__ .
": reusing connection $i/$domain" );
1012 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1014 $conn = $this->conns[$connFreeKey][$i][$domain];
1015 unset( $this->conns[$connFreeKey][$i][$domain] );
1016 $this->conns[$connInUseKey][$i][$domain] = $conn;
1017 $this->connLogger->debug( __METHOD__ .
": reusing free connection $i/$domain" );
1018 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1020 foreach ( $this->conns[$connFreeKey][$i]
as $oldDomain => $conn ) {
1021 if ( $domainInstance->getDatabase() !==
null ) {
1027 $conn->databasesAreIndependent() &&
1028 $conn->getDBname() !== $domainInstance->getDatabase()
1033 $conn->selectDomain( $domainInstance );
1036 $conn->dbSchema( $domainInstance->getSchema() );
1037 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1039 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1041 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1042 $this->connLogger->debug( __METHOD__ .
1043 ": reusing free connection from $oldDomain for $domain" );
1049 if ( !isset( $this->
servers[$i] ) || !is_array( $this->
servers[$i] ) ) {
1050 throw new InvalidArgumentException(
"No server with index '$i'." );
1054 $server[
'serverIndex'] = $i;
1055 $server[
'foreignPoolRefCount'] = 0;
1056 $server[
'foreign'] =
true;
1057 $server[
'autoCommitOnly'] = $autoCommit;
1059 if ( !$conn->isOpen() ) {
1060 $this->connLogger->warning( __METHOD__ .
": connection error for $i/$domain" );
1061 $this->errorConnection = $conn;
1065 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1066 $this->connLogger->debug( __METHOD__ .
": opened new connection for $i/$domain" );
1072 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1073 throw new UnexpectedValueException(
1074 "Got connection to '{$conn->getDomainID()}', but expected '$domain'." );
1077 $refCount = $conn->getLBInfo(
'foreignPoolRefCount' );
1078 $conn->setLBInfo(
'foreignPoolRefCount', $refCount + 1 );
1087 $this->
servers[$i][
'driver'] ??
null
1099 if ( !is_int( $index ) ) {
1118 if ( $this->disabled ) {
1126 if ( $server[
'type'] ===
'mysql' ) {
1130 $server[
'dbname'] =
null;
1137 $server[
'schema'] = $domain->
getSchema();
1145 $server[
'clusterMasterHost'] = $masterName;
1148 if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
1149 $this->perfLogger->warning( __METHOD__ .
": " .
1150 "{$this->connsOpened}+ connections made (master=$masterName)" );
1177 $db->setLBInfo( $server );
1178 $db->setLazyMasterHandle(
1181 $db->setTableAliases( $this->tableAliases );
1182 $db->setIndexAliases( $this->indexAliases );
1185 if ( $this->trxRoundId !==
false ) {
1188 foreach ( $this->trxRecurringCallbacks
as $name => $callback ) {
1189 $db->setTransactionListener(
$name, $callback );
1202 'method' => __METHOD__,
1207 $context[
'db_server'] = $conn->getServer();
1208 $this->connLogger->warning(
1209 __METHOD__ .
": connection error: {last_error} ({db_server})",
1213 throw new DBConnectionError( $conn,
"{$this->lastError} ({$context['db_server']})" );
1216 $this->connLogger->error(
1218 ": LB failure with no last connection. Connection error: {last_error}",
1232 return array_key_exists( $i, $this->
servers );
1236 return array_key_exists( $i, $this->
servers ) && $this->loads[$i] != 0;
1246 return (
$name !=
'' ) ?
$name :
'localhost';
1250 return $this->
servers[$i] ??
false;
1254 return $this->
servers[$i][
'type'] ??
'unknown';
1258 # If this entire request was served from a replica DB without opening a connection to the
1259 # master (however unlikely that may be), then we can fetch the position from the replica DB.
1261 if ( !$masterConn ) {
1263 for ( $i = 1; $i < $serverCount; $i++ ) {
1266 return $conn->getReplicaPos();
1270 return $masterConn->getMasterPos();
1278 $this->disabled =
true;
1285 $this->connLogger->debug(
1286 $fname .
": closing connection to database '$host'." );
1291 self::KEY_LOCAL => [],
1292 self::KEY_FOREIGN_INUSE => [],
1293 self::KEY_FOREIGN_FREE => [],
1294 self::KEY_LOCAL_NOROUND => [],
1295 self::KEY_FOREIGN_INUSE_NOROUND => [],
1296 self::KEY_FOREIGN_FREE_NOROUND => []
1298 $this->connsOpened = 0;
1304 throw new RuntimeException( __METHOD__ .
': got DBConnRef instance.' );
1307 $serverIndex = $conn->
getLBInfo(
'serverIndex' );
1308 foreach ( $this->conns
as $type => $connsByServer ) {
1309 if ( !isset( $connsByServer[$serverIndex] ) ) {
1313 foreach ( $connsByServer[$serverIndex]
as $i => $trackedConn ) {
1314 if ( $conn === $trackedConn ) {
1316 $this->connLogger->debug(
1317 __METHOD__ .
": closing connection to database $i at '$host'." );
1318 unset( $this->conns[
$type][$serverIndex][$i] );
1337 $this->trxRoundStage = self::ROUND_ERROR;
1348 }
while ( $count > 0 );
1353 $this->trxRoundStage = self::ROUND_FINALIZED;
1361 $limit =
$options[
'maxWriteDuration'] ?? 0;
1363 $this->trxRoundStage = self::ROUND_ERROR;
1372 if ( $limit > 0 &&
$time > $limit ) {
1375 "Transaction spent $time second(s) in writes, exceeding the limit of $limit.",
1384 "A connection to the {$conn->getDBname()} database was lost before commit."
1388 $this->trxRoundStage = self::ROUND_APPROVED;
1392 if ( $this->trxRoundId !==
false ) {
1395 "$fname: Transaction round '{$this->trxRoundId}' already started."
1403 $this->trxRoundId =
$fname;
1404 $this->trxRoundStage = self::ROUND_ERROR;
1412 $this->trxRoundStage = self::ROUND_CURSORY;
1421 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1423 $restore = ( $this->trxRoundId !==
false );
1424 $this->trxRoundId =
false;
1425 $this->trxRoundStage = self::ROUND_ERROR;
1434 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1441 "$fname: Commit failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1450 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1454 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1455 $type = IDatabase::TRIGGER_COMMIT;
1456 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1457 $type = IDatabase::TRIGGER_ROLLBACK;
1461 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1466 $this->trxRoundStage = self::ROUND_ERROR;
1486 }
catch ( Exception $ex ) {
1495 $this->queryLogger->warning(
$fname .
": found writes pending." );
1497 $this->queryLogger->warning(
1498 $fname .
": found writes pending ($fnames).",
1507 $this->queryLogger->debug(
$fname .
": found empty transaction." );
1511 }
catch ( Exception $ex ) {
1515 }
while ( $count > 0 );
1517 $this->trxRoundStage = $oldStage;
1523 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1524 $type = IDatabase::TRIGGER_COMMIT;
1525 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1526 $type = IDatabase::TRIGGER_ROLLBACK;
1530 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1536 $this->trxRoundStage = self::ROUND_ERROR;
1540 }
catch ( Exception $ex ) {
1544 $this->trxRoundStage = self::ROUND_CURSORY;
1550 $restore = ( $this->trxRoundId !==
false );
1551 $this->trxRoundId =
false;
1552 $this->trxRoundStage = self::ROUND_ERROR;
1562 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1569 $stages = (
array)$stage;
1571 if ( !in_array( $this->trxRoundStage, $stages,
true ) ) {
1572 $stageList = implode(
1574 array_map(
function ( $v ) {
1580 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1595 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
1606 $conn->
setLBInfo(
'trxRoundId', $this->trxRoundId );
1614 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
1619 $conn->
setLBInfo(
'trxRoundId',
false );
1657 return (
bool)$pending;
1670 $age = ( $age ===
null ) ? $this->waitTimeout : $age;
1687 if ( !$this->laggedReplicaMode && $this->
getServerCount() > 1 ) {
1694 $this->allReplicasDownMode =
true;
1695 $this->laggedReplicaMode =
true;
1716 if ( $this->readOnlyReason !==
false ) {
1719 if ( $this->allReplicasDownMode ) {
1720 return 'The database has been automatically locked ' .
1721 'until the replica database servers become available';
1723 return 'The database has been automatically locked ' .
1724 'while the replica database servers catch up to the master.';
1727 return 'The database master is running in read-only mode.';
1742 return (
bool)
$cache->getWithSetCallback(
1743 $cache->makeGlobalKey( __CLASS__,
'server-read-only', $masterServer ),
1744 self::TTL_CACHE_READONLY,
1745 function ()
use ( $domain, $conn ) {
1746 $old = $this->trxProfiler->setSilenced(
true );
1749 $readOnly = (int)$dbw->serverIsReadOnly();
1756 $this->trxProfiler->setSilenced( $old );
1759 [
'pcTTL' => $cache::TTL_PROC_LONG,
'busyValue' => 0 ]
1764 if ( $mode ===
null ) {
1775 if ( !$conn->
ping() ) {
1784 foreach ( $this->conns
as $connsByServer ) {
1785 foreach ( $connsByServer
as $serverConns ) {
1786 foreach ( $serverConns
as $conn ) {
1787 $callback( $conn, ...
$params );
1795 foreach ( $this->conns
as $connsByServer ) {
1796 if ( isset( $connsByServer[$masterIndex] ) ) {
1798 foreach ( $connsByServer[$masterIndex]
as $conn ) {
1799 $callback( $conn, ...
$params );
1806 foreach ( $this->conns
as $connsByServer ) {
1807 foreach ( $connsByServer
as $i => $serverConns ) {
1811 foreach ( $serverConns
as $conn ) {
1812 $callback( $conn, ...
$params );
1824 return [ $host,
$maxLag, $maxIndex ];
1828 foreach ( $lagTimes
as $i => $lag ) {
1829 if ( $this->loads[$i] > 0 && $lag >
$maxLag ) {
1831 $host = $this->
servers[$i][
'host'];
1836 return [ $host,
$maxLag, $maxIndex ];
1844 $knownLagTimes = [];
1845 $indexesWithLag = [];
1846 foreach ( $this->
servers as $i => $server ) {
1847 if ( empty( $server[
'is static'] ) ) {
1848 $indexesWithLag[] = $i;
1850 $knownLagTimes[$i] = 0;
1854 return $this->
getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1866 $timeout = max( 1, $timeout ?: $this->waitTimeout );
1875 if ( $masterConn ) {
1876 $pos = $masterConn->getMasterPos();
1879 if ( !$masterConn ) {
1882 "Could not obtain a master database connection to get the position"
1885 $pos = $masterConn->getMasterPos();
1893 $msg = __METHOD__ .
': timed out waiting on {host} pos {pos}';
1894 $this->replLogger->warning( $msg, [
1897 'trace' => (
new RuntimeException() )->getTraceAsString()
1901 $this->replLogger->debug( __METHOD__ .
': done waiting' );
1906 $this->replLogger->error(
1907 __METHOD__ .
': could not get master pos for {host}',
1910 'trace' => (
new RuntimeException() )->getTraceAsString()
1920 $this->trxRecurringCallbacks[
$name] = $callback;
1922 unset( $this->trxRecurringCallbacks[
$name] );
1932 $this->tableAliases = $aliases;
1936 $this->indexAliases = $aliases;
1953 if ( $conn->
getLBInfo(
'foreignPoolRefCount' ) > 0 ) {
1959 if ( $domainsInUse ) {
1960 $domains = implode(
', ', $domainsInUse );
1962 "Foreign domain connections are still in use ($domains)." );
1966 $this->localDomain->getDatabase(),
1967 $this->localDomain->getSchema(),
1989 $this->localDomain = $domain;
1992 if ( $this->localDomain->getTablePrefix() !=
'' ) {
1993 $this->localDomainIdAlias =
1994 $this->localDomain->
getDatabase() .
'-' . $this->localDomain->getTablePrefix();
1996 $this->localDomainIdAlias = $this->localDomain->getDatabase();