MediaWiki  master
ChronologyProtector.php
Go to the documentation of this file.
1 <?php
20 namespace Wikimedia\Rdbms;
21 
22 use BagOStuff;
23 use Psr\Log\LoggerAwareInterface;
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 
132 class ChronologyProtector implements LoggerAwareInterface {
134  protected $store;
136  protected $logger;
137 
139  protected $key;
141  protected $clientId;
143  protected $clientLogInfo;
145  protected $waitForPosIndex;
146 
148  protected $enabled = true;
150  protected $positionWaitsEnabled = true;
152  protected $startupTimestamp;
153 
162 
164  private $wallClockOverride;
165 
184  private $hasImplicitClientId = false;
185 
187  public const POSITION_COOKIE_TTL = 10;
189  private const POSITION_STORE_TTL = 60;
190 
192  private const LOCK_TIMEOUT = 3;
194  private const LOCK_TTL = 6;
195 
196  private const FLD_POSITIONS = 'positions';
197  private const FLD_TIMESTAMPS = 'timestamps';
198  private const FLD_WRITE_INDEX = 'writeIndex';
199 
207  public function __construct(
209  array $client,
210  ?int $clientPosIndex,
211  string $secret = ''
212  ) {
213  $this->store = $store;
214 
215  if ( isset( $client['clientId'] ) ) {
216  $this->clientId = $client['clientId'];
217  } else {
218  $this->hasImplicitClientId = true;
219  $this->clientId = ( $secret != '' )
220  ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
221  : md5( $client['ip'] . "\n" . $client['agent'] );
222  }
223  $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v4' );
224  $this->waitForPosIndex = $clientPosIndex;
225 
226  $this->clientLogInfo = [
227  'clientIP' => $client['ip'],
228  'clientAgent' => $client['agent'],
229  'clientId' => $client['clientId'] ?? null
230  ];
231 
232  $this->logger = new NullLogger();
233  }
234 
235  public function setLogger( LoggerInterface $logger ) {
236  $this->logger = $logger;
237  }
238 
243  public function getClientId() {
244  return $this->clientId;
245  }
246 
251  public function setEnabled( $enabled ) {
252  $this->enabled = $enabled;
253  }
254 
259  public function setWaitEnabled( $enabled ) {
260  $this->positionWaitsEnabled = $enabled;
261  }
262 
277  if ( !$this->enabled || !$this->positionWaitsEnabled ) {
278  return;
279  }
280 
281  $cluster = $lb->getClusterName();
282  $primaryName = $lb->getServerName( $lb->getWriterIndex() );
283 
284  $pos = $this->getStartupSessionPositions()[$primaryName] ?? null;
285  if ( $pos instanceof DBPrimaryPos ) {
286  $this->logger->debug( __METHOD__ . ": $cluster ($primaryName) position is '$pos'" );
287  $lb->waitFor( $pos );
288  } else {
289  $this->logger->debug( __METHOD__ . ": $cluster ($primaryName) has no position" );
290  }
291  }
292 
305  if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
306  return;
307  }
308 
309  $cluster = $lb->getClusterName();
310  $masterName = $lb->getServerName( $lb->getWriterIndex() );
311 
312  if ( $lb->hasStreamingReplicaServers() ) {
313  $pos = $lb->getReplicaResumePos();
314  if ( $pos ) {
315  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
316  $this->shutdownPositionsByPrimary[$masterName] = $pos;
317  $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
318  } else {
319  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
320  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
321  }
322  } else {
323  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
324  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
325  }
326  }
327 
336  public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
337  if ( !$this->enabled ) {
338  return [];
339  }
340 
341  if ( !$this->shutdownTimestampsByCluster ) {
342  $this->logger->debug( __METHOD__ . ": no primary positions/timestamps to save" );
343 
344  return [];
345  }
346 
347  $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
348  if ( $scopeLock ) {
349  $positions = $this->mergePositions(
350  $this->unmarshalPositions( $this->store->get( $this->key ) ),
351  $this->shutdownPositionsByPrimary,
352  $this->shutdownTimestampsByCluster,
353  $clientPosIndex
354  );
355 
356  $ok = $this->store->set(
357  $this->key,
358  $this->marshalPositions( $positions ),
359  self::POSITION_STORE_TTL
360  );
361  unset( $scopeLock );
362  } else {
363  $ok = false;
364  }
365 
366  $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
367 
368  if ( $ok ) {
369  $bouncedPositions = [];
370  $this->logger->debug(
371  __METHOD__ . ": saved primary positions/timestamp for DB cluster(s) $clusterList"
372  );
373 
374  } else {
375  $clientPosIndex = null; // nothing saved
376  $bouncedPositions = $this->shutdownPositionsByPrimary;
377  // Raced out too many times or stash is down
378  $this->logger->warning(
379  __METHOD__ . ": failed to save primary positions for DB cluster(s) $clusterList"
380  );
381  }
382 
383  return $bouncedPositions;
384  }
385 
395  public function getTouched( ILoadBalancer $lb ) {
396  if ( !$this->enabled ) {
397  return false;
398  }
399 
400  $cluster = $lb->getClusterName();
401 
402  $timestampsByCluster = $this->getStartupSessionTimestamps();
403  $timestamp = $timestampsByCluster[$cluster] ?? null;
404  if ( $timestamp === null ) {
405  $recentTouchTimestamp = false;
406  } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
407  // If the position store is not replicated among datacenters and the cookie that
408  // sticks the client to the primary datacenter expires, then the touch timestamp
409  // will be found for requests in one datacenter but not others. For consistency,
410  // return false once the user is no longer routed to the primary datacenter.
411  $recentTouchTimestamp = false;
412  $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
413  } else {
414  $recentTouchTimestamp = $timestamp;
415  $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
416  }
417 
418  return $recentTouchTimestamp;
419  }
420 
424  protected function getStartupSessionPositions() {
425  $this->lazyStartup();
426 
428  }
429 
433  protected function getStartupSessionTimestamps() {
434  $this->lazyStartup();
435 
437  }
438 
444  protected function lazyStartup() {
445  if ( $this->startupTimestamp !== null ) {
446  return;
447  }
448 
449  $this->startupTimestamp = $this->getCurrentTime();
450  $this->logger->debug( 'ChronologyProtector using store ' . get_class( $this->store ) );
451  $this->logger->debug( "ChronologyProtector fetching positions for {$this->clientId}" );
452 
453  $data = $this->unmarshalPositions( $this->store->get( $this->key ) );
454 
455  $this->startupPositionsByPrimary = $data ? $data[self::FLD_POSITIONS] : [];
456  $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
457 
458  // When a stored array expires and is re-created under the same (deterministic) key,
459  // the array value naturally starts again from index zero. As such, it is possible
460  // that if certain store writes were lost (e.g. store down), that we unintentionally
461  // point to an offset in an older incarnation of the array.
462  // We don't try to detect or do something about this because:
463  // 1. Waiting for an older offset is harmless and generally no-ops.
464  // 2. The older value will have expired by now and thus treated as non-existing,
465  // which means we wouldn't even "see" it here.
466  $indexReached = is_array( $data ) ? $data[self::FLD_WRITE_INDEX] : null;
467  if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
468  if ( $indexReached >= $this->waitForPosIndex ) {
469  $this->logger->debug( 'expected and found position index {cpPosIndex}', [
470  'cpPosIndex' => $this->waitForPosIndex,
471  ] + $this->clientLogInfo );
472  } else {
473  $this->logger->warning( 'expected but failed to find position index {cpPosIndex}', [
474  'cpPosIndex' => $this->waitForPosIndex,
475  'indexReached' => $indexReached,
476  'exception' => new \RuntimeException(),
477  ] + $this->clientLogInfo );
478  }
479  } else {
480  if ( $indexReached ) {
481  $this->logger->debug( 'found position data with index {indexReached}', [
482  'indexReached' => $indexReached
483  ] + $this->clientLogInfo );
484  }
485  }
486 
487  if ( $indexReached && $this->hasImplicitClientId ) {
488  $isWithinPossibleCookieTTL = false;
489  foreach ( $this->startupTimestampsByCluster as $timestamp ) {
490  if ( ( $this->startupTimestamp - $timestamp ) < self::POSITION_COOKIE_TTL ) {
491  $isWithinPossibleCookieTTL = true;
492  break;
493  }
494  }
495  if ( $isWithinPossibleCookieTTL ) {
496  $this->logger->warning( 'found position data under a presumed clientId (T314434)', [
497  'indexReached' => $indexReached
498  ] + $this->clientLogInfo );
499  }
500  }
501  }
502 
512  protected function mergePositions(
513  $storedValue,
514  array $shutdownPositions,
515  array $shutdownTimestamps,
516  ?int &$clientPosIndex = null
517  ) {
519  $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
520  // Use the newest positions for each DB primary
521  foreach ( $shutdownPositions as $masterName => $pos ) {
522  if (
523  !isset( $mergedPositions[$masterName] ) ||
524  !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) ||
525  $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
526  ) {
527  $mergedPositions[$masterName] = $pos;
528  }
529  }
530 
532  $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
533  // Use the newest touch timestamp for each DB primary
534  foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
535  if (
536  !isset( $mergedTimestamps[$cluster] ) ||
537  $timestamp > $mergedTimestamps[$cluster]
538  ) {
539  $mergedTimestamps[$cluster] = $timestamp;
540  }
541  }
542 
543  $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
544 
545  return [
546  self::FLD_POSITIONS => $mergedPositions,
547  self::FLD_TIMESTAMPS => $mergedTimestamps,
548  self::FLD_WRITE_INDEX => $clientPosIndex
549  ];
550  }
551 
557  protected function getCurrentTime() {
558  if ( $this->wallClockOverride ) {
559  return $this->wallClockOverride;
560  }
561 
562  $clockTime = (float)time(); // call this first
563  // microtime() can severely drift from time() and the microtime() value of other threads.
564  // Instead of seeing the current time as being in the past, use the value of time().
565  return max( microtime( true ), $clockTime );
566  }
567 
573  public function setMockTime( &$time ) {
574  $this->wallClockOverride =& $time;
575  }
576 
577  private function marshalPositions( array $positions ) {
578  foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
579  $positions[ self::FLD_POSITIONS ][ $key ] = $pos->toArray();
580  }
581 
582  return $positions;
583  }
584 
589  private function unmarshalPositions( $positions ) {
590  if ( !$positions ) {
591  return $positions;
592  }
593 
594  foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
595  $class = $pos[ '_type_' ];
596  $positions[ self::FLD_POSITIONS ][ $key ] = $class::newFromArray( $pos );
597  }
598 
599  return $positions;
600  }
601 }
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:85
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.
Provide a given client with protection against visible database lag.
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
array< string, DBPrimaryPos > $shutdownPositionsByPrimary
Map of (primary server name => position)
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
__construct(BagOStuff $store, array $client, ?int $clientPosIndex, string $secret='')
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
array< string, DBPrimaryPos > $startupPositionsByPrimary
Map of (primary server name => position)
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
applySessionReplicationPosition(ILoadBalancer $lb)
Apply client "session consistency" replication position to a new ILoadBalancer.
string $clientId
Hash of client parameters.
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
string[] $clientLogInfo
Map of client information fields for logging.
bool $positionWaitsEnabled
Whether waiting on DB servers to reach replication positions is enabled.
stageSessionReplicationPosition(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
An object representing a primary or replica DB position in a replicated setup.
Create and track the database connections and transactions for a given database cluster.
getClusterName()
Get the logical name of the database cluster.
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
waitFor( $pos)
Set the primary position to reach before the next generic group DB query.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
getWriterIndex()
Get the specific server index of the primary server.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
getServerName( $i)
Get the readable name of the server with the specified index.