26use Psr\Log\LoggerInterface;
27use Psr\Log\NullLogger;
28use Wikimedia\ScopedCallback;
114 [
'replLogger',
'connLogger',
'queryLogger',
'perfLogger' ];
117 $this->localDomain = isset( $conf[
'localDomain'] )
121 $this->maxLag = $conf[
'maxLag'] ??
null;
122 if ( isset( $conf[
'readOnlyReason'] ) && is_string( $conf[
'readOnlyReason'] ) ) {
123 $this->readOnlyReason = $conf[
'readOnlyReason'];
128 $this->wanCache = $conf[
'wanCache'] ?? WANObjectCache::newEmpty();
130 foreach ( self::$loggerFields as $key ) {
131 $this->$key = $conf[$key] ??
new NullLogger();
133 $this->errorLogger = $conf[
'errorLogger'] ??
function ( Exception $e ) {
134 trigger_error( get_class( $e ) .
': ' . $e->getMessage(), E_USER_WARNING );
136 $this->deprecationLogger = $conf[
'deprecationLogger'] ??
function ( $msg ) {
137 trigger_error( $msg, E_USER_DEPRECATED );
140 $this->profiler = $conf[
'profiler'] ??
null;
143 $this->requestInfo = [
144 'IPAddress' => $_SERVER[
'REMOTE_ADDR' ] ??
'',
145 'UserAgent' => $_SERVER[
'HTTP_USER_AGENT'] ??
'',
147 'ChronologyProtection' =>
null,
148 'ChronologyClientId' =>
null,
149 'ChronologyPositionIndex' =>
null
152 $this->cliMode = $conf[
'cliMode'] ?? ( PHP_SAPI ===
'cli' || PHP_SAPI ===
'phpdbg' );
153 $this->hostname = $conf[
'hostname'] ?? gethostname();
154 $this->agent = $conf[
'agent'] ??
'';
155 $this->defaultGroup = $conf[
'defaultGroup'] ??
null;
156 $this->secret = $conf[
'secret'] ??
'';
158 static $nextId, $nextTicket;
159 $this->
id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
160 $this->ticket = $nextTicket = ( is_int( $nextTicket ) ? $nextTicket++ : mt_rand() );
165 $scope = ScopedCallback::newScopedIgnoreUserAbort();
171 return $this->localDomain->getId();
179 $mode = self::SHUTDOWN_CHRONPROT_SYNC,
180 callable $workCallback =
null,
185 $scope = ScopedCallback::newScopedIgnoreUserAbort();
188 if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
190 } elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
208 $loadBalancer->$methodName( ...
$args );
210 [ $methodName,
$args ]
215 if ( $this->trxRoundId !==
false && $this->trxRoundId !== $fname ) {
216 $this->queryLogger->warning(
217 "$fname: transaction round '{$this->trxRoundId}' still running",
218 [
'trace' => (
new RuntimeException() )->getTraceAsString() ]
224 final public function commitAll( $fname = __METHOD__, array $options = [] ) {
233 $scope = ScopedCallback::newScopedIgnoreUserAbort();
236 if ( $this->trxRoundId !==
false ) {
239 "$fname: transaction round '{$this->trxRoundId}' already started"
242 $this->trxRoundId = $fname;
251 $scope = ScopedCallback::newScopedIgnoreUserAbort();
254 if ( $this->trxRoundId !==
false && $this->trxRoundId !== $fname ) {
257 "$fname: transaction round '{$this->trxRoundId}' still running"
266 }
while ( $count > 0 );
267 $this->trxRoundId =
false;
279 if ( $e instanceof Exception ) {
286 $scope = ScopedCallback::newScopedIgnoreUserAbort();
289 $this->trxRoundId =
false;
321 return ( $this->trxRoundId !==
false );
325 return ( $this->trxRoundStage === self::ROUND_CURSORY );
337 $callersByDB[$masterName] = $callers;
341 if ( count( $callersByDB ) >= 2 ) {
342 $dbs = implode(
', ', array_keys( $callersByDB ) );
343 $msg =
"Multi-DB transaction [{$dbs}]:\n";
344 foreach ( $callersByDB as $db => $callers ) {
345 $msg .=
"$db: " . implode(
'; ', $callers ) .
"\n";
347 $this->queryLogger->info( $msg );
381 'timeout' => $this->cliMode ? 60 : 1,
382 'ifWritesSince' => null
385 if ( $opts[
'domain'] ===
false && isset( $opts[
'wiki'] ) ) {
386 $opts[
'domain'] = $opts[
'wiki'];
392 if ( $opts[
'cluster'] !==
false ) {
394 } elseif ( $opts[
'domain'] !==
false ) {
395 $lbs[] = $this->
getMainLB( $opts[
'domain'] );
408 $masterPositions = array_fill( 0, count( $lbs ),
false );
409 foreach ( $lbs as $i => $lb ) {
412 !$lb->hasMasterConnection() ||
414 !$lb->hasStreamingReplicaServers() ||
417 $opts[
'ifWritesSince'] &&
418 $lb->lastMasterChangeTimestamp() < $opts[
'ifWritesSince']
424 $masterPositions[$i] = $lb->getMasterPos();
429 foreach ( $this->replicationWaitCallbacks as $callback ) {
434 foreach ( $lbs as $i => $lb ) {
435 if ( $masterPositions[$i] ) {
437 if ( !$lb->waitForAll( $masterPositions[$i], $opts[
'timeout'] ) ) {
438 $failed[] = $lb->getServerName( $lb->getWriterIndex() );
448 $this->replicationWaitCallbacks[$name] = $callback;
450 unset( $this->replicationWaitCallbacks[$name] );
456 $this->queryLogger->error(
457 __METHOD__ .
": $fname does not have outer scope",
458 [
'trace' => (
new RuntimeException() )->getTraceAsString() ]
468 if (
$ticket !== $this->ticket ) {
469 $this->perfLogger->error(
470 __METHOD__ .
": $fname does not have outer scope",
471 [
'trace' => (
new RuntimeException() )->getTraceAsString() ]
479 if ( $this->trxRoundId !==
false && $fname !== $this->trxRoundId ) {
480 $this->queryLogger->info(
"$fname: committing on behalf of {$this->trxRoundId}" );
483 $fnameEffective = $fname;
490 if ( $fnameEffective !== $fname ) {
494 return $waitSucceeded;
509 if ( $this->chronProt ) {
516 'ip' => $this->requestInfo[
'IPAddress'],
517 'agent' => $this->requestInfo[
'UserAgent'],
518 'clientId' => $this->requestInfo[
'ChronologyClientId'] ?: null
520 $this->requestInfo[
'ChronologyPositionIndex'],
523 $this->chronProt->
setLogger( $this->replLogger );
525 if ( $this->cliMode ) {
526 $this->chronProt->setEnabled(
false );
527 } elseif ( $this->requestInfo[
'ChronologyProtection'] ===
'false' ) {
530 $this->chronProt->setWaitEnabled(
false );
533 $this->chronProt->setEnabled(
false );
534 $this->replLogger->info(
'Cannot use ChronologyProtector with EmptyBagOStuff' );
537 $this->replLogger->debug(
538 __METHOD__ .
': request info ' .
539 json_encode( $this->requestInfo, JSON_PRETTY_PRINT )
562 $unsavedPositions = $cp->
shutdown( $workCallback, $mode, $cpIndex );
563 if ( $unsavedPositions && $workCallback ) {
573 if ( isset( $unsavedPositions[$masterName] ) ) {
574 $lb->
waitForAll( $unsavedPositions[$masterName] );
586 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
587 $initStage = ILoadBalancer::STAGE_POSTCOMMIT_CALLBACKS;
588 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
589 $initStage = ILoadBalancer::STAGE_POSTROLLBACK_CALLBACKS;
616 'roundStage' => $initStage,
625 if ( $this->trxRoundId !==
false ) {
634 $this->tableAliases = $aliases;
638 $this->indexAliases = $aliases;
643 $this->localDomain->getDatabase(),
644 $this->localDomain->getSchema(),
665 $scope = ScopedCallback::newScopedIgnoreUserAbort();
680 if ( !$usedCluster ) {
684 return strpos( $url,
'?' ) ===
false ?
"$url?cpPosIndex=$index" :
"$url&cpPosIndex=$index";
699 return "$index@$time#$clientId";
709 static $placeholder = [
'index' =>
null,
'clientId' => null ];
711 if ( !preg_match(
'/^(\d+)@(\d+)#([0-9a-f]{32})$/', $value, $m ) ) {
718 } elseif ( isset( $m[2] ) && $m[2] !==
'' && (
int)$m[2] < $minTimestamp ) {
722 $clientId = ( isset( $m[3] ) && $m[3] !==
'' ) ? $m[3] :
null;
724 return [
'index' => $index,
'clientId' => $clientId ];
728 if ( $this->chronProt ) {
729 throw new LogicException(
'ChronologyProtector already initialized' );
747 if ( $this->trxRoundStage !== $stage ) {
750 "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')"
Class representing a cache/ephemeral data store.
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Class to handle database/schema/prefix specifications for IDatabase.
static newFromId( $domain)