MediaWiki REL1_35
ChronologyProtector.php
Go to the documentation of this file.
1<?php
25
26use BagOStuff;
27use Psr\Log\LoggerAwareInterface;
28use Psr\Log\LoggerInterface;
29use Psr\Log\NullLogger;
30use Wikimedia\WaitConditionLoop;
31
41class ChronologyProtector implements LoggerAwareInterface {
43 protected $store;
45 protected $logger;
46
48 protected $key;
50 protected $clientId;
52 protected $clientLogInfo;
56 protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
58 protected $enabled = true;
60 protected $wait = true;
61
63 protected $initialized = false;
65 protected $startupPositions = [];
67 protected $shutdownPositions = [];
69 protected $shutdownTouchDBs = [];
70
72 public const POSITION_TTL = 60;
74 public const POSITION_COOKIE_TTL = 10;
76 private const POS_STORE_WAIT_TIMEOUT = 5;
77
85 public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
86 $this->store = $store;
87 if ( isset( $client['clientId'] ) ) {
88 $this->clientId = $client['clientId'];
89 } else {
90 $this->clientId = ( $secret != '' )
91 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
92 : md5( $client['ip'] . "\n" . $client['agent'] );
93 }
94 $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
95 $this->waitForPosIndex = $posIndex;
96
97 $this->clientLogInfo = [
98 'clientIP' => $client['ip'],
99 'clientAgent' => $client['agent'],
100 'clientId' => $client['clientId'] ?? null
101 ];
102
103 $this->logger = new NullLogger();
104 }
105
106 public function setLogger( LoggerInterface $logger ) {
107 $this->logger = $logger;
108 }
109
114 public function getClientId() {
115 return $this->clientId;
116 }
117
122 public function setEnabled( $enabled ) {
123 $this->enabled = $enabled;
124 }
125
130 public function setWaitEnabled( $enabled ) {
131 $this->wait = $enabled;
132 }
133
148 if ( !$this->enabled ) {
149 return; // disabled
150 }
151
152 $masterName = $lb->getServerName( $lb->getWriterIndex() );
154
155 $pos = $startupPositions[$masterName] ?? null;
156 if ( $pos instanceof DBMasterPos ) {
157 $this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'" );
158 $lb->waitFor( $pos );
159 }
160 }
161
173 if ( !$this->enabled ) {
174 return; // disabled
175 } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
176 // Only save the position if writes have been done on the connection
177 return;
178 }
179
180 $masterName = $lb->getServerName( $lb->getWriterIndex() );
181 if ( $lb->hasStreamingReplicaServers() ) {
182 $pos = $lb->getReplicaResumePos();
183 if ( $pos ) {
184 $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos" );
185 $this->shutdownPositions[$masterName] = $pos;
186 }
187 } else {
188 $this->logger->debug( __METHOD__ . ": DB '$masterName' touched" );
189 }
190 $this->shutdownTouchDBs[$masterName] = 1;
191 }
192
202 public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
203 if ( !$this->enabled ) {
204 return [];
205 }
206
208 // Some callers might want to know if a user recently touched a DB.
209 // These writes do not need to block on all datacenters receiving them.
210 foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
211 $store->set(
212 $this->getTouchedKey( $this->store, $dbName ),
213 microtime( true ),
214 $store::TTL_DAY
215 );
216 }
217
218 if ( $this->shutdownPositions === [] ) {
219 $this->logger->debug( __METHOD__ . ": no master positions to save" );
220
221 return []; // nothing to save
222 }
223
224 $this->logger->debug(
225 __METHOD__ . ": saving master pos for " .
226 implode( ', ', array_keys( $this->shutdownPositions ) )
227 );
228
229 // CP-protected writes should overwhelmingly go to the master datacenter, so merge the
230 // positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
231 // If set() returns success, then any get() should be able to see the new positions.
232 if ( $store->lock( $this->key, 3 ) ) {
233 if ( $workCallback ) {
234 // Let the store run the work before blocking on a replication sync barrier.
235 // If replication caught up while the work finished, the barrier will be fast.
236 $store->addBusyCallback( $workCallback );
237 }
238 $ok = $store->set(
239 $this->key,
240 $this->mergePositions(
241 $store->get( $this->key ),
242 $this->shutdownPositions,
243 $cpIndex
244 ),
245 self::POSITION_TTL,
246 ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
247 );
248 $store->unlock( $this->key );
249 } else {
250 $ok = false;
251 }
252
253 if ( !$ok ) {
254 $cpIndex = null; // nothing saved
255 $bouncedPositions = $this->shutdownPositions;
256 // Raced out too many times or stash is down
257 $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
258 implode( ', ', array_keys( $this->shutdownPositions ) )
259 );
260 } elseif ( $mode === 'sync' &&
261 $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
262 ) {
263 // Positions may not be in all datacenters, force LBFactory to play it safe
264 $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
265 $bouncedPositions = $this->shutdownPositions;
266 } else {
267 $bouncedPositions = [];
268 }
269
270 return $bouncedPositions;
271 }
272
279 public function getTouched( ILoadBalancer $lb ) {
280 $masterName = $lb->getServerName( $lb->getWriterIndex() );
281 return $this->store->get( $this->getTouchedKey( $this->store, $masterName ) );
282 }
283
289 private function getTouchedKey( BagOStuff $store, $masterName ) {
290 return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $masterName );
291 }
292
297 protected function getStartupMasterPositions() {
298 if ( $this->initialized ) {
300 }
301
302 $this->initialized = true;
303 $this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)" );
304
305 if ( $this->wait ) {
306 // If there is an expectation to see master positions from a certain write
307 // index or higher, then block until it appears, or until a timeout is reached.
308 // Since the write index restarts each time the key is created, it is possible that
309 // a lagged store has a matching key write index. However, in that case, it should
310 // already be expired and thus treated as non-existing, maintaining correctness.
311 if ( $this->waitForPosIndex > 0 ) {
312 $data = null;
313 $indexReached = null; // highest index reached in the position store
314 $loop = new WaitConditionLoop(
315 function () use ( &$data, &$indexReached ) {
316 $data = $this->store->get( $this->key );
317 if ( !is_array( $data ) ) {
318 return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
319 } elseif ( !isset( $data['writeIndex'] ) ) {
320 return WaitConditionLoop::CONDITION_REACHED; // b/c
321 }
322 $indexReached = max( $data['writeIndex'], $indexReached );
323
324 return ( $data['writeIndex'] >= $this->waitForPosIndex )
325 ? WaitConditionLoop::CONDITION_REACHED
326 : WaitConditionLoop::CONDITION_CONTINUE;
327 },
329 );
330 $result = $loop->invoke();
331 $waitedMs = $loop->getLastWaitTime() * 1e3;
332
333 if ( $result == $loop::CONDITION_REACHED ) {
334 $this->logger->debug(
335 __METHOD__ . ": expected and found position index.",
336 [
337 'cpPosIndex' => $this->waitForPosIndex,
338 'waitTimeMs' => $waitedMs
339 ] + $this->clientLogInfo
340 );
341 } else {
342 $this->logger->warning(
343 __METHOD__ . ": expected but failed to find position index.",
344 [
345 'cpPosIndex' => $this->waitForPosIndex,
346 'indexReached' => $indexReached,
347 'waitTimeMs' => $waitedMs
348 ] + $this->clientLogInfo
349 );
350 }
351 } else {
352 $data = $this->store->get( $this->key );
353 }
354
355 $this->startupPositions = $data ? $data['positions'] : [];
356 $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)" );
357 } else {
358 $this->startupPositions = [];
359 $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)" );
360 }
361
363 }
364
371 protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
373 $curPositions = $curValue['positions'] ?? [];
374 // Use the newest positions for each DB master
375 foreach ( $shutdownPositions as $db => $pos ) {
376 if (
377 !isset( $curPositions[$db] ) ||
378 !( $curPositions[$db] instanceof DBMasterPos ) ||
379 $pos->asOfTime() > $curPositions[$db]->asOfTime()
380 ) {
381 $curPositions[$db] = $pos;
382 }
383 }
384
385 $cpIndex = $curValue['writeIndex'] ?? 0;
386
387 return [
388 'positions' => $curPositions,
389 'writeIndex' => ++$cpIndex
390 ];
391 }
392}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:71
get( $key, $flags=0)
Get an item with the given key.
unlock( $key)
Release an advisory lock on a key string.
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
getQoS( $flag)
makeGlobalKey( $class,... $components)
Make a global cache key.
set( $key, $value, $exptime=0, $flags=0)
Set an item.
addBusyCallback(callable $workCallback)
Let a callback be run to avoid wasting time on special blocking calls.
Helper class for mitigating DB replication lag in order to provide "session consistency".
mergePositions( $curValue, array $shutdownPositions, &$cpIndex=null)
bool $enabled
Whether to no-op all method calls.
bool $wait
Whether to check and wait on positions.
float[] $shutdownTouchDBs
Map of (DB master name => 1)
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
getStartupMasterPositions()
Load in previous master positions for the client.
storeSessionReplicationPosition(ILoadBalancer $lb)
Save the "session consistency" DB replication position for an end-of-life ILoadBalancer.
DBMasterPos[] $shutdownPositions
Map of (DB master name => position)
__construct(BagOStuff $store, array $client, $posIndex, $secret='')
DBMasterPos[] $startupPositions
Map of (DB master name => position)
applySessionReplicationPosition(ILoadBalancer $lb)
Apply the "session consistency" DB replication position to a new ILoadBalancer.
string $clientId
Hash of client parameters.
shutdown(callable $workCallback=null, $mode='sync', &$cpIndex=null)
Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
string[] $clientLogInfo
Map of client information fields for logging.
getTouchedKey(BagOStuff $store, $masterName)
bool $initialized
Whether the client data was loaded.
int $waitForPosStoreTimeout
Max seconds to wait on positions to appear.
An object representing a master or replica DB position in a replicated setup.
Database cluster connection, tracking, load balancing, and transaction manager interface.
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getWriterIndex()
Get the server index of the master server.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
getServerName( $i)
Get the host name or IP address of the server with the specified index.