27use Psr\Log\LoggerAwareInterface;
28use Psr\Log\LoggerInterface;
29use Psr\Log\NullLogger;
30use Wikimedia\WaitConditionLoop;
72 public const POSITION_TTL = 60;
74 public const POSITION_COOKIE_TTL = 10;
76 private const POS_STORE_WAIT_TIMEOUT = 5;
87 if ( isset( $client[
'clientId'] ) ) {
88 $this->clientId = $client[
'clientId'];
90 $this->clientId = ( $secret !=
'' )
91 ? hash_hmac(
'md5', $client[
'ip'] .
"\n" . $client[
'agent'], $secret )
92 : md5( $client[
'ip'] .
"\n" . $client[
'agent'] );
95 $this->waitForPosIndex = $posIndex;
97 $this->clientLogInfo = [
98 'clientIP' => $client[
'ip'],
99 'clientAgent' => $client[
'agent'],
100 'clientId' => $client[
'clientId'] ?? null
103 $this->logger =
new NullLogger();
148 if ( !$this->enabled ) {
157 $this->logger->debug( __METHOD__ .
": pos for DB '$masterName' set to '$pos'" );
173 if ( !$this->enabled ) {
184 $this->logger->debug( __METHOD__ .
": LB for '$masterName' has pos $pos" );
185 $this->shutdownPositions[$masterName] = $pos;
188 $this->logger->debug( __METHOD__ .
": DB '$masterName' touched" );
190 $this->shutdownTouchDBs[$masterName] = 1;
202 public function shutdown( callable $workCallback =
null, $mode =
'sync', &$cpIndex =
null ) {
203 if ( !$this->enabled ) {
210 foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
218 if ( $this->shutdownPositions === [] ) {
219 $this->logger->debug( __METHOD__ .
": no master positions to save" );
224 $this->logger->debug(
225 __METHOD__ .
": saving master pos for " .
226 implode(
', ', array_keys( $this->shutdownPositions ) )
233 if ( $workCallback ) {
242 $this->shutdownPositions,
246 ( $mode ===
'sync' ) ? $store::WRITE_SYNC : 0
257 $this->logger->warning( __METHOD__ .
": failed to save master pos for " .
258 implode(
', ', array_keys( $this->shutdownPositions ) )
260 } elseif ( $mode ===
'sync' &&
261 $store->
getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
264 $this->logger->info( __METHOD__ .
": store may not support synchronous writes." );
267 $bouncedPositions = [];
270 return $bouncedPositions;
281 return $this->store->get( $this->
getTouchedKey( $this->store, $masterName ) );
298 if ( $this->initialized ) {
302 $this->initialized =
true;
303 $this->logger->debug( __METHOD__ .
": client ID is {$this->clientId} (read)" );
311 if ( $this->waitForPosIndex > 0 ) {
313 $indexReached =
null;
314 $loop =
new WaitConditionLoop(
315 function () use ( &$data, &$indexReached ) {
316 $data = $this->store->get( $this->key );
317 if ( !is_array( $data ) ) {
318 return WaitConditionLoop::CONDITION_CONTINUE;
319 } elseif ( !isset( $data[
'writeIndex'] ) ) {
320 return WaitConditionLoop::CONDITION_REACHED;
322 $indexReached = max( $data[
'writeIndex'], $indexReached );
324 return ( $data[
'writeIndex'] >= $this->waitForPosIndex )
325 ? WaitConditionLoop::CONDITION_REACHED
326 : WaitConditionLoop::CONDITION_CONTINUE;
330 $result = $loop->invoke();
331 $waitedMs = $loop->getLastWaitTime() * 1e3;
333 if ( $result == $loop::CONDITION_REACHED ) {
334 $this->logger->debug(
335 __METHOD__ .
": expected and found position index.",
337 'cpPosIndex' => $this->waitForPosIndex,
338 'waitTimeMs' => $waitedMs
339 ] + $this->clientLogInfo
342 $this->logger->warning(
343 __METHOD__ .
": expected but failed to find position index.",
345 'cpPosIndex' => $this->waitForPosIndex,
346 'indexReached' => $indexReached,
347 'waitTimeMs' => $waitedMs
348 ] + $this->clientLogInfo
352 $data = $this->store->get( $this->key );
355 $this->startupPositions = $data ? $data[
'positions'] : [];
356 $this->logger->debug( __METHOD__ .
": key is {$this->key} (read)" );
358 $this->startupPositions = [];
359 $this->logger->debug( __METHOD__ .
": key is {$this->key} (unread)" );
373 $curPositions = $curValue[
'positions'] ?? [];
377 !isset( $curPositions[$db] ) ||
379 $pos->asOfTime() > $curPositions[$db]->asOfTime()
381 $curPositions[$db] = $pos;
385 $cpIndex = $curValue[
'writeIndex'] ?? 0;
388 'positions' => $curPositions,
389 'writeIndex' => ++$cpIndex
Class representing a cache/ephemeral data store.
get( $key, $flags=0)
Get an item with the given key.
unlock( $key)
Release an advisory lock on a key string.
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
makeGlobalKey( $class,... $components)
Make a global cache key.
set( $key, $value, $exptime=0, $flags=0)
Set an item.
addBusyCallback(callable $workCallback)
Let a callback be run to avoid wasting time on special blocking calls.