MediaWiki REL1_34
ChronologyProtector.php
Go to the documentation of this file.
1<?php
25
26use Psr\Log\LoggerAwareInterface;
27use Psr\Log\LoggerInterface;
28use Psr\Log\NullLogger;
29use Wikimedia\WaitConditionLoop;
30use BagOStuff;
31
39class ChronologyProtector implements LoggerAwareInterface {
41 protected $store;
43 protected $logger;
44
46 protected $key;
48 protected $clientId;
50 protected $clientLogInfo;
54 protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
56 protected $enabled = true;
58 protected $wait = true;
59
61 protected $initialized = false;
63 protected $startupPositions = [];
65 protected $shutdownPositions = [];
67 protected $shutdownTouchDBs = [];
68
70 const POSITION_TTL = 60;
72 const POSITION_COOKIE_TTL = 10;
74 const POS_STORE_WAIT_TIMEOUT = 5;
75
83 public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
84 $this->store = $store;
85 if ( isset( $client['clientId'] ) ) {
86 $this->clientId = $client['clientId'];
87 } else {
88 $this->clientId = ( $secret != '' )
89 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
90 : md5( $client['ip'] . "\n" . $client['agent'] );
91 }
92 $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
93 $this->waitForPosIndex = $posIndex;
94
95 $this->clientLogInfo = [
96 'clientIP' => $client['ip'],
97 'clientAgent' => $client['agent'],
98 'clientId' => $client['clientId'] ?? null
99 ];
100
101 $this->logger = new NullLogger();
102 }
103
104 public function setLogger( LoggerInterface $logger ) {
105 $this->logger = $logger;
106 }
107
112 public function getClientId() {
113 return $this->clientId;
114 }
115
120 public function setEnabled( $enabled ) {
121 $this->enabled = $enabled;
122 }
123
128 public function setWaitEnabled( $enabled ) {
129 $this->wait = $enabled;
130 }
131
146 if ( !$this->enabled ) {
147 return; // disabled
148 }
149
150 $masterName = $lb->getServerName( $lb->getWriterIndex() );
152
153 $pos = $startupPositions[$masterName] ?? null;
154 if ( $pos instanceof DBMasterPos ) {
155 $this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'\n" );
156 $lb->waitFor( $pos );
157 }
158 }
159
171 if ( !$this->enabled ) {
172 return; // disabled
173 } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
174 // Only save the position if writes have been done on the connection
175 return;
176 }
177
178 $masterName = $lb->getServerName( $lb->getWriterIndex() );
179 if ( $lb->hasStreamingReplicaServers() ) {
180 $pos = $lb->getReplicaResumePos();
181 if ( $pos ) {
182 $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
183 $this->shutdownPositions[$masterName] = $pos;
184 }
185 } else {
186 $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" );
187 }
188 $this->shutdownTouchDBs[$masterName] = 1;
189 }
190
200 public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
201 if ( !$this->enabled ) {
202 return [];
203 }
204
206 // Some callers might want to know if a user recently touched a DB.
207 // These writes do not need to block on all datacenters receiving them.
208 foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
209 $store->set(
210 $this->getTouchedKey( $this->store, $dbName ),
211 microtime( true ),
212 $store::TTL_DAY
213 );
214 }
215
216 if ( $this->shutdownPositions === [] ) {
217 $this->logger->debug( __METHOD__ . ": no master positions to save\n" );
218
219 return []; // nothing to save
220 }
221
222 $this->logger->debug(
223 __METHOD__ . ": saving master pos for " .
224 implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
225 );
226
227 // CP-protected writes should overwhelmingly go to the master datacenter, so merge the
228 // positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
229 // If set() returns success, then any get() should be able to see the new positions.
230 if ( $store->lock( $this->key, 3 ) ) {
231 if ( $workCallback ) {
232 // Let the store run the work before blocking on a replication sync barrier.
233 // If replication caught up while the work finished, the barrier will be fast.
234 $store->addBusyCallback( $workCallback );
235 }
236 $ok = $store->set(
237 $this->key,
238 $this->mergePositions(
239 $store->get( $this->key ),
240 $this->shutdownPositions,
241 $cpIndex
242 ),
243 self::POSITION_TTL,
244 ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
245 );
246 $store->unlock( $this->key );
247 } else {
248 $ok = false;
249 }
250
251 if ( !$ok ) {
252 $cpIndex = null; // nothing saved
253 $bouncedPositions = $this->shutdownPositions;
254 // Raced out too many times or stash is down
255 $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
256 implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
257 );
258 } elseif ( $mode === 'sync' &&
259 $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
260 ) {
261 // Positions may not be in all datacenters, force LBFactory to play it safe
262 $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
263 $bouncedPositions = $this->shutdownPositions;
264 } else {
265 $bouncedPositions = [];
266 }
267
268 return $bouncedPositions;
269 }
270
276 public function getTouched( $dbName ) {
277 return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) );
278 }
279
285 private function getTouchedKey( BagOStuff $store, $dbName ) {
286 return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName );
287 }
288
292 protected function getStartupMasterPositions() {
293 if ( $this->initialized ) {
295 }
296
297 $this->initialized = true;
298 $this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)\n" );
299
300 if ( $this->wait ) {
301 // If there is an expectation to see master positions from a certain write
302 // index or higher, then block until it appears, or until a timeout is reached.
303 // Since the write index restarts each time the key is created, it is possible that
304 // a lagged store has a matching key write index. However, in that case, it should
305 // already be expired and thus treated as non-existing, maintaining correctness.
306 if ( $this->waitForPosIndex > 0 ) {
307 $data = null;
308 $indexReached = null; // highest index reached in the position store
309 $loop = new WaitConditionLoop(
310 function () use ( &$data, &$indexReached ) {
311 $data = $this->store->get( $this->key );
312 if ( !is_array( $data ) ) {
313 return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
314 } elseif ( !isset( $data['writeIndex'] ) ) {
315 return WaitConditionLoop::CONDITION_REACHED; // b/c
316 }
317 $indexReached = max( $data['writeIndex'], $indexReached );
318
319 return ( $data['writeIndex'] >= $this->waitForPosIndex )
320 ? WaitConditionLoop::CONDITION_REACHED
321 : WaitConditionLoop::CONDITION_CONTINUE;
322 },
324 );
325 $result = $loop->invoke();
326 $waitedMs = $loop->getLastWaitTime() * 1e3;
327
328 if ( $result == $loop::CONDITION_REACHED ) {
329 $this->logger->debug(
330 __METHOD__ . ": expected and found position index.",
331 [
332 'cpPosIndex' => $this->waitForPosIndex,
333 'waitTimeMs' => $waitedMs
334 ] + $this->clientLogInfo
335 );
336 } else {
337 $this->logger->warning(
338 __METHOD__ . ": expected but failed to find position index.",
339 [
340 'cpPosIndex' => $this->waitForPosIndex,
341 'indexReached' => $indexReached,
342 'waitTimeMs' => $waitedMs
343 ] + $this->clientLogInfo
344 );
345 }
346 } else {
347 $data = $this->store->get( $this->key );
348 }
349
350 $this->startupPositions = $data ? $data['positions'] : [];
351 $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" );
352 } else {
353 $this->startupPositions = [];
354 $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
355 }
356
358 }
359
366 protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
368 $curPositions = $curValue['positions'] ?? [];
369 // Use the newest positions for each DB master
370 foreach ( $shutdownPositions as $db => $pos ) {
371 if (
372 !isset( $curPositions[$db] ) ||
373 !( $curPositions[$db] instanceof DBMasterPos ) ||
374 $pos->asOfTime() > $curPositions[$db]->asOfTime()
375 ) {
376 $curPositions[$db] = $pos;
377 }
378 }
379
380 $cpIndex = $curValue['writeIndex'] ?? 0;
381
382 return [
383 'positions' => $curPositions,
384 'writeIndex' => ++$cpIndex
385 ];
386 }
387}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:63
get( $key, $flags=0)
Get an item with the given key.
unlock( $key)
Release an advisory lock on a key string.
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
getQoS( $flag)
makeGlobalKey( $class,... $components)
Make a global cache key.
set( $key, $value, $exptime=0, $flags=0)
Set an item.
addBusyCallback(callable $workCallback)
Let a callback be run to avoid wasting time on special blocking calls.
Helper class for mitigating DB replication lag in order to provide "session consistency".
mergePositions( $curValue, array $shutdownPositions, &$cpIndex=null)
bool $enabled
Whether to no-op all method calls.
bool $wait
Whether to check and wait on positions.
float[] $shutdownTouchDBs
Map of (DB master name => 1)
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
getStartupMasterPositions()
Load in previous master positions for the client.
storeSessionReplicationPosition(ILoadBalancer $lb)
Save the "session consistency" DB replication position for an end-of-life ILoadBalancer.
DBMasterPos[] $shutdownPositions
Map of (DB master name => position)
__construct(BagOStuff $store, array $client, $posIndex, $secret='')
DBMasterPos[] $startupPositions
Map of (DB master name => position)
applySessionReplicationPosition(ILoadBalancer $lb)
Apply the "session consistency" DB replication position to a new ILoadBalancer.
string $clientId
Hash of client parameters.
shutdown(callable $workCallback=null, $mode='sync', &$cpIndex=null)
Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
string[] $clientLogInfo
Map of client information fields for logging.
getTouchedKey(BagOStuff $store, $dbName)
bool $initialized
Whether the client data was loaded.
int $waitForPosStoreTimeout
Max seconds to wait on positions to appear.
An object representing a master or replica DB position in a replicated setup.
Database cluster connection, tracking, load balancing, and transaction manager interface.
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getWriterIndex()
Get the server index of the master server.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
getServerName( $i)
Get the host name or IP address of the server with the specified index.