MediaWiki  master
ChronologyProtector.php
Go to the documentation of this file.
1 <?php
24 namespace Wikimedia\Rdbms;
25 
26 use BagOStuff;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Psr\Log\NullLogger;
30 use Wikimedia\WaitConditionLoop;
31 
135 class ChronologyProtector implements LoggerAwareInterface {
137  protected $store;
139  protected $logger;
140 
142  protected $key;
144  protected $clientId;
146  protected $clientLogInfo;
148  protected $waitForPosIndex;
149 
151  protected $enabled = true;
153  protected $positionWaitsEnabled = true;
155  protected $startupTimestamp;
156 
165 
168 
170  public const POSITION_COOKIE_TTL = 10;
172  private const POSITION_STORE_TTL = 60;
174  private const POSITION_INDEX_WAIT_TIMEOUT = 5;
175 
177  private const LOCK_TIMEOUT = 3;
179  private const LOCK_TTL = 6;
180 
181  private const FLD_POSITIONS = 'positions';
182  private const FLD_TIMESTAMPS = 'timestamps';
183  private const FLD_WRITE_INDEX = 'writeIndex';
184 
192  public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
193  $this->store = $store;
194 
195  if ( isset( $client['clientId'] ) ) {
196  $this->clientId = $client['clientId'];
197  } else {
198  $this->clientId = ( $secret != '' )
199  ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
200  : md5( $client['ip'] . "\n" . $client['agent'] );
201  }
202  $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
203  $this->waitForPosIndex = $posIndex;
204 
205  $this->clientLogInfo = [
206  'clientIP' => $client['ip'],
207  'clientAgent' => $client['agent'],
208  'clientId' => $client['clientId'] ?? null
209  ];
210 
211  $this->logger = new NullLogger();
212  }
213 
214  public function setLogger( LoggerInterface $logger ) {
215  $this->logger = $logger;
216  }
217 
222  public function getClientId() {
223  return $this->clientId;
224  }
225 
230  public function setEnabled( $enabled ) {
231  $this->enabled = $enabled;
232  }
233 
238  public function setWaitEnabled( $enabled ) {
239  $this->positionWaitsEnabled = $enabled;
240  }
241 
256  if ( !$this->enabled || !$this->positionWaitsEnabled ) {
257  return;
258  }
259 
260  $cluster = $lb->getClusterName();
261  $masterName = $lb->getServerName( $lb->getWriterIndex() );
262 
263  $pos = $this->getStartupSessionPositions()[$masterName] ?? null;
264  if ( $pos instanceof DBMasterPos ) {
265  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position is '$pos'" );
266  $lb->waitFor( $pos );
267  } else {
268  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no position" );
269  }
270  }
271 
284  if ( !$this->enabled || !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
285  return;
286  }
287 
288  $cluster = $lb->getClusterName();
289  $masterName = $lb->getServerName( $lb->getWriterIndex() );
290 
291  if ( $lb->hasStreamingReplicaServers() ) {
292  $pos = $lb->getReplicaResumePos();
293  if ( $pos ) {
294  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
295  $this->shutdownPositionsByMaster[$masterName] = $pos;
296  $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
297  } else {
298  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
299  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
300  }
301  } else {
302  $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
303  $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
304  }
305  }
306 
315  public function shutdown( &$cpIndex = null ) {
316  if ( !$this->enabled ) {
317  return [];
318  }
319 
320  if ( !$this->shutdownTimestampsByCluster ) {
321  $this->logger->debug( __METHOD__ . ": no master positions/timestamps to save" );
322 
323  return [];
324  }
325 
326  $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
327  if ( $scopeLock ) {
328  $ok = $this->store->set(
329  $this->key,
330  $this->mergePositions(
331  $this->store->get( $this->key ),
332  $this->shutdownPositionsByMaster,
333  $this->shutdownTimestampsByCluster,
334  $cpIndex
335  ),
336  self::POSITION_STORE_TTL
337  );
338  unset( $scopeLock );
339  } else {
340  $ok = false;
341  }
342 
343  $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
344 
345  if ( $ok ) {
346  $bouncedPositions = [];
347  $this->logger->debug(
348  __METHOD__ . ": saved master positions/timestamp for DB cluster(s) $clusterList"
349  );
350 
351  } else {
352  $cpIndex = null; // nothing saved
353  $bouncedPositions = $this->shutdownPositionsByMaster;
354  // Raced out too many times or stash is down
355  $this->logger->warning(
356  __METHOD__ . ": failed to save master positions for DB cluster(s) $clusterList"
357  );
358  }
359 
360  return $bouncedPositions;
361  }
362 
372  public function getTouched( ILoadBalancer $lb ) {
373  if ( !$this->enabled ) {
374  return false;
375  }
376 
377  $cluster = $lb->getClusterName();
378 
379  $timestampsByCluster = $this->getStartupSessionTimestamps();
380  $timestamp = $timestampsByCluster[$cluster] ?? null;
381  if ( $timestamp === null ) {
382  $recentTouchTimestamp = false;
383  } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
384  // If the position store is not replicated among datacenters and the cookie that
385  // sticks the client to the primary datacenter expires, then the touch timestamp
386  // will be found for requests in one datacenter but not others. For consistency,
387  // return false once the user is no longer routed to the primary datacenter.
388  $recentTouchTimestamp = false;
389  $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
390  } else {
391  $recentTouchTimestamp = $timestamp;
392  $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
393  }
394 
395  return $recentTouchTimestamp;
396  }
397 
401  protected function getStartupSessionPositions() {
402  $this->lazyStartup();
403 
405  }
406 
410  protected function getStartupSessionTimestamps() {
411  $this->lazyStartup();
412 
414  }
415 
421  protected function lazyStartup() {
422  if ( $this->startupTimestamp !== null ) {
423  return;
424  }
425 
426  $this->startupTimestamp = $this->getCurrentTime();
427  $this->logger->debug(
428  __METHOD__ .
429  ": client ID is {$this->clientId}; key is {$this->key}"
430  );
431 
432  // If there is an expectation to see master positions from a certain write
433  // index or higher, then block until it appears, or until a timeout is reached.
434  // Since the write index restarts each time the key is created, it is possible that
435  // a lagged store has a matching key write index. However, in that case, it should
436  // already be expired and thus treated as non-existing, maintaining correctness.
437  if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
438  $data = null;
439  $indexReached = null; // highest index reached in the position store
440  $loop = new WaitConditionLoop(
441  function () use ( &$data, &$indexReached ) {
442  $data = $this->store->get( $this->key );
443  if ( !is_array( $data ) ) {
444  return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
445  } elseif ( !isset( $data[self::FLD_WRITE_INDEX] ) ) {
446  return WaitConditionLoop::CONDITION_REACHED; // b/c
447  }
448  $indexReached = max( $data[self::FLD_WRITE_INDEX], $indexReached );
449 
450  return ( $data[self::FLD_WRITE_INDEX] >= $this->waitForPosIndex )
451  ? WaitConditionLoop::CONDITION_REACHED
452  : WaitConditionLoop::CONDITION_CONTINUE;
453  },
455  );
456  $result = $loop->invoke();
457  $waitedMs = $loop->getLastWaitTime() * 1e3;
458 
459  if ( $result == $loop::CONDITION_REACHED ) {
460  $this->logger->debug(
461  __METHOD__ . ": expected and found position index {cpPosIndex}.",
462  [
463  'cpPosIndex' => $this->waitForPosIndex,
464  'waitTimeMs' => $waitedMs
465  ] + $this->clientLogInfo
466  );
467  } else {
468  $this->logger->warning(
469  __METHOD__ . ": expected but failed to find position index {cpPosIndex}.",
470  [
471  'cpPosIndex' => $this->waitForPosIndex,
472  'indexReached' => $indexReached,
473  'waitTimeMs' => $waitedMs
474  ] + $this->clientLogInfo
475  );
476  }
477  } else {
478  $data = $this->store->get( $this->key );
479  $indexReached = $data[self::FLD_WRITE_INDEX] ?? null;
480  if ( $indexReached ) {
481  $this->logger->debug(
482  __METHOD__ . ": found position/timestamp data with index {indexReached}.",
483  [ 'indexReached' => $indexReached ] + $this->clientLogInfo
484  );
485  }
486  }
487 
488  $this->startupPositionsByMaster = $data ? $data[self::FLD_POSITIONS] : [];
489  $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
490  }
491 
501  protected function mergePositions(
502  $storedValue,
503  array $shutdownPositions,
504  array $shutdownTimestamps,
505  &$cpIndex = null
506  ) {
508  $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
509  // Use the newest positions for each DB master
510  foreach ( $shutdownPositions as $masterName => $pos ) {
511  if (
512  !isset( $mergedPositions[$masterName] ) ||
513  !( $mergedPositions[$masterName] instanceof DBMasterPos ) ||
514  $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
515  ) {
516  $mergedPositions[$masterName] = $pos;
517  }
518  }
519 
521  $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
522  // Use the newest touch timestamp for each DB master
523  foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
524  if (
525  !isset( $mergedTimestamps[$cluster] ) ||
526  $timestamp > $mergedTimestamps[$cluster]
527  ) {
528  $mergedTimestamps[$cluster] = $timestamp;
529  }
530  }
531 
532  $cpIndex = $storedValue[self::FLD_WRITE_INDEX] ?? 0;
533 
534  return [
535  self::FLD_POSITIONS => $mergedPositions,
536  self::FLD_TIMESTAMPS => $mergedTimestamps,
537  self::FLD_WRITE_INDEX => ++$cpIndex
538  ];
539  }
540 
546  protected function getCurrentTime() {
547  if ( $this->wallClockOverride ) {
549  }
550 
551  $clockTime = (float)time(); // call this first
552  // microtime() can severely drift from time() and the microtime() value of other threads.
553  // Instead of seeing the current time as being in the past, use the value of time().
554  return max( microtime( true ), $clockTime );
555  }
556 
562  public function setMockTime( &$time ) {
563  $this->wallClockOverride =& $time;
564  }
565 }
Wikimedia\Rdbms\ChronologyProtector\$positionWaitsEnabled
bool $positionWaitsEnabled
Whether waiting on DB servers to reach replication positions is enabled.
Definition: ChronologyProtector.php:153
Wikimedia\Rdbms\ILoadBalancer\getClusterName
getClusterName()
Get the logical name of the database cluster.
Wikimedia\Rdbms\ChronologyProtector\setMockTime
setMockTime(&$time)
Definition: ChronologyProtector.php:562
Wikimedia\Rdbms\ChronologyProtector\POSITION_INDEX_WAIT_TIMEOUT
const POSITION_INDEX_WAIT_TIMEOUT
Max seconds to wait for positions write indexes to appear (e.g.
Definition: ChronologyProtector.php:174
Wikimedia\Rdbms\ChronologyProtector\FLD_POSITIONS
const FLD_POSITIONS
Definition: ChronologyProtector.php:181
Wikimedia\Rdbms\ChronologyProtector\getTouched
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
Definition: ChronologyProtector.php:372
Wikimedia\Rdbms\ChronologyProtector\$store
BagOStuff $store
Definition: ChronologyProtector.php:137
Wikimedia\Rdbms\ChronologyProtector\FLD_WRITE_INDEX
const FLD_WRITE_INDEX
Definition: ChronologyProtector.php:183
Wikimedia\Rdbms\ChronologyProtector\$shutdownPositionsByMaster
array< string, DBMasterPos > $shutdownPositionsByMaster
Map of (DB master name => position)
Definition: ChronologyProtector.php:160
Wikimedia\Rdbms
Definition: ChronologyProtector.php:24
Wikimedia\Rdbms\DBMasterPos
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:14
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:86
Wikimedia\Rdbms\ChronologyProtector\getStartupSessionTimestamps
getStartupSessionTimestamps()
Definition: ChronologyProtector.php:410
Wikimedia\Rdbms\ChronologyProtector\mergePositions
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, &$cpIndex=null)
Merge the new DB replication positions with the currently stored ones (highest wins)
Definition: ChronologyProtector.php:501
Wikimedia\Rdbms\ChronologyProtector\$logger
LoggerInterface $logger
Definition: ChronologyProtector.php:139
Wikimedia\Rdbms\ChronologyProtector\POSITION_COOKIE_TTL
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
Definition: ChronologyProtector.php:170
Wikimedia\Rdbms\ChronologyProtector\applySessionReplicationPosition
applySessionReplicationPosition(ILoadBalancer $lb)
Apply the "session consistency" DB replication position to a new ILoadBalancer.
Definition: ChronologyProtector.php:255
Wikimedia\Rdbms\ChronologyProtector\setLogger
setLogger(LoggerInterface $logger)
Definition: ChronologyProtector.php:214
Wikimedia\Rdbms\ChronologyProtector\$wallClockOverride
float null $wallClockOverride
Definition: ChronologyProtector.php:167
Wikimedia\Rdbms\ChronologyProtector\FLD_TIMESTAMPS
const FLD_TIMESTAMPS
Definition: ChronologyProtector.php:182
Wikimedia\Rdbms\ChronologyProtector\$clientLogInfo
string[] $clientLogInfo
Map of client information fields for logging.
Definition: ChronologyProtector.php:146
Wikimedia\Rdbms\ILoadBalancer\getServerName
getServerName( $i)
Get the host name or IP address of the server with the specified index.
Wikimedia\Rdbms\ChronologyProtector\$waitForPosIndex
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
Definition: ChronologyProtector.php:148
Wikimedia\Rdbms\ChronologyProtector\setWaitEnabled
setWaitEnabled( $enabled)
Definition: ChronologyProtector.php:238
Wikimedia\Rdbms\ILoadBalancer\getWriterIndex
getWriterIndex()
Get the specific server index of the master server.
Wikimedia\Rdbms\ILoadBalancer\waitFor
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
Wikimedia\Rdbms\ChronologyProtector\$key
string $key
Storage key name.
Definition: ChronologyProtector.php:142
Wikimedia\Rdbms\ChronologyProtector\POSITION_STORE_TTL
const POSITION_STORE_TTL
Seconds to store replication positions.
Definition: ChronologyProtector.php:172
Wikimedia\Rdbms\ChronologyProtector\LOCK_TTL
const LOCK_TTL
Lock expiry to use for key updates.
Definition: ChronologyProtector.php:179
Wikimedia\Rdbms\ChronologyProtector\$enabled
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
Definition: ChronologyProtector.php:151
Wikimedia\Rdbms\ChronologyProtector\getClientId
getClientId()
Definition: ChronologyProtector.php:222
Wikimedia\Rdbms\ChronologyProtector\getCurrentTime
getCurrentTime()
Definition: ChronologyProtector.php:546
Wikimedia\Rdbms\ILoadBalancer\hasOrMadeRecentMasterChanges
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
Wikimedia\Rdbms\ChronologyProtector\$shutdownTimestampsByCluster
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
Definition: ChronologyProtector.php:164
Wikimedia\Rdbms\ChronologyProtector\stageSessionReplicationPosition
stageSessionReplicationPosition(ILoadBalancer $lb)
Update the "session consistency" DB replication position for an end-of-life ILoadBalancer.
Definition: ChronologyProtector.php:283
Wikimedia\Rdbms\ChronologyProtector\LOCK_TIMEOUT
const LOCK_TIMEOUT
Lock timeout to use for key updates.
Definition: ChronologyProtector.php:177
Wikimedia\Rdbms\ChronologyProtector\lazyStartup
lazyStartup()
Load the stored DB replication positions and touch timestamps for the client.
Definition: ChronologyProtector.php:421
Wikimedia\Rdbms\ChronologyProtector\$startupTimestamp
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
Definition: ChronologyProtector.php:155
Wikimedia\Rdbms\ChronologyProtector\$startupTimestampsByCluster
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
Definition: ChronologyProtector.php:162
Wikimedia\Rdbms\ILoadBalancer\getReplicaResumePos
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
Wikimedia\Rdbms\ChronologyProtector\shutdown
shutdown(&$cpIndex=null)
Save any remarked "session consistency" DB replication positions to persistent storage.
Definition: ChronologyProtector.php:315
Wikimedia\Rdbms\ChronologyProtector
Provide a given client with protection against visible database lag.
Definition: ChronologyProtector.php:135
Wikimedia\Rdbms\ChronologyProtector\getStartupSessionPositions
getStartupSessionPositions()
Definition: ChronologyProtector.php:401
Wikimedia\Rdbms\ChronologyProtector\$clientId
string $clientId
Hash of client parameters.
Definition: ChronologyProtector.php:144
Wikimedia\Rdbms\ChronologyProtector\__construct
__construct(BagOStuff $store, array $client, $posIndex, $secret='')
Definition: ChronologyProtector.php:192
Wikimedia\Rdbms\ILoadBalancer\hasStreamingReplicaServers
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
Wikimedia\Rdbms\ChronologyProtector\setEnabled
setEnabled( $enabled)
Definition: ChronologyProtector.php:230
BagOStuff\makeGlobalKey
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.
Wikimedia\Rdbms\ILoadBalancer
Database cluster connection, tracking, load balancing, and transaction manager interface.
Definition: ILoadBalancer.php:81
Wikimedia\Rdbms\ChronologyProtector\$startupPositionsByMaster
array< string, DBMasterPos > $startupPositionsByMaster
Map of (DB master name => position)
Definition: ChronologyProtector.php:158