MediaWiki  master
ChronologyProtector.php
Go to the documentation of this file.
1 <?php
24 namespace Wikimedia\Rdbms;
25 
26 use BagOStuff;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Psr\Log\NullLogger;
30 use Wikimedia\WaitConditionLoop;
31 
136 class ChronologyProtector implements LoggerAwareInterface {
138  protected $store;
140  protected $logger;
141 
143  protected $key;
145  protected $clientId;
147  protected $clientLogInfo;
149  protected $waitForPosIndex;
150 
152  protected $enabled = true;
154  protected $positionWaitsEnabled = true;
156  protected $startupTimestamp;
157 
166 
169 
171  public const POSITION_COOKIE_TTL = 10;
173  private const POSITION_STORE_TTL = 60;
175  private const POSITION_INDEX_WAIT_TIMEOUT = 5;
176 
178  private const LOCK_TIMEOUT = 3;
180  private const LOCK_TTL = 6;
181 
182  private const FLD_POSITIONS = 'positions';
183  private const FLD_TIMESTAMPS = 'timestamps';
184  private const FLD_WRITE_INDEX = 'writeIndex';
185 
193  public function __construct(
195  array $client,
196  ?int $clientPosIndex,
197  string $secret = ''
198  ) {
199  $this->store = $store;
200 
201  if ( isset( $client['clientId'] ) ) {
202  $this->clientId = $client['clientId'];
203  } else {
204  $this->clientId = ( $secret != '' )
205  ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
206  : md5( $client['ip'] . "\n" . $client['agent'] );
207  }
208  $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
209  $this->waitForPosIndex = $clientPosIndex;
210 
211  $this->clientLogInfo = [
212  'clientIP' => $client['ip'],
213  'clientAgent' => $client['agent'],
214  'clientId' => $client['clientId'] ?? null
215  ];
216 
217  $this->logger = new NullLogger();
218  }
219 
220  public function setLogger( LoggerInterface $logger ) {
221  $this->logger = $logger;
222  }
223 
228  public function getClientId() {
229  return $this->clientId;
230  }
231 
236  public function setEnabled( $enabled ) {
237  $this->enabled = $enabled;
238  }
239 
244  public function setWaitEnabled( $enabled ) {
245  $this->positionWaitsEnabled = $enabled;
246  }
247 
262  if ( !$this->enabled || !$this->positionWaitsEnabled ) {
263  return;
264  }
265 
266  $cluster = $lb->getClusterName();
267  $masterName = $lb->getServerName( $lb->getWriterIndex() );
268 
269  $pos = $this->getStartupSessionPositions()[$masterName] ?? null;
270  if ( $pos instanceof DBMasterPos ) {
271  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position is '$pos'" );
272  $lb->waitFor( $pos );
273  } else {
274  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no position" );
275  }
276  }
277 
290  if ( !$this->enabled || !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
291  return;
292  }
293 
294  $cluster = $lb->getClusterName();
295  $masterName = $lb->getServerName( $lb->getWriterIndex() );
296 
297  if ( $lb->hasStreamingReplicaServers() ) {
298  $pos = $lb->getReplicaResumePos();
299  if ( $pos ) {
300  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
301  $this->shutdownPositionsByMaster[$masterName] = $pos;
302  $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
303  } else {
304  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
305  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
306  }
307  } else {
308  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
309  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
310  }
311  }
312 
321  public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
322  if ( !$this->enabled ) {
323  return [];
324  }
325 
326  if ( !$this->shutdownTimestampsByCluster ) {
327  $this->logger->debug( __METHOD__ . ": no master positions/timestamps to save" );
328 
329  return [];
330  }
331 
332  $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
333  if ( $scopeLock ) {
334  $ok = $this->store->set(
335  $this->key,
336  $this->mergePositions(
337  $this->store->get( $this->key ),
338  $this->shutdownPositionsByMaster,
339  $this->shutdownTimestampsByCluster,
340  $clientPosIndex
341  ),
342  self::POSITION_STORE_TTL
343  );
344  unset( $scopeLock );
345  } else {
346  $ok = false;
347  }
348 
349  $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
350 
351  if ( $ok ) {
352  $bouncedPositions = [];
353  $this->logger->debug(
354  __METHOD__ . ": saved master positions/timestamp for DB cluster(s) $clusterList"
355  );
356 
357  } else {
358  $clientPosIndex = null; // nothing saved
359  $bouncedPositions = $this->shutdownPositionsByMaster;
360  // Raced out too many times or stash is down
361  $this->logger->warning(
362  __METHOD__ . ": failed to save master positions for DB cluster(s) $clusterList"
363  );
364  }
365 
366  return $bouncedPositions;
367  }
368 
378  public function getTouched( ILoadBalancer $lb ) {
379  if ( !$this->enabled ) {
380  return false;
381  }
382 
383  $cluster = $lb->getClusterName();
384 
385  $timestampsByCluster = $this->getStartupSessionTimestamps();
386  $timestamp = $timestampsByCluster[$cluster] ?? null;
387  if ( $timestamp === null ) {
388  $recentTouchTimestamp = false;
389  } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
390  // If the position store is not replicated among datacenters and the cookie that
391  // sticks the client to the primary datacenter expires, then the touch timestamp
392  // will be found for requests in one datacenter but not others. For consistency,
393  // return false once the user is no longer routed to the primary datacenter.
394  $recentTouchTimestamp = false;
395  $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
396  } else {
397  $recentTouchTimestamp = $timestamp;
398  $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
399  }
400 
401  return $recentTouchTimestamp;
402  }
403 
407  protected function getStartupSessionPositions() {
408  $this->lazyStartup();
409 
411  }
412 
416  protected function getStartupSessionTimestamps() {
417  $this->lazyStartup();
418 
420  }
421 
427  protected function lazyStartup() {
428  if ( $this->startupTimestamp !== null ) {
429  return;
430  }
431 
432  $this->startupTimestamp = $this->getCurrentTime();
433  $this->logger->debug(
434  __METHOD__ .
435  ": client ID is {$this->clientId}; key is {$this->key}"
436  );
437 
438  // If there is an expectation to see master positions from a certain write
439  // index or higher, then block until it appears, or until a timeout is reached.
440  // Since the write index restarts each time the key is created, it is possible that
441  // a lagged store has a matching key write index. However, in that case, it should
442  // already be expired and thus treated as non-existing, maintaining correctness.
443  if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
444  $data = null;
445  $indexReached = null; // highest index reached in the position store
446  $loop = new WaitConditionLoop(
447  function () use ( &$data, &$indexReached ) {
448  $data = $this->store->get( $this->key );
449  if ( !is_array( $data ) ) {
450  return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
451  } elseif ( !isset( $data[self::FLD_WRITE_INDEX] ) ) {
452  return WaitConditionLoop::CONDITION_REACHED; // b/c
453  }
454  $indexReached = max( $data[self::FLD_WRITE_INDEX], $indexReached );
455 
456  return ( $data[self::FLD_WRITE_INDEX] >= $this->waitForPosIndex )
457  ? WaitConditionLoop::CONDITION_REACHED
458  : WaitConditionLoop::CONDITION_CONTINUE;
459  },
461  );
462  $result = $loop->invoke();
463  $waitedMs = $loop->getLastWaitTime() * 1e3;
464 
465  if ( $result == $loop::CONDITION_REACHED ) {
466  $this->logger->debug(
467  __METHOD__ . ": expected and found position index {cpPosIndex}.",
468  [
469  'cpPosIndex' => $this->waitForPosIndex,
470  'waitTimeMs' => $waitedMs
471  ] + $this->clientLogInfo
472  );
473  } else {
474  $this->logger->warning(
475  __METHOD__ . ": expected but failed to find position index {cpPosIndex}.",
476  [
477  'cpPosIndex' => $this->waitForPosIndex,
478  'indexReached' => $indexReached,
479  'waitTimeMs' => $waitedMs
480  ] + $this->clientLogInfo
481  );
482  }
483  } else {
484  $data = $this->store->get( $this->key );
485  $indexReached = $data[self::FLD_WRITE_INDEX] ?? null;
486  if ( $indexReached ) {
487  $this->logger->debug(
488  __METHOD__ . ": found position/timestamp data with index {indexReached}.",
489  [ 'indexReached' => $indexReached ] + $this->clientLogInfo
490  );
491  }
492  }
493 
494  $this->startupPositionsByMaster = $data ? $data[self::FLD_POSITIONS] : [];
495  $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
496  }
497 
507  protected function mergePositions(
508  $storedValue,
509  array $shutdownPositions,
510  array $shutdownTimestamps,
511  ?int &$clientPosIndex = null
512  ) {
514  $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
515  // Use the newest positions for each DB master
516  foreach ( $shutdownPositions as $masterName => $pos ) {
517  if (
518  !isset( $mergedPositions[$masterName] ) ||
519  !( $mergedPositions[$masterName] instanceof DBMasterPos ) ||
520  $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
521  ) {
522  $mergedPositions[$masterName] = $pos;
523  }
524  }
525 
527  $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
528  // Use the newest touch timestamp for each DB master
529  foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
530  if (
531  !isset( $mergedTimestamps[$cluster] ) ||
532  $timestamp > $mergedTimestamps[$cluster]
533  ) {
534  $mergedTimestamps[$cluster] = $timestamp;
535  }
536  }
537 
538  $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
539 
540  return [
541  self::FLD_POSITIONS => $mergedPositions,
542  self::FLD_TIMESTAMPS => $mergedTimestamps,
543  self::FLD_WRITE_INDEX => $clientPosIndex
544  ];
545  }
546 
552  protected function getCurrentTime() {
553  if ( $this->wallClockOverride ) {
555  }
556 
557  $clockTime = (float)time(); // call this first
558  // microtime() can severely drift from time() and the microtime() value of other threads.
559  // Instead of seeing the current time as being in the past, use the value of time().
560  return max( microtime( true ), $clockTime );
561  }
562 
568  public function setMockTime( &$time ) {
569  $this->wallClockOverride =& $time;
570  }
571 }
Wikimedia\Rdbms\ChronologyProtector\$positionWaitsEnabled
bool $positionWaitsEnabled
Whether waiting on DB servers to reach replication positions is enabled.
Definition: ChronologyProtector.php:154
Wikimedia\Rdbms\ILoadBalancer\getClusterName
getClusterName()
Get the logical name of the database cluster.
Wikimedia\Rdbms\ChronologyProtector\setMockTime
setMockTime(&$time)
Definition: ChronologyProtector.php:568
Wikimedia\Rdbms\ChronologyProtector\POSITION_INDEX_WAIT_TIMEOUT
const POSITION_INDEX_WAIT_TIMEOUT
Max seconds to wait for positions write indexes to appear (e.g.
Definition: ChronologyProtector.php:175
Wikimedia\Rdbms\ChronologyProtector\FLD_POSITIONS
const FLD_POSITIONS
Definition: ChronologyProtector.php:182
Wikimedia\Rdbms\ChronologyProtector\getTouched
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
Definition: ChronologyProtector.php:378
Wikimedia\Rdbms\ChronologyProtector\$store
BagOStuff $store
Definition: ChronologyProtector.php:138
Wikimedia\Rdbms\ChronologyProtector\FLD_WRITE_INDEX
const FLD_WRITE_INDEX
Definition: ChronologyProtector.php:184
Wikimedia\Rdbms\ChronologyProtector\$shutdownPositionsByMaster
array< string, DBMasterPos > $shutdownPositionsByMaster
Map of (DB master name => position)
Definition: ChronologyProtector.php:161
Wikimedia\Rdbms
Definition: ChronologyProtector.php:24
Wikimedia\Rdbms\DBMasterPos
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:14
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:86
Wikimedia\Rdbms\ChronologyProtector\getStartupSessionTimestamps
getStartupSessionTimestamps()
Definition: ChronologyProtector.php:416
Wikimedia\Rdbms\ChronologyProtector\$logger
LoggerInterface $logger
Definition: ChronologyProtector.php:140
Wikimedia\Rdbms\ChronologyProtector\POSITION_COOKIE_TTL
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
Definition: ChronologyProtector.php:171
Wikimedia\Rdbms\ChronologyProtector\mergePositions
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
Definition: ChronologyProtector.php:507
Wikimedia\Rdbms\ChronologyProtector\applySessionReplicationPosition
applySessionReplicationPosition(ILoadBalancer $lb)
Apply client "session consistency" replication position to a new ILoadBalancer.
Definition: ChronologyProtector.php:261
Wikimedia\Rdbms\ChronologyProtector\setLogger
setLogger(LoggerInterface $logger)
Definition: ChronologyProtector.php:220
Wikimedia\Rdbms\ChronologyProtector\persistSessionReplicationPositions
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
Definition: ChronologyProtector.php:321
Wikimedia\Rdbms\ChronologyProtector\$wallClockOverride
float null $wallClockOverride
Definition: ChronologyProtector.php:168
Wikimedia\Rdbms\ChronologyProtector\FLD_TIMESTAMPS
const FLD_TIMESTAMPS
Definition: ChronologyProtector.php:183
Wikimedia\Rdbms\ChronologyProtector\$clientLogInfo
string[] $clientLogInfo
Map of client information fields for logging.
Definition: ChronologyProtector.php:147
Wikimedia\Rdbms\ILoadBalancer\getServerName
getServerName( $i)
Get the readable name of the server with the specified index.
Wikimedia\Rdbms\ChronologyProtector\$waitForPosIndex
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
Definition: ChronologyProtector.php:149
Wikimedia\Rdbms\ChronologyProtector\setWaitEnabled
setWaitEnabled( $enabled)
Definition: ChronologyProtector.php:244
Wikimedia\Rdbms\ILoadBalancer\getWriterIndex
getWriterIndex()
Get the specific server index of the primary server.
Wikimedia\Rdbms\ILoadBalancer\waitFor
waitFor( $pos)
Set the primary position to reach before the next generic group DB handle query.
Wikimedia\Rdbms\ChronologyProtector\$key
string $key
Storage key name.
Definition: ChronologyProtector.php:143
Wikimedia\Rdbms\ChronologyProtector\POSITION_STORE_TTL
const POSITION_STORE_TTL
Seconds to store replication positions.
Definition: ChronologyProtector.php:173
Wikimedia\Rdbms\ChronologyProtector\LOCK_TTL
const LOCK_TTL
Lock expiry to use for key updates.
Definition: ChronologyProtector.php:180
Wikimedia\Rdbms\ChronologyProtector\__construct
__construct(BagOStuff $store, array $client, ?int $clientPosIndex, string $secret='')
Definition: ChronologyProtector.php:193
Wikimedia\Rdbms\ChronologyProtector\$enabled
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
Definition: ChronologyProtector.php:152
Wikimedia\Rdbms\ChronologyProtector\getClientId
getClientId()
Definition: ChronologyProtector.php:228
Wikimedia\Rdbms\ChronologyProtector\getCurrentTime
getCurrentTime()
Definition: ChronologyProtector.php:552
Wikimedia\Rdbms\ILoadBalancer\hasOrMadeRecentMasterChanges
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
Wikimedia\Rdbms\ChronologyProtector\$shutdownTimestampsByCluster
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
Definition: ChronologyProtector.php:165
Wikimedia\Rdbms\ChronologyProtector\stageSessionReplicationPosition
stageSessionReplicationPosition(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
Definition: ChronologyProtector.php:289
Wikimedia\Rdbms\ChronologyProtector\LOCK_TIMEOUT
const LOCK_TIMEOUT
Lock timeout to use for key updates.
Definition: ChronologyProtector.php:178
Wikimedia\Rdbms\ChronologyProtector\lazyStartup
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
Definition: ChronologyProtector.php:427
Wikimedia\Rdbms\ChronologyProtector\$startupTimestamp
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
Definition: ChronologyProtector.php:156
Wikimedia\Rdbms\ChronologyProtector\$startupTimestampsByCluster
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
Definition: ChronologyProtector.php:163
Wikimedia\Rdbms\ILoadBalancer\getReplicaResumePos
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
Wikimedia\Rdbms\ChronologyProtector
Provide a given client with protection against visible database lag.
Definition: ChronologyProtector.php:136
Wikimedia\Rdbms\ChronologyProtector\getStartupSessionPositions
getStartupSessionPositions()
Definition: ChronologyProtector.php:407
Wikimedia\Rdbms\ChronologyProtector\$clientId
string $clientId
Hash of client parameters.
Definition: ChronologyProtector.php:145
Wikimedia\Rdbms\ILoadBalancer\hasStreamingReplicaServers
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
Wikimedia\Rdbms\ChronologyProtector\setEnabled
setEnabled( $enabled)
Definition: ChronologyProtector.php:236
BagOStuff\makeGlobalKey
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.
Wikimedia\Rdbms\ILoadBalancer
Database cluster connection, tracking, load balancing, and transaction manager interface.
Definition: ILoadBalancer.php:81
Wikimedia\Rdbms\ChronologyProtector\$startupPositionsByMaster
array< string, DBMasterPos > $startupPositionsByMaster
Map of (DB master name => position)
Definition: ChronologyProtector.php:159