24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
31 use UnexpectedValueException;
32 use InvalidArgumentException;
134 const CONN_HELD_WARN_THRESHOLD = 10;
137 const MAX_LAG_DEFAULT = 10;
139 const MAX_WAIT_DEFAULT = 10;
141 const TTL_CACHE_READONLY = 5;
152 const ROUND_CURSORY =
'cursory';
154 const ROUND_FINALIZED =
'finalized';
156 const ROUND_APPROVED =
'approved';
158 const ROUND_COMMIT_CALLBACKS =
'commit-callbacks';
160 const ROUND_ROLLBACK_CALLBACKS =
'rollback-callbacks';
162 const ROUND_ERROR =
'error';
165 if ( !isset(
$params[
'servers'] ) ) {
166 throw new InvalidArgumentException( __CLASS__ .
': missing servers parameter' );
168 $this->
servers = $params[
'servers'];
169 foreach ( $this->
servers as $i => $server ) {
171 $this->
servers[$i][
'master'] =
true;
173 $this->
servers[$i][
'replica'] =
true;
182 $this->waitTimeout =
$params[
'waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
184 $this->readIndex = -1;
187 self::KEY_LOCAL => [],
188 self::KEY_FOREIGN_INUSE => [],
189 self::KEY_FOREIGN_FREE => [],
191 self::KEY_LOCAL_NOROUND => [],
192 self::KEY_FOREIGN_INUSE_NOROUND => [],
193 self::KEY_FOREIGN_FREE_NOROUND => []
196 $this->waitForPos =
false;
199 if ( isset(
$params[
'readOnlyReason'] ) && is_string(
$params[
'readOnlyReason'] ) ) {
200 $this->readOnlyReason =
$params[
'readOnlyReason'];
203 if ( isset(
$params[
'maxLag'] ) ) {
204 $this->maxLag =
$params[
'maxLag'];
207 if ( isset(
$params[
'loadMonitor'] ) ) {
208 $this->loadMonitorConfig =
$params[
'loadMonitor'];
210 $this->loadMonitorConfig = [
'class' =>
'LoadMonitorNull' ];
212 $this->loadMonitorConfig += [
'lagWarnThreshold' =>
$this->maxLag ];
214 foreach (
$params[
'servers']
as $i => $server ) {
215 $this->loads[$i] = $server[
'load'];
216 if ( isset( $server[
'groupLoads'] ) ) {
217 foreach ( $server[
'groupLoads']
as $group => $ratio ) {
218 if ( !isset( $this->groupLoads[$group] ) ) {
219 $this->groupLoads[$group] = [];
221 $this->groupLoads[$group][$i] = $ratio;
226 if ( isset(
$params[
'srvCache'] ) ) {
227 $this->srvCache =
$params[
'srvCache'];
231 if ( isset(
$params[
'wanCache'] ) ) {
232 $this->wanCache =
$params[
'wanCache'];
236 $this->profiler =
$params[
'profiler'] ??
null;
237 if ( isset(
$params[
'trxProfiler'] ) ) {
238 $this->trxProfiler =
$params[
'trxProfiler'];
243 $this->errorLogger =
$params[
'errorLogger'] ??
function ( Exception
$e ) {
244 trigger_error( get_class(
$e ) .
': ' .
$e->getMessage(), E_USER_WARNING );
246 $this->deprecationLogger =
$params[
'deprecationLogger'] ??
function ( $msg ) {
247 trigger_error( $msg, E_USER_DEPRECATED );
250 foreach ( [
'replLogger',
'connLogger',
'queryLogger',
'perfLogger' ]
as $key ) {
251 $this->$key =
$params[$key] ??
new NullLogger();
254 $this->hostname =
$params[
'hostname'] ?? ( gethostname() ?:
'unknown' );
255 $this->cliMode =
$params[
'cliMode'] ?? ( PHP_SAPI ===
'cli' || PHP_SAPI ===
'phpdbg' );
256 $this->agent =
$params[
'agent'] ??
'';
258 if ( isset(
$params[
'chronologyCallback'] ) ) {
259 $this->chronologyCallback =
$params[
'chronologyCallback'];
262 if ( isset(
$params[
'roundStage'] ) ) {
263 if (
$params[
'roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
264 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
265 } elseif (
$params[
'roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
266 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
270 $this->defaultGroup =
$params[
'defaultGroup'] ??
null;
274 return $this->localDomain->getId();
287 if ( !isset( $this->loadMonitor ) ) {
294 $class = $this->loadMonitorConfig[
'class'];
295 if ( isset( $compat[$class] ) ) {
296 $class = $compat[$class];
299 $this->loadMonitor =
new $class(
300 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
301 $this->loadMonitor->setLogger( $this->replLogger );
316 # Unset excessively lagged servers
317 foreach ( $lags
as $i => $lag ) {
319 # How much lag this server nominally is allowed to have
321 # Constrain that futher by $maxLag argument
322 $maxServerLag = min( $maxServerLag,
$maxLag );
325 if ( $lag ===
false && !is_infinite( $maxServerLag ) ) {
326 $this->replLogger->error(
328 ": server {host} is not replicating?", [
'host' => $host ] );
330 } elseif ( $lag > $maxServerLag ) {
331 $this->replLogger->info(
333 ": server {host} has {lag} seconds of lag (>= {maxlag})",
334 [
'host' => $host,
'lag' => $lag,
'maxlag' => $maxServerLag ]
341 # Find out if all the replica DBs with non-zero load are lagged
347 # No appropriate DB servers except maybe the master and some replica DBs with zero load
348 # Do NOT use the master
349 # Instead, this function will return false, triggering read-only mode,
350 # and a lagged replica DB will be used instead.
358 # Return a random representative of the remainder
366 } elseif ( $group ===
false && $this->readIndex >= 0 ) {
371 if ( $group !==
false ) {
373 if ( isset( $this->groupLoads[$group] ) ) {
374 $loads = $this->groupLoads[$group];
377 $this->connLogger->info( __METHOD__ .
": no loads for group $group" );
391 if ( $i ===
false ) {
401 if ( !$this->
doWait( $i ) ) {
406 if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group ===
false ) {
408 $this->readIndex = $i;
411 $this->laggedReplicaMode =
true;
416 $this->connLogger->debug( __METHOD__ .
": using server $serverName for group '$group'" );
428 throw new InvalidArgumentException(
"Empty server array given to LoadBalancer" );
438 while (
count( $currentLoads ) ) {
443 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
447 $ago = microtime(
true ) - $this->waitForPos->asOfTime();
451 if ( $i ===
false ) {
455 if ( $i ===
false &&
count( $currentLoads ) != 0 ) {
457 $this->replLogger->error(
458 __METHOD__ .
": all replica DBs lagged. Switch to read-only mode" );
464 if ( $i ===
false ) {
468 $this->connLogger->debug( __METHOD__ .
": pickRandom() returned false" );
470 return [
false,
false ];
474 $this->connLogger->debug( __METHOD__ .
": Using reader #$i: $serverName..." );
478 $this->connLogger->warning( __METHOD__ .
": Failed connecting to $i/$domain" );
479 unset( $currentLoads[$i] );
486 if ( $domain !==
false ) {
495 if ( !
count( $currentLoads ) ) {
496 $this->connLogger->error( __METHOD__ .
": all servers down" );
505 $this->waitForPos = $pos;
509 if ( !$this->
doWait( $i ) ) {
510 $this->laggedReplicaMode =
true;
522 $this->waitForPos = $pos;
529 $readLoads = array_filter( $readLoads );
534 $ok = $this->
doWait( $i,
true, $timeout );
539 # Restore the old position, as this is not used for lag-protection but for throttling
540 $this->waitForPos = $oldPos;
551 $this->waitForPos = $pos;
555 for ( $i = 1; $i < $serverCount; $i++ ) {
556 if ( $this->loads[$i] > 0 ) {
557 $start = microtime(
true );
558 $ok = $this->
doWait( $i,
true, $timeout ) && $ok;
559 $timeout -= intval( microtime(
true ) - $start );
560 if ( $timeout <= 0 ) {
566 # Restore the old position, as this is not used for lag-protection but for throttling
567 $this->waitForPos = $oldPos;
581 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
582 $this->waitForPos = $pos;
587 $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
588 foreach ( $this->conns
as $connsByServer ) {
589 if ( !isset( $connsByServer[$i] ) ) {
593 foreach ( $connsByServer[$i]
as $conn ) {
594 if ( !$autocommit || $conn->getLBInfo(
'autoCommitOnly' ) ) {
610 protected function doWait( $index, $open =
false, $timeout =
null ) {
611 $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
615 $key = $this->srvCache->makeGlobalKey( __CLASS__,
'last-known-pos', $server,
'v1' );
617 $knownReachedPos = $this->srvCache->get( $key );
620 $knownReachedPos->
hasReached( $this->waitForPos )
622 $this->replLogger->debug(
624 ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
625 [
'dbserver' => $server ]
635 $this->replLogger->debug(
636 __METHOD__ .
': no connection open for {dbserver}',
637 [
'dbserver' => $server ]
644 $this->replLogger->warning(
645 __METHOD__ .
': failed to connect to {dbserver}',
646 [
'dbserver' => $server ]
657 $this->replLogger->info(
659 ': waiting for replica DB {dbserver} to catch up...',
660 [
'dbserver' => $server ]
663 $result = $conn->masterPosWait( $this->waitForPos, $timeout );
666 $this->replLogger->warning(
667 __METHOD__ .
': Errored out waiting on {host} pos {pos}',
670 'pos' => $this->waitForPos,
671 'trace' => (
new RuntimeException() )->getTraceAsString()
676 $this->replLogger->warning(
677 __METHOD__ .
': Timed out waiting on {host} pos {pos}',
680 'pos' => $this->waitForPos,
681 'trace' => (
new RuntimeException() )->getTraceAsString()
686 $this->replLogger->debug( __METHOD__ .
": done waiting" );
699 public function getConnection( $i, $groups = [], $domain =
false, $flags = 0 ) {
700 if ( $i ===
null || $i ===
false ) {
701 throw new InvalidArgumentException(
'Attempt to call ' . __METHOD__ .
702 ' with invalid server index' );
709 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
715 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
720 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
721 $this->connLogger->info( __METHOD__ .
722 ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
729 $groups = ( $groups ===
false || $groups === [] )
739 # Try to find an available server in any the query groups (in order)
740 foreach ( $groups
as $group ) {
742 if ( $groupIndex !==
false ) {
749 # Operation-based index
751 $this->lastError =
'Unknown error';
752 # Try the general server pool if $groups are unavailable.
753 $i = ( $groups === [
false ] )
756 # Couldn't find a working server in getReaderIndex()?
757 if ( $i ===
false ) {
765 # Now we have an explicit index into the servers array
773 # Profile any new connections that happen
774 if ( $this->connsOpened > $oldConnsOpened ) {
775 $host = $conn->getServer();
776 $dbname = $conn->getDBname();
777 $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
781 # Make master-requested DB handles inherit any read-only mode setting
782 $conn->setLBInfo(
'readOnlyReason', $this->
getReadOnlyReason( $domain, $conn ) );
789 $serverIndex = $conn->
getLBInfo(
'serverIndex' );
790 $refCount = $conn->
getLBInfo(
'foreignPoolRefCount' );
791 if ( $serverIndex ===
null || $refCount ===
null ) {
803 } elseif ( $conn instanceof
DBConnRef ) {
806 $this->connLogger->error(
807 __METHOD__ .
": got DBConnRef instance.\n" .
808 (
new RuntimeException() )->getTraceAsString() );
813 if ( $this->disabled ) {
817 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
826 if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
827 throw new InvalidArgumentException( __METHOD__ .
828 ": connection $serverIndex/$domain not found; it may have already been freed." );
829 } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
830 throw new InvalidArgumentException( __METHOD__ .
831 ": connection $serverIndex/$domain mismatched; it may have already been freed." );
834 $conn->
setLBInfo(
'foreignPoolRefCount', --$refCount );
835 if ( $refCount <= 0 ) {
836 $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
837 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
838 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
839 unset( $this->conns[$connInUseKey][$serverIndex] );
841 $this->connLogger->debug( __METHOD__ .
": freed connection $serverIndex/$domain" );
843 $this->connLogger->debug( __METHOD__ .
844 ": reference count for $serverIndex/$domain reduced to $refCount" );
857 return new DBConnRef( $this, [ $db, $groups, $domain, $flags ] );
864 $this, $this->
getConnection( $db, $groups, $domain, $flags ) );
872 if ( !$this->connectionAttempted && $this->chronologyCallback ) {
873 $this->connLogger->debug( __METHOD__ .
': calling initLB() before first connection.' );
875 $this->connectionAttempted =
true;
883 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
885 if ( $domain !==
false ) {
898 $this->errorConnection = $conn;
902 if ( $autoCommit && $conn instanceof
IDatabase ) {
903 if ( $conn->trxLevel() ) {
906 __METHOD__ .
': CONN_TRX_AUTOCOMMIT handle has a transaction.'
929 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
932 if ( isset( $this->conns[$connKey][$i][0] ) ) {
933 $conn = $this->conns[$connKey][$i][0];
935 if ( !isset( $this->
servers[$i] ) || !is_array( $this->
servers[$i] ) ) {
936 throw new InvalidArgumentException(
"No server with index '$i'." );
940 $server[
'serverIndex'] = $i;
941 $server[
'autoCommitOnly'] = $autoCommit;
944 if ( $conn->isOpen() ) {
945 $this->connLogger->debug(
946 __METHOD__ .
": connected to database $i at '$host'." );
947 $this->conns[$connKey][$i][0] = $conn;
949 $this->connLogger->warning(
950 __METHOD__ .
": failed to connect to database $i at '$host'." );
951 $this->errorConnection = $conn;
959 !$this->localDomain->isCompatible( $conn->getDomainID() )
961 throw new UnexpectedValueException(
962 "Got connection to '{$conn->getDomainID()}', " .
963 "but expected local domain ('{$this->localDomain}')." );
993 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1004 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1006 $conn = $this->conns[$connInUseKey][$i][$domain];
1007 $this->connLogger->debug( __METHOD__ .
": reusing connection $i/$domain" );
1008 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1010 $conn = $this->conns[$connFreeKey][$i][$domain];
1011 unset( $this->conns[$connFreeKey][$i][$domain] );
1012 $this->conns[$connInUseKey][$i][$domain] = $conn;
1013 $this->connLogger->debug( __METHOD__ .
": reusing free connection $i/$domain" );
1014 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1016 $conn = reset( $this->conns[$connFreeKey][$i] );
1017 $oldDomain =
key( $this->conns[$connFreeKey][$i] );
1018 if ( $domainInstance->getDatabase() !== null ) {
1019 $conn->selectDomain( $domainInstance );
1022 $conn->dbSchema( $domainInstance->getSchema() );
1023 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1025 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1027 $this->conns[$connInUseKey][$i][$conn->getDomainId()] = $conn;
1028 $this->connLogger->debug( __METHOD__ .
1029 ": reusing free connection from $oldDomain for $domain" );
1031 if ( !isset( $this->
servers[$i] ) || !is_array( $this->
servers[$i] ) ) {
1032 throw new InvalidArgumentException(
"No server with index '$i'." );
1036 $server[
'serverIndex'] = $i;
1037 $server[
'foreignPoolRefCount'] = 0;
1038 $server[
'foreign'] =
true;
1039 $server[
'autoCommitOnly'] = $autoCommit;
1041 if ( !$conn->isOpen() ) {
1042 $this->connLogger->warning( __METHOD__ .
": connection error for $i/$domain" );
1043 $this->errorConnection = $conn;
1047 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1048 $this->connLogger->debug( __METHOD__ .
": opened new connection for $i/$domain" );
1054 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1055 throw new UnexpectedValueException(
1056 "Got connection to '{$conn->getDomainID()}', but expected '$domain'." );
1059 $refCount = $conn->getLBInfo(
'foreignPoolRefCount' );
1060 $conn->setLBInfo(
'foreignPoolRefCount', $refCount + 1 );
1069 $this->
servers[$i][
'driver'] ??
null
1081 if ( !is_int( $index ) ) {
1100 if ( $this->disabled ) {
1108 if ( $server[
'type'] ===
'mysql' ) {
1112 $server[
'dbname'] =
null;
1119 $server[
'schema'] = $domain->
getSchema();
1127 $server[
'clusterMasterHost'] = $masterName;
1130 if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
1131 $this->perfLogger->warning( __METHOD__ .
": " .
1132 "{$this->connsOpened}+ connections made (master=$masterName)" );
1159 $db->setLBInfo( $server );
1160 $db->setLazyMasterHandle(
1163 $db->setTableAliases( $this->tableAliases );
1164 $db->setIndexAliases( $this->indexAliases );
1167 if ( $this->trxRoundId !==
false ) {
1170 foreach ( $this->trxRecurringCallbacks
as $name => $callback ) {
1171 $db->setTransactionListener(
$name, $callback );
1184 'method' => __METHOD__,
1189 $context[
'db_server'] = $conn->getServer();
1190 $this->connLogger->warning(
1191 __METHOD__ .
": connection error: {last_error} ({db_server})",
1195 throw new DBConnectionError( $conn,
"{$this->lastError} ({$context['db_server']})" );
1198 $this->connLogger->error(
1200 ": LB failure with no last connection. Connection error: {last_error}",
1214 return array_key_exists( $i, $this->
servers );
1218 return array_key_exists( $i, $this->
servers ) && $this->loads[$i] != 0;
1226 if ( isset( $this->
servers[$i][
'hostName'] ) ) {
1228 } elseif ( isset( $this->
servers[$i][
'host'] ) ) {
1234 return (
$name !=
'' ) ?
$name :
'localhost';
1238 if ( isset( $this->
servers[$i] ) ) {
1246 return $this->
servers[$i][
'type'] ??
'unknown';
1250 # If this entire request was served from a replica DB without opening a connection to the
1251 # master (however unlikely that may be), then we can fetch the position from the replica DB.
1253 if ( !$masterConn ) {
1255 for ( $i = 1; $i < $serverCount; $i++ ) {
1258 return $conn->getReplicaPos();
1262 return $masterConn->getMasterPos();
1270 $this->disabled =
true;
1277 $this->connLogger->debug(
1278 $fname .
": closing connection to database '$host'." );
1283 self::KEY_LOCAL => [],
1284 self::KEY_FOREIGN_INUSE => [],
1285 self::KEY_FOREIGN_FREE => [],
1286 self::KEY_LOCAL_NOROUND => [],
1287 self::KEY_FOREIGN_INUSE_NOROUND => [],
1288 self::KEY_FOREIGN_FREE_NOROUND => []
1290 $this->connsOpened = 0;
1294 $serverIndex = $conn->
getLBInfo(
'serverIndex' );
1295 foreach ( $this->conns
as $type => $connsByServer ) {
1296 if ( !isset( $connsByServer[$serverIndex] ) ) {
1300 foreach ( $connsByServer[$serverIndex]
as $i => $trackedConn ) {
1301 if ( $conn === $trackedConn ) {
1303 $this->connLogger->debug(
1304 __METHOD__ .
": closing connection to database $i at '$host'." );
1305 unset( $this->conns[
$type][$serverIndex][$i] );
1324 $this->trxRoundStage = self::ROUND_ERROR;
1335 }
while ( $count > 0 );
1340 $this->trxRoundStage = self::ROUND_FINALIZED;
1348 $limit =
$options[
'maxWriteDuration'] ?? 0;
1350 $this->trxRoundStage = self::ROUND_ERROR;
1359 if ( $limit > 0 &&
$time > $limit ) {
1362 "Transaction spent $time second(s) in writes, exceeding the limit of $limit.",
1371 "A connection to the {$conn->getDBname()} database was lost before commit."
1375 $this->trxRoundStage = self::ROUND_APPROVED;
1379 if ( $this->trxRoundId !==
false ) {
1382 "$fname: Transaction round '{$this->trxRoundId}' already started."
1390 $this->trxRoundId =
$fname;
1391 $this->trxRoundStage = self::ROUND_ERROR;
1399 $this->trxRoundStage = self::ROUND_CURSORY;
1410 $restore = ( $this->trxRoundId !==
false );
1411 $this->trxRoundId =
false;
1412 $this->trxRoundStage = self::ROUND_ERROR;
1421 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1428 "$fname: Commit failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1437 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1441 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1442 $type = IDatabase::TRIGGER_COMMIT;
1443 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1444 $type = IDatabase::TRIGGER_ROLLBACK;
1448 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1453 $this->trxRoundStage = self::ROUND_ERROR;
1473 }
catch ( Exception $ex ) {
1482 $this->queryLogger->warning(
$fname .
": found writes pending." );
1484 $this->queryLogger->warning(
1485 $fname .
": found writes pending ($fnames).",
1494 $this->queryLogger->debug(
$fname .
": found empty transaction." );
1498 }
catch ( Exception $ex ) {
1502 }
while ( $count > 0 );
1504 $this->trxRoundStage = $oldStage;
1510 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1511 $type = IDatabase::TRIGGER_COMMIT;
1512 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1513 $type = IDatabase::TRIGGER_ROLLBACK;
1517 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1523 $this->trxRoundStage = self::ROUND_ERROR;
1527 }
catch ( Exception $ex ) {
1531 $this->trxRoundStage = self::ROUND_CURSORY;
1537 $restore = ( $this->trxRoundId !==
false );
1538 $this->trxRoundId =
false;
1539 $this->trxRoundStage = self::ROUND_ERROR;
1549 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1556 $stages = (
array)$stage;
1558 if ( !in_array( $this->trxRoundStage, $stages,
true ) ) {
1559 $stageList = implode(
1561 array_map(
function ( $v ) {
1567 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1582 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
1593 $conn->
setLBInfo(
'trxRoundId', $this->trxRoundId );
1601 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
1606 $conn->
setLBInfo(
'trxRoundId',
false );
1644 return (
bool)$pending;
1657 $age = ( $age === null ) ? $this->waitTimeout : $age;
1674 if ( !$this->laggedReplicaMode && $this->
getServerCount() > 1 ) {
1681 $this->allReplicasDownMode =
true;
1682 $this->laggedReplicaMode =
true;
1703 if ( $this->readOnlyReason !==
false ) {
1706 if ( $this->allReplicasDownMode ) {
1707 return 'The database has been automatically locked ' .
1708 'until the replica database servers become available';
1710 return 'The database has been automatically locked ' .
1711 'while the replica database servers catch up to the master.';
1714 return 'The database master is running in read-only mode.';
1729 return (
bool)
$cache->getWithSetCallback(
1730 $cache->makeGlobalKey( __CLASS__,
'server-read-only', $masterServer ),
1731 self::TTL_CACHE_READONLY,
1732 function ()
use ( $domain, $conn ) {
1733 $old = $this->trxProfiler->setSilenced(
true );
1736 $readOnly = (int)$dbw->serverIsReadOnly();
1743 $this->trxProfiler->setSilenced( $old );
1746 [
'pcTTL' => $cache::TTL_PROC_LONG,
'busyValue' => 0 ]
1751 if ( $mode ===
null ) {
1762 if ( !$conn->
ping() ) {
1771 foreach ( $this->conns
as $connsByServer ) {
1772 foreach ( $connsByServer
as $serverConns ) {
1773 foreach ( $serverConns
as $conn ) {
1774 $callback( $conn, ...
$params );
1782 foreach ( $this->conns
as $connsByServer ) {
1783 if ( isset( $connsByServer[$masterIndex] ) ) {
1785 foreach ( $connsByServer[$masterIndex]
as $conn ) {
1786 $callback( $conn, ...
$params );
1793 foreach ( $this->conns
as $connsByServer ) {
1794 foreach ( $connsByServer
as $i => $serverConns ) {
1798 foreach ( $serverConns
as $conn ) {
1799 $callback( $conn, ...
$params );
1811 return [ $host,
$maxLag, $maxIndex ];
1815 foreach ( $lagTimes
as $i => $lag ) {
1816 if ( $this->loads[$i] > 0 && $lag >
$maxLag ) {
1818 $host = $this->
servers[$i][
'host'];
1823 return [ $host,
$maxLag, $maxIndex ];
1831 $knownLagTimes = [];
1832 $indexesWithLag = [];
1833 foreach ( $this->
servers as $i => $server ) {
1834 if ( empty( $server[
'is static'] ) ) {
1835 $indexesWithLag[] = $i;
1837 $knownLagTimes[$i] = 0;
1841 return $this->
getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1859 $timeout = max( 1, $timeout ?: $this->waitTimeout );
1868 if ( $masterConn ) {
1869 $pos = $masterConn->getMasterPos();
1872 $pos = $masterConn->getMasterPos();
1880 $msg = __METHOD__ .
': timed out waiting on {host} pos {pos}';
1881 $this->replLogger->warning( $msg, [
1884 'trace' => (
new RuntimeException() )->getTraceAsString()
1888 $this->replLogger->debug( __METHOD__ .
': done waiting' );
1893 $this->replLogger->error(
1894 __METHOD__ .
': could not get master pos for {host}',
1897 'trace' => (
new RuntimeException() )->getTraceAsString()
1907 $this->trxRecurringCallbacks[
$name] = $callback;
1909 unset( $this->trxRecurringCallbacks[
$name] );
1919 $this->tableAliases = $aliases;
1923 $this->indexAliases = $aliases;
1932 if ( $conn->
getLBInfo(
'foreignPoolRefCount' ) > 0 ) {
1938 if ( $domainsInUse ) {
1939 $domains = implode(
', ', $domainsInUse );
1941 "Foreign domain connections are still in use ($domains)." );
1945 $this->localDomain->getDatabase(),
1946 $this->localDomain->getSchema(),
1962 $this->localDomain = $domain;
1965 if ( $this->localDomain->getTablePrefix() !=
'' ) {
1966 $this->localDomainIdAlias =
1967 $this->localDomain->
getDatabase() .
'-' . $this->localDomain->getTablePrefix();
1969 $this->localDomainIdAlias = $this->localDomain->getDatabase();
1980 if ( PHP_SAPI !=
'cli' ) {
1981 $old = ignore_user_abort(
true );
1982 return new ScopedCallback(
function ()
use ( $old ) {
1983 ignore_user_abort( $old );