MediaWiki REL1_31
ChronologyProtector.php
Go to the documentation of this file.
1<?php
25
26use Psr\Log\LoggerAwareInterface;
27use Psr\Log\LoggerInterface;
28use Psr\Log\NullLogger;
29use Wikimedia\WaitConditionLoop;
30use BagOStuff;
31
36class ChronologyProtector implements LoggerAwareInterface {
38 protected $store;
40 protected $logger;
41
43 protected $key;
45 protected $clientId;
49 protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
51 protected $enabled = true;
53 protected $wait = true;
54
56 protected $initialized = false;
58 protected $startupPositions = [];
60 protected $shutdownPositions = [];
62 protected $shutdownTouchDBs = [];
63
65 const POSITION_TTL = 60;
67 const POS_STORE_WAIT_TIMEOUT = 5;
68
75 public function __construct( BagOStuff $store, array $client, $posIndex = null ) {
76 $this->store = $store;
77 $this->clientId = md5( $client['ip'] . "\n" . $client['agent'] );
78 $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
79 $this->waitForPosIndex = $posIndex;
80 $this->logger = new NullLogger();
81 }
82
83 public function setLogger( LoggerInterface $logger ) {
84 $this->logger = $logger;
85 }
86
91 public function setEnabled( $enabled ) {
92 $this->enabled = $enabled;
93 }
94
99 public function setWaitEnabled( $enabled ) {
100 $this->wait = $enabled;
101 }
102
114 public function initLB( ILoadBalancer $lb ) {
115 if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
116 return; // non-replicated setup or disabled
117 }
118
119 $this->initPositions();
120
121 $masterName = $lb->getServerName( $lb->getWriterIndex() );
122 if (
123 isset( $this->startupPositions[$masterName] ) &&
124 $this->startupPositions[$masterName] instanceof DBMasterPos
125 ) {
126 $pos = $this->startupPositions[$masterName];
127 $this->logger->debug( __METHOD__ . ": LB for '$masterName' set to pos $pos\n" );
128 $lb->waitFor( $pos );
129 }
130 }
131
139 public function shutdownLB( ILoadBalancer $lb ) {
140 if ( !$this->enabled ) {
141 return; // not enabled
142 } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
143 // Only save the position if writes have been done on the connection
144 return;
145 }
146
147 $masterName = $lb->getServerName( $lb->getWriterIndex() );
148 if ( $lb->getServerCount() > 1 ) {
149 $pos = $lb->getMasterPos();
150 if ( $pos ) {
151 $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
152 $this->shutdownPositions[$masterName] = $pos;
153 }
154 } else {
155 $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" );
156 }
157 $this->shutdownTouchDBs[$masterName] = 1;
158 }
159
169 public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
170 if ( !$this->enabled ) {
171 return [];
172 }
173
175 // Some callers might want to know if a user recently touched a DB.
176 // These writes do not need to block on all datacenters receiving them.
177 foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
178 $store->set(
179 $this->getTouchedKey( $this->store, $dbName ),
180 microtime( true ),
181 $store::TTL_DAY
182 );
183 }
184
185 if ( !count( $this->shutdownPositions ) ) {
186 return []; // nothing to save
187 }
188
189 $this->logger->debug( __METHOD__ . ": saving master pos for " .
190 implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
191 );
192
193 // CP-protected writes should overwhemingly go to the master datacenter, so get DC-local
194 // lock to merge the values. Use a DC-local get() and a synchronous all-DC set(). This
195 // makes it possible for the BagOStuff class to write in parallel to all DCs with one RTT.
196 if ( $store->lock( $this->key, 3 ) ) {
197 if ( $workCallback ) {
198 // Let the store run the work before blocking on a replication sync barrier. By the
199 // time it's done with the work, the barrier should be fast if replication caught up.
200 $store->addBusyCallback( $workCallback );
201 }
202 $ok = $store->set(
203 $this->key,
204 $this->mergePositions(
205 $store->get( $this->key ),
206 $this->shutdownPositions,
207 $cpIndex
208 ),
209 self::POSITION_TTL,
210 ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
211 );
212 $store->unlock( $this->key );
213 } else {
214 $ok = false;
215 $cpIndex = null; // nothing saved
216 }
217
218 if ( !$ok ) {
219 $bouncedPositions = $this->shutdownPositions;
220 // Raced out too many times or stash is down
221 $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
222 implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
223 );
224 } elseif ( $mode === 'sync' &&
225 $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
226 ) {
227 // Positions may not be in all datacenters, force LBFactory to play it safe
228 $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
229 $bouncedPositions = $this->shutdownPositions;
230 } else {
231 $bouncedPositions = [];
232 }
233
234 return $bouncedPositions;
235 }
236
242 public function getTouched( $dbName ) {
243 return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) );
244 }
245
251 private function getTouchedKey( BagOStuff $store, $dbName ) {
252 return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName );
253 }
254
258 protected function initPositions() {
259 if ( $this->initialized ) {
260 return;
261 }
262
263 $this->initialized = true;
264 if ( $this->wait ) {
265 // If there is an expectation to see master positions from a certain write
266 // index or higher, then block until it appears, or until a timeout is reached.
267 // Since the write index restarts each time the key is created, it is possible that
268 // a lagged store has a matching key write index. However, in that case, it should
269 // already be expired and thus treated as non-existing, maintaining correctness.
270 if ( $this->waitForPosIndex > 0 ) {
271 $data = null;
272 $loop = new WaitConditionLoop(
273 function () use ( &$data ) {
274 $data = $this->store->get( $this->key );
275 if ( !is_array( $data ) ) {
276 return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
277 } elseif ( !isset( $data['writeIndex'] ) ) {
278 return WaitConditionLoop::CONDITION_REACHED; // b/c
279 }
280
281 return ( $data['writeIndex'] >= $this->waitForPosIndex )
282 ? WaitConditionLoop::CONDITION_REACHED
283 : WaitConditionLoop::CONDITION_CONTINUE;
284 },
286 );
287 $result = $loop->invoke();
288 $waitedMs = $loop->getLastWaitTime() * 1e3;
289
290 if ( $result == $loop::CONDITION_REACHED ) {
291 $msg = "expected and found pos index {$this->waitForPosIndex} ({$waitedMs}ms)";
292 $this->logger->debug( $msg );
293 } else {
294 $msg = "expected but missed pos index {$this->waitForPosIndex} ({$waitedMs}ms)";
295 $this->logger->info( $msg );
296 }
297 } else {
298 $data = $this->store->get( $this->key );
299 }
300
301 $this->startupPositions = $data ? $data['positions'] : [];
302 $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" );
303 } else {
304 $this->startupPositions = [];
305 $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
306 }
307 }
308
315 protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
317 $curPositions = isset( $curValue['positions'] ) ? $curValue['positions'] : [];
318 // Use the newest positions for each DB master
319 foreach ( $shutdownPositions as $db => $pos ) {
320 if (
321 !isset( $curPositions[$db] ) ||
322 !( $curPositions[$db] instanceof DBMasterPos ) ||
323 $pos->asOfTime() > $curPositions[$db]->asOfTime()
324 ) {
325 $curPositions[$db] = $pos;
326 }
327 }
328
329 $cpIndex = isset( $curValue['writeIndex'] ) ? $curValue['writeIndex'] : 0;
330
331 return [
332 'positions' => $curPositions,
333 'writeIndex' => ++$cpIndex
334 ];
335 }
336}
interface is intended to be more or less compatible with the PHP memcached client.
Definition BagOStuff.php:47
unlock( $key)
Release an advisory lock on a key string.
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
getQoS( $flag)
set( $key, $value, $exptime=0, $flags=0)
Set an item.
get( $key, $flags=0, $oldFlags=null)
Get an item with the given key.
addBusyCallback(callable $workCallback)
Let a callback be run to avoid wasting time on special blocking calls.
makeGlobalKey( $class, $component=null)
Make a global cache key.
Class for ensuring a consistent ordering of events as seen by the user, despite replication.
mergePositions( $curValue, array $shutdownPositions, &$cpIndex=null)
__construct(BagOStuff $store, array $client, $posIndex=null)
bool $enabled
Whether to no-op all method calls.
bool $wait
Whether to check and wait on positions.
float[] $shutdownTouchDBs
Map of (DB master name => 1)
int null $waitForPosIndex
Expected minimum index of the last write to the position store.
DBMasterPos[] $shutdownPositions
Map of (DB master name => position)
initLB(ILoadBalancer $lb)
Initialise a ILoadBalancer to give it appropriate chronology protection.
initPositions()
Load in previous master positions for the client.
DBMasterPos[] $startupPositions
Map of (DB master name => position)
shutdownLB(ILoadBalancer $lb)
Notify the ChronologyProtector that the ILoadBalancer is about to shut down.
string $clientId
Hash of client parameters.
shutdown(callable $workCallback=null, $mode='sync', &$cpIndex=null)
Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
getTouchedKey(BagOStuff $store, $dbName)
bool $initialized
Whether the client data was loaded.
int $waitForPosStoreTimeout
Max seconds to wait on positions to appear.
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any and then calling but I prefer the flexibility This should also do the output encoding The system allocates a global one in $wgOut Title Represents the title of an and does all the work of translating among various forms such as plain database key
Definition design.txt:26
An object representing a master or replica DB position in a replicated setup.
Database cluster connection, tracking, load balancing, and transaction manager interface.
waitFor( $pos)
Set the master wait position.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
getMasterPos()
Get the current master position for chronology control purposes.
getServerCount()
Get the number of defined servers (not the number of open connections)
getServerName( $i)
Get the host name or IP address of the server with the specified index.