MediaWiki master
ChronologyProtector.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\Rdbms;
21
22use LogicException;
23use Psr\Log\LoggerInterface;
24use Psr\Log\NullLogger;
27
135 private $requestInfo;
137 private string $secret;
138 private bool $cliMode;
140 private $store;
142 protected $logger;
143
145 protected $key;
147 protected $clientId;
149 protected $clientLogInfo;
152
154 protected $enabled = true;
155
160
172 private $hasNewClientId = false;
173
175 public const POSITION_COOKIE_TTL = 10;
177 private const POSITION_STORE_TTL = 60;
178
180 private const LOCK_TIMEOUT = 3;
182 private const LOCK_TTL = 6;
183
184 private const FLD_POSITIONS = 'positions';
185 private const FLD_WRITE_INDEX = 'writeIndex';
186
194 public function __construct( $cpStash = null, $secret = null, $cliMode = null, $logger = null ) {
195 $this->requestInfo = [
196 'IPAddress' => $_SERVER['REMOTE_ADDR'] ?? '',
197 'UserAgent' => $_SERVER['HTTP_USER_AGENT'] ?? '',
198 // Headers application can inject via LBFactory::setRequestInfo()
199 'ChronologyClientId' => null, // prior $cpClientId value from LBFactory::shutdown()
200 'ChronologyPositionIndex' => null // prior $cpIndex value from LBFactory::shutdown()
201 ];
202 $this->store = $cpStash ?? new EmptyBagOStuff();
203 $this->secret = $secret ?? '';
204 $this->logger = $logger ?? new NullLogger();
205 $this->cliMode = $cliMode ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
206 }
207
208 private function load() {
209 // Not enabled or already loaded, short-circuit.
210 if ( !$this->enabled || $this->clientId ) {
211 return;
212 }
213 $client = [
214 'ip' => $this->requestInfo['IPAddress'],
215 'agent' => $this->requestInfo['UserAgent'],
216 'clientId' => $this->requestInfo['ChronologyClientId'] ?: null
217 ];
218 if ( $this->cliMode ) {
219 $this->setEnabled( false );
220 } elseif ( $this->store instanceof EmptyBagOStuff ) {
221 // No where to store any DB positions and wait for them to appear
222 $this->setEnabled( false );
223 $this->logger->debug( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
224 }
225
226 if ( isset( $client['clientId'] ) ) {
227 $this->clientId = $client['clientId'];
228 } else {
229 $this->hasNewClientId = true;
230 $this->clientId = ( $this->secret != '' )
231 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $this->secret )
232 : md5( $client['ip'] . "\n" . $client['agent'] );
233 }
234 $this->key = $this->store->makeGlobalKey( __CLASS__, $this->clientId, 'v4' );
235 $this->waitForPosIndex = $this->requestInfo['ChronologyPositionIndex'];
236
237 $this->clientLogInfo = [
238 'clientIP' => $client['ip'],
239 'clientAgent' => $client['agent'],
240 'clientId' => $client['clientId'] ?? null
241 ];
242 }
243
244 public function setRequestInfo( array $info ) {
245 if ( $this->clientId ) {
246 throw new LogicException( 'ChronologyProtector already initialized' );
247 }
248
249 $this->requestInfo = $info + $this->requestInfo;
250 }
251
256 public function getClientId() {
257 $this->load();
258 return $this->clientId;
259 }
260
265 public function setEnabled( $enabled ) {
266 $this->enabled = $enabled;
267 }
268
282 public function getSessionPrimaryPos( ILoadBalancer $lb ) {
283 $this->load();
284 if ( !$this->enabled ) {
285 return null;
286 }
287
288 $cluster = $lb->getClusterName();
289 $primaryName = $lb->getServerName( ServerInfo::WRITER_INDEX );
290
291 $pos = $this->getStartupSessionPositions()[$primaryName] ?? null;
292 if ( $pos instanceof DBPrimaryPos ) {
293 $this->logger->debug( "ChronologyProtector will wait for '$pos' on $cluster ($primaryName)'" );
294 } else {
295 $this->logger->debug( "ChronologyProtector skips wait on $cluster ($primaryName)" );
296 }
297
298 return $pos;
299 }
300
312 public function stageSessionPrimaryPos( ILoadBalancer $lb ) {
313 $this->load();
314 if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
315 return;
316 }
317
318 $cluster = $lb->getClusterName();
319 $masterName = $lb->getServerName( ServerInfo::WRITER_INDEX );
320
321 // If LoadBalancer::hasStreamingReplicaServers is false (single DB host),
322 // or if the database type has no replication (i.e. SQLite), then we do not need to
323 // save any position data, as it would never be loaded or waited for. When we save this
324 // data, it is for DB_REPLICA queries in future requests, which load it via
325 // ChronologyProtector::getSessionPrimaryPos (from LoadBalancer::getReaderIndex)
326 // and wait for that position. In a single-server setup, all queries go the primary DB.
327 //
328 // In that case we still store a null value, so that ChronologyProtector::getTouched
329 // reliably detects recent writes for non-database purposes,
330 // such as ParserOutputAccess/PoolWorkArticleViewCurrent. This also makes getTouched()
331 // easier to setup and test, as we it work work always once CP is enabled
332 // (e.g. wgMicroStashType or wgMainCacheType set to a non-DB cache).
333 if ( $lb->hasStreamingReplicaServers() ) {
334 $pos = $lb->getPrimaryPos();
335 if ( $pos ) {
336 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
337 $this->shutdownPositionsByPrimary[$masterName] = $pos;
338 } else {
339 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
340 $this->shutdownPositionsByPrimary[$masterName] = null;
341 }
342 } else {
343 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
344 $this->shutdownPositionsByPrimary[$masterName] = null;
345 }
346 }
347
356 public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
357 $this->load();
358 if ( !$this->enabled ) {
359 return [];
360 }
361
362 if ( !$this->shutdownPositionsByPrimary ) {
363 $this->logger->debug( __METHOD__ . ": no primary positions data to save" );
364
365 return [];
366 }
367
368 $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
369 if ( $scopeLock ) {
370 $positions = $this->mergePositions(
371 $this->unmarshalPositions( $this->store->get( $this->key ) ),
372 $this->shutdownPositionsByPrimary,
373 $clientPosIndex
374 );
375
376 $ok = $this->store->set(
377 $this->key,
378 $this->marshalPositions( $positions ),
379 self::POSITION_STORE_TTL
380 );
381 unset( $scopeLock );
382 } else {
383 $ok = false;
384 }
385
386 $primaryList = implode( ', ', array_keys( $this->shutdownPositionsByPrimary ) );
387
388 if ( $ok ) {
389 $this->logger->debug( "ChronologyProtector saved position data for $primaryList" );
390 $bouncedPositions = [];
391 } else {
392 // Maybe position store is down
393 $this->logger->warning( "ChronologyProtector failed to save position data for $primaryList" );
394 $clientPosIndex = null;
395 $bouncedPositions = $this->shutdownPositionsByPrimary;
396 }
397
398 return $bouncedPositions;
399 }
400
415 public function getTouched() {
416 $this->load();
417
418 if ( !$this->enabled ) {
419 return false;
420 }
421
422 if ( $this->getStartupSessionPositions() ) {
423 $this->logger->debug( __METHOD__ . ": found recent writes" );
424 return true;
425 }
426
427 $this->logger->debug( __METHOD__ . ": found no recent writes" );
428 return false;
429 }
430
434 protected function getStartupSessionPositions() {
435 $this->lazyStartup();
436
438 }
439
445 protected function lazyStartup() {
446 if ( $this->startupPositionsByPrimary !== null ) {
447 return;
448 }
449
450 // There wasn't a client id in the cookie so we built one
451 // There is no point in looking it up.
452 if ( $this->hasNewClientId ) {
453 $this->startupPositionsByPrimary = [];
454 return;
455 }
456
457 $this->logger->debug( 'ChronologyProtector using store ' . get_class( $this->store ) );
458 $this->logger->debug( "ChronologyProtector fetching positions for {$this->clientId}" );
459
460 $data = $this->unmarshalPositions( $this->store->get( $this->key ) );
461
462 $this->startupPositionsByPrimary = $data ? $data[self::FLD_POSITIONS] : [];
463
464 // When a stored array expires and is re-created under the same (deterministic) key,
465 // the array value naturally starts again from index zero. As such, it is possible
466 // that if certain store writes were lost (e.g. store down), that we unintentionally
467 // point to an offset in an older incarnation of the array.
468 // We don't try to detect or do something about this because:
469 // 1. Waiting for an older offset is harmless and generally no-ops.
470 // 2. The older value will have expired by now and thus treated as non-existing,
471 // which means we wouldn't even "see" it here.
472 $indexReached = is_array( $data ) ? $data[self::FLD_WRITE_INDEX] : null;
473 if ( $this->waitForPosIndex > 0 ) {
474 if ( $indexReached >= $this->waitForPosIndex ) {
475 $this->logger->debug( 'expected and found position index {cpPosIndex}', [
476 'cpPosIndex' => $this->waitForPosIndex,
477 ] + $this->clientLogInfo );
478 } else {
479 $this->logger->warning( 'expected but failed to find position index {cpPosIndex}', [
480 'cpPosIndex' => $this->waitForPosIndex,
481 'indexReached' => $indexReached,
482 'exception' => new \RuntimeException(),
483 ] + $this->clientLogInfo );
484 }
485 } else {
486 if ( $indexReached ) {
487 $this->logger->debug( 'found position data with index {indexReached}', [
488 'indexReached' => $indexReached
489 ] + $this->clientLogInfo );
490 }
491 }
492 }
493
502 protected function mergePositions(
503 $storedValue,
504 array $shutdownPositions,
505 ?int &$clientPosIndex = null
506 ) {
508 $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
509 // Use the newest positions for each DB primary
510 foreach ( $shutdownPositions as $masterName => $pos ) {
511 if (
512 !isset( $mergedPositions[$masterName] ) ||
513 !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) ||
514 $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
515 ) {
516 $mergedPositions[$masterName] = $pos;
517 }
518 }
519
520 $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
521
522 return [
523 self::FLD_POSITIONS => $mergedPositions,
524 self::FLD_WRITE_INDEX => $clientPosIndex
525 ];
526 }
527
528 private function marshalPositions( array $positions ): array {
529 foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
530 if ( $pos ) {
531 $positions[ self::FLD_POSITIONS ][ $key ] = $pos->toArray();
532 }
533 }
534
535 return $positions;
536 }
537
542 private function unmarshalPositions( $positions ) {
543 if ( !$positions ) {
544 return $positions;
545 }
546
547 foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
548 if ( $pos ) {
549 $class = $pos[ '_type_' ];
550 $positions[ self::FLD_POSITIONS ][ $key ] = $class::newFromArray( $pos );
551 }
552 }
553
554 return $positions;
555 }
556
566 public static function makeCookieValueFromCPIndex(
567 int $writeIndex,
568 int $time,
569 string $clientId
570 ) {
571 // Format is "<write index>@<write timestamp>#<client ID hash>"
572 return "{$writeIndex}@{$time}#{$clientId}";
573 }
574
583 public static function getCPInfoFromCookieValue( ?string $value, int $minTimestamp ) {
584 static $placeholder = [ 'index' => null, 'clientId' => null ];
585
586 if ( $value === null ) {
587 return $placeholder; // not set
588 }
589
590 // Format is "<write index>@<write timestamp>#<client ID hash>"
591 if ( !preg_match( '/^(\d+)@(\d+)#([0-9a-f]{32})$/', $value, $m ) ) {
592 return $placeholder; // invalid
593 }
594
595 $index = (int)$m[1];
596 if ( $index <= 0 ) {
597 return $placeholder; // invalid
598 } elseif ( isset( $m[2] ) && $m[2] !== '' && (int)$m[2] < $minTimestamp ) {
599 return $placeholder; // expired
600 }
601
602 $clientId = ( isset( $m[3] ) && $m[3] !== '' ) ? $m[3] : null;
603
604 return [ 'index' => $index, 'clientId' => $clientId ];
605 }
606}
Abstract class for any ephemeral data store.
Definition BagOStuff.php:87
No-op implementation that stores nothing.
Provide a given client with protection against visible database lag.
array< string, DBPrimaryPos|null > $shutdownPositionsByPrimary
Map of (primary server name => position)
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
getTouched()
Whether the request came from a client that recently made database changes (last 10 seconds).
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
mergePositions( $storedValue, array $shutdownPositions, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
static makeCookieValueFromCPIndex(int $writeIndex, int $time, string $clientId)
Build a string conveying the client and write index of the chronology protector data.
stageSessionPrimaryPos(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
static getCPInfoFromCookieValue(?string $value, int $minTimestamp)
Parse a string conveying the client and write index of the chronology protector data.
array< string, DBPrimaryPos|null > $startupPositionsByPrimary
Map of (primary server name => position)
string $clientId
Hash of client parameters.
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
string[] $clientLogInfo
Map of client information fields for logging.
getSessionPrimaryPos(ILoadBalancer $lb)
Yield client "session consistency" replication position for a new ILoadBalancer.
__construct( $cpStash=null, $secret=null, $cliMode=null, $logger=null)
An object representing a primary or replica DB position in a replicated setup.
This class is a delegate to ILBFactory for a given database cluster.
getClusterName()
Get the name of the overall cluster of database servers managing the dataset.
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getPrimaryPos()
Get the current primary replication position.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
getServerName( $i)
Get the readable name of the server with the specified index.