MediaWiki REL1_37
ChronologyProtector.php
Go to the documentation of this file.
1<?php
25
26use BagOStuff;
27use Psr\Log\LoggerAwareInterface;
28use Psr\Log\LoggerInterface;
29use Psr\Log\NullLogger;
30use Wikimedia\WaitConditionLoop;
31
136class ChronologyProtector implements LoggerAwareInterface {
138 protected $store;
140 protected $logger;
141
143 protected $key;
145 protected $clientId;
147 protected $clientLogInfo;
150
152 protected $enabled = true;
154 protected $positionWaitsEnabled = true;
157
166
169
171 public const POSITION_COOKIE_TTL = 10;
173 private const POSITION_STORE_TTL = 60;
176
178 private const LOCK_TIMEOUT = 3;
180 private const LOCK_TTL = 6;
181
182 private const FLD_POSITIONS = 'positions';
183 private const FLD_TIMESTAMPS = 'timestamps';
184 private const FLD_WRITE_INDEX = 'writeIndex';
185
193 public function __construct(
195 array $client,
196 ?int $clientPosIndex,
197 string $secret = ''
198 ) {
199 $this->store = $store;
200
201 if ( isset( $client['clientId'] ) ) {
202 $this->clientId = $client['clientId'];
203 } else {
204 $this->clientId = ( $secret != '' )
205 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
206 : md5( $client['ip'] . "\n" . $client['agent'] );
207 }
208 $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
209 $this->waitForPosIndex = $clientPosIndex;
210
211 $this->clientLogInfo = [
212 'clientIP' => $client['ip'],
213 'clientAgent' => $client['agent'],
214 'clientId' => $client['clientId'] ?? null
215 ];
216
217 $this->logger = new NullLogger();
218 }
219
220 public function setLogger( LoggerInterface $logger ) {
221 $this->logger = $logger;
222 }
223
228 public function getClientId() {
229 return $this->clientId;
230 }
231
236 public function setEnabled( $enabled ) {
237 $this->enabled = $enabled;
238 }
239
244 public function setWaitEnabled( $enabled ) {
245 $this->positionWaitsEnabled = $enabled;
246 }
247
262 if ( !$this->enabled || !$this->positionWaitsEnabled ) {
263 return;
264 }
265
266 $cluster = $lb->getClusterName();
267 $masterName = $lb->getServerName( $lb->getWriterIndex() );
268
269 $pos = $this->getStartupSessionPositions()[$masterName] ?? null;
270 if ( $pos instanceof DBPrimaryPos ) {
271 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position is '$pos'" );
272 $lb->waitFor( $pos );
273 } else {
274 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no position" );
275 }
276 }
277
290 if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
291 return;
292 }
293
294 $cluster = $lb->getClusterName();
295 $masterName = $lb->getServerName( $lb->getWriterIndex() );
296
297 if ( $lb->hasStreamingReplicaServers() ) {
298 $pos = $lb->getReplicaResumePos();
299 if ( $pos ) {
300 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
301 $this->shutdownPositionsByMaster[$masterName] = $pos;
302 $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
303 } else {
304 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
305 $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
306 }
307 } else {
308 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
309 $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
310 }
311 }
312
321 public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
322 if ( !$this->enabled ) {
323 return [];
324 }
325
326 if ( !$this->shutdownTimestampsByCluster ) {
327 $this->logger->debug( __METHOD__ . ": no primary positions/timestamps to save" );
328
329 return [];
330 }
331
332 $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
333 if ( $scopeLock ) {
334 $ok = $this->store->set(
335 $this->key,
336 $this->mergePositions(
337 $this->store->get( $this->key ),
338 $this->shutdownPositionsByMaster,
339 $this->shutdownTimestampsByCluster,
340 $clientPosIndex
341 ),
342 self::POSITION_STORE_TTL
343 );
344 unset( $scopeLock );
345 } else {
346 $ok = false;
347 }
348
349 $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
350
351 if ( $ok ) {
352 $bouncedPositions = [];
353 $this->logger->debug(
354 __METHOD__ . ": saved primary positions/timestamp for DB cluster(s) $clusterList"
355 );
356
357 } else {
358 $clientPosIndex = null; // nothing saved
359 $bouncedPositions = $this->shutdownPositionsByMaster;
360 // Raced out too many times or stash is down
361 $this->logger->warning(
362 __METHOD__ . ": failed to save primary positions for DB cluster(s) $clusterList"
363 );
364 }
365
366 return $bouncedPositions;
367 }
368
378 public function getTouched( ILoadBalancer $lb ) {
379 if ( !$this->enabled ) {
380 return false;
381 }
382
383 $cluster = $lb->getClusterName();
384
385 $timestampsByCluster = $this->getStartupSessionTimestamps();
386 $timestamp = $timestampsByCluster[$cluster] ?? null;
387 if ( $timestamp === null ) {
388 $recentTouchTimestamp = false;
389 } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
390 // If the position store is not replicated among datacenters and the cookie that
391 // sticks the client to the primary datacenter expires, then the touch timestamp
392 // will be found for requests in one datacenter but not others. For consistency,
393 // return false once the user is no longer routed to the primary datacenter.
394 $recentTouchTimestamp = false;
395 $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
396 } else {
397 $recentTouchTimestamp = $timestamp;
398 $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
399 }
400
401 return $recentTouchTimestamp;
402 }
403
407 protected function getStartupSessionPositions() {
408 $this->lazyStartup();
409
411 }
412
416 protected function getStartupSessionTimestamps() {
417 $this->lazyStartup();
418
420 }
421
427 protected function lazyStartup() {
428 if ( $this->startupTimestamp !== null ) {
429 return;
430 }
431
432 $this->startupTimestamp = $this->getCurrentTime();
433 $this->logger->debug(
434 __METHOD__ .
435 ": client ID is {$this->clientId}; key is {$this->key}"
436 );
437
438 // If there is an expectation to see primary positions from a certain write
439 // index or higher, then block until it appears, or until a timeout is reached.
440 // Since the write index restarts each time the key is created, it is possible that
441 // a lagged store has a matching key write index. However, in that case, it should
442 // already be expired and thus treated as non-existing, maintaining correctness.
443 if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
444 $data = null;
445 $indexReached = null; // highest index reached in the position store
446 $loop = new WaitConditionLoop(
447 function () use ( &$data, &$indexReached ) {
448 $data = $this->store->get( $this->key );
449 if ( !is_array( $data ) ) {
450 return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
451 } elseif ( !isset( $data[self::FLD_WRITE_INDEX] ) ) {
452 return WaitConditionLoop::CONDITION_REACHED; // b/c
453 }
454 $indexReached = max( $data[self::FLD_WRITE_INDEX], $indexReached );
455
456 return ( $data[self::FLD_WRITE_INDEX] >= $this->waitForPosIndex )
457 ? WaitConditionLoop::CONDITION_REACHED
458 : WaitConditionLoop::CONDITION_CONTINUE;
459 },
461 );
462 $result = $loop->invoke();
463 $waitedMs = $loop->getLastWaitTime() * 1e3;
464
465 if ( $result == $loop::CONDITION_REACHED ) {
466 $this->logger->debug(
467 __METHOD__ . ": expected and found position index {cpPosIndex}.",
468 [
469 'cpPosIndex' => $this->waitForPosIndex,
470 'waitTimeMs' => $waitedMs
471 ] + $this->clientLogInfo
472 );
473 } else {
474 $this->logger->warning(
475 __METHOD__ . ": expected but failed to find position index {cpPosIndex}.",
476 [
477 'cpPosIndex' => $this->waitForPosIndex,
478 'indexReached' => $indexReached,
479 'waitTimeMs' => $waitedMs
480 ] + $this->clientLogInfo
481 );
482 }
483 } else {
484 $data = $this->store->get( $this->key );
485 $indexReached = $data[self::FLD_WRITE_INDEX] ?? null;
486 if ( $indexReached ) {
487 $this->logger->debug(
488 __METHOD__ . ": found position/timestamp data with index {indexReached}.",
489 [ 'indexReached' => $indexReached ] + $this->clientLogInfo
490 );
491 }
492 }
493
494 $this->startupPositionsByMaster = $data ? $data[self::FLD_POSITIONS] : [];
495 $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
496 }
497
507 protected function mergePositions(
508 $storedValue,
509 array $shutdownPositions,
510 array $shutdownTimestamps,
511 ?int &$clientPosIndex = null
512 ) {
514 $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
515 // Use the newest positions for each DB primary
516 foreach ( $shutdownPositions as $masterName => $pos ) {
517 if (
518 !isset( $mergedPositions[$masterName] ) ||
519 !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) ||
520 $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
521 ) {
522 $mergedPositions[$masterName] = $pos;
523 }
524 }
525
527 $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
528 // Use the newest touch timestamp for each DB primary
529 foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
530 if (
531 !isset( $mergedTimestamps[$cluster] ) ||
532 $timestamp > $mergedTimestamps[$cluster]
533 ) {
534 $mergedTimestamps[$cluster] = $timestamp;
535 }
536 }
537
538 $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
539
540 return [
541 self::FLD_POSITIONS => $mergedPositions,
542 self::FLD_TIMESTAMPS => $mergedTimestamps,
543 self::FLD_WRITE_INDEX => $clientPosIndex
544 ];
545 }
546
552 protected function getCurrentTime() {
553 if ( $this->wallClockOverride ) {
555 }
556
557 $clockTime = (float)time(); // call this first
558 // microtime() can severely drift from time() and the microtime() value of other threads.
559 // Instead of seeing the current time as being in the past, use the value of time().
560 return max( microtime( true ), $clockTime );
561 }
562
568 public function setMockTime( &$time ) {
569 $this->wallClockOverride =& $time;
570 }
571}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:86
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.
Provide a given client with protection against visible database lag.
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
__construct(BagOStuff $store, array $client, ?int $clientPosIndex, string $secret='')
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
array< string, DBPrimaryPos > $startupPositionsByMaster
Map of (DB primary name => position)
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
array< string, DBPrimaryPos > $shutdownPositionsByMaster
Map of (DB primary name => position)
applySessionReplicationPosition(ILoadBalancer $lb)
Apply client "session consistency" replication position to a new ILoadBalancer.
string $clientId
Hash of client parameters.
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
const LOCK_TTL
Lock expiry to use for key updates.
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
string[] $clientLogInfo
Map of client information fields for logging.
const POSITION_INDEX_WAIT_TIMEOUT
Max seconds to wait for positions write indexes to appear (e.g.
bool $positionWaitsEnabled
Whether waiting on DB servers to reach replication positions is enabled.
stageSessionReplicationPosition(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
const LOCK_TIMEOUT
Lock timeout to use for key updates.
const POSITION_STORE_TTL
Seconds to store replication positions.
An object representing a primary or replica DB position in a replicated setup.
Database cluster connection, tracking, load balancing, and transaction manager interface.
getClusterName()
Get the logical name of the database cluster.
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
waitFor( $pos)
Set the primary position to reach before the next generic group DB handle query.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
getWriterIndex()
Get the specific server index of the primary server.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
getServerName( $i)
Get the readable name of the server with the specified index.