MediaWiki REL1_32
ChronologyProtector.php
Go to the documentation of this file.
1<?php
25
26use Psr\Log\LoggerAwareInterface;
27use Psr\Log\LoggerInterface;
28use Psr\Log\NullLogger;
29use Wikimedia\WaitConditionLoop;
30use BagOStuff;
31
36class ChronologyProtector implements LoggerAwareInterface {
38 protected $store;
40 protected $logger;
41
43 protected $key;
45 protected $clientId;
47 protected $clientLogInfo;
51 protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
53 protected $enabled = true;
55 protected $wait = true;
56
58 protected $initialized = false;
60 protected $startupPositions = [];
62 protected $shutdownPositions = [];
64 protected $shutdownTouchDBs = [];
65
67 const POSITION_TTL = 60;
69 const POSITION_COOKIE_TTL = 10;
71 const POS_STORE_WAIT_TIMEOUT = 5;
72
79 public function __construct( BagOStuff $store, array $client, $posIndex = null ) {
80 $this->store = $store;
81 $this->clientId = $client['clientId'] ??
82 md5( $client['ip'] . "\n" . $client['agent'] );
83 $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
84 $this->waitForPosIndex = $posIndex;
85
86 $this->clientLogInfo = [
87 'clientIP' => $client['ip'],
88 'clientAgent' => $client['agent'],
89 'clientId' => $client['clientId'] ?? null
90 ];
91
92 $this->logger = new NullLogger();
93 }
94
95 public function setLogger( LoggerInterface $logger ) {
96 $this->logger = $logger;
97 }
98
103 public function getClientId() {
104 return $this->clientId;
105 }
106
111 public function setEnabled( $enabled ) {
112 $this->enabled = $enabled;
113 }
114
119 public function setWaitEnabled( $enabled ) {
120 $this->wait = $enabled;
121 }
122
134 public function initLB( ILoadBalancer $lb ) {
135 if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
136 return; // non-replicated setup or disabled
137 }
138
139 $this->initPositions();
140
141 $masterName = $lb->getServerName( $lb->getWriterIndex() );
142 if (
143 isset( $this->startupPositions[$masterName] ) &&
144 $this->startupPositions[$masterName] instanceof DBMasterPos
145 ) {
146 $pos = $this->startupPositions[$masterName];
147 $this->logger->debug( __METHOD__ . ": LB for '$masterName' set to pos $pos\n" );
148 $lb->waitFor( $pos );
149 }
150 }
151
159 public function shutdownLB( ILoadBalancer $lb ) {
160 if ( !$this->enabled ) {
161 return; // not enabled
162 } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
163 // Only save the position if writes have been done on the connection
164 return;
165 }
166
167 $masterName = $lb->getServerName( $lb->getWriterIndex() );
168 if ( $lb->getServerCount() > 1 ) {
169 $pos = $lb->getMasterPos();
170 if ( $pos ) {
171 $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
172 $this->shutdownPositions[$masterName] = $pos;
173 }
174 } else {
175 $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" );
176 }
177 $this->shutdownTouchDBs[$masterName] = 1;
178 }
179
189 public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
190 if ( !$this->enabled ) {
191 return [];
192 }
193
195 // Some callers might want to know if a user recently touched a DB.
196 // These writes do not need to block on all datacenters receiving them.
197 foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
198 $store->set(
199 $this->getTouchedKey( $this->store, $dbName ),
200 microtime( true ),
201 $store::TTL_DAY
202 );
203 }
204
205 if ( !count( $this->shutdownPositions ) ) {
206 return []; // nothing to save
207 }
208
209 $this->logger->debug( __METHOD__ . ": saving master pos for " .
210 implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
211 );
212
213 // CP-protected writes should overwhelmingly go to the master datacenter, so use a
214 // DC-local lock to merge the values. Use a DC-local get() and a synchronous all-DC
215 // set(). This makes it possible for the BagOStuff class to write in parallel to all
216 // DCs with one RTT. The use of WRITE_SYNC avoids needing READ_LATEST for the get().
217 if ( $store->lock( $this->key, 3 ) ) {
218 if ( $workCallback ) {
219 // Let the store run the work before blocking on a replication sync barrier.
220 // If replication caught up while the work finished, the barrier will be fast.
221 $store->addBusyCallback( $workCallback );
222 }
223 $ok = $store->set(
224 $this->key,
225 $this->mergePositions(
226 $store->get( $this->key ),
227 $this->shutdownPositions,
228 $cpIndex
229 ),
230 self::POSITION_TTL,
231 ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
232 );
233 $store->unlock( $this->key );
234 } else {
235 $ok = false;
236 }
237
238 if ( !$ok ) {
239 $cpIndex = null; // nothing saved
240 $bouncedPositions = $this->shutdownPositions;
241 // Raced out too many times or stash is down
242 $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
243 implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
244 );
245 } elseif ( $mode === 'sync' &&
246 $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
247 ) {
248 // Positions may not be in all datacenters, force LBFactory to play it safe
249 $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
250 $bouncedPositions = $this->shutdownPositions;
251 } else {
252 $bouncedPositions = [];
253 }
254
255 return $bouncedPositions;
256 }
257
263 public function getTouched( $dbName ) {
264 return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) );
265 }
266
272 private function getTouchedKey( BagOStuff $store, $dbName ) {
273 return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName );
274 }
275
279 protected function initPositions() {
280 if ( $this->initialized ) {
281 return;
282 }
283
284 $this->initialized = true;
285 if ( $this->wait ) {
286 // If there is an expectation to see master positions from a certain write
287 // index or higher, then block until it appears, or until a timeout is reached.
288 // Since the write index restarts each time the key is created, it is possible that
289 // a lagged store has a matching key write index. However, in that case, it should
290 // already be expired and thus treated as non-existing, maintaining correctness.
291 if ( $this->waitForPosIndex > 0 ) {
292 $data = null;
293 $indexReached = null; // highest index reached in the position store
294 $loop = new WaitConditionLoop(
295 function () use ( &$data, &$indexReached ) {
296 $data = $this->store->get( $this->key );
297 if ( !is_array( $data ) ) {
298 return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
299 } elseif ( !isset( $data['writeIndex'] ) ) {
300 return WaitConditionLoop::CONDITION_REACHED; // b/c
301 }
302 $indexReached = max( $data['writeIndex'], $indexReached );
303
304 return ( $data['writeIndex'] >= $this->waitForPosIndex )
305 ? WaitConditionLoop::CONDITION_REACHED
306 : WaitConditionLoop::CONDITION_CONTINUE;
307 },
309 );
310 $result = $loop->invoke();
311 $waitedMs = $loop->getLastWaitTime() * 1e3;
312
313 if ( $result == $loop::CONDITION_REACHED ) {
314 $this->logger->debug(
315 __METHOD__ . ": expected and found position index.",
316 [
317 'cpPosIndex' => $this->waitForPosIndex,
318 'waitTimeMs' => $waitedMs
319 ] + $this->clientLogInfo
320 );
321 } else {
322 $this->logger->warning(
323 __METHOD__ . ": expected but failed to find position index.",
324 [
325 'cpPosIndex' => $this->waitForPosIndex,
326 'indexReached' => $indexReached,
327 'waitTimeMs' => $waitedMs
328 ] + $this->clientLogInfo
329 );
330 }
331 } else {
332 $data = $this->store->get( $this->key );
333 }
334
335 $this->startupPositions = $data ? $data['positions'] : [];
336 $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" );
337 } else {
338 $this->startupPositions = [];
339 $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
340 }
341 }
342
349 protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
351 $curPositions = $curValue['positions'] ?? [];
352 // Use the newest positions for each DB master
353 foreach ( $shutdownPositions as $db => $pos ) {
354 if (
355 !isset( $curPositions[$db] ) ||
356 !( $curPositions[$db] instanceof DBMasterPos ) ||
357 $pos->asOfTime() > $curPositions[$db]->asOfTime()
358 ) {
359 $curPositions[$db] = $pos;
360 }
361 }
362
363 $cpIndex = $curValue['writeIndex'] ?? 0;
364
365 return [
366 'positions' => $curPositions,
367 'writeIndex' => ++$cpIndex
368 ];
369 }
370}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:58
unlock( $key)
Release an advisory lock on a key string.
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
getQoS( $flag)
set( $key, $value, $exptime=0, $flags=0)
Set an item.
get( $key, $flags=0, $oldFlags=null)
Get an item with the given key.
addBusyCallback(callable $workCallback)
Let a callback be run to avoid wasting time on special blocking calls.
makeGlobalKey( $class, $component=null)
Make a global cache key.
Class for ensuring a consistent ordering of events as seen by the user, despite replication.
mergePositions( $curValue, array $shutdownPositions, &$cpIndex=null)
__construct(BagOStuff $store, array $client, $posIndex=null)
bool $enabled
Whether to no-op all method calls.
bool $wait
Whether to check and wait on positions.
float[] $shutdownTouchDBs
Map of (DB master name => 1)
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
DBMasterPos[] $shutdownPositions
Map of (DB master name => position)
initLB(ILoadBalancer $lb)
Initialise a ILoadBalancer to give it appropriate chronology protection.
initPositions()
Load in previous master positions for the client.
DBMasterPos[] $startupPositions
Map of (DB master name => position)
shutdownLB(ILoadBalancer $lb)
Notify the ChronologyProtector that the ILoadBalancer is about to shut down.
string $clientId
Hash of client parameters.
shutdown(callable $workCallback=null, $mode='sync', &$cpIndex=null)
Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
string[] $clientLogInfo
Map of client information fields for logging.
getTouchedKey(BagOStuff $store, $dbName)
bool $initialized
Whether the client data was loaded.
int $waitForPosStoreTimeout
Max seconds to wait on positions to appear.
either a unescaped string or a HtmlArmor object after in associative array form externallinks including delete and has completed for all link tables whether this was an auto creation use $formDescriptor instead default is conds Array Extra conditions for the No matching items in log is displayed if loglist is empty msgKey Array If you want a nice box with a set this to the key of the message First element is the message key
Definition hooks.txt:2214
An object representing a master or replica DB position in a replicated setup.
Database cluster connection, tracking, load balancing, and transaction manager interface.
waitFor( $pos)
Set the master wait position.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getMasterPos()
Get the current master position for chronology control purposes.
getServerCount()
Get the number of defined servers (not the number of open connections)
getServerName( $i)
Get the host name or IP address of the server with the specified index.
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))