MediaWiki REL1_36
ChronologyProtector.php
Go to the documentation of this file.
1<?php
25
26use BagOStuff;
27use Psr\Log\LoggerAwareInterface;
28use Psr\Log\LoggerInterface;
29use Psr\Log\NullLogger;
30use Wikimedia\WaitConditionLoop;
31
135class ChronologyProtector implements LoggerAwareInterface {
137 protected $store;
139 protected $logger;
140
142 protected $key;
144 protected $clientId;
146 protected $clientLogInfo;
149
151 protected $enabled = true;
153 protected $positionWaitsEnabled = true;
156
165
168
170 public const POSITION_COOKIE_TTL = 10;
172 private const POSITION_STORE_TTL = 60;
175
177 private const LOCK_TIMEOUT = 3;
179 private const LOCK_TTL = 6;
180
181 private const FLD_POSITIONS = 'positions';
182 private const FLD_TIMESTAMPS = 'timestamps';
183 private const FLD_WRITE_INDEX = 'writeIndex';
184
192 public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
193 $this->store = $store;
194
195 if ( isset( $client['clientId'] ) ) {
196 $this->clientId = $client['clientId'];
197 } else {
198 $this->clientId = ( $secret != '' )
199 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
200 : md5( $client['ip'] . "\n" . $client['agent'] );
201 }
202 $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
203 $this->waitForPosIndex = $posIndex;
204
205 $this->clientLogInfo = [
206 'clientIP' => $client['ip'],
207 'clientAgent' => $client['agent'],
208 'clientId' => $client['clientId'] ?? null
209 ];
210
211 $this->logger = new NullLogger();
212 }
213
214 public function setLogger( LoggerInterface $logger ) {
215 $this->logger = $logger;
216 }
217
222 public function getClientId() {
223 return $this->clientId;
224 }
225
230 public function setEnabled( $enabled ) {
231 $this->enabled = $enabled;
232 }
233
238 public function setWaitEnabled( $enabled ) {
239 $this->positionWaitsEnabled = $enabled;
240 }
241
256 if ( !$this->enabled || !$this->positionWaitsEnabled ) {
257 return;
258 }
259
260 $cluster = $lb->getClusterName();
261 $masterName = $lb->getServerName( $lb->getWriterIndex() );
262
263 $pos = $this->getStartupSessionPositions()[$masterName] ?? null;
264 if ( $pos instanceof DBMasterPos ) {
265 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position is '$pos'" );
266 $lb->waitFor( $pos );
267 } else {
268 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no position" );
269 }
270 }
271
284 if ( !$this->enabled || !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
285 return;
286 }
287
288 $cluster = $lb->getClusterName();
289 $masterName = $lb->getServerName( $lb->getWriterIndex() );
290
291 if ( $lb->hasStreamingReplicaServers() ) {
292 $pos = $lb->getReplicaResumePos();
293 if ( $pos ) {
294 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
295 $this->shutdownPositionsByMaster[$masterName] = $pos;
296 $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
297 } else {
298 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
299 $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
300 }
301 } else {
302 $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
303 $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
304 }
305 }
306
315 public function shutdown( &$cpIndex = null ) {
316 if ( !$this->enabled ) {
317 return [];
318 }
319
320 if ( !$this->shutdownTimestampsByCluster ) {
321 $this->logger->debug( __METHOD__ . ": no master positions/timestamps to save" );
322
323 return [];
324 }
325
326 $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
327 if ( $scopeLock ) {
328 $ok = $this->store->set(
329 $this->key,
330 $this->mergePositions(
331 $this->store->get( $this->key ),
332 $this->shutdownPositionsByMaster,
333 $this->shutdownTimestampsByCluster,
334 $cpIndex
335 ),
336 self::POSITION_STORE_TTL
337 );
338 unset( $scopeLock );
339 } else {
340 $ok = false;
341 }
342
343 $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
344
345 if ( $ok ) {
346 $bouncedPositions = [];
347 $this->logger->debug(
348 __METHOD__ . ": saved master positions/timestamp for DB cluster(s) $clusterList"
349 );
350
351 } else {
352 $cpIndex = null; // nothing saved
353 $bouncedPositions = $this->shutdownPositionsByMaster;
354 // Raced out too many times or stash is down
355 $this->logger->warning(
356 __METHOD__ . ": failed to save master positions for DB cluster(s) $clusterList"
357 );
358 }
359
360 return $bouncedPositions;
361 }
362
372 public function getTouched( ILoadBalancer $lb ) {
373 if ( !$this->enabled ) {
374 return false;
375 }
376
377 $cluster = $lb->getClusterName();
378
379 $timestampsByCluster = $this->getStartupSessionTimestamps();
380 $timestamp = $timestampsByCluster[$cluster] ?? null;
381 if ( $timestamp === null ) {
382 $recentTouchTimestamp = false;
383 } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
384 // If the position store is not replicated among datacenters and the cookie that
385 // sticks the client to the primary datacenter expires, then the touch timestamp
386 // will be found for requests in one datacenter but not others. For consistency,
387 // return false once the user is no longer routed to the primary datacenter.
388 $recentTouchTimestamp = false;
389 $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
390 } else {
391 $recentTouchTimestamp = $timestamp;
392 $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
393 }
394
395 return $recentTouchTimestamp;
396 }
397
401 protected function getStartupSessionPositions() {
402 $this->lazyStartup();
403
405 }
406
410 protected function getStartupSessionTimestamps() {
411 $this->lazyStartup();
412
414 }
415
421 protected function lazyStartup() {
422 if ( $this->startupTimestamp !== null ) {
423 return;
424 }
425
426 $this->startupTimestamp = $this->getCurrentTime();
427 $this->logger->debug(
428 __METHOD__ .
429 ": client ID is {$this->clientId}; key is {$this->key}"
430 );
431
432 // If there is an expectation to see master positions from a certain write
433 // index or higher, then block until it appears, or until a timeout is reached.
434 // Since the write index restarts each time the key is created, it is possible that
435 // a lagged store has a matching key write index. However, in that case, it should
436 // already be expired and thus treated as non-existing, maintaining correctness.
437 if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
438 $data = null;
439 $indexReached = null; // highest index reached in the position store
440 $loop = new WaitConditionLoop(
441 function () use ( &$data, &$indexReached ) {
442 $data = $this->store->get( $this->key );
443 if ( !is_array( $data ) ) {
444 return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
445 } elseif ( !isset( $data[self::FLD_WRITE_INDEX] ) ) {
446 return WaitConditionLoop::CONDITION_REACHED; // b/c
447 }
448 $indexReached = max( $data[self::FLD_WRITE_INDEX], $indexReached );
449
450 return ( $data[self::FLD_WRITE_INDEX] >= $this->waitForPosIndex )
451 ? WaitConditionLoop::CONDITION_REACHED
452 : WaitConditionLoop::CONDITION_CONTINUE;
453 },
455 );
456 $result = $loop->invoke();
457 $waitedMs = $loop->getLastWaitTime() * 1e3;
458
459 if ( $result == $loop::CONDITION_REACHED ) {
460 $this->logger->debug(
461 __METHOD__ . ": expected and found position index {cpPosIndex}.",
462 [
463 'cpPosIndex' => $this->waitForPosIndex,
464 'waitTimeMs' => $waitedMs
465 ] + $this->clientLogInfo
466 );
467 } else {
468 $this->logger->warning(
469 __METHOD__ . ": expected but failed to find position index {cpPosIndex}.",
470 [
471 'cpPosIndex' => $this->waitForPosIndex,
472 'indexReached' => $indexReached,
473 'waitTimeMs' => $waitedMs
474 ] + $this->clientLogInfo
475 );
476 }
477 } else {
478 $data = $this->store->get( $this->key );
479 $indexReached = $data[self::FLD_WRITE_INDEX] ?? null;
480 if ( $indexReached ) {
481 $this->logger->debug(
482 __METHOD__ . ": found position/timestamp data with index {indexReached}.",
483 [ 'indexReached' => $indexReached ] + $this->clientLogInfo
484 );
485 }
486 }
487
488 $this->startupPositionsByMaster = $data ? $data[self::FLD_POSITIONS] : [];
489 $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
490 }
491
501 protected function mergePositions(
502 $storedValue,
503 array $shutdownPositions,
504 array $shutdownTimestamps,
505 &$cpIndex = null
506 ) {
508 $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
509 // Use the newest positions for each DB master
510 foreach ( $shutdownPositions as $masterName => $pos ) {
511 if (
512 !isset( $mergedPositions[$masterName] ) ||
513 !( $mergedPositions[$masterName] instanceof DBMasterPos ) ||
514 $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
515 ) {
516 $mergedPositions[$masterName] = $pos;
517 }
518 }
519
521 $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
522 // Use the newest touch timestamp for each DB master
523 foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
524 if (
525 !isset( $mergedTimestamps[$cluster] ) ||
526 $timestamp > $mergedTimestamps[$cluster]
527 ) {
528 $mergedTimestamps[$cluster] = $timestamp;
529 }
530 }
531
532 $cpIndex = $storedValue[self::FLD_WRITE_INDEX] ?? 0;
533
534 return [
535 self::FLD_POSITIONS => $mergedPositions,
536 self::FLD_TIMESTAMPS => $mergedTimestamps,
537 self::FLD_WRITE_INDEX => ++$cpIndex
538 ];
539 }
540
546 protected function getCurrentTime() {
547 if ( $this->wallClockOverride ) {
549 }
550
551 $clockTime = (float)time(); // call this first
552 // microtime() can severely drift from time() and the microtime() value of other threads.
553 // Instead of seeing the current time as being in the past, use the value of time().
554 return max( microtime( true ), $clockTime );
555 }
556
562 public function setMockTime( &$time ) {
563 $this->wallClockOverride =& $time;
564 }
565}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:86
makeGlobalKey( $collection,... $components)
Make a cache key for the default keyspace and given components.
Provide a given client with protection against visible database lag.
bool $enabled
Whether reading/writing session consistency replication positions is enabled.
lazyStartup()
Load the stored DB replication positions and touch timestamps for the client.
array< string, float > $startupTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
const POSITION_COOKIE_TTL
Seconds to store position write index cookies (safely less than POSITION_STORE_TTL)
__construct(BagOStuff $store, array $client, $posIndex, $secret='')
array< string, DBMasterPos > $startupPositionsByMaster
Map of (DB master name => position)
mergePositions( $storedValue, array $shutdownPositions, array $shutdownTimestamps, &$cpIndex=null)
Merge the new DB replication positions with the currently stored ones (highest wins)
array< string, DBMasterPos > $shutdownPositionsByMaster
Map of (DB master name => position)
float null $startupTimestamp
UNIX timestamp when the client data was loaded.
applySessionReplicationPosition(ILoadBalancer $lb)
Apply the "session consistency" DB replication position to a new ILoadBalancer.
string $clientId
Hash of client parameters.
array< string, float > $shutdownTimestampsByCluster
Map of (DB cluster name => UNIX timestamp)
getTouched(ILoadBalancer $lb)
Get the UNIX timestamp when the client last touched the DB, if they did so recently.
const LOCK_TTL
Lock expiry to use for key updates.
string[] $clientLogInfo
Map of client information fields for logging.
const POSITION_INDEX_WAIT_TIMEOUT
Max seconds to wait for positions write indexes to appear (e.g.
bool $positionWaitsEnabled
Whether waiting on DB servers to reach replication positions is enabled.
stageSessionReplicationPosition(ILoadBalancer $lb)
Update the "session consistency" DB replication position for an end-of-life ILoadBalancer.
const LOCK_TIMEOUT
Lock timeout to use for key updates.
shutdown(&$cpIndex=null)
Save any remarked "session consistency" DB replication positions to persistent storage.
const POSITION_STORE_TTL
Seconds to store replication positions.
An object representing a master or replica DB position in a replicated setup.
Database cluster connection, tracking, load balancing, and transaction manager interface.
getClusterName()
Get the logical name of the database cluster.
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getWriterIndex()
Get the specific server index of the master server.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
getServerName( $i)
Get the host name or IP address of the server with the specified index.