26 use Psr\Log\LoggerAwareInterface;
27 use Psr\Log\LoggerInterface;
28 use Psr\Log\NullLogger;
29 use Wikimedia\WaitConditionLoop;
70 const POSITION_TTL = 60;
72 const POSITION_COOKIE_TTL = 10;
74 const POS_STORE_WAIT_TIMEOUT = 5;
85 if ( isset( $client[
'clientId'] ) ) {
86 $this->clientId = $client[
'clientId'];
88 $this->clientId = ( $secret !=
'' )
89 ? hash_hmac(
'md5', $client[
'ip'] .
"\n" . $client[
'agent'], $secret )
90 : md5( $client[
'ip'] .
"\n" . $client[
'agent'] );
93 $this->waitForPosIndex = $posIndex;
95 $this->clientLogInfo = [
96 'clientIP' => $client[
'ip'],
97 'clientAgent' => $client[
'agent'],
98 'clientId' => $client[
'clientId'] ?? null
101 $this->logger =
new NullLogger();
146 if ( !$this->enabled ) {
155 $this->logger->debug( __METHOD__ .
": pos for DB '$masterName' set to '$pos'\n" );
171 if ( !$this->enabled ) {
182 $this->logger->debug( __METHOD__ .
": LB for '$masterName' has pos $pos\n" );
183 $this->shutdownPositions[$masterName] = $pos;
186 $this->logger->debug( __METHOD__ .
": DB '$masterName' touched\n" );
188 $this->shutdownTouchDBs[$masterName] = 1;
200 public function shutdown( callable $workCallback =
null, $mode =
'sync', &$cpIndex =
null ) {
201 if ( !$this->enabled ) {
208 foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
216 if ( $this->shutdownPositions === [] ) {
217 $this->logger->debug( __METHOD__ .
": no master positions to save\n" );
222 $this->logger->debug(
223 __METHOD__ .
": saving master pos for " .
224 implode(
', ', array_keys( $this->shutdownPositions ) ) .
"\n"
231 if ( $workCallback ) {
244 ( $mode ===
'sync' ) ? $store::WRITE_SYNC : 0
255 $this->logger->warning( __METHOD__ .
": failed to save master pos for " .
256 implode(
', ', array_keys( $this->shutdownPositions ) ) .
"\n"
258 } elseif ( $mode ===
'sync' &&
259 $store->
getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
262 $this->logger->info( __METHOD__ .
": store may not support synchronous writes." );
265 $bouncedPositions = [];
268 return $bouncedPositions;
277 return $this->store->get( $this->
getTouchedKey( $this->store, $dbName ) );
293 if ( $this->initialized ) {
297 $this->initialized =
true;
298 $this->logger->debug( __METHOD__ .
": client ID is {$this->clientId} (read)\n" );
306 if ( $this->waitForPosIndex > 0 ) {
308 $indexReached =
null;
309 $loop =
new WaitConditionLoop(
310 function () use ( &$data, &$indexReached ) {
311 $data = $this->store->get( $this->key );
312 if ( !is_array( $data ) ) {
313 return WaitConditionLoop::CONDITION_CONTINUE;
314 } elseif ( !isset( $data[
'writeIndex'] ) ) {
315 return WaitConditionLoop::CONDITION_REACHED;
317 $indexReached = max( $data[
'writeIndex'], $indexReached );
319 return ( $data[
'writeIndex'] >= $this->waitForPosIndex )
320 ? WaitConditionLoop::CONDITION_REACHED
321 : WaitConditionLoop::CONDITION_CONTINUE;
325 $result = $loop->invoke();
326 $waitedMs = $loop->getLastWaitTime() * 1e3;
328 if ( $result == $loop::CONDITION_REACHED ) {
329 $this->logger->debug(
330 __METHOD__ .
": expected and found position index.",
332 'cpPosIndex' => $this->waitForPosIndex,
333 'waitTimeMs' => $waitedMs
334 ] + $this->clientLogInfo
337 $this->logger->warning(
338 __METHOD__ .
": expected but failed to find position index.",
340 'cpPosIndex' => $this->waitForPosIndex,
341 'indexReached' => $indexReached,
342 'waitTimeMs' => $waitedMs
343 ] + $this->clientLogInfo
347 $data = $this->store->get( $this->key );
350 $this->startupPositions = $data ? $data[
'positions'] : [];
351 $this->logger->debug( __METHOD__ .
": key is {$this->key} (read)\n" );
353 $this->startupPositions = [];
354 $this->logger->debug( __METHOD__ .
": key is {$this->key} (unread)\n" );
368 $curPositions = $curValue[
'positions'] ?? [];
372 !isset( $curPositions[$db] ) ||
374 $pos->asOfTime() > $curPositions[$db]->asOfTime()
376 $curPositions[$db] = $pos;
380 $cpIndex = $curValue[
'writeIndex'] ?? 0;
383 'positions' => $curPositions,
384 'writeIndex' => ++$cpIndex