MediaWiki  1.34.0
ChronologyProtector.php
Go to the documentation of this file.
1 <?php
24 namespace Wikimedia\Rdbms;
25 
26 use Psr\Log\LoggerAwareInterface;
27 use Psr\Log\LoggerInterface;
28 use Psr\Log\NullLogger;
29 use Wikimedia\WaitConditionLoop;
30 use BagOStuff;
31 
39 class ChronologyProtector implements LoggerAwareInterface {
41  protected $store;
43  protected $logger;
44 
46  protected $key;
48  protected $clientId;
50  protected $clientLogInfo;
52  protected $waitForPosIndex;
54  protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
56  protected $enabled = true;
58  protected $wait = true;
59 
61  protected $initialized = false;
63  protected $startupPositions = [];
65  protected $shutdownPositions = [];
67  protected $shutdownTouchDBs = [];
68 
70  const POSITION_TTL = 60;
72  const POSITION_COOKIE_TTL = 10;
74  const POS_STORE_WAIT_TIMEOUT = 5;
75 
83  public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
84  $this->store = $store;
85  if ( isset( $client['clientId'] ) ) {
86  $this->clientId = $client['clientId'];
87  } else {
88  $this->clientId = ( $secret != '' )
89  ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
90  : md5( $client['ip'] . "\n" . $client['agent'] );
91  }
92  $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
93  $this->waitForPosIndex = $posIndex;
94 
95  $this->clientLogInfo = [
96  'clientIP' => $client['ip'],
97  'clientAgent' => $client['agent'],
98  'clientId' => $client['clientId'] ?? null
99  ];
100 
101  $this->logger = new NullLogger();
102  }
103 
104  public function setLogger( LoggerInterface $logger ) {
105  $this->logger = $logger;
106  }
107 
112  public function getClientId() {
113  return $this->clientId;
114  }
115 
120  public function setEnabled( $enabled ) {
121  $this->enabled = $enabled;
122  }
123 
128  public function setWaitEnabled( $enabled ) {
129  $this->wait = $enabled;
130  }
131 
146  if ( !$this->enabled ) {
147  return; // disabled
148  }
149 
150  $masterName = $lb->getServerName( $lb->getWriterIndex() );
152 
153  $pos = $startupPositions[$masterName] ?? null;
154  if ( $pos instanceof DBMasterPos ) {
155  $this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'\n" );
156  $lb->waitFor( $pos );
157  }
158  }
159 
171  if ( !$this->enabled ) {
172  return; // disabled
173  } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
174  // Only save the position if writes have been done on the connection
175  return;
176  }
177 
178  $masterName = $lb->getServerName( $lb->getWriterIndex() );
179  if ( $lb->hasStreamingReplicaServers() ) {
180  $pos = $lb->getReplicaResumePos();
181  if ( $pos ) {
182  $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
183  $this->shutdownPositions[$masterName] = $pos;
184  }
185  } else {
186  $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" );
187  }
188  $this->shutdownTouchDBs[$masterName] = 1;
189  }
190 
200  public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
201  if ( !$this->enabled ) {
202  return [];
203  }
204 
206  // Some callers might want to know if a user recently touched a DB.
207  // These writes do not need to block on all datacenters receiving them.
208  foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
209  $store->set(
210  $this->getTouchedKey( $this->store, $dbName ),
211  microtime( true ),
212  $store::TTL_DAY
213  );
214  }
215 
216  if ( $this->shutdownPositions === [] ) {
217  $this->logger->debug( __METHOD__ . ": no master positions to save\n" );
218 
219  return []; // nothing to save
220  }
221 
222  $this->logger->debug(
223  __METHOD__ . ": saving master pos for " .
224  implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
225  );
226 
227  // CP-protected writes should overwhelmingly go to the master datacenter, so merge the
228  // positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
229  // If set() returns success, then any get() should be able to see the new positions.
230  if ( $store->lock( $this->key, 3 ) ) {
231  if ( $workCallback ) {
232  // Let the store run the work before blocking on a replication sync barrier.
233  // If replication caught up while the work finished, the barrier will be fast.
234  $store->addBusyCallback( $workCallback );
235  }
236  $ok = $store->set(
237  $this->key,
238  $this->mergePositions(
239  $store->get( $this->key ),
241  $cpIndex
242  ),
243  self::POSITION_TTL,
244  ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
245  );
246  $store->unlock( $this->key );
247  } else {
248  $ok = false;
249  }
250 
251  if ( !$ok ) {
252  $cpIndex = null; // nothing saved
253  $bouncedPositions = $this->shutdownPositions;
254  // Raced out too many times or stash is down
255  $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
256  implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
257  );
258  } elseif ( $mode === 'sync' &&
259  $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
260  ) {
261  // Positions may not be in all datacenters, force LBFactory to play it safe
262  $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
263  $bouncedPositions = $this->shutdownPositions;
264  } else {
265  $bouncedPositions = [];
266  }
267 
268  return $bouncedPositions;
269  }
270 
276  public function getTouched( $dbName ) {
277  return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) );
278  }
279 
285  private function getTouchedKey( BagOStuff $store, $dbName ) {
286  return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName );
287  }
288 
292  protected function getStartupMasterPositions() {
293  if ( $this->initialized ) {
295  }
296 
297  $this->initialized = true;
298  $this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)\n" );
299 
300  if ( $this->wait ) {
301  // If there is an expectation to see master positions from a certain write
302  // index or higher, then block until it appears, or until a timeout is reached.
303  // Since the write index restarts each time the key is created, it is possible that
304  // a lagged store has a matching key write index. However, in that case, it should
305  // already be expired and thus treated as non-existing, maintaining correctness.
306  if ( $this->waitForPosIndex > 0 ) {
307  $data = null;
308  $indexReached = null; // highest index reached in the position store
309  $loop = new WaitConditionLoop(
310  function () use ( &$data, &$indexReached ) {
311  $data = $this->store->get( $this->key );
312  if ( !is_array( $data ) ) {
313  return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
314  } elseif ( !isset( $data['writeIndex'] ) ) {
315  return WaitConditionLoop::CONDITION_REACHED; // b/c
316  }
317  $indexReached = max( $data['writeIndex'], $indexReached );
318 
319  return ( $data['writeIndex'] >= $this->waitForPosIndex )
320  ? WaitConditionLoop::CONDITION_REACHED
321  : WaitConditionLoop::CONDITION_CONTINUE;
322  },
324  );
325  $result = $loop->invoke();
326  $waitedMs = $loop->getLastWaitTime() * 1e3;
327 
328  if ( $result == $loop::CONDITION_REACHED ) {
329  $this->logger->debug(
330  __METHOD__ . ": expected and found position index.",
331  [
332  'cpPosIndex' => $this->waitForPosIndex,
333  'waitTimeMs' => $waitedMs
334  ] + $this->clientLogInfo
335  );
336  } else {
337  $this->logger->warning(
338  __METHOD__ . ": expected but failed to find position index.",
339  [
340  'cpPosIndex' => $this->waitForPosIndex,
341  'indexReached' => $indexReached,
342  'waitTimeMs' => $waitedMs
343  ] + $this->clientLogInfo
344  );
345  }
346  } else {
347  $data = $this->store->get( $this->key );
348  }
349 
350  $this->startupPositions = $data ? $data['positions'] : [];
351  $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" );
352  } else {
353  $this->startupPositions = [];
354  $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
355  }
356 
358  }
359 
366  protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
368  $curPositions = $curValue['positions'] ?? [];
369  // Use the newest positions for each DB master
370  foreach ( $shutdownPositions as $db => $pos ) {
371  if (
372  !isset( $curPositions[$db] ) ||
373  !( $curPositions[$db] instanceof DBMasterPos ) ||
374  $pos->asOfTime() > $curPositions[$db]->asOfTime()
375  ) {
376  $curPositions[$db] = $pos;
377  }
378  }
379 
380  $cpIndex = $curValue['writeIndex'] ?? 0;
381 
382  return [
383  'positions' => $curPositions,
384  'writeIndex' => ++$cpIndex
385  ];
386  }
387 }
Wikimedia\Rdbms\ChronologyProtector\$startupPositions
DBMasterPos[] $startupPositions
Map of (DB master name => position)
Definition: ChronologyProtector.php:63
BagOStuff\getQoS
getQoS( $flag)
Definition: BagOStuff.php:467
Wikimedia\Rdbms\ChronologyProtector\$store
BagOStuff $store
Definition: ChronologyProtector.php:41
Wikimedia\Rdbms\ChronologyProtector\$wait
bool $wait
Whether to check and wait on positions.
Definition: ChronologyProtector.php:58
Wikimedia\Rdbms\ChronologyProtector\shutdown
shutdown(callable $workCallback=null, $mode='sync', &$cpIndex=null)
Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
Definition: ChronologyProtector.php:200
Wikimedia\Rdbms\ChronologyProtector\$waitForPosStoreTimeout
int $waitForPosStoreTimeout
Max seconds to wait on positions to appear.
Definition: ChronologyProtector.php:54
Wikimedia\Rdbms\ChronologyProtector\$shutdownTouchDBs
float[] $shutdownTouchDBs
Map of (DB master name => 1)
Definition: ChronologyProtector.php:67
Wikimedia\Rdbms
Definition: ChronologyProtector.php:24
Wikimedia\Rdbms\ChronologyProtector\$initialized
bool $initialized
Whether the client data was loaded.
Definition: ChronologyProtector.php:61
Wikimedia\Rdbms\DBMasterPos
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:12
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:63
Wikimedia\Rdbms\ChronologyProtector\$logger
LoggerInterface $logger
Definition: ChronologyProtector.php:43
BagOStuff\makeGlobalKey
makeGlobalKey( $class,... $components)
Make a global cache key.
Wikimedia\Rdbms\ChronologyProtector\getTouched
getTouched( $dbName)
Definition: ChronologyProtector.php:276
Wikimedia\Rdbms\ChronologyProtector\applySessionReplicationPosition
applySessionReplicationPosition(ILoadBalancer $lb)
Apply the "session consistency" DB replication position to a new ILoadBalancer.
Definition: ChronologyProtector.php:145
Wikimedia\Rdbms\ChronologyProtector\setLogger
setLogger(LoggerInterface $logger)
Definition: ChronologyProtector.php:104
BagOStuff\unlock
unlock( $key)
Release an advisory lock on a key string.
BagOStuff\get
get( $key, $flags=0)
Get an item with the given key.
Wikimedia\Rdbms\ChronologyProtector\$clientLogInfo
string[] $clientLogInfo
Map of client information fields for logging.
Definition: ChronologyProtector.php:50
Wikimedia\Rdbms\ILoadBalancer\getServerName
getServerName( $i)
Get the host name or IP address of the server with the specified index.
Wikimedia\Rdbms\ChronologyProtector\$waitForPosIndex
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
Definition: ChronologyProtector.php:52
Wikimedia\Rdbms\ChronologyProtector\setWaitEnabled
setWaitEnabled( $enabled)
Definition: ChronologyProtector.php:128
Wikimedia\Rdbms\ILoadBalancer\getWriterIndex
getWriterIndex()
Get the server index of the master server.
BagOStuff\addBusyCallback
addBusyCallback(callable $workCallback)
Let a callback be run to avoid wasting time on special blocking calls.
Wikimedia\Rdbms\ILoadBalancer\waitFor
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
Wikimedia\Rdbms\ChronologyProtector\$key
string $key
Storage key name.
Definition: ChronologyProtector.php:46
Wikimedia\Rdbms\ChronologyProtector\$enabled
bool $enabled
Whether to no-op all method calls.
Definition: ChronologyProtector.php:56
Wikimedia\Rdbms\ChronologyProtector\getClientId
getClientId()
Definition: ChronologyProtector.php:112
Wikimedia\Rdbms\ILoadBalancer\hasOrMadeRecentMasterChanges
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
Wikimedia\Rdbms\ILoadBalancer\getReplicaResumePos
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
Wikimedia\Rdbms\ChronologyProtector\mergePositions
mergePositions( $curValue, array $shutdownPositions, &$cpIndex=null)
Definition: ChronologyProtector.php:366
Wikimedia\Rdbms\ChronologyProtector\getStartupMasterPositions
getStartupMasterPositions()
Load in previous master positions for the client.
Definition: ChronologyProtector.php:292
Wikimedia\Rdbms\ChronologyProtector
Helper class for mitigating DB replication lag in order to provide "session consistency".
Definition: ChronologyProtector.php:39
Wikimedia\Rdbms\ChronologyProtector\$clientId
string $clientId
Hash of client parameters.
Definition: ChronologyProtector.php:48
BagOStuff\lock
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
Wikimedia\Rdbms\ChronologyProtector\__construct
__construct(BagOStuff $store, array $client, $posIndex, $secret='')
Definition: ChronologyProtector.php:83
Wikimedia\Rdbms\ILoadBalancer\hasStreamingReplicaServers
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
Wikimedia\Rdbms\ChronologyProtector\$shutdownPositions
DBMasterPos[] $shutdownPositions
Map of (DB master name => position)
Definition: ChronologyProtector.php:65
BagOStuff\set
set( $key, $value, $exptime=0, $flags=0)
Set an item.
Wikimedia\Rdbms\ChronologyProtector\getTouchedKey
getTouchedKey(BagOStuff $store, $dbName)
Definition: ChronologyProtector.php:285
Wikimedia\Rdbms\ChronologyProtector\setEnabled
setEnabled( $enabled)
Definition: ChronologyProtector.php:120
Wikimedia\Rdbms\ChronologyProtector\storeSessionReplicationPosition
storeSessionReplicationPosition(ILoadBalancer $lb)
Save the "session consistency" DB replication position for an end-of-life ILoadBalancer.
Definition: ChronologyProtector.php:170
Wikimedia\Rdbms\ILoadBalancer
Database cluster connection, tracking, load balancing, and transaction manager interface.
Definition: ILoadBalancer.php:81