25 use Psr\Log\LoggerInterface;
26 use Psr\Log\NullLogger;
27 use Wikimedia\ScopedCallback;
32 use InvalidArgumentException;
122 const CONN_HELD_WARN_THRESHOLD = 10;
125 const MAX_LAG_DEFAULT = 10;
127 const TTL_CACHE_READONLY = 5;
130 if ( !isset(
$params[
'servers'] ) ) {
131 throw new InvalidArgumentException( __CLASS__ .
': missing servers parameter' );
133 $this->mServers =
$params[
'servers'];
135 $this->localDomain = isset(
$params[
'localDomain'] )
140 if ( $this->localDomain->getTablePrefix() !=
'' ) {
141 $this->localDomainIdAlias =
142 $this->localDomain->getDatabase() .
'-' . $this->localDomain->getTablePrefix();
144 $this->localDomainIdAlias = $this->localDomain->getDatabase();
147 $this->mWaitTimeout = isset(
$params[
'waitTimeout'] ) ?
$params[
'waitTimeout'] : 10;
149 $this->mReadIndex = -1;
156 $this->mWaitForPos =
false;
157 $this->mAllowLagged =
false;
159 if ( isset(
$params[
'readOnlyReason'] ) && is_string(
$params[
'readOnlyReason'] ) ) {
160 $this->readOnlyReason =
$params[
'readOnlyReason'];
163 if ( isset(
$params[
'loadMonitor'] ) ) {
164 $this->loadMonitorConfig =
$params[
'loadMonitor'];
166 $this->loadMonitorConfig = [
'class' =>
'LoadMonitorNull' ];
169 foreach (
$params[
'servers']
as $i => $server ) {
170 $this->mLoads[$i] = $server[
'load'];
171 if ( isset( $server[
'groupLoads'] ) ) {
172 foreach ( $server[
'groupLoads']
as $group => $ratio ) {
173 if ( !isset( $this->mGroupLoads[$group] ) ) {
174 $this->mGroupLoads[$group] = [];
176 $this->mGroupLoads[$group][$i] = $ratio;
181 if ( isset(
$params[
'srvCache'] ) ) {
182 $this->srvCache =
$params[
'srvCache'];
186 if ( isset(
$params[
'memCache'] ) ) {
187 $this->memCache =
$params[
'memCache'];
191 if ( isset(
$params[
'wanCache'] ) ) {
192 $this->wanCache =
$params[
'wanCache'];
196 $this->profiler = isset(
$params[
'profiler'] ) ?
$params[
'profiler'] :
null;
197 if ( isset(
$params[
'trxProfiler'] ) ) {
198 $this->trxProfiler =
$params[
'trxProfiler'];
203 $this->errorLogger = isset(
$params[
'errorLogger'] )
205 :
function ( Exception
$e ) {
206 trigger_error( get_class(
$e ) .
': ' .
$e->getMessage(), E_USER_WARNING );
209 foreach ( [
'replLogger',
'connLogger',
'queryLogger',
'perfLogger' ]
as $key ) {
210 $this->$key = isset(
$params[$key] ) ?
$params[$key] :
new NullLogger();
213 $this->host = isset(
$params[
'hostname'] )
215 : ( gethostname() ?:
'unknown' );
216 $this->cliMode = isset(
$params[
'cliMode'] ) ?
$params[
'cliMode'] : PHP_SAPI ===
'cli';
219 if ( isset(
$params[
'chronologyProtector'] ) ) {
220 $this->chronProt =
$params[
'chronologyProtector'];
230 if ( !isset( $this->loadMonitor ) ) {
237 $class = $this->loadMonitorConfig[
'class'];
238 if ( isset( $compat[$class] ) ) {
239 $class = $compat[$class];
242 $this->loadMonitor =
new $class(
243 $this, $this->srvCache, $this->memCache, $this->loadMonitorConfig );
244 $this->loadMonitor->setLogger( $this->replLogger );
259 # Unset excessively lagged servers
260 foreach ( $lags
as $i => $lag ) {
262 # How much lag this server nominally is allowed to have
263 $maxServerLag = isset( $this->mServers[$i][
'max lag'] )
264 ? $this->mServers[$i][
'max lag']
265 : self::MAX_LAG_DEFAULT;
266 # Constrain that futher by $maxLag argument
267 $maxServerLag = min( $maxServerLag, $maxLag );
270 if ( $lag ===
false && !is_infinite( $maxServerLag ) ) {
271 $this->replLogger->error(
272 "Server {host} is not replicating?", [
'host' =>
$host ] );
274 } elseif ( $lag > $maxServerLag ) {
275 $this->replLogger->warning(
276 "Server {host} has {lag} seconds of lag (>= {maxlag})",
277 [
'host' =>
$host,
'lag' => $lag,
'maxlag' => $maxServerLag ]
284 # Find out if all the replica DBs with non-zero load are lagged
286 foreach ( $loads
as $load ) {
290 # No appropriate DB servers except maybe the master and some replica DBs with zero load
291 # Do NOT use the master
292 # Instead, this function will return false, triggering read-only mode,
293 # and a lagged replica DB will be used instead.
297 if (
count( $loads ) == 0 ) {
301 # Return a random representative of the remainder
306 if (
count( $this->mServers ) == 1 ) {
309 } elseif ( $group ===
false && $this->mReadIndex >= 0 ) {
314 if ( $group !==
false ) {
316 if ( isset( $this->mGroupLoads[$group] ) ) {
317 $loads = $this->mGroupLoads[$group];
320 $this->connLogger->info( __METHOD__ .
": no loads for group $group" );
334 if ( $i ===
false ) {
344 if ( !$this->
doWait( $i ) ) {
349 if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group ===
false ) {
351 $this->mReadIndex = $i;
354 $this->laggedReplicaMode =
true;
359 $this->connLogger->debug( __METHOD__ .
": using server $serverName for group '$group'" );
370 if ( !
count( $loads ) ) {
371 throw new InvalidArgumentException(
"Empty server array given to LoadBalancer" );
380 $currentLoads = $loads;
381 while (
count( $currentLoads ) ) {
386 if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
390 $ago = microtime(
true ) - $this->mWaitForPos->asOfTime();
394 if ( $i ===
false ) {
398 if ( $i ===
false &&
count( $currentLoads ) != 0 ) {
400 $this->replLogger->error(
"All replica DBs lagged. Switch to read-only mode" );
406 if ( $i ===
false ) {
410 $this->connLogger->debug( __METHOD__ .
": pickRandom() returned false" );
412 return [
false,
false ];
416 $this->connLogger->debug( __METHOD__ .
": Using reader #$i: $serverName..." );
420 $this->connLogger->warning( __METHOD__ .
": Failed connecting to $i/$domain" );
421 unset( $currentLoads[$i] );
428 if ( $domain !==
false ) {
437 if ( !
count( $currentLoads ) ) {
438 $this->connLogger->error(
"All servers down" );
447 $this->mWaitForPos = $pos;
451 if ( !$this->
doWait( $i ) ) {
452 $this->laggedReplicaMode =
true;
464 $this->mWaitForPos = $pos;
471 $readLoads = array_filter( $readLoads );
476 $ok = $this->
doWait( $i,
true, $timeout );
481 # Restore the old position, as this is not used for lag-protection but for throttling
482 $this->mWaitForPos = $oldPos;
491 $this->mWaitForPos = $pos;
492 $serverCount =
count( $this->mServers );
495 for ( $i = 1; $i < $serverCount; $i++ ) {
496 if ( $this->mLoads[$i] > 0 ) {
497 $ok = $this->
doWait( $i,
true, $timeout ) && $ok;
501 # Restore the old position, as this is not used for lag-protection but for throttling
502 $this->mWaitForPos = $oldPos;
516 if ( !$this->mWaitForPos || $pos->hasReached( $this->mWaitForPos ) ) {
517 $this->mWaitForPos = $pos;
526 foreach ( $this->mConns
as $connsByServer ) {
527 if ( !empty( $connsByServer[$i] ) ) {
529 $serverConns = $connsByServer[$i];
531 return reset( $serverConns );
545 protected function doWait( $index, $open =
false, $timeout =
null ) {
550 $key = $this->srvCache->makeGlobalKey( __CLASS__,
'last-known-pos', $server,
'v1' );
552 $knownReachedPos = $this->srvCache->get( $key );
555 $knownReachedPos->
hasReached( $this->mWaitForPos )
557 $this->replLogger->debug( __METHOD__ .
558 ": replica DB $server known to be caught up (pos >= $knownReachedPos)." );
566 $this->replLogger->debug( __METHOD__ .
": no connection open for $server" );
572 $this->replLogger->warning( __METHOD__ .
": failed to connect to $server" );
582 $this->replLogger->info( __METHOD__ .
": Waiting for replica DB $server to catch up..." );
584 $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
588 $this->replLogger->warning(
589 __METHOD__ .
": Timed out waiting on {host} pos {$this->mWaitForPos}",
590 [
'host' => $server ]
594 $this->replLogger->info( __METHOD__ .
": Done" );
617 if ( $i ===
null || $i ===
false ) {
618 throw new InvalidArgumentException(
'Attempt to call ' . __METHOD__ .
619 ' with invalid server index' );
626 $groups = ( $groups ===
false || $groups === [] )
636 # Try to find an available server in any the query groups (in order)
637 foreach ( $groups
as $group ) {
639 if ( $groupIndex !==
false ) {
646 # Operation-based index
648 $this->mLastError =
'Unknown error';
649 # Try the general server pool if $groups are unavailable.
650 $i = ( $groups === [
false ] )
653 # Couldn't find a working server in getReaderIndex()?
654 if ( $i ===
false ) {
662 # Now we have an explicit index into the servers array
670 # Profile any new connections that happen
671 if ( $this->connsOpened > $oldConnsOpened ) {
672 $host = $conn->getServer();
673 $dbname = $conn->getDBname();
674 $this->trxProfiler->recordConnection(
$host, $dbname, $masterOnly );
678 # Make master-requested DB handles inherit any read-only mode setting
679 $conn->setLBInfo(
'readOnlyReason', $this->
getReadOnlyReason( $domain, $conn ) );
686 $serverIndex = $conn->getLBInfo(
'serverIndex' );
687 $refCount = $conn->getLBInfo(
'foreignPoolRefCount' );
688 if ( $serverIndex ===
null || $refCount ===
null ) {
700 } elseif ( $conn instanceof
DBConnRef ) {
703 $this->connLogger->error( __METHOD__ .
": got DBConnRef instance.\n" .
704 (
new RuntimeException() )->getTraceAsString() );
709 if ( $this->disabled ) {
713 $domain = $conn->getDomainID();
714 if ( !isset( $this->mConns[
'foreignUsed'][$serverIndex][$domain] ) ) {
715 throw new InvalidArgumentException( __METHOD__ .
716 ": connection $serverIndex/$domain not found; it may have already been freed." );
717 } elseif ( $this->mConns[
'foreignUsed'][$serverIndex][$domain] !== $conn ) {
718 throw new InvalidArgumentException( __METHOD__ .
719 ": connection $serverIndex/$domain mismatched; it may have already been freed." );
721 $conn->setLBInfo(
'foreignPoolRefCount', --$refCount );
722 if ( $refCount <= 0 ) {
723 $this->mConns[
'foreignFree'][$serverIndex][$domain] = $conn;
724 unset( $this->mConns[
'foreignUsed'][$serverIndex][$domain] );
725 if ( !$this->mConns[
'foreignUsed'][$serverIndex] ) {
726 unset( $this->mConns[
'foreignUsed' ][$serverIndex] );
728 $this->connLogger->debug( __METHOD__ .
": freed connection $serverIndex/$domain" );
730 $this->connLogger->debug( __METHOD__ .
731 ": reference count for $serverIndex/$domain reduced to $refCount" );
736 $domain = ( $domain !==
false ) ? $domain : $this->localDomain;
742 $domain = ( $domain !==
false ) ? $domain : $this->localDomain;
744 return new DBConnRef( $this, [ $db, $groups, $domain ] );
748 $domain = ( $domain !==
false ) ? $domain : $this->localDomain;
766 if ( !$this->chronProtInitialized && $this->chronProt ) {
767 $this->connLogger->debug( __METHOD__ .
': calling initLB() before first connection.' );
769 $this->chronProtInitialized =
true;
770 $this->chronProt->initLB( $this );
773 if ( $domain !==
false ) {
775 } elseif ( isset( $this->mConns[
'local'][$i][0] ) ) {
776 $conn = $this->mConns[
'local'][$i][0];
778 if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
779 throw new InvalidArgumentException(
"No server with index '$i'." );
782 $server = $this->mServers[$i];
783 $server[
'serverIndex'] = $i;
786 if ( $conn->isOpen() ) {
787 $this->connLogger->debug(
"Connected to database $i at '$serverName'." );
788 $this->mConns[
'local'][$i][0] = $conn;
790 $this->connLogger->warning(
"Failed to connect to database $i at '$serverName'." );
791 $this->errorConnection = $conn;
801 $this->errorConnection = $conn;
830 $dbName = $domainInstance->getDatabase();
831 $prefix = $domainInstance->getTablePrefix();
833 if ( isset( $this->mConns[
'foreignUsed'][$i][$domain] ) ) {
835 $conn = $this->mConns[
'foreignUsed'][$i][$domain];
836 $this->connLogger->debug( __METHOD__ .
": reusing connection $i/$domain" );
837 } elseif ( isset( $this->mConns[
'foreignFree'][$i][$domain] ) ) {
839 $conn = $this->mConns[
'foreignFree'][$i][$domain];
840 unset( $this->mConns[
'foreignFree'][$i][$domain] );
841 $this->mConns[
'foreignUsed'][$i][$domain] = $conn;
842 $this->connLogger->debug( __METHOD__ .
": reusing free connection $i/$domain" );
843 } elseif ( !empty( $this->mConns[
'foreignFree'][$i] ) ) {
845 $conn = reset( $this->mConns[
'foreignFree'][$i] );
846 $oldDomain =
key( $this->mConns[
'foreignFree'][$i] );
849 if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
850 $this->mLastError =
"Error selecting database '$dbName' on server " .
851 $conn->getServer() .
" from client host {$this->host}";
852 $this->errorConnection = $conn;
855 $conn->tablePrefix( $prefix );
856 unset( $this->mConns[
'foreignFree'][$i][$oldDomain] );
857 $this->mConns[
'foreignUsed'][$i][$domain] = $conn;
858 $this->connLogger->debug( __METHOD__ .
859 ": reusing free connection from $oldDomain for $domain" );
862 if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
863 throw new InvalidArgumentException(
"No server with index '$i'." );
866 $server = $this->mServers[$i];
867 $server[
'serverIndex'] = $i;
868 $server[
'foreignPoolRefCount'] = 0;
869 $server[
'foreign'] =
true;
871 if ( !$conn->isOpen() ) {
872 $this->connLogger->warning( __METHOD__ .
": connection error for $i/$domain" );
873 $this->errorConnection = $conn;
876 $conn->tablePrefix( $prefix );
877 $this->mConns[
'foreignUsed'][$i][$domain] = $conn;
878 $this->connLogger->debug( __METHOD__ .
": opened new connection for $i/$domain" );
884 $refCount = $conn->getLBInfo(
'foreignPoolRefCount' );
885 $conn->setLBInfo(
'foreignPoolRefCount', $refCount + 1 );
899 if ( !is_integer( $index ) ) {
918 if ( $this->disabled ) {
922 if ( $dbNameOverride !==
false ) {
923 $server[
'dbname'] = $dbNameOverride;
928 $server[
'clusterMasterHost'] = $masterName;
931 if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
932 $this->perfLogger->warning( __METHOD__ .
": " .
933 "{$this->connsOpened}+ connections made (master=$masterName)" );
959 $db->setLBInfo( $server );
960 $db->setLazyMasterHandle(
963 $db->setTableAliases( $this->tableAliases );
966 if ( $this->trxRoundId !==
false ) {
969 foreach ( $this->trxRecurringCallbacks
as $name => $callback ) {
970 $db->setTransactionListener(
$name, $callback );
983 'method' => __METHOD__,
988 $context[
'db_server'] = $conn->getServer();
989 $this->connLogger->warning(
990 "Connection error: {last_error} ({db_server})",
995 $conn->reportConnectionError(
"{$this->mLastError} ({$context['db_server']})" );
998 $this->connLogger->error(
999 "LB failure with no last connection. Connection error: {last_error}",
1013 return array_key_exists( $i, $this->mServers );
1017 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
1021 return count( $this->mServers );
1025 if ( isset( $this->mServers[$i][
'hostName'] ) ) {
1026 $name = $this->mServers[$i][
'hostName'];
1027 } elseif ( isset( $this->mServers[$i][
'host'] ) ) {
1028 $name = $this->mServers[$i][
'host'];
1033 return (
$name !=
'' ) ?
$name :
'localhost';
1037 if ( isset( $this->mServers[$i] ) ) {
1038 return $this->mServers[$i];
1045 $this->mServers[$i] = $serverInfo;
1049 # If this entire request was served from a replica DB without opening a connection to the
1050 # master (however unlikely that may be), then we can fetch the position from the replica DB.
1052 if ( !$masterConn ) {
1053 $serverCount =
count( $this->mServers );
1054 for ( $i = 1; $i < $serverCount; $i++ ) {
1057 return $conn->getReplicaPos();
1061 return $masterConn->getMasterPos();
1069 $this->disabled =
true;
1075 $this->connLogger->debug(
"Closing connection to database '$host'." );
1081 'foreignFree' => [],
1082 'foreignUsed' => [],
1084 $this->connsOpened = 0;
1088 $serverIndex = $conn->
getLBInfo(
'serverIndex' );
1089 foreach ( $this->mConns
as $type => $connsByServer ) {
1090 if ( !isset( $connsByServer[$serverIndex] ) ) {
1094 foreach ( $connsByServer[$serverIndex]
as $i => $trackedConn ) {
1095 if ( $conn === $trackedConn ) {
1097 $this->connLogger->debug(
"Closing connection to database $i at '$host'." );
1098 unset( $this->mConns[
$type][$serverIndex][$i] );
1111 $restore = ( $this->trxRoundId !==
false );
1112 $this->trxRoundId =
false;
1118 call_user_func( $this->errorLogger,
$e );
1119 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1121 if ( $restore && $conn->
getLBInfo(
'master' ) ) {
1130 "Commit failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1154 "Explicit transaction still active. A caller may have caught an error."
1163 "Transaction spent $time second(s) in writes, exceeding the $limit limit.",
1172 "A connection to the {$conn->getDBname()} database was lost before commit."
1179 if ( $this->trxRoundId !==
false ) {
1182 "$fname: Transaction round '{$this->trxRoundId}' already started."
1185 $this->trxRoundId =
$fname;
1194 call_user_func( $this->errorLogger,
$e );
1195 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1205 "$fname: Flush failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1216 $restore = ( $this->trxRoundId !==
false );
1217 $this->trxRoundId =
false;
1223 } elseif ( $restore ) {
1227 call_user_func( $this->errorLogger,
$e );
1228 $failures[] =
"{$conn->getServer()}: {$e->getMessage()}";
1239 "$fname: Commit failed on server(s) " . implode(
"\n", array_unique( $failures ) )
1252 $this->queryLogger->error( __METHOD__ .
": found writes/callbacks pending." );
1264 }
catch ( Exception $ex ) {
1269 }
catch ( Exception $ex ) {
1278 $restore = ( $this->trxRoundId !==
false );
1279 $this->trxRoundId =
false;
1337 return (
bool)$pending;
1350 $age = ( $age === null ) ? $this->mWaitTimeout : $age;
1367 if ( !$this->laggedReplicaMode && $this->
getServerCount() > 1 ) {
1374 $this->allReplicasDownMode =
true;
1375 $this->laggedReplicaMode =
true;
1405 if ( $this->readOnlyReason !==
false ) {
1408 if ( $this->allReplicasDownMode ) {
1409 return 'The database has been automatically locked ' .
1410 'until the replica database servers become available';
1412 return 'The database has been automatically locked ' .
1413 'while the replica database servers catch up to the master.';
1416 return 'The database master is running in read-only mode.';
1431 return (
bool)
$cache->getWithSetCallback(
1432 $cache->makeGlobalKey( __CLASS__,
'server-read-only', $masterServer ),
1433 self::TTL_CACHE_READONLY,
1434 function ()
use ( $domain, $conn ) {
1435 $old = $this->trxProfiler->setSilenced(
true );
1438 $readOnly = (int)$dbw->serverIsReadOnly();
1445 $this->trxProfiler->setSilenced( $old );
1448 [
'pcTTL' => $cache::TTL_PROC_LONG,
'busyValue' => 0 ]
1453 if ( $mode ===
null ) {
1456 $this->mAllowLagged = $mode;
1464 if ( !$conn->
ping() ) {
1473 foreach ( $this->mConns
as $connsByServer ) {
1474 foreach ( $connsByServer
as $serverConns ) {
1475 foreach ( $serverConns
as $conn ) {
1476 $mergedParams = array_merge( [ $conn ],
$params );
1477 call_user_func_array( $callback, $mergedParams );
1485 foreach ( $this->mConns
as $connsByServer ) {
1486 if ( isset( $connsByServer[$masterIndex] ) ) {
1488 foreach ( $connsByServer[$masterIndex]
as $conn ) {
1489 $mergedParams = array_merge( [ $conn ],
$params );
1490 call_user_func_array( $callback, $mergedParams );
1497 foreach ( $this->mConns
as $connsByServer ) {
1498 foreach ( $connsByServer
as $i => $serverConns ) {
1502 foreach ( $serverConns
as $conn ) {
1503 $mergedParams = array_merge( [ $conn ],
$params );
1504 call_user_func_array( $callback, $mergedParams );
1516 return [
$host, $maxLag, $maxIndex ];
1520 foreach ( $lagTimes
as $i => $lag ) {
1521 if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1523 $host = $this->mServers[$i][
'host'];
1528 return [
$host, $maxLag, $maxIndex ];
1536 $knownLagTimes = [];
1537 $indexesWithLag = [];
1538 foreach ( $this->mServers
as $i => $server ) {
1539 if ( empty( $server[
'is static'] ) ) {
1540 $indexesWithLag[] = $i;
1542 $knownLagTimes[$i] = 0;
1546 return $this->
getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1571 if ( $masterConn ) {
1572 $pos = $masterConn->getMasterPos();
1575 $pos = $masterConn->getMasterPos();
1583 $msg = __METHOD__ .
": Timed out waiting on {$conn->getServer()} pos {$pos}";
1584 $this->replLogger->warning(
"$msg" );
1587 $this->replLogger->info( __METHOD__ .
": Done" );
1592 $this->replLogger->error(
"Could not get master pos for {$conn->getServer()}." );
1600 $this->trxRecurringCallbacks[
$name] = $callback;
1602 unset( $this->trxRecurringCallbacks[
$name] );
1612 $this->tableAliases = $aliases;
1616 if ( $this->mConns[
'foreignUsed'] ) {
1619 foreach ( $this->mConns[
'foreignUsed']
as $i => $connsByDomain ) {
1620 $domains = array_merge( $domains, array_keys( $connsByDomain ) );
1622 $domains = implode(
', ', $domains );
1624 "Foreign domain connections are still in use ($domains)." );
1628 $this->localDomain->getDatabase(),
1645 if ( PHP_SAPI !=
'cli' ) {
1646 $old = ignore_user_abort(
true );
1647 return new ScopedCallback(
function ()
use ( $old ) {
1648 ignore_user_abort( $old );