MediaWiki  master
ChronologyProtector.php
Go to the documentation of this file.
1 <?php
20 namespace Wikimedia\Rdbms;
21 
22 use BagOStuff;
23 use Psr\Log\LoggerAwareInterface;
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
26 
132 class ChronologyProtector implements LoggerAwareInterface {
134  protected $store;
136  protected $logger;
137 
139  protected $key;
141  protected $clientId;
143  protected $clientLogInfo;
145  protected $waitForPosIndex;
146 
148  protected $enabled = true;
150  protected $positionWaitsEnabled = true;
152  protected $startupTimestamp;
153 
162 
164  private $wallClockOverride;
165 
184  private $hasImplicitClientId = false;
185 
187  public const POSITION_COOKIE_TTL = 10;
189  private const POSITION_STORE_TTL = 60;
190 
192  private const LOCK_TIMEOUT = 3;
194  private const LOCK_TTL = 6;
195 
196  private const FLD_POSITIONS = 'positions';
197  private const FLD_TIMESTAMPS = 'timestamps';
198  private const FLD_WRITE_INDEX = 'writeIndex';
199 
207  public function __construct(
209  array $client,
210  ?int $clientPosIndex,
211  string $secret = ''
212  ) {
213  $this->store = $store;
214 
215  if ( isset( $client['clientId'] ) ) {
216  $this->clientId = $client['clientId'];
217  } else {
218  $this->hasImplicitClientId = true;
219  $this->clientId = ( $secret != '' )
220  ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
221  : md5( $client['ip'] . "\n" . $client['agent'] );
222  }
223  $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v4' );
224  $this->waitForPosIndex = $clientPosIndex;
225 
226  $this->clientLogInfo = [
227  'clientIP' => $client['ip'],
228  'clientAgent' => $client['agent'],
229  'clientId' => $client['clientId'] ?? null
230  ];
231 
232  $this->logger = new NullLogger();
233  }
234 
235  public function setLogger( LoggerInterface $logger ) {
236  $this->logger = $logger;
237  }
238 
243  public function getClientId() {
244  return $this->clientId;
245  }
246 
251  public function setEnabled( $enabled ) {
252  $this->enabled = $enabled;
253  }
254 
259  public function setWaitEnabled( $enabled ) {
260  $this->positionWaitsEnabled = $enabled;
261  }
262 
276  public function yieldSessionPrimaryPos( ILoadBalancer $lb ) {
277  if ( !$this->enabled || !$this->positionWaitsEnabled ) {
278  return null;
279  }
280 
281  $cluster = $lb->getClusterName();
282  $primaryName = $lb->getServerName( $lb->getWriterIndex() );
283 
284  $pos = $this->getStartupSessionPositions()[$primaryName] ?? null;
285  if ( $pos instanceof DBPrimaryPos ) {
286  $this->logger->debug( __METHOD__ . ": $cluster ($primaryName) position is '$pos'" );
287  } else {
288  $this->logger->debug( __METHOD__ . ": $cluster ($primaryName) has no position" );
289  }
290 
291  return $pos;
292  }
293 
305  public function stageSessionPrimaryPos( ILoadBalancer $lb ) {
306  if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
307  return;
308  }
309 
310  $cluster = $lb->getClusterName();
311  $masterName = $lb->getServerName( $lb->getWriterIndex() );
312 
313  if ( $lb->hasStreamingReplicaServers() ) {
314  $pos = $lb->getReplicaResumePos();
315  if ( $pos ) {
316  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
317  $this->shutdownPositionsByPrimary[$masterName] = $pos;
318  $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
319  } else {
320  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
321  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
322  }
323  } else {
324  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
325  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
326  }
327  }
328 
337  public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
338  if ( !$this->enabled ) {
339  return [];
340  }
341 
342  if ( !$this->shutdownTimestampsByCluster ) {
343  $this->logger->debug( __METHOD__ . ": no primary positions/timestamps to save" );
344 
345  return [];
346  }
347 
348  $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
349  if ( $scopeLock ) {
350  $positions = $this->mergePositions(
351  $this->unmarshalPositions( $this->store->get( $this->key ) ),
352  $this->shutdownPositionsByPrimary,
353  $this->shutdownTimestampsByCluster,
354  $clientPosIndex
355  );
356 
357  $ok = $this->store->set(
358  $this->key,
359  $this->marshalPositions( $positions ),
360  self::POSITION_STORE_TTL
361  );
362  unset( $scopeLock );
363  } else {
364  $ok = false;
365  }
366 
367  $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
368 
369  if ( $ok ) {
370  $bouncedPositions = [];
371  $this->logger->debug(
372  __METHOD__ . ": saved primary positions/timestamp for DB cluster(s) $clusterList"
373  );
374 
375  } else {
376  $clientPosIndex = null; // nothing saved
377  $bouncedPositions = $this->shutdownPositionsByPrimary;
378  // Raced out too many times or stash is down
379  $this->logger->warning(
380  __METHOD__ . ": failed to save primary positions for DB cluster(s) $clusterList"
381  );
382  }
383 
384  return $bouncedPositions;
385  }
386 
396  public function getTouched( ILoadBalancer $lb ) {
397  if ( !$this->enabled ) {
398  return false;
399  }
400 
401  $cluster = $lb->getClusterName();
402 
403  $timestampsByCluster = $this->getStartupSessionTimestamps();
404  $timestamp = $timestampsByCluster[$cluster] ?? null;
405  if ( $timestamp === null ) {
406  $recentTouchTimestamp = false;
407  } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
408  // If the position store is not replicated among datacenters and the cookie that
409  // sticks the client to the primary datacenter expires, then the touch timestamp
410  // will be found for requests in one datacenter but not others. For consistency,
411  // return false once the user is no longer routed to the primary datacenter.
412  $recentTouchTimestamp = false;
413  $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
414  } else {
415  $recentTouchTimestamp = $timestamp;
416  $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
417  }
418 
419  return $recentTouchTimestamp;
420  }
421 
425  protected function getStartupSessionPositions() {
426  $this->lazyStartup();
427 
429  }
430 
434  protected function getStartupSessionTimestamps() {
435  $this->lazyStartup();
436 
438  }
439 
445  protected function lazyStartup() {
446  if ( $this->startupTimestamp !== null ) {
447  return;
448  }
449 
450  $this->startupTimestamp = $this->getCurrentTime();
451  $this->logger->debug( 'ChronologyProtector using store ' . get_class( $this->store ) );
452  $this->logger->debug( "ChronologyProtector fetching positions for {$this->clientId}" );
453 
454  $data = $this->unmarshalPositions( $this->store->get( $this->key ) );
455 
456  $this->startupPositionsByPrimary = $data ? $data[self::FLD_POSITIONS] : [];
457  $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
458 
459  // When a stored array expires and is re-created under the same (deterministic) key,
460  // the array value naturally starts again from index zero. As such, it is possible
461  // that if certain store writes were lost (e.g. store down), that we unintentionally
462  // point to an offset in an older incarnation of the array.
463  // We don't try to detect or do something about this because:
464  // 1. Waiting for an older offset is harmless and generally no-ops.
465  // 2. The older value will have expired by now and thus treated as non-existing,
466  // which means we wouldn't even "see" it here.
467  $indexReached = is_array( $data ) ? $data[self::FLD_WRITE_INDEX] : null;
468  if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
469  if ( $indexReached >= $this->waitForPosIndex ) {
470  $this->logger->debug( 'expected and found position index {cpPosIndex}', [
471  'cpPosIndex' => $this->waitForPosIndex,
472  ] + $this->clientLogInfo );
473  } else {
474  $this->logger->warning( 'expected but failed to find position index {cpPosIndex}', [
475  'cpPosIndex' => $this->waitForPosIndex,
476  'indexReached' => $indexReached,
477  'exception' => new \RuntimeException(),
478  ] + $this->clientLogInfo );
479  }
480  } else {
481  if ( $indexReached ) {
482  $this->logger->debug( 'found position data with index {indexReached}', [
483  'indexReached' => $indexReached
484  ] + $this->clientLogInfo );
485  }
486  }
487 
488  if ( $indexReached && $this->hasImplicitClientId ) {
489  $isWithinPossibleCookieTTL = false;
490  foreach ( $this->startupTimestampsByCluster as $timestamp ) {
491  if ( ( $this->startupTimestamp - $timestamp ) < self::POSITION_COOKIE_TTL ) {
492  $isWithinPossibleCookieTTL = true;
493  break;
494  }
495  }
496  if ( $isWithinPossibleCookieTTL ) {
497  $this->logger->warning( 'found position data under a presumed clientId (T314434)', [
498  'indexReached' => $indexReached
499  ] + $this->clientLogInfo );
500  }
501  }
502  }
503 
513  protected function mergePositions(
514  $storedValue,
515  array $shutdownPositions,
516  array $shutdownTimestamps,
517  ?int &$clientPosIndex = null
518  ) {
520  $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
521  // Use the newest positions for each DB primary
522  foreach ( $shutdownPositions as $masterName => $pos ) {
523  if (
524  !isset( $mergedPositions[$masterName] ) ||
525  !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) ||
526  $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
527  ) {
528  $mergedPositions[$masterName] = $pos;
529  }
530  }
531 
533  $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
534  // Use the newest touch timestamp for each DB primary
535  foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
536  if (
537  !isset( $mergedTimestamps[$cluster] ) ||
538  $timestamp > $mergedTimestamps[$cluster]
539  ) {
540  $mergedTimestamps[$cluster] = $timestamp;
541  }
542  }
543 
544  $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
545 
546  return [
547  self::FLD_POSITIONS => $mergedPositions,
548  self::FLD_TIMESTAMPS => $mergedTimestamps,
549  self::FLD_WRITE_INDEX => $clientPosIndex
550  ];
551  }
552 
558  protected function getCurrentTime() {
559  if ( $this->wallClockOverride ) {
560  return $this->wallClockOverride;
561  }
562 
563  $clockTime = (float)time(); // call this first
564  // microtime() can severely drift from time() and the microtime() value of other threads.
565  // Instead of seeing the current time as being in the past, use the value of time().
566  return max( microtime( true ), $clockTime );
567  }
568 
574  public function setMockTime( &$time ) {
575  $this->wallClockOverride =& $time;
576  }
577 
578  private function marshalPositions( array $positions ) {
579  foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
580  $positions[ self::FLD_POSITIONS ][ $key ] = $pos->toArray();
581  }
582 
583  return $positions;
584  }
585 
590  private function unmarshalPositions( $positions ) {
591  if ( !$positions ) {
592  return $positions;
593  }
594 
595  foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
596  $class = $pos[ '_type_' ];
597  $positions[ self::FLD_POSITIONS ][ $key ] = $class::newFromArray( $pos );
598  }
599 
600  return $positions;
601  }
602 }
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:85
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.
Provide a given client with protection against visible database lag.
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
array< string, DBPrimaryPos > $shutdownPositionsByPrimary
Map of (primary server name => position)
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
__construct(BagOStuff $store, array $client, ?int $clientPosIndex, string $secret='')
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
array< string, DBPrimaryPos > $startupPositionsByPrimary
Map of (primary server name => position)
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
yieldSessionPrimaryPos(ILoadBalancer $lb)
Yield client "session consistency" replication position for a new ILoadBalancer.
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
stageSessionPrimaryPos(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
string $clientId
Hash of client parameters.
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
string[] $clientLogInfo
Map of client information fields for logging.
bool $positionWaitsEnabled
Whether waiting on DB servers to reach replication positions is enabled.
An object representing a primary or replica DB position in a replicated setup.
This class is a delegate to ILBFactory for a given database cluster.
getClusterName()
Get the name of the overall cluster of database servers managing the dataset.
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
getWriterIndex()
Get the specific server index of the "writer server".
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
getServerName( $i)
Get the readable name of the server with the specified index.