MediaWiki master
ChronologyProtector.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\Rdbms;
21
22use LogicException;
23use Psr\Log\LoggerAwareInterface;
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
28
134class ChronologyProtector implements LoggerAwareInterface {
136 private $requestInfo;
138 private string $secret;
139 private bool $cliMode;
141 private $store;
143 protected $logger;
144
146 protected $key;
148 protected $clientId;
150 protected $clientLogInfo;
153
155 protected $enabled = true;
158
167
169 private $wallClockOverride;
170
182 private $hasNewClientId = false;
183
185 public const POSITION_COOKIE_TTL = 10;
187 private const POSITION_STORE_TTL = 60;
188
190 private const LOCK_TIMEOUT = 3;
192 private const LOCK_TTL = 6;
193
194 private const FLD_POSITIONS = 'positions';
195 private const FLD_TIMESTAMPS = 'timestamps';
196 private const FLD_WRITE_INDEX = 'writeIndex';
197
205 public function __construct( $cpStash = null, $secret = null, $cliMode = null, $logger = null ) {
206 $this->requestInfo = [
207 'IPAddress' => $_SERVER['REMOTE_ADDR'] ?? '',
208 'UserAgent' => $_SERVER['HTTP_USER_AGENT'] ?? '',
209 // Headers application can inject via LBFactory::setRequestInfo()
210 'ChronologyClientId' => null, // prior $cpClientId value from LBFactory::shutdown()
211 'ChronologyPositionIndex' => null // prior $cpIndex value from LBFactory::shutdown()
212 ];
213 $this->store = $cpStash ?? new EmptyBagOStuff();
214 $this->secret = $secret ?? '';
215 $this->logger = $logger ?? new NullLogger();
216 $this->cliMode = $cliMode ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
217 }
218
219 private function load() {
220 // Not enabled or already loaded, short-circuit.
221 if ( !$this->enabled || $this->clientId ) {
222 return;
223 }
224 $client = [
225 'ip' => $this->requestInfo['IPAddress'],
226 'agent' => $this->requestInfo['UserAgent'],
227 'clientId' => $this->requestInfo['ChronologyClientId'] ?: null
228 ];
229 if ( $this->cliMode ) {
230 $this->setEnabled( false );
231 } elseif ( $this->store instanceof EmptyBagOStuff ) {
232 // No where to store any DB positions and wait for them to appear
233 $this->setEnabled( false );
234 $this->logger->debug( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
235 }
236
237 if ( isset( $client['clientId'] ) ) {
238 $this->clientId = $client['clientId'];
239 } else {
240 $this->hasNewClientId = true;
241 $this->clientId = ( $this->secret != '' )
242 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $this->secret )
243 : md5( $client['ip'] . "\n" . $client['agent'] );
244 }
245 $this->key = $this->store->makeGlobalKey( __CLASS__, $this->clientId, 'v4' );
246 $this->waitForPosIndex = $this->requestInfo['ChronologyPositionIndex'];
247
248 $this->clientLogInfo = [
249 'clientIP' => $client['ip'],
250 'clientAgent' => $client['agent'],
251 'clientId' => $client['clientId'] ?? null
252 ];
253 }
254
255 public function setRequestInfo( array $info ) {
256 if ( $this->clientId ) {
257 throw new LogicException( 'ChronologyProtector already initialized' );
258 }
259
260 $this->requestInfo = $info + $this->requestInfo;
261 }
262
263 public function setLogger( LoggerInterface $logger ) {
264 $this->load();
265 $this->logger = $logger;
266 }
267
272 public function getClientId() {
273 $this->load();
274 return $this->clientId;
275 }
276
281 public function setEnabled( $enabled ) {
282 $this->enabled = $enabled;
283 }
284
298 public function getSessionPrimaryPos( ILoadBalancer $lb ) {
299 $this->load();
300 if ( !$this->enabled ) {
301 return null;
302 }
303
304 $cluster = $lb->getClusterName();
305 $primaryName = $lb->getServerName( ServerInfo::WRITER_INDEX );
306
307 $pos = $this->getStartupSessionPositions()[$primaryName] ?? null;
308 if ( $pos instanceof DBPrimaryPos ) {
309 $this->logger->debug( "ChronologyProtector will wait for '$pos' on $cluster ($primaryName)'" );
310 } else {
311 $this->logger->debug( "ChronologyProtector skips wait on $cluster ($primaryName)" );
312 }
313
314 return $pos;
315 }
316
328 public function stageSessionPrimaryPos( ILoadBalancer $lb ) {
329 $this->load();
330 if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
331 return;
332 }
333
334 $cluster = $lb->getClusterName();
335 $masterName = $lb->getServerName( ServerInfo::WRITER_INDEX );
336
337 if ( $lb->hasStreamingReplicaServers() ) {
338 $pos = $lb->getPrimaryPos();
339 if ( $pos ) {
340 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
341 $this->shutdownPositionsByPrimary[$masterName] = $pos;
342 $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
343 } else {
344 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
345 $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
346 }
347 } else {
348 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
349 $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
350 }
351 }
352
361 public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
362 $this->load();
363 if ( !$this->enabled ) {
364 return [];
365 }
366
367 if ( !$this->shutdownTimestampsByCluster ) {
368 $this->logger->debug( __METHOD__ . ": no primary positions data to save" );
369
370 return [];
371 }
372
373 $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
374 if ( $scopeLock ) {
375 $positions = $this->mergePositions(
376 $this->unmarshalPositions( $this->store->get( $this->key ) ),
377 $this->shutdownPositionsByPrimary,
378 $this->shutdownTimestampsByCluster,
379 $clientPosIndex
380 );
381
382 $ok = $this->store->set(
383 $this->key,
384 $this->marshalPositions( $positions ),
385 self::POSITION_STORE_TTL
386 );
387 unset( $scopeLock );
388 } else {
389 $ok = false;
390 }
391
392 $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
393
394 if ( $ok ) {
395 $this->logger->debug( "ChronologyProtector saved position data for $clusterList" );
396 $bouncedPositions = [];
397 } else {
398 // Maybe position store is down
399 $this->logger->warning( "ChronologyProtector failed to save position data for $clusterList" );
400 $clientPosIndex = null;
401 $bouncedPositions = $this->shutdownPositionsByPrimary;
402 }
403
404 return $bouncedPositions;
405 }
406
416 public function getTouched( ILoadBalancer $lb ) {
417 $this->load();
418 if ( !$this->enabled ) {
419 return false;
420 }
421
422 $cluster = $lb->getClusterName();
423
424 $timestampsByCluster = $this->getStartupSessionTimestamps();
425 $timestamp = $timestampsByCluster[$cluster] ?? null;
426 if ( $timestamp === null ) {
427 $recentTouchTimestamp = false;
428 } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
429 // If the position store is not replicated among datacenters and the cookie that
430 // sticks the client to the primary datacenter expires, then the touch timestamp
431 // will be found for requests in one datacenter but not others. For consistency,
432 // return false once the user is no longer routed to the primary datacenter.
433 $recentTouchTimestamp = false;
434 $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
435 } else {
436 $recentTouchTimestamp = $timestamp;
437 $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
438 }
439
440 return $recentTouchTimestamp;
441 }
442
446 protected function getStartupSessionPositions() {
447 $this->lazyStartup();
448
450 }
451
455 protected function getStartupSessionTimestamps() {
456 $this->lazyStartup();
457
459 }
460
466 protected function lazyStartup() {
467 if ( $this->startupTimestamp !== null ) {
468 return;
469 }
470
471 $this->startupTimestamp = $this->getCurrentTime();
472
473 // There wasn't a client id in the cookie so we built one
474 // There is no point in looking it up.
475 if ( $this->hasNewClientId ) {
476 $this->startupPositionsByPrimary = [];
477 $this->startupTimestampsByCluster = [];
478 return;
479 }
480
481 $this->logger->debug( 'ChronologyProtector using store ' . get_class( $this->store ) );
482 $this->logger->debug( "ChronologyProtector fetching positions for {$this->clientId}" );
483
484 $data = $this->unmarshalPositions( $this->store->get( $this->key ) );
485
486 $this->startupPositionsByPrimary = $data ? $data[self::FLD_POSITIONS] : [];
487 $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
488
489 // When a stored array expires and is re-created under the same (deterministic) key,
490 // the array value naturally starts again from index zero. As such, it is possible
491 // that if certain store writes were lost (e.g. store down), that we unintentionally
492 // point to an offset in an older incarnation of the array.
493 // We don't try to detect or do something about this because:
494 // 1. Waiting for an older offset is harmless and generally no-ops.
495 // 2. The older value will have expired by now and thus treated as non-existing,
496 // which means we wouldn't even "see" it here.
497 $indexReached = is_array( $data ) ? $data[self::FLD_WRITE_INDEX] : null;
498 if ( $this->waitForPosIndex > 0 ) {
499 if ( $indexReached >= $this->waitForPosIndex ) {
500 $this->logger->debug( 'expected and found position index {cpPosIndex}', [
501 'cpPosIndex' => $this->waitForPosIndex,
502 ] + $this->clientLogInfo );
503 } else {
504 $this->logger->warning( 'expected but failed to find position index {cpPosIndex}', [
505 'cpPosIndex' => $this->waitForPosIndex,
506 'indexReached' => $indexReached,
507 'exception' => new \RuntimeException(),
508 ] + $this->clientLogInfo );
509 }
510 } else {
511 if ( $indexReached ) {
512 $this->logger->debug( 'found position data with index {indexReached}', [
513 'indexReached' => $indexReached
514 ] + $this->clientLogInfo );
515 }
516 }
517 }
518
528 protected function mergePositions(
529 $storedValue,
530 array $shutdownPositions,
531 array $shutdownTimestamps,
532 ?int &$clientPosIndex = null
533 ) {
535 $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
536 // Use the newest positions for each DB primary
537 foreach ( $shutdownPositions as $masterName => $pos ) {
538 if (
539 !isset( $mergedPositions[$masterName] ) ||
540 !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) ||
541 $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
542 ) {
543 $mergedPositions[$masterName] = $pos;
544 }
545 }
546
548 $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
549 // Use the newest touch timestamp for each DB primary
550 foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
551 if (
552 !isset( $mergedTimestamps[$cluster] ) ||
553 $timestamp > $mergedTimestamps[$cluster]
554 ) {
555 $mergedTimestamps[$cluster] = $timestamp;
556 }
557 }
558
559 $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
560
561 return [
562 self::FLD_POSITIONS => $mergedPositions,
563 self::FLD_TIMESTAMPS => $mergedTimestamps,
564 self::FLD_WRITE_INDEX => $clientPosIndex
565 ];
566 }
567
573 protected function getCurrentTime() {
574 if ( $this->wallClockOverride ) {
575 return $this->wallClockOverride;
576 }
577
578 $clockTime = (float)time(); // call this first
579 // microtime() can severely drift from time() and the microtime() value of other threads.
580 // Instead of seeing the current time as being in the past, use the value of time().
581 return max( microtime( true ), $clockTime );
582 }
583
589 public function setMockTime( &$time ) {
590 $this->load();
591 $this->wallClockOverride =& $time;
592 }
593
594 private function marshalPositions( array $positions ) {
595 foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
596 $positions[ self::FLD_POSITIONS ][ $key ] = $pos->toArray();
597 }
598
599 return $positions;
600 }
601
606 private function unmarshalPositions( $positions ) {
607 if ( !$positions ) {
608 return $positions;
609 }
610
611 foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
612 $class = $pos[ '_type_' ];
613 $positions[ self::FLD_POSITIONS ][ $key ] = $class::newFromArray( $pos );
614 }
615
616 return $positions;
617 }
618
628 public static function makeCookieValueFromCPIndex(
629 int $writeIndex,
630 int $time,
631 string $clientId
632 ) {
633 // Format is "<write index>@<write timestamp>#<client ID hash>"
634 return "{$writeIndex}@{$time}#{$clientId}";
635 }
636
645 public static function getCPInfoFromCookieValue( ?string $value, int $minTimestamp ) {
646 static $placeholder = [ 'index' => null, 'clientId' => null ];
647
648 if ( $value === null ) {
649 return $placeholder; // not set
650 }
651
652 // Format is "<write index>@<write timestamp>#<client ID hash>"
653 if ( !preg_match( '/^(\d+)@(\d+)#([0-9a-f]{32})$/', $value, $m ) ) {
654 return $placeholder; // invalid
655 }
656
657 $index = (int)$m[1];
658 if ( $index <= 0 ) {
659 return $placeholder; // invalid
660 } elseif ( isset( $m[2] ) && $m[2] !== '' && (int)$m[2] < $minTimestamp ) {
661 return $placeholder; // expired
662 }
663
664 $clientId = ( isset( $m[3] ) && $m[3] !== '' ) ? $m[3] : null;
665
666 return [ 'index' => $index, 'clientId' => $clientId ];
667 }
668}
Abstract class for any ephemeral data store.
Definition BagOStuff.php:89
No-op implementation that stores nothing.
Provide a given client with protection against visible database lag.
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
array< string, DBPrimaryPos > $shutdownPositionsByPrimary
Map of (primary server name => position)
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
array< string, DBPrimaryPos > $startupPositionsByPrimary
Map of (primary server name => position)
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
static makeCookieValueFromCPIndex(int $writeIndex, int $time, string $clientId)
Build a string conveying the client and write index of the chronology protector data.
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
stageSessionPrimaryPos(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
static getCPInfoFromCookieValue(?string $value, int $minTimestamp)
Parse a string conveying the client and write index of the chronology protector data.
string $clientId
Hash of client parameters.
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
string[] $clientLogInfo
Map of client information fields for logging.
getSessionPrimaryPos(ILoadBalancer $lb)
Yield client "session consistency" replication position for a new ILoadBalancer.
__construct( $cpStash=null, $secret=null, $cliMode=null, $logger=null)
An object representing a primary or replica DB position in a replicated setup.
This class is a delegate to ILBFactory for a given database cluster.
getClusterName()
Get the name of the overall cluster of database servers managing the dataset.
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getPrimaryPos()
Get the current primary replication position.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
getServerName( $i)
Get the readable name of the server with the specified index.