27use Psr\Log\LoggerAwareInterface;
28use Psr\Log\LoggerInterface;
29use Psr\Log\NullLogger;
30use Wikimedia\WaitConditionLoop;
196 ?
int $clientPosIndex,
201 if ( isset( $client[
'clientId'] ) ) {
202 $this->clientId = $client[
'clientId'];
204 $this->clientId = ( $secret !=
'' )
205 ? hash_hmac(
'md5', $client[
'ip'] .
"\n" . $client[
'agent'], $secret )
206 : md5( $client[
'ip'] .
"\n" . $client[
'agent'] );
209 $this->waitForPosIndex = $clientPosIndex;
211 $this->clientLogInfo = [
212 'clientIP' => $client[
'ip'],
213 'clientAgent' => $client[
'agent'],
214 'clientId' => $client[
'clientId'] ?? null
217 $this->logger =
new NullLogger();
245 $this->positionWaitsEnabled =
$enabled;
262 if ( !$this->enabled || !$this->positionWaitsEnabled ) {
271 $this->logger->debug( __METHOD__ .
": $cluster ($masterName) position is '$pos'" );
274 $this->logger->debug( __METHOD__ .
": $cluster ($masterName) has no position" );
300 $this->logger->debug( __METHOD__ .
": $cluster ($masterName) position now '$pos'" );
301 $this->shutdownPositionsByMaster[$masterName] = $pos;
302 $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
304 $this->logger->debug( __METHOD__ .
": $cluster ($masterName) position unknown" );
305 $this->shutdownTimestampsByCluster[$cluster] = $this->
getCurrentTime();
308 $this->logger->debug( __METHOD__ .
": $cluster ($masterName) has no replication" );
309 $this->shutdownTimestampsByCluster[$cluster] = $this->
getCurrentTime();
322 if ( !$this->enabled ) {
326 if ( !$this->shutdownTimestampsByCluster ) {
327 $this->logger->debug( __METHOD__ .
": no primary positions/timestamps to save" );
332 $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
334 $ok = $this->store->set(
337 $this->store->get( $this->key ),
338 $this->shutdownPositionsByMaster,
339 $this->shutdownTimestampsByCluster,
342 self::POSITION_STORE_TTL
349 $clusterList = implode(
', ', array_keys( $this->shutdownTimestampsByCluster ) );
352 $bouncedPositions = [];
353 $this->logger->debug(
354 __METHOD__ .
": saved primary positions/timestamp for DB cluster(s) $clusterList"
358 $clientPosIndex =
null;
361 $this->logger->warning(
362 __METHOD__ .
": failed to save primary positions for DB cluster(s) $clusterList"
366 return $bouncedPositions;
379 if ( !$this->enabled ) {
386 $timestamp = $timestampsByCluster[$cluster] ??
null;
387 if ( $timestamp ===
null ) {
388 $recentTouchTimestamp =
false;
389 } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
394 $recentTouchTimestamp =
false;
395 $this->logger->debug( __METHOD__ .
": old timestamp ($timestamp) for $cluster" );
397 $recentTouchTimestamp = $timestamp;
398 $this->logger->debug( __METHOD__ .
": recent timestamp ($timestamp) for $cluster" );
401 return $recentTouchTimestamp;
428 if ( $this->startupTimestamp !==
null ) {
433 $this->logger->debug(
435 ": client ID is {$this->clientId}; key is {$this->key}"
443 if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
445 $indexReached =
null;
446 $loop =
new WaitConditionLoop(
447 function () use ( &$data, &$indexReached ) {
448 $data = $this->store->get( $this->key );
449 if ( !is_array( $data ) ) {
450 return WaitConditionLoop::CONDITION_CONTINUE;
451 } elseif ( !isset( $data[self::FLD_WRITE_INDEX] ) ) {
452 return WaitConditionLoop::CONDITION_REACHED;
454 $indexReached = max( $data[self::FLD_WRITE_INDEX], $indexReached );
456 return ( $data[self::FLD_WRITE_INDEX] >= $this->waitForPosIndex )
457 ? WaitConditionLoop::CONDITION_REACHED
458 : WaitConditionLoop::CONDITION_CONTINUE;
462 $result = $loop->invoke();
463 $waitedMs = $loop->getLastWaitTime() * 1e3;
465 if ( $result == $loop::CONDITION_REACHED ) {
466 $this->logger->debug(
467 __METHOD__ .
": expected and found position index {cpPosIndex}.",
469 'cpPosIndex' => $this->waitForPosIndex,
470 'waitTimeMs' => $waitedMs
471 ] + $this->clientLogInfo
474 $this->logger->warning(
475 __METHOD__ .
": expected but failed to find position index {cpPosIndex}.",
477 'cpPosIndex' => $this->waitForPosIndex,
478 'indexReached' => $indexReached,
479 'waitTimeMs' => $waitedMs
480 ] + $this->clientLogInfo
484 $data = $this->store->get( $this->key );
486 if ( $indexReached ) {
487 $this->logger->debug(
488 __METHOD__ .
": found position/timestamp data with index {indexReached}.",
489 [
'indexReached' => $indexReached ] + $this->clientLogInfo
509 array $shutdownPositions,
510 array $shutdownTimestamps,
511 ?
int &$clientPosIndex =
null
516 foreach ( $shutdownPositions as $masterName => $pos ) {
518 !isset( $mergedPositions[$masterName] ) ||
519 !( $mergedPositions[$masterName] instanceof
DBPrimaryPos ) ||
520 $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
522 $mergedPositions[$masterName] = $pos;
529 foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
531 !isset( $mergedTimestamps[$cluster] ) ||
532 $timestamp > $mergedTimestamps[$cluster]
534 $mergedTimestamps[$cluster] = $timestamp;
541 self::FLD_POSITIONS => $mergedPositions,
542 self::FLD_TIMESTAMPS => $mergedTimestamps,
543 self::FLD_WRITE_INDEX => $clientPosIndex
553 if ( $this->wallClockOverride ) {
557 $clockTime = (float)time();
560 return max( microtime(
true ), $clockTime );
569 $this->wallClockOverride =& $time;
Class representing a cache/ephemeral data store.
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.