MediaWiki  master
ChronologyProtector.php
Go to the documentation of this file.
1 <?php
24 namespace Wikimedia\Rdbms;
25 
30 use BagOStuff;
31 
39 class ChronologyProtector implements LoggerAwareInterface {
41  protected $store;
43  protected $logger;
44 
46  protected $key;
48  protected $clientId;
50  protected $clientLogInfo;
52  protected $waitForPosIndex;
54  protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
56  protected $enabled = true;
58  protected $wait = true;
59 
61  protected $initialized = false;
63  protected $startupPositions = [];
65  protected $shutdownPositions = [];
67  protected $shutdownTouchDBs = [];
68 
70  const POSITION_TTL = 60;
72  const POSITION_COOKIE_TTL = 10;
74  const POS_STORE_WAIT_TIMEOUT = 5;
75 
83  public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
84  $this->store = $store;
85  if ( isset( $client['clientId'] ) ) {
86  $this->clientId = $client['clientId'];
87  } else {
88  $this->clientId = ( $secret != '' )
89  ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
90  : md5( $client['ip'] . "\n" . $client['agent'] );
91  }
92  $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
93  $this->waitForPosIndex = $posIndex;
94 
95  $this->clientLogInfo = [
96  'clientIP' => $client['ip'],
97  'clientAgent' => $client['agent'],
98  'clientId' => $client['clientId'] ?? null
99  ];
100 
101  $this->logger = new NullLogger();
102  }
103 
104  public function setLogger( LoggerInterface $logger ) {
105  $this->logger = $logger;
106  }
107 
112  public function getClientId() {
113  return $this->clientId;
114  }
115 
120  public function setEnabled( $enabled ) {
121  $this->enabled = $enabled;
122  }
123 
128  public function setWaitEnabled( $enabled ) {
129  $this->wait = $enabled;
130  }
131 
146  if ( !$this->enabled ) {
147  return; // disabled
148  }
149 
150  $masterName = $lb->getServerName( $lb->getWriterIndex() );
152 
153  $pos = $startupPositions[$masterName] ?? null;
154  if ( $pos instanceof DBMasterPos ) {
155  $this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'\n" );
156  $lb->waitFor( $pos );
157  }
158  }
159 
171  if ( !$this->enabled ) {
172  return; // disabled
173  } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
174  // Only save the position if writes have been done on the connection
175  return;
176  }
177 
178  $masterName = $lb->getServerName( $lb->getWriterIndex() );
179  if ( $lb->hasStreamingReplicaServers() ) {
180  $pos = $lb->getReplicaResumePos();
181  if ( $pos ) {
182  $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
183  $this->shutdownPositions[$masterName] = $pos;
184  }
185  } else {
186  $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" );
187  }
188  $this->shutdownTouchDBs[$masterName] = 1;
189  }
190 
200  public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
201  if ( !$this->enabled ) {
202  return [];
203  }
204 
206  // Some callers might want to know if a user recently touched a DB.
207  // These writes do not need to block on all datacenters receiving them.
208  foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
209  $store->set(
210  $this->getTouchedKey( $this->store, $dbName ),
211  microtime( true ),
212  $store::TTL_DAY
213  );
214  }
215 
216  if ( $this->shutdownPositions === [] ) {
217  $this->logger->debug( __METHOD__ . ": no master positions to save\n" );
218 
219  return []; // nothing to save
220  }
221 
222  $this->logger->debug(
223  __METHOD__ . ": saving master pos for " .
224  implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
225  );
226 
227  // CP-protected writes should overwhelmingly go to the master datacenter, so merge the
228  // positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
229  // If set() returns success, then any get() should be able to see the new positions.
230  if ( $store->lock( $this->key, 3 ) ) {
231  if ( $workCallback ) {
232  // Let the store run the work before blocking on a replication sync barrier.
233  // If replication caught up while the work finished, the barrier will be fast.
234  $store->addBusyCallback( $workCallback );
235  }
236  $ok = $store->set(
237  $this->key,
238  $this->mergePositions(
239  $store->get( $this->key ),
241  $cpIndex
242  ),
243  self::POSITION_TTL,
244  ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
245  );
246  $store->unlock( $this->key );
247  } else {
248  $ok = false;
249  }
250 
251  if ( !$ok ) {
252  $cpIndex = null; // nothing saved
253  $bouncedPositions = $this->shutdownPositions;
254  // Raced out too many times or stash is down
255  $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
256  implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
257  );
258  } elseif ( $mode === 'sync' &&
259  $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
260  ) {
261  // Positions may not be in all datacenters, force LBFactory to play it safe
262  $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
263  $bouncedPositions = $this->shutdownPositions;
264  } else {
265  $bouncedPositions = [];
266  }
267 
268  return $bouncedPositions;
269  }
270 
276  public function getTouched( $dbName ) {
277  return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) );
278  }
279 
285  private function getTouchedKey( BagOStuff $store, $dbName ) {
286  return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName );
287  }
288 
293  protected function getStartupMasterPositions() {
294  if ( $this->initialized ) {
296  }
297 
298  $this->initialized = true;
299  $this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)\n" );
300 
301  if ( $this->wait ) {
302  // If there is an expectation to see master positions from a certain write
303  // index or higher, then block until it appears, or until a timeout is reached.
304  // Since the write index restarts each time the key is created, it is possible that
305  // a lagged store has a matching key write index. However, in that case, it should
306  // already be expired and thus treated as non-existing, maintaining correctness.
307  if ( $this->waitForPosIndex > 0 ) {
308  $data = null;
309  $indexReached = null; // highest index reached in the position store
310  $loop = new WaitConditionLoop(
311  function () use ( &$data, &$indexReached ) {
312  $data = $this->store->get( $this->key );
313  if ( !is_array( $data ) ) {
314  return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
315  } elseif ( !isset( $data['writeIndex'] ) ) {
316  return WaitConditionLoop::CONDITION_REACHED; // b/c
317  }
318  $indexReached = max( $data['writeIndex'], $indexReached );
319 
320  return ( $data['writeIndex'] >= $this->waitForPosIndex )
321  ? WaitConditionLoop::CONDITION_REACHED
322  : WaitConditionLoop::CONDITION_CONTINUE;
323  },
325  );
326  $result = $loop->invoke();
327  $waitedMs = $loop->getLastWaitTime() * 1e3;
328 
329  if ( $result == $loop::CONDITION_REACHED ) {
330  $this->logger->debug(
331  __METHOD__ . ": expected and found position index.",
332  [
333  'cpPosIndex' => $this->waitForPosIndex,
334  'waitTimeMs' => $waitedMs
335  ] + $this->clientLogInfo
336  );
337  } else {
338  $this->logger->warning(
339  __METHOD__ . ": expected but failed to find position index.",
340  [
341  'cpPosIndex' => $this->waitForPosIndex,
342  'indexReached' => $indexReached,
343  'waitTimeMs' => $waitedMs
344  ] + $this->clientLogInfo
345  );
346  }
347  } else {
348  $data = $this->store->get( $this->key );
349  }
350 
351  $this->startupPositions = $data ? $data['positions'] : [];
352  $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" );
353  } else {
354  $this->startupPositions = [];
355  $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
356  }
357 
359  }
360 
367  protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
369  $curPositions = $curValue['positions'] ?? [];
370  // Use the newest positions for each DB master
371  foreach ( $shutdownPositions as $db => $pos ) {
372  if (
373  !isset( $curPositions[$db] ) ||
374  !( $curPositions[$db] instanceof DBMasterPos ) ||
375  $pos->asOfTime() > $curPositions[$db]->asOfTime()
376  ) {
377  $curPositions[$db] = $pos;
378  }
379  }
380 
381  $cpIndex = $curValue['writeIndex'] ?? 0;
382 
383  return [
384  'positions' => $curPositions,
385  'writeIndex' => ++$cpIndex
386  ];
387  }
388 }
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
shutdown(callable $workCallback=null, $mode='sync', &$cpIndex=null)
Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now...
unlock( $key)
Release an advisory lock on a key string.
string [] $clientLogInfo
Map of client information fields for logging.
DBMasterPos [] $startupPositions
Map of (DB master name => position)
bool $wait
Whether to check and wait on positions.
float [] $shutdownTouchDBs
Map of (DB master name => 1)
applySessionReplicationPosition(ILoadBalancer $lb)
Apply the "session consistency" DB replication position to a new ILoadBalancer.
getServerName( $i)
Get the host name or IP address of the server with the specified index.
Helper class for mitigating DB replication lag in order to provide "session consistency".
bool $enabled
Whether to no-op all method calls.
set( $key, $value, $exptime=0, $flags=0)
Set an item.
__construct(BagOStuff $store, array $client, $posIndex, $secret='')
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:12
mergePositions( $curValue, array $shutdownPositions, &$cpIndex=null)
getTouchedKey(BagOStuff $store, $dbName)
int $waitForPosStoreTimeout
Max seconds to wait on positions to appear.
storeSessionReplicationPosition(ILoadBalancer $lb)
Save the "session consistency" DB replication position for an end-of-life ILoadBalancer.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
bool $initialized
Whether the client data was loaded.
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
makeGlobalKey( $class,... $components)
Make a global cache key.
getQoS( $flag)
Definition: BagOStuff.php:467
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getWriterIndex()
Get the server index of the master server.
string $clientId
Hash of client parameters.
Database cluster connection, tracking, load balancing, and transaction manager interface.
get( $key, $flags=0)
Get an item with the given key.
DBMasterPos [] $shutdownPositions
Map of (DB master name => position)
addBusyCallback(callable $workCallback)
Let a callback be run to avoid wasting time on special blocking calls.
getStartupMasterPositions()
Load in previous master positions for the client.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.