Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
88.95% |
161 / 181 |
|
55.56% |
10 / 18 |
CRAP | |
0.00% |
0 / 1 |
ChronologyProtector | |
88.95% |
161 / 181 |
|
55.56% |
10 / 18 |
74.24 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
2 | |||
load | |
92.00% |
23 / 25 |
|
0.00% |
0 / 1 |
8.03 | |||
setRequestInfo | |
66.67% |
2 / 3 |
|
0.00% |
0 / 1 |
2.15 | |||
setLogger | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
getClientId | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
setEnabled | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getSessionPrimaryPos | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
3 | |||
stageSessionPrimaryPos | |
80.00% |
12 / 15 |
|
0.00% |
0 / 1 |
5.20 | |||
persistSessionReplicationPositions | |
86.21% |
25 / 29 |
|
0.00% |
0 / 1 |
5.07 | |||
getTouched | |
71.43% |
10 / 14 |
|
0.00% |
0 / 1 |
4.37 | |||
getStartupSessionPositions | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
getStartupSessionTimestamps | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
lazyStartup | |
88.89% |
24 / 27 |
|
0.00% |
0 / 1 |
8.09 | |||
mergePositions | |
100.00% |
17 / 17 |
|
100.00% |
1 / 1 |
8 | |||
getCurrentTime | n/a |
0 / 0 |
n/a |
0 / 0 |
2 | |||||
setMockTime | n/a |
0 / 0 |
n/a |
0 / 0 |
1 | |||||
marshalPositions | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
unmarshalPositions | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
3 | |||
makeCookieValueFromCPIndex | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getCPInfoFromCookieValue | |
91.67% |
11 / 12 |
|
0.00% |
0 / 1 |
9.05 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | namespace Wikimedia\Rdbms; |
21 | |
22 | use LogicException; |
23 | use Psr\Log\LoggerAwareInterface; |
24 | use Psr\Log\LoggerInterface; |
25 | use Psr\Log\NullLogger; |
26 | use Wikimedia\ObjectCache\BagOStuff; |
27 | use Wikimedia\ObjectCache\EmptyBagOStuff; |
28 | |
29 | /** |
30 | * Provide a given client with protection against visible database lag. |
31 | * |
32 | * ### In a nut shell |
33 | * |
34 | * This class tries to hide visible effects of database lag. It does this by temporarily remembering |
35 | * the database positions after a client makes a write, and on their next web request we will prefer |
36 | * non-lagged database replicas. When replica connections are established, we wait up to a few seconds |
37 | * for sufficient replication to have occurred, if they were not yet caught up to that same point. |
38 | * |
39 | * This ensures a consistent ordering of events as seen by a client. Kind of like Hawking's |
40 | * [Chronology Protection Agency](https://en.wikipedia.org/wiki/Chronology_protection_conjecture). |
41 | * |
42 | * ### Purpose |
43 | * |
44 | * For performance and scalability reasons, almost all data is queried from replica databases. |
45 | * Only queries relating to writing data, are sent to a primary database. When rendering a web page |
46 | * with content or activity feeds on it, the very latest information may thus not yet be there. |
47 | * That's okay in general, but if, for example, a client recently changed their preferences or |
48 | * submitted new data, we do our best to make sure their next web response does reflect at least |
49 | * their own recent changes. |
50 | * |
51 | * ### How |
52 | * |
53 | * To explain how it works, we will look at an example lifecycle for a client. |
54 | * |
55 | * A client is browsing the site. Their web requests are generally read-only and display data from |
56 | * database replicas, which may be a few seconds out of date if a client elsewhere in the world |
57 | * recently modified that same data. If the application is run from multiple data centers, then |
58 | * these web requests may be served from the nearest secondary DC. |
59 | * |
60 | * A client performs a POST request, perhaps to publish an edit or change their preferences. This |
61 | * request is routed to the primary DC (this is the responsibility of infrastructure outside |
62 | * the web app). There, the data is saved to the primary database, after which the database |
63 | * host will asynchronously replicate this to its replicas in the same and any other DCs. |
64 | * |
65 | * Toward the end of the response to this POST request, the application takes note of the primary |
66 | * database's current "position", and save this under a "clientId" key in the ChronologyProtector |
67 | * store. The web response will also set two cookies that are similarly short-lived (about ten |
68 | * seconds): `UseDC=master` and `cpPosIndex=<posIndex>@<write time>#<clientId>`. |
69 | * |
70 | * The ten seconds window is meant to account for the time needed for the database writes to have |
71 | * replicated across all active database replicas, including the cross-dc latency for those |
72 | * further away in any secondary DCs. The "clientId" is placed in the cookie to handle the case |
73 | * where the client IP addresses frequently changes between web requests. |
74 | * |
75 | * Future web requests from the client should fall in one of two categories: |
76 | * |
77 | * 1. Within the ten second window. Their UseDC cookie will make them return |
78 | * to the primary DC where we access the ChronologyProtector store and use |
79 | * the database "position" to decide which local database replica to use |
80 | * and on-demand wait a split second for replication to catch up if needed. |
81 | * 2. After the ten second window. They will be routed to the nearest and |
82 | * possibly different DC. Any local ChronologyProtector store existing there |
83 | * will not be interacted with. A random database replica may be used as |
84 | * the client's own writes are expected to have been applied here by now. |
85 | * |
86 | * @anchor ChronologyProtector-storage-requirements |
87 | * |
88 | * ### Storage requirements |
89 | * |
90 | * The store used by ChronologyProtector, as configured via {@link $wgMicroStashType}, |
91 | * should meet the following requirements: |
92 | * |
93 | * - Low latencies. Nearly all web requests that involve a database connection will |
94 | * unconditionally query this store first. It is expected to respond within the order |
95 | * of one millisecond. |
96 | * - Best effort persistence, without active eviction pressure. Data stored here cannot be |
97 | * obtained elsewhere or recomputed. As such, under normal operating conditions, this store |
98 | * should not be full, and should not evict values before their intended expiry time elapsed. |
99 | * - No replication, local consistency. Each DC may have a fully independent dc-local store |
100 | * associated with ChronologyProtector (no replication across DCs is needed). Local writes |
101 | * must be immediately reflected in subsequent local reads. No intra-dc read lag is allowed. |
102 | * - No redundancy, fast failure. Loss of data will likely be noticeable and disruptive to |
103 | * clients, but the data is not considered essential. Under maintenance or unprecedented load, |
104 | * it is recommended to lose some data, instead of compromising other requirements such as |
105 | * latency or availability for new writes. The fallback is that users may be temporary |
106 | * confused as they observe their own actions as not being immediately reflected. |
107 | * For example, they might change their skin or language preference but still get a one or two |
108 | * page views afterward with the old settings. Or they might have published an edit and briefly |
109 | * not yet see it appear in their contribution history. |
110 | * |
111 | * ### Operational requirements |
112 | * |
113 | * These are the expectations a site administrator must meet for chronology protection: |
114 | * |
115 | * - If the application is run from multiple data centers, then you must designate one of them |
116 | * as the "primary DC". The primary DC is where the primary database is located, from which |
117 | * replication propagates to replica databases in that same DC and any other DCs. |
118 | * |
119 | * - Web requests that use the POST verb, or carry a `UseDC=master` cookie, must be routed to |
120 | * the primary DC only. |
121 | * |
122 | * An exception is requests carrying the `Promise-Non-Write-API-Action: true` header, |
123 | * which use the POST verb for large read queries, but don't actually require the primary DC. |
124 | * |
125 | * If you have legacy extensions deployed that perform queries on the primary database during |
126 | * GET requests, then you will have to identify a way to route any of its relevant URLs to the |
127 | * primary DC as well, or to accept that their reads do not enjoy chronology protection, and |
128 | * that writes may be slower (due to cross-dc latency). |
129 | * See [T91820](https://phabricator.wikimedia.org/T91820) for %Wikimedia Foundation's routing. |
130 | * |
131 | * @ingroup Database |
132 | * @internal |
133 | */ |
134 | class ChronologyProtector implements LoggerAwareInterface { |
135 | /** @var array Web request information about the client */ |
136 | private $requestInfo; |
137 | /** @var string Secret string for HMAC hashing */ |
138 | private string $secret; |
139 | private bool $cliMode; |
140 | /** @var BagOStuff */ |
141 | private $store; |
142 | /** @var LoggerInterface */ |
143 | protected $logger; |
144 | |
145 | /** @var string Storage key name */ |
146 | protected $key; |
147 | /** @var string Hash of client parameters */ |
148 | protected $clientId; |
149 | /** @var string[] Map of client information fields for logging */ |
150 | protected $clientLogInfo; |
151 | /** @var int|null Expected minimum index of the last write to the position store */ |
152 | protected $waitForPosIndex; |
153 | |
154 | /** @var bool Whether reading/writing session consistency replication positions is enabled */ |
155 | protected $enabled = true; |
156 | /** @var float|null UNIX timestamp when the client data was loaded */ |
157 | protected $startupTimestamp; |
158 | |
159 | /** @var array<string,DBPrimaryPos> Map of (primary server name => position) */ |
160 | protected $startupPositionsByPrimary = []; |
161 | /** @var array<string,DBPrimaryPos> Map of (primary server name => position) */ |
162 | protected $shutdownPositionsByPrimary = []; |
163 | /** @var array<string,float> Map of (DB cluster name => UNIX timestamp) */ |
164 | protected $startupTimestampsByCluster = []; |
165 | /** @var array<string,float> Map of (DB cluster name => UNIX timestamp) */ |
166 | protected $shutdownTimestampsByCluster = []; |
167 | |
168 | /** @var float|null */ |
169 | private $wallClockOverride; |
170 | |
171 | /** |
172 | * Whether a clientId is new during this request. |
173 | * |
174 | * If the clientId wasn't passed by the incoming request, lazyStartup() |
175 | * can skip fetching position data, and thus LoadBalancer can skip |
176 | * its IDatabaseForOwner::primaryPosWait() call. |
177 | * |
178 | * See also: <https://phabricator.wikimedia.org/T314434> |
179 | * |
180 | * @var bool |
181 | */ |
182 | private $hasNewClientId = false; |
183 | |
184 | /** Seconds to store position write index cookies (safely less than POSITION_STORE_TTL) */ |
185 | public const POSITION_COOKIE_TTL = 10; |
186 | /** Seconds to store replication positions */ |
187 | private const POSITION_STORE_TTL = 60; |
188 | |
189 | /** Lock timeout to use for key updates */ |
190 | private const LOCK_TIMEOUT = 3; |
191 | /** Lock expiry to use for key updates */ |
192 | private const LOCK_TTL = 6; |
193 | |
194 | private const FLD_POSITIONS = 'positions'; |
195 | private const FLD_TIMESTAMPS = 'timestamps'; |
196 | private const FLD_WRITE_INDEX = 'writeIndex'; |
197 | |
198 | /** |
199 | * @param BagOStuff|null $cpStash |
200 | * @param string|null $secret Secret string for HMAC hashing [optional] |
201 | * @param bool|null $cliMode Whether the context is CLI or not, setting it to true would disable CP |
202 | * @param LoggerInterface|null $logger |
203 | * @since 1.27 |
204 | */ |
205 | public function __construct( $cpStash = null, $secret = null, $cliMode = null, $logger = null ) { |
206 | $this->requestInfo = [ |
207 | 'IPAddress' => $_SERVER['REMOTE_ADDR'] ?? '', |
208 | 'UserAgent' => $_SERVER['HTTP_USER_AGENT'] ?? '', |
209 | // Headers application can inject via LBFactory::setRequestInfo() |
210 | 'ChronologyClientId' => null, // prior $cpClientId value from LBFactory::shutdown() |
211 | 'ChronologyPositionIndex' => null // prior $cpIndex value from LBFactory::shutdown() |
212 | ]; |
213 | $this->store = $cpStash ?? new EmptyBagOStuff(); |
214 | $this->secret = $secret ?? ''; |
215 | $this->logger = $logger ?? new NullLogger(); |
216 | $this->cliMode = $cliMode ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ); |
217 | } |
218 | |
219 | private function load() { |
220 | // Not enabled or already loaded, short-circuit. |
221 | if ( !$this->enabled || $this->clientId ) { |
222 | return; |
223 | } |
224 | $client = [ |
225 | 'ip' => $this->requestInfo['IPAddress'], |
226 | 'agent' => $this->requestInfo['UserAgent'], |
227 | 'clientId' => $this->requestInfo['ChronologyClientId'] ?: null |
228 | ]; |
229 | if ( $this->cliMode ) { |
230 | $this->setEnabled( false ); |
231 | } elseif ( $this->store instanceof EmptyBagOStuff ) { |
232 | // No where to store any DB positions and wait for them to appear |
233 | $this->setEnabled( false ); |
234 | $this->logger->debug( 'Cannot use ChronologyProtector with EmptyBagOStuff' ); |
235 | } |
236 | |
237 | if ( isset( $client['clientId'] ) ) { |
238 | $this->clientId = $client['clientId']; |
239 | } else { |
240 | $this->hasNewClientId = true; |
241 | $this->clientId = ( $this->secret != '' ) |
242 | ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $this->secret ) |
243 | : md5( $client['ip'] . "\n" . $client['agent'] ); |
244 | } |
245 | $this->key = $this->store->makeGlobalKey( __CLASS__, $this->clientId, 'v4' ); |
246 | $this->waitForPosIndex = $this->requestInfo['ChronologyPositionIndex']; |
247 | |
248 | $this->clientLogInfo = [ |
249 | 'clientIP' => $client['ip'], |
250 | 'clientAgent' => $client['agent'], |
251 | 'clientId' => $client['clientId'] ?? null |
252 | ]; |
253 | } |
254 | |
255 | public function setRequestInfo( array $info ) { |
256 | if ( $this->clientId ) { |
257 | throw new LogicException( 'ChronologyProtector already initialized' ); |
258 | } |
259 | |
260 | $this->requestInfo = $info + $this->requestInfo; |
261 | } |
262 | |
263 | public function setLogger( LoggerInterface $logger ) { |
264 | $this->load(); |
265 | $this->logger = $logger; |
266 | } |
267 | |
268 | /** |
269 | * @return string Client ID hash |
270 | * @since 1.32 |
271 | */ |
272 | public function getClientId() { |
273 | $this->load(); |
274 | return $this->clientId; |
275 | } |
276 | |
277 | /** |
278 | * @param bool $enabled Whether reading/writing session replication positions is enabled |
279 | * @since 1.27 |
280 | */ |
281 | public function setEnabled( $enabled ) { |
282 | $this->enabled = $enabled; |
283 | } |
284 | |
285 | /** |
286 | * Yield client "session consistency" replication position for a new ILoadBalancer |
287 | * |
288 | * If the stash has a previous primary position recorded, this will try to make |
289 | * sure that the next query to a replica server of that primary will see changes up |
290 | * to that position by delaying execution. The delay may timeout and allow stale |
291 | * data if no non-lagged replica servers are available. |
292 | * |
293 | * @internal This method should only be called from LBFactory. |
294 | * |
295 | * @param ILoadBalancer $lb |
296 | * @return DBPrimaryPos|null |
297 | */ |
298 | public function getSessionPrimaryPos( ILoadBalancer $lb ) { |
299 | $this->load(); |
300 | if ( !$this->enabled ) { |
301 | return null; |
302 | } |
303 | |
304 | $cluster = $lb->getClusterName(); |
305 | $primaryName = $lb->getServerName( ServerInfo::WRITER_INDEX ); |
306 | |
307 | $pos = $this->getStartupSessionPositions()[$primaryName] ?? null; |
308 | if ( $pos instanceof DBPrimaryPos ) { |
309 | $this->logger->debug( "ChronologyProtector will wait for '$pos' on $cluster ($primaryName)'" ); |
310 | } else { |
311 | $this->logger->debug( "ChronologyProtector skips wait on $cluster ($primaryName)" ); |
312 | } |
313 | |
314 | return $pos; |
315 | } |
316 | |
317 | /** |
318 | * Update client "session consistency" replication position for an end-of-life ILoadBalancer |
319 | * |
320 | * This remarks the replication position of the primary DB if this request made writes to |
321 | * it using the provided ILoadBalancer instance. |
322 | * |
323 | * @internal This method should only be called from LBFactory. |
324 | * |
325 | * @param ILoadBalancer $lb |
326 | * @return void |
327 | */ |
328 | public function stageSessionPrimaryPos( ILoadBalancer $lb ) { |
329 | $this->load(); |
330 | if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) { |
331 | return; |
332 | } |
333 | |
334 | $cluster = $lb->getClusterName(); |
335 | $masterName = $lb->getServerName( ServerInfo::WRITER_INDEX ); |
336 | |
337 | if ( $lb->hasStreamingReplicaServers() ) { |
338 | $pos = $lb->getPrimaryPos(); |
339 | if ( $pos ) { |
340 | $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" ); |
341 | $this->shutdownPositionsByPrimary[$masterName] = $pos; |
342 | $this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime(); |
343 | } else { |
344 | $this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" ); |
345 | $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime(); |
346 | } |
347 | } else { |
348 | $this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" ); |
349 | $this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime(); |
350 | } |
351 | } |
352 | |
353 | /** |
354 | * Persist any staged client "session consistency" replication positions |
355 | * |
356 | * @internal This method should only be called from LBFactory. |
357 | * |
358 | * @param int|null &$clientPosIndex DB position key write counter; incremented on update |
359 | * @return DBPrimaryPos[] Empty on success; map of (db name => unsaved position) on failure |
360 | */ |
361 | public function persistSessionReplicationPositions( &$clientPosIndex = null ) { |
362 | $this->load(); |
363 | if ( !$this->enabled ) { |
364 | return []; |
365 | } |
366 | |
367 | if ( !$this->shutdownTimestampsByCluster ) { |
368 | $this->logger->debug( __METHOD__ . ": no primary positions data to save" ); |
369 | |
370 | return []; |
371 | } |
372 | |
373 | $scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL ); |
374 | if ( $scopeLock ) { |
375 | $positions = $this->mergePositions( |
376 | $this->unmarshalPositions( $this->store->get( $this->key ) ), |
377 | $this->shutdownPositionsByPrimary, |
378 | $this->shutdownTimestampsByCluster, |
379 | $clientPosIndex |
380 | ); |
381 | |
382 | $ok = $this->store->set( |
383 | $this->key, |
384 | $this->marshalPositions( $positions ), |
385 | self::POSITION_STORE_TTL |
386 | ); |
387 | unset( $scopeLock ); |
388 | } else { |
389 | $ok = false; |
390 | } |
391 | |
392 | $clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) ); |
393 | |
394 | if ( $ok ) { |
395 | $this->logger->debug( "ChronologyProtector saved position data for $clusterList" ); |
396 | $bouncedPositions = []; |
397 | } else { |
398 | // Maybe position store is down |
399 | $this->logger->warning( "ChronologyProtector failed to save position data for $clusterList" ); |
400 | $clientPosIndex = null; |
401 | $bouncedPositions = $this->shutdownPositionsByPrimary; |
402 | } |
403 | |
404 | return $bouncedPositions; |
405 | } |
406 | |
407 | /** |
408 | * Get the UNIX timestamp when the client last touched the DB, if they did so recently |
409 | * |
410 | * @internal This method should only be called from LBFactory. |
411 | * |
412 | * @param ILoadBalancer $lb |
413 | * @return float|false UNIX timestamp; false if not recent or on record |
414 | * @since 1.35 |
415 | */ |
416 | public function getTouched( ILoadBalancer $lb ) { |
417 | $this->load(); |
418 | if ( !$this->enabled ) { |
419 | return false; |
420 | } |
421 | |
422 | $cluster = $lb->getClusterName(); |
423 | |
424 | $timestampsByCluster = $this->getStartupSessionTimestamps(); |
425 | $timestamp = $timestampsByCluster[$cluster] ?? null; |
426 | if ( $timestamp === null ) { |
427 | $recentTouchTimestamp = false; |
428 | } elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) { |
429 | // If the position store is not replicated among datacenters and the cookie that |
430 | // sticks the client to the primary datacenter expires, then the touch timestamp |
431 | // will be found for requests in one datacenter but not others. For consistency, |
432 | // return false once the user is no longer routed to the primary datacenter. |
433 | $recentTouchTimestamp = false; |
434 | $this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" ); |
435 | } else { |
436 | $recentTouchTimestamp = $timestamp; |
437 | $this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" ); |
438 | } |
439 | |
440 | return $recentTouchTimestamp; |
441 | } |
442 | |
443 | /** |
444 | * @return array<string,DBPrimaryPos> |
445 | */ |
446 | protected function getStartupSessionPositions() { |
447 | $this->lazyStartup(); |
448 | |
449 | return $this->startupPositionsByPrimary; |
450 | } |
451 | |
452 | /** |
453 | * @return array<string,float> |
454 | */ |
455 | protected function getStartupSessionTimestamps() { |
456 | $this->lazyStartup(); |
457 | |
458 | return $this->startupTimestampsByCluster; |
459 | } |
460 | |
461 | /** |
462 | * Load the stored replication positions and touch timestamps for the client |
463 | * |
464 | * @return void |
465 | */ |
466 | protected function lazyStartup() { |
467 | if ( $this->startupTimestamp !== null ) { |
468 | return; |
469 | } |
470 | |
471 | $this->startupTimestamp = $this->getCurrentTime(); |
472 | |
473 | // There wasn't a client id in the cookie so we built one |
474 | // There is no point in looking it up. |
475 | if ( $this->hasNewClientId ) { |
476 | $this->startupPositionsByPrimary = []; |
477 | $this->startupTimestampsByCluster = []; |
478 | return; |
479 | } |
480 | |
481 | $this->logger->debug( 'ChronologyProtector using store ' . get_class( $this->store ) ); |
482 | $this->logger->debug( "ChronologyProtector fetching positions for {$this->clientId}" ); |
483 | |
484 | $data = $this->unmarshalPositions( $this->store->get( $this->key ) ); |
485 | |
486 | $this->startupPositionsByPrimary = $data ? $data[self::FLD_POSITIONS] : []; |
487 | $this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? []; |
488 | |
489 | // When a stored array expires and is re-created under the same (deterministic) key, |
490 | // the array value naturally starts again from index zero. As such, it is possible |
491 | // that if certain store writes were lost (e.g. store down), that we unintentionally |
492 | // point to an offset in an older incarnation of the array. |
493 | // We don't try to detect or do something about this because: |
494 | // 1. Waiting for an older offset is harmless and generally no-ops. |
495 | // 2. The older value will have expired by now and thus treated as non-existing, |
496 | // which means we wouldn't even "see" it here. |
497 | $indexReached = is_array( $data ) ? $data[self::FLD_WRITE_INDEX] : null; |
498 | if ( $this->waitForPosIndex > 0 ) { |
499 | if ( $indexReached >= $this->waitForPosIndex ) { |
500 | $this->logger->debug( 'expected and found position index {cpPosIndex}', [ |
501 | 'cpPosIndex' => $this->waitForPosIndex, |
502 | ] + $this->clientLogInfo ); |
503 | } else { |
504 | $this->logger->warning( 'expected but failed to find position index {cpPosIndex}', [ |
505 | 'cpPosIndex' => $this->waitForPosIndex, |
506 | 'indexReached' => $indexReached, |
507 | 'exception' => new \RuntimeException(), |
508 | ] + $this->clientLogInfo ); |
509 | } |
510 | } else { |
511 | if ( $indexReached ) { |
512 | $this->logger->debug( 'found position data with index {indexReached}', [ |
513 | 'indexReached' => $indexReached |
514 | ] + $this->clientLogInfo ); |
515 | } |
516 | } |
517 | } |
518 | |
519 | /** |
520 | * Merge the new replication positions with the currently stored ones (highest wins) |
521 | * |
522 | * @param array<string,mixed>|false $storedValue Current replication position data |
523 | * @param array<string,DBPrimaryPos> $shutdownPositions New replication positions |
524 | * @param array<string,float> $shutdownTimestamps New DB post-commit shutdown timestamps |
525 | * @param int|null &$clientPosIndex New position write index |
526 | * @return array<string,mixed> Combined replication position data |
527 | */ |
528 | protected function mergePositions( |
529 | $storedValue, |
530 | array $shutdownPositions, |
531 | array $shutdownTimestamps, |
532 | ?int &$clientPosIndex = null |
533 | ) { |
534 | /** @var array<string,DBPrimaryPos> $mergedPositions */ |
535 | $mergedPositions = $storedValue[self::FLD_POSITIONS] ?? []; |
536 | // Use the newest positions for each DB primary |
537 | foreach ( $shutdownPositions as $masterName => $pos ) { |
538 | if ( |
539 | !isset( $mergedPositions[$masterName] ) || |
540 | !( $mergedPositions[$masterName] instanceof DBPrimaryPos ) || |
541 | $pos->asOfTime() > $mergedPositions[$masterName]->asOfTime() |
542 | ) { |
543 | $mergedPositions[$masterName] = $pos; |
544 | } |
545 | } |
546 | |
547 | /** @var array<string,float> $mergedTimestamps */ |
548 | $mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? []; |
549 | // Use the newest touch timestamp for each DB primary |
550 | foreach ( $shutdownTimestamps as $cluster => $timestamp ) { |
551 | if ( |
552 | !isset( $mergedTimestamps[$cluster] ) || |
553 | $timestamp > $mergedTimestamps[$cluster] |
554 | ) { |
555 | $mergedTimestamps[$cluster] = $timestamp; |
556 | } |
557 | } |
558 | |
559 | $clientPosIndex = ( $storedValue[self::FLD_WRITE_INDEX] ?? 0 ) + 1; |
560 | |
561 | return [ |
562 | self::FLD_POSITIONS => $mergedPositions, |
563 | self::FLD_TIMESTAMPS => $mergedTimestamps, |
564 | self::FLD_WRITE_INDEX => $clientPosIndex |
565 | ]; |
566 | } |
567 | |
568 | /** |
569 | * @internal For testing only |
570 | * @return float UNIX timestamp |
571 | * @codeCoverageIgnore |
572 | */ |
573 | protected function getCurrentTime() { |
574 | if ( $this->wallClockOverride ) { |
575 | return $this->wallClockOverride; |
576 | } |
577 | |
578 | $clockTime = (float)time(); // call this first |
579 | // microtime() can severely drift from time() and the microtime() value of other threads. |
580 | // Instead of seeing the current time as being in the past, use the value of time(). |
581 | return max( microtime( true ), $clockTime ); |
582 | } |
583 | |
584 | /** |
585 | * @internal For testing only |
586 | * @param float|null &$time Mock UNIX timestamp |
587 | * @codeCoverageIgnore |
588 | */ |
589 | public function setMockTime( &$time ) { |
590 | $this->load(); |
591 | $this->wallClockOverride =& $time; |
592 | } |
593 | |
594 | private function marshalPositions( array $positions ) { |
595 | foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) { |
596 | $positions[ self::FLD_POSITIONS ][ $key ] = $pos->toArray(); |
597 | } |
598 | |
599 | return $positions; |
600 | } |
601 | |
602 | /** |
603 | * @param array|false $positions |
604 | * @return array|false |
605 | */ |
606 | private function unmarshalPositions( $positions ) { |
607 | if ( !$positions ) { |
608 | return $positions; |
609 | } |
610 | |
611 | foreach ( $positions[ self::FLD_POSITIONS ] as $key => $pos ) { |
612 | $class = $pos[ '_type_' ]; |
613 | $positions[ self::FLD_POSITIONS ][ $key ] = $class::newFromArray( $pos ); |
614 | } |
615 | |
616 | return $positions; |
617 | } |
618 | |
619 | /** |
620 | * Build a string conveying the client and write index of the chronology protector data |
621 | * |
622 | * @param int $writeIndex |
623 | * @param int $time UNIX timestamp; can be used to detect stale cookies (T190082) |
624 | * @param string $clientId Client ID hash from ILBFactory::shutdown() |
625 | * @return string Value to use for "cpPosIndex" cookie |
626 | * @since 1.32 in LBFactory, moved to CP in 1.41 |
627 | */ |
628 | public static function makeCookieValueFromCPIndex( |
629 | int $writeIndex, |
630 | int $time, |
631 | string $clientId |
632 | ) { |
633 | // Format is "<write index>@<write timestamp>#<client ID hash>" |
634 | return "{$writeIndex}@{$time}#{$clientId}"; |
635 | } |
636 | |
637 | /** |
638 | * Parse a string conveying the client and write index of the chronology protector data |
639 | * |
640 | * @param string|null $value Value of "cpPosIndex" cookie |
641 | * @param int $minTimestamp Lowest UNIX timestamp that a non-expired value can have |
642 | * @return array (index: int or null, clientId: string or null) |
643 | * @since 1.32 in LBFactory, moved to CP in 1.41 |
644 | */ |
645 | public static function getCPInfoFromCookieValue( ?string $value, int $minTimestamp ) { |
646 | static $placeholder = [ 'index' => null, 'clientId' => null ]; |
647 | |
648 | if ( $value === null ) { |
649 | return $placeholder; // not set |
650 | } |
651 | |
652 | // Format is "<write index>@<write timestamp>#<client ID hash>" |
653 | if ( !preg_match( '/^(\d+)@(\d+)#([0-9a-f]{32})$/', $value, $m ) ) { |
654 | return $placeholder; // invalid |
655 | } |
656 | |
657 | $index = (int)$m[1]; |
658 | if ( $index <= 0 ) { |
659 | return $placeholder; // invalid |
660 | } elseif ( isset( $m[2] ) && $m[2] !== '' && (int)$m[2] < $minTimestamp ) { |
661 | return $placeholder; // expired |
662 | } |
663 | |
664 | $clientId = ( isset( $m[3] ) && $m[3] !== '' ) ? $m[3] : null; |
665 | |
666 | return [ 'index' => $index, 'clientId' => $clientId ]; |
667 | } |
668 | } |