MediaWiki master
ChronologyProtector.php
Go to the documentation of this file.
1<?php
6namespace Wikimedia\Rdbms;
7
8use LogicException;
9use Psr\Log\LoggerInterface;
10use Psr\Log\NullLogger;
13
121 private $requestInfo;
123 private string $secret;
124 private bool $cliMode;
126 private $store;
128 protected $logger;
129
131 protected $key;
133 protected $clientId;
135 protected $clientLogInfo;
138
140 protected $enabled = true;
141
146
158 private $hasNewClientId = false;
159
161 public const POSITION_COOKIE_TTL = 10;
163 private const POSITION_STORE_TTL = 60;
164
166 private const LOCK_TIMEOUT = 3;
168 private const LOCK_TTL = 6;
169
170 private const FLD_POSITIONS = 'positions';
171 private const FLD_WRITE_INDEX = 'writeIndex';
172
180 public function __construct( $cpStash = null, $secret = null, $cliMode = null, $logger = null ) {
181 $this->requestInfo = [
182 'IPAddress' => $_SERVER['REMOTE_ADDR'] ?? '',
183 'UserAgent' => $_SERVER['HTTP_USER_AGENT'] ?? '',
184 // Headers application can inject via LBFactory::setRequestInfo()
185 'ChronologyClientId' => null, // prior $cpClientId value from LBFactory::shutdown()
186 'ChronologyPositionIndex' => null // prior $cpIndex value from LBFactory::shutdown()
187 ];
188 $this->store = $cpStash ?? new EmptyBagOStuff();
189 $this->secret = $secret ?? '';
190 $this->logger = $logger ?? new NullLogger();
191 $this->cliMode = $cliMode ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
192 }
193
194 private function load() {
195 // Not enabled or already loaded, short-circuit.
196 if ( !$this->enabled || $this->clientId ) {
197 return;
198 }
199 $client = [
200 'ip' => $this->requestInfo['IPAddress'],
201 'agent' => $this->requestInfo['UserAgent'],
202 'clientId' => $this->requestInfo['ChronologyClientId'] ?: null
203 ];
204 if ( $this->cliMode ) {
205 $this->setEnabled( false );
206 } elseif ( $this->store instanceof EmptyBagOStuff ) {
207 // No where to store any DB positions and wait for them to appear
208 $this->setEnabled( false );
209 $this->logger->debug( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
210 }
211
212 if ( isset( $client['clientId'] ) ) {
213 $this->clientId = $client['clientId'];
214 } else {
215 $this->hasNewClientId = true;
216 $this->clientId = ( $this->secret != '' )
217 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $this->secret )
218 : md5( $client['ip'] . "\n" . $client['agent'] );
219 }
220 $this->key = $this->store->makeGlobalKey( __CLASS__, $this->clientId, 'v4' );
221 $this->waitForPosIndex = $this->requestInfo['ChronologyPositionIndex'];
222
223 $this->clientLogInfo = [
224 'clientIP' => $client['ip'],
225 'clientAgent' => $client['agent'],
226 'clientId' => $client['clientId'] ?? null
227 ];
228 }
229
230 public function setRequestInfo( array $info ) {
231 if ( $this->clientId ) {
232 throw new LogicException( 'ChronologyProtector already initialized' );
233 }
234
235 $this->requestInfo = $info + $this->requestInfo;
236 }
237
242 public function getClientId() {
243 $this->load();
244 return $this->clientId;
245 }
246
251 public function setEnabled( $enabled ) {
252 $this->enabled = $enabled;
253 }
254
268 public function getSessionPrimaryPos( ILoadBalancer $lb ) {
269 $this->load();
270 if ( !$this->enabled ) {
271 return null;
272 }
273
274 $cluster = $lb->getClusterName();
275 $primaryName = $lb->getServerName( ServerInfo::WRITER_INDEX );
276
277 $pos = $this->getStartupSessionPositions()[$primaryName] ?? null;
278 if ( $pos instanceof DBPrimaryPos ) {
279 $this->logger->debug( "ChronologyProtector will wait for '$pos' on $cluster ($primaryName)'" );
280 } else {
281 $this->logger->debug( "ChronologyProtector skips wait on $cluster ($primaryName)" );
282 }
283
284 return $pos;
285 }
286
298 public function stageSessionPrimaryPos( ILoadBalancer $lb ) {
299 $this->load();
300 if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
301 return;
302 }
303
304 $cluster = $lb->getClusterName();
305 $masterName = $lb->getServerName( ServerInfo::WRITER_INDEX );
306
307 // If LoadBalancer::hasStreamingReplicaServers is false (single DB host),
308 // or if the database type has no replication (i.e. SQLite), then we do not need to
309 // save any position data, as it would never be loaded or waited for. When we save this
310 // data, it is for DB_REPLICA queries in future requests, which load it via
311 // ChronologyProtector::getSessionPrimaryPos (from LoadBalancer::getReaderIndex)
312 // and wait for that position. In a single-server setup, all queries go the primary DB.
313 //
314 // In that case we still store a null value, so that ChronologyProtector::getTouched
315 // reliably detects recent writes for non-database purposes,
316 // such as ParserOutputAccess/PoolWorkArticleViewCurrent. This also makes getTouched()
317 // easier to setup and test, as we it work work always once CP is enabled
318 // (e.g. wgMicroStashType or wgMainCacheType set to a non-DB cache).
319 if ( $lb->hasStreamingReplicaServers() ) {
320 $pos = $lb->getPrimaryPos();
321 if ( $pos ) {
322 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
323 $this->shutdownPositionsByPrimary[$masterName] = $pos;
324 } else {
325 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
326 $this->shutdownPositionsByPrimary[$masterName] = null;
327 }
328 } else {
329 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
330 $this->shutdownPositionsByPrimary[$masterName] = null;
331 }
332 }
333
342 public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
343 $this->load();
344 if ( !$this->enabled ) {
345 return [];
346 }
347
348 if ( !$this->shutdownPositionsByPrimary ) {
349 $this->logger->debug( __METHOD__ . ": no primary positions data to save" );
350
351 return [];
352 }
353
354 $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
355 if ( $scopeLock ) {
356 $positions = $this->mergePositions(
357 $this->unmarshalPositions( $this->store->get( $this->key ) ),
358 $this->shutdownPositionsByPrimary,
359 $clientPosIndex
360 );
361
362 $ok = $this->store->set(
363 $this->key,
364 $this->marshalPositions( $positions ),
365 self::POSITION_STORE_TTL
366 );
367 unset( $scopeLock );
368 } else {
369 $ok = false;
370 }
371
372 $primaryList = implode( ', ', array_keys( $this->shutdownPositionsByPrimary ) );
373
374 if ( $ok ) {
375 $this->logger->debug( "ChronologyProtector saved position data for $primaryList" );
376 $bouncedPositions = [];
377 } else {
378 // Maybe position store is down
379 $this->logger->warning( "ChronologyProtector failed to save position data for $primaryList" );
380 $clientPosIndex = null;
381 $bouncedPositions = $this->shutdownPositionsByPrimary;
382 }
383
384 return $bouncedPositions;
385 }
386
401 public function getTouched() {
402 $this->load();
403
404 if ( !$this->enabled ) {
405 return false;
406 }
407
408 if ( $this->getStartupSessionPositions() ) {
409 $this->logger->debug( __METHOD__ . ": found recent writes" );
410 return true;
411 }
412
413 $this->logger->debug( __METHOD__ . ": found no recent writes" );
414 return false;
415 }
416
420 protected function getStartupSessionPositions() {
421 $this->lazyStartup();
422
424 }
425
431 protected function lazyStartup() {
432 if ( $this->startupPositionsByPrimary !== null ) {
433 return;
434 }
435
436 // There wasn't a client id in the cookie so we built one
437 // There is no point in looking it up.
438 if ( $this->hasNewClientId ) {
439 $this->startupPositionsByPrimary = [];
440 return;
441 }
442
443 $this->logger->debug( 'ChronologyProtector using store ' . get_class( $this->store ) );
444 $this->logger->debug( "ChronologyProtector fetching positions for {$this->clientId}" );
445
446 $data = $this->unmarshalPositions( $this->store->get( $this->key ) );
447
448 $this->startupPositionsByPrimary = $data ? $data[self::FLD_POSITIONS] : [];
449
450 // When a stored array expires and is re-created under the same (deterministic) key,
451 // the array value naturally starts again from index zero. As such, it is possible
452 // that if certain store writes were lost (e.g. store down), that we unintentionally
453 // point to an offset in an older incarnation of the array.
454 // We don't try to detect or do something about this because:
455 // 1. Waiting for an older offset is harmless and generally no-ops.
456 // 2. The older value will have expired by now and thus treated as non-existing,
457 // which means we wouldn't even "see" it here.
458 $indexReached = is_array( $data ) ? $data[self::FLD_WRITE_INDEX] : null;
459 if ( $this->waitForPosIndex > 0 ) {
460 if ( $indexReached >= $this->waitForPosIndex ) {
461 $this->logger->debug( 'expected and found position index {cpPosIndex}', [
462 'cpPosIndex' => $this->waitForPosIndex,
463 ] + $this->clientLogInfo );
464 } else {
465 $this->logger->warning( 'expected but failed to find position index {cpPosIndex}', [
466 'cpPosIndex' => $this->waitForPosIndex,
467 'indexReached' => $indexReached,
468 'exception' => new \RuntimeException(),
469 ] + $this->clientLogInfo );
470 }
471 } else {
472 if ( $indexReached ) {
473 $this->logger->debug( 'found position data with index {indexReached}', [
474 'indexReached' => $indexReached
475 ] + $this->clientLogInfo );
476 }
477 }
478 }
479
488 protected function mergePositions(
489 $storedValue,
490 array $shutdownPositions,
491 ?int &$clientPosIndex = null
492 ) {
494 $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
495 // Use the newest positions for each DB primary
496 foreach ( $shutdownPositions as $masterName => $pos ) {
497 if (
498 !isset( $mergedPositions[$masterName] ) ||
499 !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) ||
500 $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
501 ) {
502 $mergedPositions[$masterName] = $pos;
503 }
504 }
505
506 $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
507
508 return [
509 self::FLD_POSITIONS => $mergedPositions,
510 self::FLD_WRITE_INDEX => $clientPosIndex
511 ];
512 }
513
514 private function marshalPositions( array $positions ): array {
515 foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
516 if ( $pos ) {
517 $positions[ self::FLD_POSITIONS ][ $key ] = $pos->toArray();
518 }
519 }
520
521 return $positions;
522 }
523
528 private function unmarshalPositions( $positions ) {
529 if ( !$positions ) {
530 return $positions;
531 }
532
533 foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
534 if ( $pos ) {
535 $class = $pos[ '_type_' ];
536 $positions[ self::FLD_POSITIONS ][ $key ] = $class::newFromArray( $pos );
537 }
538 }
539
540 return $positions;
541 }
542
552 public static function makeCookieValueFromCPIndex(
553 int $writeIndex,
554 int $time,
555 string $clientId
556 ) {
557 // Format is "<write index>@<write timestamp>#<client ID hash>"
558 return "{$writeIndex}@{$time}#{$clientId}";
559 }
560
569 public static function getCPInfoFromCookieValue( ?string $value, int $minTimestamp ) {
570 static $placeholder = [ 'index' => null, 'clientId' => null ];
571
572 if ( $value === null ) {
573 return $placeholder; // not set
574 }
575
576 // Format is "<write index>@<write timestamp>#<client ID hash>"
577 if ( !preg_match( '/^(\d+)@(\d+)#([0-9a-f]{32})$/', $value, $m ) ) {
578 return $placeholder; // invalid
579 }
580
581 $index = (int)$m[1];
582 if ( $index <= 0 ) {
583 return $placeholder; // invalid
584 } elseif ( isset( $m[2] ) && $m[2] !== '' && (int)$m[2] < $minTimestamp ) {
585 return $placeholder; // expired
586 }
587
588 $clientId = ( isset( $m[3] ) && $m[3] !== '' ) ? $m[3] : null;
589
590 return [ 'index' => $index, 'clientId' => $clientId ];
591 }
592}
Abstract class for any ephemeral data store.
Definition BagOStuff.php:73
No-op implementation that stores nothing.
Provide a given client with protection against visible database lag.
array< string, DBPrimaryPos|null > $shutdownPositionsByPrimary
Map of (primary server name => position)
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
getTouched()
Whether the request came from a client that recently made database changes (last 10 seconds).
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
mergePositions( $storedValue, array $shutdownPositions, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
static makeCookieValueFromCPIndex(int $writeIndex, int $time, string $clientId)
Build a string conveying the client and write index of the chronology protector data.
stageSessionPrimaryPos(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
static getCPInfoFromCookieValue(?string $value, int $minTimestamp)
Parse a string conveying the client and write index of the chronology protector data.
array< string, DBPrimaryPos|null > $startupPositionsByPrimary
Map of (primary server name => position)
string $clientId
Hash of client parameters.
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
string[] $clientLogInfo
Map of client information fields for logging.
getSessionPrimaryPos(ILoadBalancer $lb)
Yield client "session consistency" replication position for a new ILoadBalancer.
__construct( $cpStash=null, $secret=null, $cliMode=null, $logger=null)
An object representing a primary or replica DB position in a replicated setup.
This class is a delegate to ILBFactory for a given database cluster.
getClusterName()
Get the name of the overall cluster of database servers managing the dataset.
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getPrimaryPos()
Get the current primary replication position.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
getServerName( $i)
Get the readable name of the server with the specified index.