MediaWiki  master
ChronologyProtector.php
Go to the documentation of this file.
1 <?php
24 namespace Wikimedia\Rdbms;
25 
26 use BagOStuff;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Psr\Log\NullLogger;
30 use Wikimedia\WaitConditionLoop;
31 
136 class ChronologyProtector implements LoggerAwareInterface {
138  protected $store;
140  protected $logger;
141 
143  protected $key;
145  protected $clientId;
147  protected $clientLogInfo;
149  protected $waitForPosIndex;
150 
152  protected $enabled = true;
154  protected $positionWaitsEnabled = true;
156  protected $startupTimestamp;
157 
166 
169 
171  public const POSITION_COOKIE_TTL = 10;
173  private const POSITION_STORE_TTL = 60;
175  private const POSITION_INDEX_WAIT_TIMEOUT = 5;
176 
178  private const LOCK_TIMEOUT = 3;
180  private const LOCK_TTL = 6;
181 
182  private const FLD_POSITIONS = 'positions';
183  private const FLD_TIMESTAMPS = 'timestamps';
184  private const FLD_WRITE_INDEX = 'writeIndex';
185 
193  public function __construct(
195  array $client,
196  ?int $clientPosIndex,
197  string $secret = ''
198  ) {
199  $this->store = $store;
200 
201  if ( isset( $client['clientId'] ) ) {
202  $this->clientId = $client['clientId'];
203  } else {
204  $this->clientId = ( $secret != '' )
205  ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
206  : md5( $client['ip'] . "\n" . $client['agent'] );
207  }
208  $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
209  $this->waitForPosIndex = $clientPosIndex;
210 
211  $this->clientLogInfo = [
212  'clientIP' => $client['ip'],
213  'clientAgent' => $client['agent'],
214  'clientId' => $client['clientId'] ?? null
215  ];
216 
217  $this->logger = new NullLogger();
218  }
219 
220  public function setLogger( LoggerInterface $logger ) {
221  $this->logger = $logger;
222  }
223 
228  public function getClientId() {
229  return $this->clientId;
230  }
231 
236  public function setEnabled( $enabled ) {
237  $this->enabled = $enabled;
238  }
239 
244  public function setWaitEnabled( $enabled ) {
245  $this->positionWaitsEnabled = $enabled;
246  }
247 
262  if ( !$this->enabled || !$this->positionWaitsEnabled ) {
263  return;
264  }
265 
266  $cluster = $lb->getClusterName();
267  $masterName = $lb->getServerName( $lb->getWriterIndex() );
268 
269  $pos = $this->getStartupSessionPositions()[$masterName] ?? null;
270  if ( $pos instanceof DBPrimaryPos ) {
271  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position is '$pos'" );
272  $lb->waitFor( $pos );
273  } else {
274  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no position" );
275  }
276  }
277 
290  if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
291  return;
292  }
293 
294  $cluster = $lb->getClusterName();
295  $masterName = $lb->getServerName( $lb->getWriterIndex() );
296 
297  if ( $lb->hasStreamingReplicaServers() ) {
298  $pos = $lb->getReplicaResumePos();
299  if ( $pos ) {
300  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
301  $this->shutdownPositionsByMaster[$masterName] = $pos;
302  $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
303  } else {
304  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
305  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
306  }
307  } else {
308  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
309  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
310  }
311  }
312 
321  public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
322  if ( !$this->enabled ) {
323  return [];
324  }
325 
326  if ( !$this->shutdownTimestampsByCluster ) {
327  $this->logger->debug( __METHOD__ . ": no primary positions/timestamps to save" );
328 
329  return [];
330  }
331 
332  $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
333  if ( $scopeLock ) {
334  $ok = $this->store->set(
335  $this->key,
336  $this->mergePositions(
337  $this->store->get( $this->key ),
338  $this->shutdownPositionsByMaster,
339  $this->shutdownTimestampsByCluster,
340  $clientPosIndex
341  ),
342  self::POSITION_STORE_TTL
343  );
344  unset( $scopeLock );
345  } else {
346  $ok = false;
347  }
348 
349  $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
350 
351  if ( $ok ) {
352  $bouncedPositions = [];
353  $this->logger->debug(
354  __METHOD__ . ": saved primary positions/timestamp for DB cluster(s) $clusterList"
355  );
356 
357  } else {
358  $clientPosIndex = null; // nothing saved
359  $bouncedPositions = $this->shutdownPositionsByMaster;
360  // Raced out too many times or stash is down
361  $this->logger->warning(
362  __METHOD__ . ": failed to save primary positions for DB cluster(s) $clusterList"
363  );
364  }
365 
366  return $bouncedPositions;
367  }
368 
378  public function getTouched( ILoadBalancer $lb ) {
379  if ( !$this->enabled ) {
380  return false;
381  }
382 
383  $cluster = $lb->getClusterName();
384 
385  $timestampsByCluster = $this->getStartupSessionTimestamps();
386  $timestamp = $timestampsByCluster[$cluster] ?? null;
387  if ( $timestamp === null ) {
388  $recentTouchTimestamp = false;
389  } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
390  // If the position store is not replicated among datacenters and the cookie that
391  // sticks the client to the primary datacenter expires, then the touch timestamp
392  // will be found for requests in one datacenter but not others. For consistency,
393  // return false once the user is no longer routed to the primary datacenter.
394  $recentTouchTimestamp = false;
395  $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
396  } else {
397  $recentTouchTimestamp = $timestamp;
398  $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
399  }
400 
401  return $recentTouchTimestamp;
402  }
403 
407  protected function getStartupSessionPositions() {
408  $this->lazyStartup();
409 
411  }
412 
416  protected function getStartupSessionTimestamps() {
417  $this->lazyStartup();
418 
420  }
421 
427  protected function lazyStartup() {
428  if ( $this->startupTimestamp !== null ) {
429  return;
430  }
431 
432  $this->startupTimestamp = $this->getCurrentTime();
433  $this->logger->debug(
434  'ChronologyProtector using store {class}',
435  [ 'class' => get_class( $this->store ) ]
436  );
437  $this->logger->debug(
438  __METHOD__ .
439  ": client ID is {$this->clientId}; key is {$this->key}"
440  );
441 
442  // If there is an expectation to see primary positions from a certain write
443  // index or higher, then block until it appears, or until a timeout is reached.
444  // Since the write index restarts each time the key is created, it is possible that
445  // a lagged store has a matching key write index. However, in that case, it should
446  // already be expired and thus treated as non-existing, maintaining correctness.
447  if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
448  $data = null;
449  $indexReached = null; // highest index reached in the position store
450  $loop = new WaitConditionLoop(
451  function () use ( &$data, &$indexReached ) {
452  $data = $this->store->get( $this->key );
453  if ( !is_array( $data ) ) {
454  return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
455  } elseif ( !isset( $data[self::FLD_WRITE_INDEX] ) ) {
456  return WaitConditionLoop::CONDITION_REACHED; // b/c
457  }
458  $indexReached = max( $data[self::FLD_WRITE_INDEX], $indexReached );
459 
460  return ( $data[self::FLD_WRITE_INDEX] >= $this->waitForPosIndex )
461  ? WaitConditionLoop::CONDITION_REACHED
462  : WaitConditionLoop::CONDITION_CONTINUE;
463  },
465  );
466  $result = $loop->invoke();
467  $waitedMs = $loop->getLastWaitTime() * 1e3;
468 
469  if ( $result == $loop::CONDITION_REACHED ) {
470  $this->logger->debug(
471  __METHOD__ . ": expected and found position index {cpPosIndex}.",
472  [
473  'cpPosIndex' => $this->waitForPosIndex,
474  'waitTimeMs' => $waitedMs
475  ] + $this->clientLogInfo
476  );
477  } else {
478  $this->logger->warning(
479  __METHOD__ . ": expected but failed to find position index {cpPosIndex}.",
480  [
481  'cpPosIndex' => $this->waitForPosIndex,
482  'indexReached' => $indexReached,
483  'waitTimeMs' => $waitedMs
484  ] + $this->clientLogInfo
485  );
486  }
487  } else {
488  $data = $this->store->get( $this->key );
489  $indexReached = $data[self::FLD_WRITE_INDEX] ?? null;
490  if ( $indexReached ) {
491  $this->logger->debug(
492  __METHOD__ . ": found position/timestamp data with index {indexReached}.",
493  [ 'indexReached' => $indexReached ] + $this->clientLogInfo
494  );
495  }
496  }
497 
498  $this->startupPositionsByMaster = $data ? $data[self::FLD_POSITIONS] : [];
499  $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
500  }
501 
511  protected function mergePositions(
512  $storedValue,
513  array $shutdownPositions,
514  array $shutdownTimestamps,
515  ?int &$clientPosIndex = null
516  ) {
518  $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
519  // Use the newest positions for each DB primary
520  foreach ( $shutdownPositions as $masterName => $pos ) {
521  if (
522  !isset( $mergedPositions[$masterName] ) ||
523  !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) ||
524  $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
525  ) {
526  $mergedPositions[$masterName] = $pos;
527  }
528  }
529 
531  $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
532  // Use the newest touch timestamp for each DB primary
533  foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
534  if (
535  !isset( $mergedTimestamps[$cluster] ) ||
536  $timestamp > $mergedTimestamps[$cluster]
537  ) {
538  $mergedTimestamps[$cluster] = $timestamp;
539  }
540  }
541 
542  $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
543 
544  return [
545  self::FLD_POSITIONS => $mergedPositions,
546  self::FLD_TIMESTAMPS => $mergedTimestamps,
547  self::FLD_WRITE_INDEX => $clientPosIndex
548  ];
549  }
550 
556  protected function getCurrentTime() {
557  if ( $this->wallClockOverride ) {
559  }
560 
561  $clockTime = (float)time(); // call this first
562  // microtime() can severely drift from time() and the microtime() value of other threads.
563  // Instead of seeing the current time as being in the past, use the value of time().
564  return max( microtime( true ), $clockTime );
565  }
566 
572  public function setMockTime( &$time ) {
573  $this->wallClockOverride =& $time;
574  }
575 }
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:87
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.
Provide a given client with protection against visible database lag.
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
__construct(BagOStuff $store, array $client, ?int $clientPosIndex, string $secret='')
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
array< string, DBPrimaryPos > $startupPositionsByMaster
Map of (DB primary name => position)
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
array< string, DBPrimaryPos > $shutdownPositionsByMaster
Map of (DB primary name => position)
applySessionReplicationPosition(ILoadBalancer $lb)
Apply client "session consistency" replication position to a new ILoadBalancer.
string $clientId
Hash of client parameters.
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
const LOCK_TTL
Lock expiry to use for key updates.
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
string[] $clientLogInfo
Map of client information fields for logging.
const POSITION_INDEX_WAIT_TIMEOUT
Max seconds to wait for positions write indexes to appear (e.g.
bool $positionWaitsEnabled
Whether waiting on DB servers to reach replication positions is enabled.
stageSessionReplicationPosition(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
const LOCK_TIMEOUT
Lock timeout to use for key updates.
const POSITION_STORE_TTL
Seconds to store replication positions.
An object representing a primary or replica DB position in a replicated setup.
Database cluster connection, tracking, load balancing, and transaction manager interface.
getClusterName()
Get the logical name of the database cluster.
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
waitFor( $pos)
Set the primary position to reach before the next generic group DB handle query.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
getWriterIndex()
Get the specific server index of the primary server.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
getServerName( $i)
Get the readable name of the server with the specified index.