MediaWiki  master
ChronologyProtector.php
Go to the documentation of this file.
1 <?php
24 namespace Wikimedia\Rdbms;
25 
26 use BagOStuff;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Psr\Log\NullLogger;
30 use Wikimedia\WaitConditionLoop;
31 
41 class ChronologyProtector implements LoggerAwareInterface {
43  protected $store;
45  protected $logger;
46 
48  protected $key;
50  protected $clientId;
52  protected $clientLogInfo;
54  protected $waitForPosIndex;
56  protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
58  protected $enabled = true;
60  protected $wait = true;
61 
63  protected $initialized = false;
65  protected $startupPositions = [];
67  protected $shutdownPositions = [];
69  protected $shutdownTouchDBs = [];
70 
72  public const POSITION_TTL = 60;
74  public const POSITION_COOKIE_TTL = 10;
76  private const POS_STORE_WAIT_TIMEOUT = 5;
77 
85  public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
86  $this->store = $store;
87  if ( isset( $client['clientId'] ) ) {
88  $this->clientId = $client['clientId'];
89  } else {
90  $this->clientId = ( $secret != '' )
91  ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
92  : md5( $client['ip'] . "\n" . $client['agent'] );
93  }
94  $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
95  $this->waitForPosIndex = $posIndex;
96 
97  $this->clientLogInfo = [
98  'clientIP' => $client['ip'],
99  'clientAgent' => $client['agent'],
100  'clientId' => $client['clientId'] ?? null
101  ];
102 
103  $this->logger = new NullLogger();
104  }
105 
106  public function setLogger( LoggerInterface $logger ) {
107  $this->logger = $logger;
108  }
109 
114  public function getClientId() {
115  return $this->clientId;
116  }
117 
122  public function setEnabled( $enabled ) {
123  $this->enabled = $enabled;
124  }
125 
130  public function setWaitEnabled( $enabled ) {
131  $this->wait = $enabled;
132  }
133 
148  if ( !$this->enabled ) {
149  return; // disabled
150  }
151 
152  $masterName = $lb->getServerName( $lb->getWriterIndex() );
154 
155  $pos = $startupPositions[$masterName] ?? null;
156  if ( $pos instanceof DBMasterPos ) {
157  $this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'" );
158  $lb->waitFor( $pos );
159  }
160  }
161 
173  if ( !$this->enabled ) {
174  return; // disabled
175  } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
176  // Only save the position if writes have been done on the connection
177  return;
178  }
179 
180  $masterName = $lb->getServerName( $lb->getWriterIndex() );
181  if ( $lb->hasStreamingReplicaServers() ) {
182  $pos = $lb->getReplicaResumePos();
183  if ( $pos ) {
184  $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos" );
185  $this->shutdownPositions[$masterName] = $pos;
186  }
187  } else {
188  $this->logger->debug( __METHOD__ . ": DB '$masterName' touched" );
189  }
190  $this->shutdownTouchDBs[$masterName] = 1;
191  }
192 
202  public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
203  if ( !$this->enabled ) {
204  return [];
205  }
206 
208  // Some callers might want to know if a user recently touched a DB.
209  // These writes do not need to block on all datacenters receiving them.
210  foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
211  $store->set(
212  $this->getTouchedKey( $this->store, $dbName ),
213  microtime( true ),
214  $store::TTL_DAY
215  );
216  }
217 
218  if ( $this->shutdownPositions === [] ) {
219  $this->logger->debug( __METHOD__ . ": no master positions to save" );
220 
221  return []; // nothing to save
222  }
223 
224  $this->logger->debug(
225  __METHOD__ . ": saving master pos for " .
226  implode( ', ', array_keys( $this->shutdownPositions ) )
227  );
228 
229  // CP-protected writes should overwhelmingly go to the master datacenter, so merge the
230  // positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
231  // If set() returns success, then any get() should be able to see the new positions.
232  if ( $store->lock( $this->key, 3 ) ) {
233  if ( $workCallback ) {
234  // Let the store run the work before blocking on a replication sync barrier.
235  // If replication caught up while the work finished, the barrier will be fast.
236  $store->addBusyCallback( $workCallback );
237  }
238  $ok = $store->set(
239  $this->key,
240  $this->mergePositions(
241  $store->get( $this->key ),
242  $this->shutdownPositions,
243  $cpIndex
244  ),
245  self::POSITION_TTL,
246  ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
247  );
248  $store->unlock( $this->key );
249  } else {
250  $ok = false;
251  }
252 
253  if ( !$ok ) {
254  $cpIndex = null; // nothing saved
255  $bouncedPositions = $this->shutdownPositions;
256  // Raced out too many times or stash is down
257  $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
258  implode( ', ', array_keys( $this->shutdownPositions ) )
259  );
260  } elseif ( $mode === 'sync' &&
261  $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
262  ) {
263  // Positions may not be in all datacenters, force LBFactory to play it safe
264  $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
265  $bouncedPositions = $this->shutdownPositions;
266  } else {
267  $bouncedPositions = [];
268  }
269 
270  return $bouncedPositions;
271  }
272 
279  public function getTouched( ILoadBalancer $lb ) {
280  $masterName = $lb->getServerName( $lb->getWriterIndex() );
281  return $this->store->get( $this->getTouchedKey( $this->store, $masterName ) );
282  }
283 
289  private function getTouchedKey( BagOStuff $store, $masterName ) {
290  return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $masterName );
291  }
292 
297  protected function getStartupMasterPositions() {
298  if ( $this->initialized ) {
300  }
301 
302  $this->initialized = true;
303  $this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)" );
304 
305  if ( $this->wait ) {
306  // If there is an expectation to see master positions from a certain write
307  // index or higher, then block until it appears, or until a timeout is reached.
308  // Since the write index restarts each time the key is created, it is possible that
309  // a lagged store has a matching key write index. However, in that case, it should
310  // already be expired and thus treated as non-existing, maintaining correctness.
311  if ( $this->waitForPosIndex > 0 ) {
312  $data = null;
313  $indexReached = null; // highest index reached in the position store
314  $loop = new WaitConditionLoop(
315  function () use ( &$data, &$indexReached ) {
316  $data = $this->store->get( $this->key );
317  if ( !is_array( $data ) ) {
318  return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
319  } elseif ( !isset( $data['writeIndex'] ) ) {
320  return WaitConditionLoop::CONDITION_REACHED; // b/c
321  }
322  $indexReached = max( $data['writeIndex'], $indexReached );
323 
324  return ( $data['writeIndex'] >= $this->waitForPosIndex )
325  ? WaitConditionLoop::CONDITION_REACHED
326  : WaitConditionLoop::CONDITION_CONTINUE;
327  },
329  );
330  $result = $loop->invoke();
331  $waitedMs = $loop->getLastWaitTime() * 1e3;
332 
333  if ( $result == $loop::CONDITION_REACHED ) {
334  $this->logger->debug(
335  __METHOD__ . ": expected and found position index.",
336  [
337  'cpPosIndex' => $this->waitForPosIndex,
338  'waitTimeMs' => $waitedMs
339  ] + $this->clientLogInfo
340  );
341  } else {
342  $this->logger->warning(
343  __METHOD__ . ": expected but failed to find position index.",
344  [
345  'cpPosIndex' => $this->waitForPosIndex,
346  'indexReached' => $indexReached,
347  'waitTimeMs' => $waitedMs
348  ] + $this->clientLogInfo
349  );
350  }
351  } else {
352  $data = $this->store->get( $this->key );
353  }
354 
355  $this->startupPositions = $data ? $data['positions'] : [];
356  $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)" );
357  } else {
358  $this->startupPositions = [];
359  $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)" );
360  }
361 
363  }
364 
371  protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
373  $curPositions = $curValue['positions'] ?? [];
374  // Use the newest positions for each DB master
375  foreach ( $shutdownPositions as $db => $pos ) {
376  if (
377  !isset( $curPositions[$db] ) ||
378  !( $curPositions[$db] instanceof DBMasterPos ) ||
379  $pos->asOfTime() > $curPositions[$db]->asOfTime()
380  ) {
381  $curPositions[$db] = $pos;
382  }
383  }
384 
385  $cpIndex = $curValue['writeIndex'] ?? 0;
386 
387  return [
388  'positions' => $curPositions,
389  'writeIndex' => ++$cpIndex
390  ];
391  }
392 }
Wikimedia\Rdbms\ChronologyProtector\$startupPositions
DBMasterPos[] $startupPositions
Map of (DB master name => position)
Definition: ChronologyProtector.php:65
BagOStuff\getQoS
getQoS( $flag)
Definition: BagOStuff.php:482
Wikimedia\Rdbms\ChronologyProtector\getTouched
getTouched(ILoadBalancer $lb)
Definition: ChronologyProtector.php:279
Wikimedia\Rdbms\ChronologyProtector\$store
BagOStuff $store
Definition: ChronologyProtector.php:43
Wikimedia\Rdbms\ChronologyProtector\$wait
bool $wait
Whether to check and wait on positions.
Definition: ChronologyProtector.php:60
Wikimedia\Rdbms\ChronologyProtector\shutdown
shutdown(callable $workCallback=null, $mode='sync', &$cpIndex=null)
Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
Definition: ChronologyProtector.php:202
Wikimedia\Rdbms\ChronologyProtector\$waitForPosStoreTimeout
int $waitForPosStoreTimeout
Max seconds to wait on positions to appear.
Definition: ChronologyProtector.php:56
Wikimedia\Rdbms\ChronologyProtector\$shutdownTouchDBs
float[] $shutdownTouchDBs
Map of (DB master name => 1)
Definition: ChronologyProtector.php:69
Wikimedia\Rdbms
Definition: ChronologyProtector.php:24
Wikimedia\Rdbms\ChronologyProtector\$initialized
bool $initialized
Whether the client data was loaded.
Definition: ChronologyProtector.php:63
Wikimedia\Rdbms\DBMasterPos
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:12
Wikimedia\Rdbms\ChronologyProtector\getTouchedKey
getTouchedKey(BagOStuff $store, $masterName)
Definition: ChronologyProtector.php:289
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:70
Wikimedia\Rdbms\ChronologyProtector\$logger
LoggerInterface $logger
Definition: ChronologyProtector.php:45
BagOStuff\makeGlobalKey
makeGlobalKey( $class,... $components)
Make a global cache key.
Wikimedia\Rdbms\ChronologyProtector\applySessionReplicationPosition
applySessionReplicationPosition(ILoadBalancer $lb)
Apply the "session consistency" DB replication position to a new ILoadBalancer.
Definition: ChronologyProtector.php:147
Wikimedia\Rdbms\ChronologyProtector\setLogger
setLogger(LoggerInterface $logger)
Definition: ChronologyProtector.php:106
BagOStuff\unlock
unlock( $key)
Release an advisory lock on a key string.
BagOStuff\get
get( $key, $flags=0)
Get an item with the given key.
Wikimedia\Rdbms\ChronologyProtector\$clientLogInfo
string[] $clientLogInfo
Map of client information fields for logging.
Definition: ChronologyProtector.php:52
Wikimedia\Rdbms\ILoadBalancer\getServerName
getServerName( $i)
Get the host name or IP address of the server with the specified index.
Wikimedia\Rdbms\ChronologyProtector\$waitForPosIndex
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
Definition: ChronologyProtector.php:54
Wikimedia\Rdbms\ChronologyProtector\setWaitEnabled
setWaitEnabled( $enabled)
Definition: ChronologyProtector.php:130
Wikimedia\Rdbms\ILoadBalancer\getWriterIndex
getWriterIndex()
Get the server index of the master server.
BagOStuff\addBusyCallback
addBusyCallback(callable $workCallback)
Let a callback be run to avoid wasting time on special blocking calls.
Wikimedia\Rdbms\ILoadBalancer\waitFor
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
Wikimedia\Rdbms\ChronologyProtector\$key
string $key
Storage key name.
Definition: ChronologyProtector.php:48
Wikimedia\Rdbms\ChronologyProtector\$enabled
bool $enabled
Whether to no-op all method calls.
Definition: ChronologyProtector.php:58
Wikimedia\Rdbms\ChronologyProtector\getClientId
getClientId()
Definition: ChronologyProtector.php:114
Wikimedia\Rdbms\ILoadBalancer\hasOrMadeRecentMasterChanges
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
Wikimedia\Rdbms\ILoadBalancer\getReplicaResumePos
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
Wikimedia\Rdbms\ChronologyProtector\mergePositions
mergePositions( $curValue, array $shutdownPositions, &$cpIndex=null)
Definition: ChronologyProtector.php:371
Wikimedia\Rdbms\ChronologyProtector\getStartupMasterPositions
getStartupMasterPositions()
Load in previous master positions for the client.
Definition: ChronologyProtector.php:297
Wikimedia\Rdbms\ChronologyProtector
Helper class for mitigating DB replication lag in order to provide "session consistency".
Definition: ChronologyProtector.php:41
Wikimedia\Rdbms\ChronologyProtector\$clientId
string $clientId
Hash of client parameters.
Definition: ChronologyProtector.php:50
BagOStuff\lock
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
Wikimedia\Rdbms\ChronologyProtector\__construct
__construct(BagOStuff $store, array $client, $posIndex, $secret='')
Definition: ChronologyProtector.php:85
Wikimedia\Rdbms\ILoadBalancer\hasStreamingReplicaServers
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
Wikimedia\Rdbms\ChronologyProtector\$shutdownPositions
DBMasterPos[] $shutdownPositions
Map of (DB master name => position)
Definition: ChronologyProtector.php:67
BagOStuff\set
set( $key, $value, $exptime=0, $flags=0)
Set an item.
Wikimedia\Rdbms\ChronologyProtector\setEnabled
setEnabled( $enabled)
Definition: ChronologyProtector.php:122
Wikimedia\Rdbms\ChronologyProtector\storeSessionReplicationPosition
storeSessionReplicationPosition(ILoadBalancer $lb)
Save the "session consistency" DB replication position for an end-of-life ILoadBalancer.
Definition: ChronologyProtector.php:172
Wikimedia\Rdbms\ILoadBalancer
Database cluster connection, tracking, load balancing, and transaction manager interface.
Definition: ILoadBalancer.php:81