24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 use Wikimedia\ScopedCallback;
31 use InvalidArgumentException;
125 const CONN_HELD_WARN_THRESHOLD = 10;
128 const MAX_LAG_DEFAULT = 10;
130 const MAX_WAIT_DEFAULT = 10;
132 const TTL_CACHE_READONLY = 5;
143 if ( !isset(
$params[
'servers'] ) ) {
144 throw new InvalidArgumentException( __CLASS__ .
': missing servers parameter' );
146 $this->
servers = $params[
'servers'];
147 foreach ( $this->
servers as $i => $server ) {
149 $this->
servers[$i][
'master'] =
true;
151 $this->
servers[$i][
'replica'] =
true;
160 $this->waitTimeout = isset(
$params[
'waitTimeout'] )
162 : self::MAX_WAIT_DEFAULT;
164 $this->readIndex = -1;
167 self::KEY_LOCAL => [],
168 self::KEY_FOREIGN_INUSE => [],
169 self::KEY_FOREIGN_FREE => [],
171 self::KEY_LOCAL_NOROUND => [],
172 self::KEY_FOREIGN_INUSE_NOROUND => [],
173 self::KEY_FOREIGN_FREE_NOROUND => []
176 $this->waitForPos =
false;
179 if ( isset(
$params[
'readOnlyReason'] ) && is_string(
$params[
'readOnlyReason'] ) ) {
180 $this->readOnlyReason =
$params[
'readOnlyReason'];
183 if ( isset(
$params[
'maxLag'] ) ) {
184 $this->maxLag =
$params[
'maxLag'];
187 if ( isset(
$params[
'loadMonitor'] ) ) {
188 $this->loadMonitorConfig =
$params[
'loadMonitor'];
190 $this->loadMonitorConfig = [
'class' =>
'LoadMonitorNull' ];
192 $this->loadMonitorConfig += [
'lagWarnThreshold' =>
$this->maxLag ];
194 foreach (
$params[
'servers']
as $i => $server ) {
195 $this->loads[$i] = $server[
'load'];
196 if ( isset( $server[
'groupLoads'] ) ) {
197 foreach ( $server[
'groupLoads']
as $group => $ratio ) {
198 if ( !isset( $this->groupLoads[$group] ) ) {
199 $this->groupLoads[$group] = [];
201 $this->groupLoads[$group][$i] = $ratio;
206 if ( isset(
$params[
'srvCache'] ) ) {
207 $this->srvCache =
$params[
'srvCache'];
211 if ( isset(
$params[
'wanCache'] ) ) {
212 $this->wanCache =
$params[
'wanCache'];
216 $this->profiler = isset(
$params[
'profiler'] ) ?
$params[
'profiler'] :
null;
217 if ( isset(
$params[
'trxProfiler'] ) ) {
218 $this->trxProfiler =
$params[
'trxProfiler'];
223 $this->errorLogger = isset(
$params[
'errorLogger'] )
225 :
function ( Exception
$e ) {
226 trigger_error( get_class(
$e ) .
': ' .
$e->getMessage(), E_USER_WARNING );
228 $this->deprecationLogger = isset(
$params[
'deprecationLogger'] )
230 :
function ( $msg ) {
231 trigger_error( $msg, E_USER_DEPRECATED );
234 foreach ( [
'replLogger',
'connLogger',
'queryLogger',
'perfLogger' ]
as $key ) {
235 $this->$key = isset(
$params[$key] ) ?
$params[$key] :
new NullLogger();
238 $this->host = isset(
$params[
'hostname'] )
240 : ( gethostname() ?:
'unknown' );
241 $this->cliMode = isset(
$params[
'cliMode'] )
243 : ( PHP_SAPI ===
'cli' || PHP_SAPI ===
'phpdbg' );
246 if ( isset(
$params[
'chronologyCallback'] ) ) {
247 $this->chronologyCallback =
$params[
'chronologyCallback'];
259 return $this->localDomain->getId();
268 if ( !isset( $this->loadMonitor ) ) {
275 $class = $this->loadMonitorConfig[
'class'];
276 if ( isset( $compat[$class] ) ) {
277 $class = $compat[$class];
280 $this->loadMonitor =
new $class(
281 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
282 $this->loadMonitor->setLogger( $this->replLogger );
297 # Unset excessively lagged servers
298 foreach ( $lags
as $i => $lag ) {
300 # How much lag this server nominally is allowed to have
301 $maxServerLag = isset( $this->
servers[$i][
'max lag'] )
302 ? $this->
servers[$i][
'max lag']
304 # Constrain that futher by $maxLag argument
305 $maxServerLag = min( $maxServerLag,
$maxLag );
308 if ( $lag ===
false && !is_infinite( $maxServerLag ) ) {
309 $this->replLogger->error(
311 ": server {host} is not replicating?", [
'host' =>
$host ] );
313 } elseif ( $lag > $maxServerLag ) {
314 $this->replLogger->info(
316 ": server {host} has {lag} seconds of lag (>= {maxlag})",
317 [
'host' =>
$host,
'lag' => $lag,
'maxlag' => $maxServerLag ]
324 # Find out if all the replica DBs with non-zero load are lagged
330 # No appropriate DB servers except maybe the master and some replica DBs with zero load
331 # Do NOT use the master
332 # Instead, this function will return false, triggering read-only mode,
333 # and a lagged replica DB will be used instead.
341 # Return a random representative of the remainder
349 } elseif ( $group ===
false && $this->readIndex >= 0 ) {
354 if ( $group !==
false ) {
356 if ( isset( $this->groupLoads[$group] ) ) {
357 $loads = $this->groupLoads[$group];
360 $this->connLogger->info( __METHOD__ .
": no loads for group $group" );
374 if ( $i ===
false ) {
384 if ( !$this->
doWait( $i ) ) {
389 if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group ===
false ) {
391 $this->readIndex = $i;
394 $this->laggedReplicaMode =
true;
399 $this->connLogger->debug( __METHOD__ .
": using server $serverName for group '$group'" );
411 throw new InvalidArgumentException(
"Empty server array given to LoadBalancer" );
421 while (
count( $currentLoads ) ) {
426 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
430 $ago = microtime(
true ) - $this->waitForPos->asOfTime();
434 if ( $i ===
false ) {
438 if ( $i ===
false &&
count( $currentLoads ) != 0 ) {
440 $this->replLogger->error(
441 __METHOD__ .
": all replica DBs lagged. Switch to read-only mode" );
447 if ( $i ===
false ) {
451 $this->connLogger->debug( __METHOD__ .
": pickRandom() returned false" );
453 return [
false,
false ];
457 $this->connLogger->debug( __METHOD__ .
": Using reader #$i: $serverName..." );
461 $this->connLogger->warning( __METHOD__ .
": Failed connecting to $i/$domain" );
462 unset( $currentLoads[$i] );
469 if ( $domain !==
false ) {
478 if ( !
count( $currentLoads ) ) {
479 $this->connLogger->error( __METHOD__ .
": all servers down" );
488 $this->waitForPos = $pos;
492 if ( !$this->
doWait( $i ) ) {
493 $this->laggedReplicaMode =
true;
505 $this->waitForPos = $pos;
512 $readLoads = array_filter( $readLoads );
517 $ok = $this->
doWait( $i,
true, $timeout );
522 # Restore the old position, as this is not used for lag-protection but for throttling
523 $this->waitForPos = $oldPos;
534 $this->waitForPos = $pos;
538 for ( $i = 1; $i < $serverCount; $i++ ) {
539 if ( $this->loads[$i] > 0 ) {
540 $start = microtime(
true );
541 $ok = $this->
doWait( $i,
true, $timeout ) && $ok;
542 $timeout -= intval( microtime(
true ) - $start );
543 if ( $timeout <= 0 ) {
549 # Restore the old position, as this is not used for lag-protection but for throttling
550 $this->waitForPos = $oldPos;
564 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
565 $this->waitForPos = $pos;
570 $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
571 foreach ( $this->conns
as $connsByServer ) {
572 if ( !isset( $connsByServer[$i] ) ) {
576 foreach ( $connsByServer[$i]
as $conn ) {
577 if ( !$autocommit || $conn->getLBInfo(
'autoCommitOnly' ) ) {
593 protected function doWait( $index, $open =
false, $timeout =
null ) {
594 $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
598 $key = $this->srvCache->makeGlobalKey( __CLASS__,
'last-known-pos', $server,
'v1' );
600 $knownReachedPos = $this->srvCache->get( $key );
603 $knownReachedPos->
hasReached( $this->waitForPos )
605 $this->replLogger->debug(
607 ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
608 [
'dbserver' => $server ]
618 $this->replLogger->debug(
619 __METHOD__ .
': no connection open for {dbserver}',
620 [
'dbserver' => $server ]
627 $this->replLogger->warning(
628 __METHOD__ .
': failed to connect to {dbserver}',
629 [
'dbserver' => $server ]
640 $this->replLogger->info(
642 ': waiting for replica DB {dbserver} to catch up...',
643 [
'dbserver' => $server ]
646 $result = $conn->masterPosWait( $this->waitForPos, $timeout );
649 $this->replLogger->warning(
650 __METHOD__ .
': Errored out waiting on {host} pos {pos}',
653 'pos' => $this->waitForPos,
654 'trace' => (
new RuntimeException() )->getTraceAsString()
659 $this->replLogger->warning(
660 __METHOD__ .
': Timed out waiting on {host} pos {pos}',
663 'pos' => $this->waitForPos,
664 'trace' => (
new RuntimeException() )->getTraceAsString()
669 $this->replLogger->debug( __METHOD__ .
": done waiting" );
682 public function getConnection( $i, $groups = [], $domain =
false, $flags = 0 ) {
683 if ( $i ===
null || $i ===
false ) {
684 throw new InvalidArgumentException(
'Attempt to call ' . __METHOD__ .
685 ' with invalid server index' );
692 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
698 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
703 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
704 $this->connLogger->info( __METHOD__ .
705 ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
709 $groups = ( $groups ===
false || $groups === [] )
719 # Try to find an available server in any the query groups (in order)
720 foreach ( $groups
as $group ) {
722 if ( $groupIndex !==
false ) {
729 # Operation-based index
731 $this->lastError =
'Unknown error';
732 # Try the general server pool if $groups are unavailable.
733 $i = ( $groups === [
false ] )
736 # Couldn't find a working server in getReaderIndex()?
737 if ( $i ===
false ) {
745 # Now we have an explicit index into the servers array
753 # Profile any new connections that happen
754 if ( $this->connsOpened > $oldConnsOpened ) {
755 $host = $conn->getServer();
756 $dbname = $conn->getDBname();
757 $this->trxProfiler->recordConnection(
$host, $dbname, $masterOnly );
761 # Make master-requested DB handles inherit any read-only mode setting
762 $conn->setLBInfo(
'readOnlyReason', $this->
getReadOnlyReason( $domain, $conn ) );
769 $serverIndex = $conn->
getLBInfo(
'serverIndex' );
770 $refCount = $conn->
getLBInfo(
'foreignPoolRefCount' );
771 if ( $serverIndex ===
null || $refCount ===
null ) {
783 } elseif ( $conn instanceof
DBConnRef ) {
786 $this->connLogger->error(
787 __METHOD__ .
": got DBConnRef instance.\n" .
788 (
new RuntimeException() )->getTraceAsString() );
793 if ( $this->disabled ) {
797 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
806 if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
807 throw new InvalidArgumentException( __METHOD__ .
808 ": connection $serverIndex/$domain not found; it may have already been freed." );
809 } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
810 throw new InvalidArgumentException( __METHOD__ .
811 ": connection $serverIndex/$domain mismatched; it may have already been freed." );
814 $conn->
setLBInfo(
'foreignPoolRefCount', --$refCount );
815 if ( $refCount <= 0 ) {
816 $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
817 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
818 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
819 unset( $this->conns[$connInUseKey][$serverIndex] );
821 $this->connLogger->debug( __METHOD__ .
": freed connection $serverIndex/$domain" );
823 $this->connLogger->debug( __METHOD__ .
824 ": reference count for $serverIndex/$domain reduced to $refCount" );
829 $domain = ( $domain !==
false ) ? $domain : $this->localDomain;
835 $domain = ( $domain !==
false ) ? $domain : $this->localDomain;
837 return new DBConnRef( $this, [ $db, $groups, $domain, $flags ] );
841 $domain = ( $domain !==
false ) ? $domain : $this->localDomain;
844 $this, $this->
getConnection( $db, $groups, $domain, $flags ) );
852 if ( !$this->connectionAttempted && $this->chronologyCallback ) {
853 $this->connLogger->debug( __METHOD__ .
': calling initLB() before first connection.' );
855 $this->connectionAttempted =
true;
856 call_user_func( $this->chronologyCallback, $this );
863 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
865 if ( $domain !==
false ) {
871 if ( isset( $this->conns[$connKey][$i][0] ) ) {
872 $conn = $this->conns[$connKey][$i][0];
874 if ( !isset( $this->
servers[$i] ) || !is_array( $this->
servers[$i] ) ) {
875 throw new InvalidArgumentException(
"No server with index '$i'." );
879 $server[
'serverIndex'] = $i;
880 $server[
'autoCommitOnly'] = $autoCommit;
881 if ( $this->localDomain->getDatabase() !== null ) {
883 $server[
'tablePrefix'] = $this->localDomain->getTablePrefix();
887 if ( $conn->isOpen() ) {
888 $this->connLogger->debug(
889 __METHOD__ .
": connected to database $i at '$host'." );
890 $this->conns[$connKey][$i][0] = $conn;
892 $this->connLogger->warning(
893 __METHOD__ .
": failed to connect to database $i at '$host'." );
894 $this->errorConnection = $conn;
905 $this->errorConnection = $conn;
909 if ( $autoCommit && $conn instanceof
IDatabase ) {
939 $dbName = $domainInstance->getDatabase();
940 $prefix = $domainInstance->getTablePrefix();
941 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
951 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
953 $conn = $this->conns[$connInUseKey][$i][$domain];
954 $this->connLogger->debug( __METHOD__ .
": reusing connection $i/$domain" );
955 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
957 $conn = $this->conns[$connFreeKey][$i][$domain];
958 unset( $this->conns[$connFreeKey][$i][$domain] );
959 $this->conns[$connInUseKey][$i][$domain] = $conn;
960 $this->connLogger->debug( __METHOD__ .
": reusing free connection $i/$domain" );
961 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
963 $conn = reset( $this->conns[$connFreeKey][$i] );
964 $oldDomain =
key( $this->conns[$connFreeKey][$i] );
965 if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
966 $this->lastError =
"Error selecting database '$dbName' on server " .
967 $conn->getServer() .
" from client host {$this->host}";
968 $this->errorConnection = $conn;
971 $conn->tablePrefix( $prefix );
972 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
974 $this->conns[$connInUseKey][$i][$conn->getDomainId()] = $conn;
975 $this->connLogger->debug( __METHOD__ .
976 ": reusing free connection from $oldDomain for $domain" );
979 if ( !isset( $this->
servers[$i] ) || !is_array( $this->
servers[$i] ) ) {
980 throw new InvalidArgumentException(
"No server with index '$i'." );
984 $server[
'serverIndex'] = $i;
985 $server[
'foreignPoolRefCount'] = 0;
986 $server[
'foreign'] =
true;
987 $server[
'autoCommitOnly'] = $autoCommit;
989 if ( !$conn->isOpen() ) {
990 $this->connLogger->warning( __METHOD__ .
": connection error for $i/$domain" );
991 $this->errorConnection = $conn;
994 $conn->tablePrefix( $prefix );
996 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
997 $this->connLogger->debug( __METHOD__ .
": opened new connection for $i/$domain" );
1003 $refCount = $conn->getLBInfo(
'foreignPoolRefCount' );
1004 $conn->setLBInfo(
'foreignPoolRefCount', $refCount + 1 );
1013 isset( $this->
servers[$i][
'driver'] ) ? $this->
servers[$i][
'driver'] :
null
1025 if ( !is_int( $index ) ) {
1044 if ( $this->disabled ) {
1052 if ( $server[
'type'] ===
'mysql' ) {
1056 $server[
'dbname'] =
null;
1059 $server[
'dbname'] = $domainOverride->
getDatabase();
1060 $server[
'schema'] = $domainOverride->
getSchema();
1065 $server[
'clusterMasterHost'] = $masterName;
1068 if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
1069 $this->perfLogger->warning( __METHOD__ .
": " .
1070 "{$this->connsOpened}+ connections made (master=$masterName)" );
1097 $db->setLBInfo( $server );
1098 $db->setLazyMasterHandle(
1101 $db->setTableAliases( $this->tableAliases );
1102 $db->setIndexAliases( $this->indexAliases );
1105 if ( $this->trxRoundId !==
false ) {
1108 foreach ( $this->trxRecurringCallbacks
as $name => $callback ) {
1109 $db->setTransactionListener(
$name, $callback );
1122 'method' => __METHOD__,
1127 $context[
'db_server'] = $conn->getServer();
1128 $this->connLogger->warning(
1129 __METHOD__ .
": connection error: {last_error} ({db_server})",
1134 $conn->reportConnectionError(
"{$this->lastError} ({$context['db_server']})" );
1137 $this->connLogger->error(
1139 ": LB failure with no last connection. Connection error: {last_error}",
1153 return array_key_exists( $i, $this->
servers );
1157 return array_key_exists( $i, $this->
servers ) && $this->loads[$i] != 0;
1165 if ( isset( $this->
servers[$i][
'hostName'] ) ) {
1167 } elseif ( isset( $this->
servers[$i][
'host'] ) ) {
1173 return (
$name !=
'' ) ?
$name :
'localhost';
1177 return isset( $this->
servers[$i][
'type'] ) ? $this->
servers[$i][
'type'] :
'unknown';
1181 # If this entire request was served from a replica DB without opening a connection to the
1182 # master (however unlikely that may be), then we can fetch the position from the replica DB.
1184 if ( !$masterConn ) {
1186 for ( $i = 1; $i < $serverCount; $i++ ) {
1189 return $conn->getReplicaPos();
1193 return $masterConn->getMasterPos();
1201 $this->disabled =
true;
1207 $this->connLogger->debug(
1208 __METHOD__ .
": closing connection to database '$host'." );
1213 self::KEY_LOCAL => [],
1214 self::KEY_FOREIGN_INUSE => [],
1215 self::KEY_FOREIGN_FREE => [],
1216 self::KEY_LOCAL_NOROUND => [],
1217 self::KEY_FOREIGN_INUSE_NOROUND => [],
1218 self::KEY_FOREIGN_FREE_NOROUND => []
1220 $this->connsOpened = 0;
1224 $serverIndex = $conn->
getLBInfo(
'serverIndex' );
1225 foreach ( $this->conns
as $type => $connsByServer ) {
1226 if ( !isset( $connsByServer[$serverIndex] ) ) {
1230 foreach ( $connsByServer[$serverIndex]
as $i => $trackedConn ) {
1231 if ( $conn === $trackedConn ) {
1233 $this->connLogger->debug(
1234 __METHOD__ .
": closing connection to database $i at '$host'." );
1235 unset( $this->conns[
$type][$serverIndex][$i] );
1248 $restore = ( $this->trxRoundId !==
false );
1249 $this->trxRoundId =
false;
1255 call_user_func( $this->errorLogger,
$e );
1256 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1258 if ( $restore && $conn->
getLBInfo(
'master' ) ) {
1267 "Commit failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1283 $limit = isset(
$options[
'maxWriteDuration'] ) ?
$options[
'maxWriteDuration'] : 0;
1291 "Explicit transaction still active. A caller may have caught an error."
1297 if ( $limit > 0 &&
$time > $limit ) {
1300 "Transaction spent $time second(s) in writes, exceeding the limit of $limit.",
1309 "A connection to the {$conn->getDBname()} database was lost before commit."
1316 if ( $this->trxRoundId !==
false ) {
1319 "$fname: Transaction round '{$this->trxRoundId}' already started."
1322 $this->trxRoundId =
$fname;
1331 call_user_func( $this->errorLogger,
$e );
1332 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1342 "$fname: Flush failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1353 $restore = ( $this->trxRoundId !==
false );
1354 $this->trxRoundId =
false;
1360 } elseif ( $restore ) {
1364 call_user_func( $this->errorLogger,
$e );
1365 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1376 "$fname: Commit failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1390 $this->queryLogger->warning( __METHOD__ .
": found writes pending." );
1402 }
catch ( Exception $ex ) {
1407 }
catch ( Exception $ex ) {
1416 $restore = ( $this->trxRoundId !==
false );
1417 $this->trxRoundId =
false;
1444 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
1455 $conn->
setLBInfo(
'trxRoundId', $this->trxRoundId );
1463 if ( $conn->
getLBInfo(
'autoCommitOnly' ) ) {
1468 $conn->
setLBInfo(
'trxRoundId',
false );
1492 return (
bool)$pending;
1505 $age = ( $age === null ) ? $this->waitTimeout : $age;
1522 if ( !$this->laggedReplicaMode && $this->
getServerCount() > 1 ) {
1529 $this->allReplicasDownMode =
true;
1530 $this->laggedReplicaMode =
true;
1560 if ( $this->readOnlyReason !==
false ) {
1563 if ( $this->allReplicasDownMode ) {
1564 return 'The database has been automatically locked ' .
1565 'until the replica database servers become available';
1567 return 'The database has been automatically locked ' .
1568 'while the replica database servers catch up to the master.';
1571 return 'The database master is running in read-only mode.';
1586 return (
bool)
$cache->getWithSetCallback(
1587 $cache->makeGlobalKey( __CLASS__,
'server-read-only', $masterServer ),
1588 self::TTL_CACHE_READONLY,
1589 function ()
use ( $domain, $conn ) {
1590 $old = $this->trxProfiler->setSilenced(
true );
1593 $readOnly = (int)$dbw->serverIsReadOnly();
1600 $this->trxProfiler->setSilenced( $old );
1603 [
'pcTTL' => $cache::TTL_PROC_LONG,
'busyValue' => 0 ]
1608 if ( $mode ===
null ) {
1619 if ( !$conn->
ping() ) {
1628 foreach ( $this->conns
as $connsByServer ) {
1629 foreach ( $connsByServer
as $serverConns ) {
1630 foreach ( $serverConns
as $conn ) {
1631 $mergedParams = array_merge( [ $conn ],
$params );
1632 call_user_func_array( $callback, $mergedParams );
1640 foreach ( $this->conns
as $connsByServer ) {
1641 if ( isset( $connsByServer[$masterIndex] ) ) {
1643 foreach ( $connsByServer[$masterIndex]
as $conn ) {
1644 $mergedParams = array_merge( [ $conn ],
$params );
1645 call_user_func_array( $callback, $mergedParams );
1652 foreach ( $this->conns
as $connsByServer ) {
1653 foreach ( $connsByServer
as $i => $serverConns ) {
1657 foreach ( $serverConns
as $conn ) {
1658 $mergedParams = array_merge( [ $conn ],
$params );
1659 call_user_func_array( $callback, $mergedParams );
1675 foreach ( $lagTimes
as $i => $lag ) {
1676 if ( $this->loads[$i] > 0 && $lag >
$maxLag ) {
1691 $knownLagTimes = [];
1692 $indexesWithLag = [];
1693 foreach ( $this->
servers as $i => $server ) {
1694 if ( empty( $server[
'is static'] ) ) {
1695 $indexesWithLag[] = $i;
1697 $knownLagTimes[$i] = 0;
1701 return $this->
getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1719 $timeout = max( 1, $timeout ?: $this->waitTimeout );
1728 if ( $masterConn ) {
1729 $pos = $masterConn->getMasterPos();
1732 $pos = $masterConn->getMasterPos();
1740 $msg = __METHOD__ .
': timed out waiting on {host} pos {pos}';
1741 $this->replLogger->warning( $msg, [
1744 'trace' => (
new RuntimeException() )->getTraceAsString()
1748 $this->replLogger->debug( __METHOD__ .
': done waiting' );
1753 $this->replLogger->error(
1754 __METHOD__ .
': could not get master pos for {host}',
1757 'trace' => (
new RuntimeException() )->getTraceAsString()
1767 $this->trxRecurringCallbacks[
$name] = $callback;
1769 unset( $this->trxRecurringCallbacks[
$name] );
1779 $this->tableAliases = $aliases;
1783 $this->indexAliases = $aliases;
1792 if ( $conn->
getLBInfo(
'foreignPoolRefCount' ) > 0 ) {
1798 if ( $domainsInUse ) {
1799 $domains = implode(
', ', $domainsInUse );
1801 "Foreign domain connections are still in use ($domains)." );
1804 $oldDomain = $this->localDomain->getId();
1806 $this->localDomain->getDatabase(),
1807 $this->localDomain->getSchema(),
1822 $this->localDomain = $domain;
1825 if ( $this->localDomain->getTablePrefix() !=
'' ) {
1826 $this->localDomainIdAlias =
1827 $this->localDomain->
getDatabase() .
'-' . $this->localDomain->getTablePrefix();
1829 $this->localDomainIdAlias = $this->localDomain->getDatabase();
1840 if ( PHP_SAPI !=
'cli' ) {
1841 $old = ignore_user_abort(
true );
1842 return new ScopedCallback(
function ()
use ( $old ) {
1843 ignore_user_abort( $old );