MediaWiki REL1_39
ChronologyProtector.php
Go to the documentation of this file.
1<?php
21
22use BagOStuff;
23use Psr\Log\LoggerAwareInterface;
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
26
132class ChronologyProtector implements LoggerAwareInterface {
134 protected $store;
136 protected $logger;
137
139 protected $key;
141 protected $clientId;
143 protected $clientLogInfo;
146
148 protected $enabled = true;
150 protected $positionWaitsEnabled = true;
153
162
164 private $wallClockOverride;
165
167 public const POSITION_COOKIE_TTL = 10;
169 private const POSITION_STORE_TTL = 60;
170
172 private const LOCK_TIMEOUT = 3;
174 private const LOCK_TTL = 6;
175
176 private const FLD_POSITIONS = 'positions';
177 private const FLD_TIMESTAMPS = 'timestamps';
178 private const FLD_WRITE_INDEX = 'writeIndex';
179
187 public function __construct(
189 array $client,
190 ?int $clientPosIndex,
191 string $secret = ''
192 ) {
193 $this->store = $store;
194
195 if ( isset( $client['clientId'] ) ) {
196 $this->clientId = $client['clientId'];
197 } else {
198 $this->clientId = ( $secret != '' )
199 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
200 : md5( $client['ip'] . "\n" . $client['agent'] );
201 }
202 $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v3' );
203 $this->waitForPosIndex = $clientPosIndex;
204
205 $this->clientLogInfo = [
206 'clientIP' => $client['ip'],
207 'clientAgent' => $client['agent'],
208 'clientId' => $client['clientId'] ?? null
209 ];
210
211 $this->logger = new NullLogger();
212 }
213
214 public function setLogger( LoggerInterface $logger ) {
215 $this->logger = $logger;
216 }
217
222 public function getClientId() {
223 return $this->clientId;
224 }
225
230 public function setEnabled( $enabled ) {
231 $this->enabled = $enabled;
232 }
233
238 public function setWaitEnabled( $enabled ) {
239 $this->positionWaitsEnabled = $enabled;
240 }
241
256 if ( !$this->enabled || !$this->positionWaitsEnabled ) {
257 return;
258 }
259
260 $cluster = $lb->getClusterName();
261 $primaryName = $lb->getServerName( $lb->getWriterIndex() );
262
263 $pos = $this->getStartupSessionPositions()[$primaryName] ?? null;
264 if ( $pos instanceof DBPrimaryPos ) {
265 $this->logger->debug( __METHOD__ . ": $cluster ($primaryName) position is '$pos'" );
266 $lb->waitFor( $pos );
267 } else {
268 $this->logger->debug( __METHOD__ . ": $cluster ($primaryName) has no position" );
269 }
270 }
271
284 if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
285 return;
286 }
287
288 $cluster = $lb->getClusterName();
289 $masterName = $lb->getServerName( $lb->getWriterIndex() );
290
291 if ( $lb->hasStreamingReplicaServers() ) {
292 $pos = $lb->getReplicaResumePos();
293 if ( $pos ) {
294 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
295 $this->shutdownPositionsByPrimary[$masterName] = $pos;
296 $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
297 } else {
298 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
299 $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
300 }
301 } else {
302 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
303 $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
304 }
305 }
306
315 public function persistSessionReplicationPositions( &$clientPosIndex = null ) {
316 if ( !$this->enabled ) {
317 return [];
318 }
319
320 if ( !$this->shutdownTimestampsByCluster ) {
321 $this->logger->debug( __METHOD__ . ": no primary positions/timestamps to save" );
322
323 return [];
324 }
325
326 $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
327 if ( $scopeLock ) {
328 $positions = $this->mergePositions(
329 $this->unmarshalPositions( $this->store->get( $this->key ) ),
330 $this->shutdownPositionsByPrimary,
331 $this->shutdownTimestampsByCluster,
332 $clientPosIndex
333 );
334
335 $ok = $this->store->set(
336 $this->key,
337 $this->marshalPositions( $positions ),
338 self::POSITION_STORE_TTL
339 );
340 unset( $scopeLock );
341 } else {
342 $ok = false;
343 }
344
345 $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
346
347 if ( $ok ) {
348 $bouncedPositions = [];
349 $this->logger->debug(
350 __METHOD__ . ": saved primary positions/timestamp for DB cluster(s) $clusterList"
351 );
352
353 } else {
354 $clientPosIndex = null; // nothing saved
355 $bouncedPositions = $this->shutdownPositionsByPrimary;
356 // Raced out too many times or stash is down
357 $this->logger->warning(
358 __METHOD__ . ": failed to save primary positions for DB cluster(s) $clusterList"
359 );
360 }
361
362 return $bouncedPositions;
363 }
364
374 public function getTouched( ILoadBalancer $lb ) {
375 if ( !$this->enabled ) {
376 return false;
377 }
378
379 $cluster = $lb->getClusterName();
380
381 $timestampsByCluster = $this->getStartupSessionTimestamps();
382 $timestamp = $timestampsByCluster[$cluster] ?? null;
383 if ( $timestamp === null ) {
384 $recentTouchTimestamp = false;
385 } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
386 // If the position store is not replicated among datacenters and the cookie that
387 // sticks the client to the primary datacenter expires, then the touch timestamp
388 // will be found for requests in one datacenter but not others. For consistency,
389 // return false once the user is no longer routed to the primary datacenter.
390 $recentTouchTimestamp = false;
391 $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
392 } else {
393 $recentTouchTimestamp = $timestamp;
394 $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
395 }
396
397 return $recentTouchTimestamp;
398 }
399
403 protected function getStartupSessionPositions() {
404 $this->lazyStartup();
405
407 }
408
412 protected function getStartupSessionTimestamps() {
413 $this->lazyStartup();
414
416 }
417
423 protected function lazyStartup() {
424 if ( $this->startupTimestamp !== null ) {
425 return;
426 }
427
428 $this->startupTimestamp = $this->getCurrentTime();
429 $this->logger->debug( 'ChronologyProtector using store ' . get_class( $this->store ) );
430 $this->logger->debug( "ChronologyProtector fetching positions for {$this->clientId}" );
431
432 $data = $this->unmarshalPositions( $this->store->get( $this->key ) );
433
434 $this->startupPositionsByPrimary = $data ? $data[self::FLD_POSITIONS] : [];
435 $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
436
437 // When a stored array expires and is re-created under the same (deterministic) key,
438 // the array value naturally starts again from index zero. As such, it is possible
439 // that if certain store writes were lost (e.g. store down), that we unintentionally
440 // point to an offset in an older incarnation of the array.
441 // We don't try to detect or do something about this because:
442 // 1. Waiting for an older offset is harmless and generally no-ops.
443 // 2. The older value will have expired by now and thus treated as non-existing,
444 // which means we wouldn't even "see" it here.
445 $indexReached = is_array( $data ) ? $data[self::FLD_WRITE_INDEX] : null;
446 if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
447 if ( $indexReached >= $this->waitForPosIndex ) {
448 $this->logger->debug( 'expected and found position index {cpPosIndex}', [
449 'cpPosIndex' => $this->waitForPosIndex,
450 ] + $this->clientLogInfo );
451 } else {
452 $this->logger->warning( 'expected but failed to find position index {cpPosIndex}', [
453 'cpPosIndex' => $this->waitForPosIndex,
454 'indexReached' => $indexReached,
455 'exception' => new \RuntimeException(),
456 ] + $this->clientLogInfo );
457 }
458 } else {
459 if ( $indexReached ) {
460 $this->logger->debug( 'found position data with index {indexReached}', [
461 'indexReached' => $indexReached
462 ] + $this->clientLogInfo );
463 }
464 }
465 }
466
476 protected function mergePositions(
477 $storedValue,
478 array $shutdownPositions,
479 array $shutdownTimestamps,
480 ?int &$clientPosIndex = null
481 ) {
483 $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
484 // Use the newest positions for each DB primary
485 foreach ( $shutdownPositions as $masterName => $pos ) {
486 if (
487 !isset( $mergedPositions[$masterName] ) ||
488 !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) ||
489 $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
490 ) {
491 $mergedPositions[$masterName] = $pos;
492 }
493 }
494
496 $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
497 // Use the newest touch timestamp for each DB primary
498 foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
499 if (
500 !isset( $mergedTimestamps[$cluster] ) ||
501 $timestamp > $mergedTimestamps[$cluster]
502 ) {
503 $mergedTimestamps[$cluster] = $timestamp;
504 }
505 }
506
507 $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1;
508
509 return [
510 self::FLD_POSITIONS => $mergedPositions,
511 self::FLD_TIMESTAMPS => $mergedTimestamps,
512 self::FLD_WRITE_INDEX => $clientPosIndex
513 ];
514 }
515
521 protected function getCurrentTime() {
522 if ( $this->wallClockOverride ) {
523 return $this->wallClockOverride;
524 }
525
526 $clockTime = (float)time(); // call this first
527 // microtime() can severely drift from time() and the microtime() value of other threads.
528 // Instead of seeing the current time as being in the past, use the value of time().
529 return max( microtime( true ), $clockTime );
530 }
531
537 public function setMockTime( &$time ) {
538 $this->wallClockOverride =& $time;
539 }
540
541 private function marshalPositions( array $positions ) {
542 foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
543 $positions[ self::FLD_POSITIONS ][ $key ] = $pos->toArray();
544 }
545
546 return $positions;
547 }
548
553 private function unmarshalPositions( $positions ) {
554 if ( !$positions ) {
555 return $positions;
556 }
557
558 foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) {
559 $class = $pos[ '_type_' ];
560 $positions[ self::FLD_POSITIONS ][ $key ] = $class::newFromArray( $pos );
561 }
562
563 return $positions;
564 }
565}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:85
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.
Provide a given client with protection against visible database lag.
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, ?int &$clientPosIndex=null)
Merge the new replication positions with the currently stored ones (highest wins)
array< string, DBPrimaryPos > $shutdownPositionsByPrimary
Map of (primary server name => position)
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored replication positions and touch timestamps for the client.
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
__construct(BagOStuff $store, array $client, ?int $clientPosIndex, string $secret='')
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
array< string, DBPrimaryPos > $startupPositionsByPrimary
Map of (primary server name => position)
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
applySessionReplicationPosition(ILoadBalancer $lb)
Apply client "session consistency" replication position to a new ILoadBalancer.
string $clientId
Hash of client parameters.
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
persistSessionReplicationPositions(&$clientPosIndex=null)
Persist any staged client "session consistency" replication positions.
string[] $clientLogInfo
Map of client information fields for logging.
bool $positionWaitsEnabled
Whether waiting on DB servers to reach replication positions is enabled.
stageSessionReplicationPosition(ILoadBalancer $lb)
Update client "session consistency" replication position for an end-of-life ILoadBalancer.
An object representing a primary or replica DB position in a replicated setup.
Create and track the database connections and transactions for a given database cluster.
getClusterName()
Get the logical name of the database cluster.
hasOrMadeRecentPrimaryChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
waitFor( $pos)
Set the primary position to reach before the next generic group DB query.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
getWriterIndex()
Get the specific server index of the primary server.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the primary server.
getServerName( $i)
Get the readable name of the server with the specified index.