23use Psr\Log\LoggerAwareInterface;
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
164 private $wallClockOverride;
169 private const POSITION_STORE_TTL = 60;
172 private const LOCK_TIMEOUT = 3;
174 private const LOCK_TTL = 6;
176 private const FLD_POSITIONS =
'positions';
177 private const FLD_TIMESTAMPS =
'timestamps';
178 private const FLD_WRITE_INDEX =
'writeIndex';
190 ?
int $clientPosIndex,
195 if ( isset( $client[
'clientId'] ) ) {
196 $this->clientId = $client[
'clientId'];
198 $this->clientId = ( $secret !=
'' )
199 ? hash_hmac(
'md5', $client[
'ip'] .
"\n" . $client[
'agent'], $secret )
200 : md5( $client[
'ip'] .
"\n" . $client[
'agent'] );
203 $this->waitForPosIndex = $clientPosIndex;
205 $this->clientLogInfo = [
206 'clientIP' => $client[
'ip'],
207 'clientAgent' => $client[
'agent'],
208 'clientId' => $client[
'clientId'] ?? null
211 $this->logger =
new NullLogger();
239 $this->positionWaitsEnabled =
$enabled;
256 if ( !$this->enabled || !$this->positionWaitsEnabled ) {
265 $this->logger->debug( __METHOD__ .
": $cluster ($primaryName) position is '$pos'" );
268 $this->logger->debug( __METHOD__ .
": $cluster ($primaryName) has no position" );
294 $this->logger->debug( __METHOD__ .
": $cluster ($masterName) position now '$pos'" );
295 $this->shutdownPositionsByPrimary[$masterName] = $pos;
296 $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
298 $this->logger->debug( __METHOD__ .
": $cluster ($masterName) position unknown" );
299 $this->shutdownTimestampsByCluster[$cluster] = $this->
getCurrentTime();
302 $this->logger->debug( __METHOD__ .
": $cluster ($masterName) has no replication" );
303 $this->shutdownTimestampsByCluster[$cluster] = $this->
getCurrentTime();
316 if ( !$this->enabled ) {
320 if ( !$this->shutdownTimestampsByCluster ) {
321 $this->logger->debug( __METHOD__ .
": no primary positions/timestamps to save" );
326 $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
329 $this->unmarshalPositions( $this->store->get( $this->key ) ),
330 $this->shutdownPositionsByPrimary,
331 $this->shutdownTimestampsByCluster,
335 $ok = $this->store->set(
337 $this->marshalPositions( $positions ),
338 self::POSITION_STORE_TTL
345 $clusterList = implode(
', ', array_keys( $this->shutdownTimestampsByCluster ) );
348 $bouncedPositions = [];
349 $this->logger->debug(
350 __METHOD__ .
": saved primary positions/timestamp for DB cluster(s) $clusterList"
354 $clientPosIndex =
null;
357 $this->logger->warning(
358 __METHOD__ .
": failed to save primary positions for DB cluster(s) $clusterList"
362 return $bouncedPositions;
375 if ( !$this->enabled ) {
382 $timestamp = $timestampsByCluster[$cluster] ??
null;
383 if ( $timestamp ===
null ) {
384 $recentTouchTimestamp =
false;
385 } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
390 $recentTouchTimestamp =
false;
391 $this->logger->debug( __METHOD__ .
": old timestamp ($timestamp) for $cluster" );
393 $recentTouchTimestamp = $timestamp;
394 $this->logger->debug( __METHOD__ .
": recent timestamp ($timestamp) for $cluster" );
397 return $recentTouchTimestamp;
424 if ( $this->startupTimestamp !==
null ) {
429 $this->logger->debug(
'ChronologyProtector using store ' . get_class( $this->store ) );
430 $this->logger->debug(
"ChronologyProtector fetching positions for {$this->clientId}" );
432 $data = $this->unmarshalPositions( $this->store->get( $this->key ) );
434 $this->startupPositionsByPrimary = $data ? $data[self::FLD_POSITIONS] : [];
435 $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
445 $indexReached = is_array( $data ) ? $data[self::FLD_WRITE_INDEX] :
null;
446 if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
447 if ( $indexReached >= $this->waitForPosIndex ) {
448 $this->logger->debug(
'expected and found position index {cpPosIndex}', [
449 'cpPosIndex' => $this->waitForPosIndex,
450 ] + $this->clientLogInfo );
452 $this->logger->warning(
'expected but failed to find position index {cpPosIndex}', [
453 'cpPosIndex' => $this->waitForPosIndex,
454 'indexReached' => $indexReached,
455 'exception' =>
new \RuntimeException(),
456 ] + $this->clientLogInfo );
459 if ( $indexReached ) {
460 $this->logger->debug(
'found position data with index {indexReached}', [
461 'indexReached' => $indexReached
462 ] + $this->clientLogInfo );
478 array $shutdownPositions,
479 array $shutdownTimestamps,
480 ?
int &$clientPosIndex =
null
483 $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
485 foreach ( $shutdownPositions as $masterName => $pos ) {
487 !isset( $mergedPositions[$masterName] ) ||
488 !( $mergedPositions[$masterName] instanceof
DBPrimaryPos ) ||
489 $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
491 $mergedPositions[$masterName] = $pos;
496 $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
498 foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
500 !isset( $mergedTimestamps[$cluster] ) ||
501 $timestamp > $mergedTimestamps[$cluster]
503 $mergedTimestamps[$cluster] = $timestamp;
507 $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
510 self::FLD_POSITIONS => $mergedPositions,
511 self::FLD_TIMESTAMPS => $mergedTimestamps,
512 self::FLD_WRITE_INDEX => $clientPosIndex
522 if ( $this->wallClockOverride ) {
523 return $this->wallClockOverride;
526 $clockTime = (float)time();
529 return max( microtime(
true ), $clockTime );
538 $this->wallClockOverride =& $time;
541 private function marshalPositions( array $positions ) {
542 foreach ( $positions[ self::FLD_POSITIONS ] as
$key => $pos ) {
543 $positions[ self::FLD_POSITIONS ][
$key ] = $pos->toArray();
553 private function unmarshalPositions( $positions ) {
558 foreach ( $positions[ self::FLD_POSITIONS ] as
$key => $pos ) {
559 $class = $pos[
'_type_' ];
560 $positions[ self::FLD_POSITIONS ][
$key ] = $class::newFromArray( $pos );
Class representing a cache/ephemeral data store.
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.