201 private $callbackDepth = 0;
203 private $warmupCache = [];
205 private $warmupKeyMisses = 0;
208 private $wallClockOverride;
211 private const MAX_COMMIT_DELAY = 3;
213 private const MAX_READ_LAG = 7;
215 public const HOLDOFF_TTL = self::MAX_COMMIT_DELAY + self::MAX_READ_LAG + 1;
218 private const LOW_TTL = 60;
223 private const HOT_TTR = 900;
225 private const AGE_NEW = 60;
228 private const TSE_NONE = -1;
238 public const MIN_TIMESTAMP_NONE = 0.0;
241 private const PC_PRIMARY =
'primary:1000';
247 private const SCHEME_HASH_TAG = 1;
249 private const SCHEME_HASH_STOP = 2;
252 private const CHECK_KEY_TTL = self::TTL_YEAR;
254 private const INTERIM_KEY_TTL = 2;
257 private const LOCK_TTL = 10;
259 private const RAMPUP_TTL = 30;
262 private const TINY_NEGATIVE = -0.000001;
264 private const TINY_POSITIVE = 0.000001;
267 private const RECENT_SET_LOW_MS = 50;
269 private const RECENT_SET_HIGH_MS = 100;
272 private const GENERATION_HIGH_SEC = 0.2;
275 private const PURGE_TIME = 0;
277 private const PURGE_HOLDOFF = 1;
280 private const VERSION = 1;
296 private const RES_VALUE = 0;
298 private const RES_VERSION = 1;
300 private const RES_AS_OF = 2;
302 private const RES_TTL = 3;
304 private const RES_TOMB_AS_OF = 4;
306 private const RES_CHECK_AS_OF = 5;
308 private const RES_TOUCH_AS_OF = 6;
310 private const RES_CUR_TTL = 7;
313 private const FLD_FORMAT_VERSION = 0;
315 private const FLD_VALUE = 1;
317 private const FLD_TTL = 2;
319 private const FLD_TIME = 3;
321 private const FLD_FLAGS = 4;
323 private const FLD_VALUE_VERSION = 5;
324 private const FLD_GENERATION_TIME = 6;
327 private const TYPE_VALUE =
'v';
329 private const TYPE_TIMESTAMP =
't';
331 private const TYPE_MUTEX =
'm';
333 private const TYPE_INTERIM =
'i';
336 private const PURGE_VAL_PREFIX =
'PURGED';
366 $this->cache = $params[
'cache'];
367 $this->broadcastRoute = $params[
'broadcastRoutingPrefix'] ??
null;
368 $this->epoch = $params[
'epoch'] ?? 0;
369 if ( ( $params[
'coalesceScheme'] ??
'' ) ===
'hash_tag' ) {
373 $this->coalesceScheme = self::SCHEME_HASH_TAG;
376 $this->coalesceScheme = self::SCHEME_HASH_STOP;
379 $this->
setLogger( $params[
'logger'] ??
new NullLogger() );
380 $this->tracer = $params[
'tracer'] ??
new NoopTracer();
381 $this->stats = $params[
'stats'] ?? StatsFactory::newNull();
383 $this->asyncHandler = $params[
'asyncHandler'] ??
null;
384 $this->missLog = array_fill( 0, 10, [
'', 0.0 ] );
455 final public function get( $key, &$curTTL =
null, array $checkKeys = [], &$info = [] ) {
458 $legacyInfo = ( $info !== self::PASS_BY_REF );
461 $span = $this->startOperationSpan( __FUNCTION__, $key, $checkKeys );
463 $now = $this->getCurrentTime();
464 $res = $this->fetchKeys( [ $key ], $checkKeys, $now )[$key];
466 $curTTL = $res[self::RES_CUR_TTL];
468 ? $res[self::RES_AS_OF]
470 self::KEY_VERSION => $res[self::RES_VERSION],
471 self::KEY_AS_OF => $res[self::RES_AS_OF],
472 self::KEY_TTL => $res[self::RES_TTL],
473 self::KEY_CUR_TTL => $res[self::RES_CUR_TTL],
474 self::KEY_TOMB_AS_OF => $res[self::RES_TOMB_AS_OF],
475 self::KEY_CHECK_AS_OF => $res[self::RES_CHECK_AS_OF]
478 if ( $curTTL ===
null || $curTTL <= 0 ) {
480 unset( $this->missLog[array_key_first( $this->missLog )] );
481 $this->missLog[] = [ $key, $this->getCurrentTime() ];
484 return $res[self::RES_VALUE];
514 array $checkKeys = [],
519 $legacyInfo = ( $info !== self::PASS_BY_REF );
522 $span = $this->startOperationSpan( __FUNCTION__, $keys, $checkKeys );
528 $now = $this->getCurrentTime();
529 $resByKey = $this->fetchKeys( $keys, $checkKeys, $now );
530 foreach ( $resByKey as $key => $res ) {
531 if ( $res[self::RES_VALUE] !==
false ) {
532 $valuesByKey[$key] = $res[self::RES_VALUE];
535 if ( $res[self::RES_CUR_TTL] !==
null ) {
536 $curTTLs[$key] = $res[self::RES_CUR_TTL];
538 $info[$key] = $legacyInfo
539 ? $res[self::RES_AS_OF]
541 self::KEY_VERSION => $res[self::RES_VERSION],
542 self::KEY_AS_OF => $res[self::RES_AS_OF],
543 self::KEY_TTL => $res[self::RES_TTL],
544 self::KEY_CUR_TTL => $res[self::RES_CUR_TTL],
545 self::KEY_TOMB_AS_OF => $res[self::RES_TOMB_AS_OF],
546 self::KEY_CHECK_AS_OF => $res[self::RES_CHECK_AS_OF]
568 protected function fetchKeys( array $keys, array $checkKeys,
float $now, $touchedCb =
null ) {
574 $valueSisterKeys = [];
576 $checkSisterKeysForAll = [];
578 $checkSisterKeysByKey = [];
580 foreach ( $keys as $key ) {
581 $sisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
582 $allSisterKeys[] = $sisterKey;
583 $valueSisterKeys[] = $sisterKey;
586 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
588 if ( is_int( $i ) ) {
590 $sisterKey = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
591 $allSisterKeys[] = $sisterKey;
592 $checkSisterKeysForAll[] = $sisterKey;
595 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
596 $sisterKey = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
597 $allSisterKeys[] = $sisterKey;
598 $checkSisterKeysByKey[$i][] = $sisterKey;
603 if ( $this->warmupCache ) {
605 $wrappedBySisterKey = $this->warmupCache;
606 $sisterKeysMissing = array_diff( $allSisterKeys, array_keys( $wrappedBySisterKey ) );
607 if ( $sisterKeysMissing ) {
608 $this->warmupKeyMisses += count( $sisterKeysMissing );
609 $wrappedBySisterKey += $this->cache->getMulti( $sisterKeysMissing );
613 $wrappedBySisterKey = $this->cache->getMulti( $allSisterKeys );
617 $ckPurgesForAll = $this->processCheckKeys(
618 $checkSisterKeysForAll,
624 foreach ( $checkSisterKeysByKey as $keyWithCheckKeys => $checkKeysForKey ) {
625 $ckPurgesByKey[$keyWithCheckKeys] = $this->processCheckKeys(
634 array_map(
null, $valueSisterKeys, $keys )
635 as [ $valueSisterKey, $key ]
637 if ( array_key_exists( $valueSisterKey, $wrappedBySisterKey ) ) {
639 $wrapped = $wrappedBySisterKey[$valueSisterKey];
645 $res = $this->unwrap( $wrapped, $now );
646 $value = $res[self::RES_VALUE];
648 foreach ( array_merge( $ckPurgesForAll, $ckPurgesByKey[$key] ?? [] ) as $ckPurge ) {
649 $res[self::RES_CHECK_AS_OF] = max(
650 $ckPurge[self::PURGE_TIME],
651 $res[self::RES_CHECK_AS_OF]
654 $holdoffDeadline = $ckPurge[self::PURGE_TIME] + $ckPurge[self::PURGE_HOLDOFF];
656 if ( $value !==
false && $holdoffDeadline >= $res[self::RES_AS_OF] ) {
658 $ago = min( $ckPurge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
660 $res[self::RES_CUR_TTL] = min( $res[self::RES_CUR_TTL], $ago );
664 if ( $touchedCb !==
null && $value !==
false ) {
665 $touched = $touchedCb( $value );
666 if ( $touched !==
null && $touched >= $res[self::RES_AS_OF] ) {
667 $res[self::RES_CUR_TTL] = min(
668 $res[self::RES_CUR_TTL],
669 $res[self::RES_AS_OF] - $touched,
677 $res[self::RES_TOUCH_AS_OF] = max( $res[self::RES_TOUCH_AS_OF], $touched );
679 $resByKey[$key] = $res;
691 private function processCheckKeys(
692 array $checkSisterKeys,
693 array $wrappedBySisterKey,
698 foreach ( $checkSisterKeys as $timeKey ) {
699 $purge = isset( $wrappedBySisterKey[$timeKey] )
700 ? $this->parsePurgeValue( $wrappedBySisterKey[$timeKey] )
703 if ( $purge ===
null ) {
705 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL_NONE, $purge );
710 $this->cache::WRITE_BACKGROUND
803 final public function set( $key, $value, $ttl = self::TTL_INDEFINITE, array $opts = [] ) {
805 $span = $this->startOperationSpan( __FUNCTION__, $key );
807 $keygroup = $this->determineKeyGroupForStats( $key );
809 $ok = $this->setMainValue(
813 $opts[
'version'] ??
null,
814 $opts[
'walltime'] ??
null,
816 $opts[
'since'] ??
null,
817 $opts[
'pending'] ??
false,
818 $opts[
'lockTSE'] ?? self::TSE_NONE,
819 $opts[
'staleTTL'] ?? self::STALE_TTL_NONE,
820 $opts[
'segmentable'] ??
false,
821 $opts[
'creating'] ??
false
824 $this->stats->getCounter(
'wanobjectcache_set_total' )
825 ->setLabel(
'keygroup', $keygroup )
826 ->setLabel(
'result', ( $ok ?
'ok' :
'error' ) )
827 ->copyToStatsdAt(
"wanobjectcache.$keygroup.set." . ( $ok ?
'ok' :
'error' ) )
848 private function setMainValue(
856 bool $dataPendingCommit,
867 $now = $this->getCurrentTime();
869 $walltime ??= $this->timeSinceLoggedMiss( $key, $now );
870 $dataSnapshotLag = ( $dataReadSince !== null ) ? max( 0, $now - $dataReadSince ) : 0;
871 $dataCombinedLag = $dataReplicaLag + $dataSnapshotLag;
878 if ( $dataPendingCommit ) {
880 $mitigated =
'pending writes';
882 $mitigationTTL = self::TTL_UNCACHEABLE;
883 } elseif ( $dataSnapshotLag > self::MAX_READ_LAG ) {
885 $pregenSnapshotLag = ( $walltime !== null ) ? ( $dataSnapshotLag - $walltime ) : 0;
886 if ( ( $pregenSnapshotLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
888 $mitigated =
'snapshot lag (late generation)';
890 $mitigationTTL = self::TTL_UNCACHEABLE;
893 $mitigated =
'snapshot lag (high generation time)';
895 $mitigationTTL = self::TTL_LAGGED;
897 } elseif ( $dataReplicaLag ===
false || $dataReplicaLag > self::MAX_READ_LAG ) {
899 $mitigated =
'replication lag';
901 $mitigationTTL = self::TTL_LAGGED;
902 } elseif ( $dataCombinedLag > self::MAX_READ_LAG ) {
903 $pregenCombinedLag = ( $walltime !== null ) ? ( $dataCombinedLag - $walltime ) : 0;
905 if ( ( $pregenCombinedLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
907 $mitigated =
'read lag (late generation)';
909 $mitigationTTL = self::TTL_UNCACHEABLE;
912 $mitigated =
'read lag (high generation time)';
914 $mitigationTTL = self::TTL_LAGGED;
920 $mitigationTTL =
null;
923 if ( $mitigationTTL === self::TTL_UNCACHEABLE ) {
924 $this->logger->warning(
925 "Rejected set() for {cachekey} due to $mitigated.",
928 'lag' => $dataReplicaLag,
929 'age' => $dataSnapshotLag,
930 'walltime' => $walltime
941 if ( $mitigationTTL !==
null ) {
943 if ( $lockTSE >= 0 ) {
945 $logicalTTL = min( $ttl ?: INF, $mitigationTTL );
948 $ttl = min( $ttl ?: INF, $mitigationTTL );
951 $this->logger->warning(
952 "Lowered set() TTL for {cachekey} due to $mitigated.",
955 'lag' => $dataReplicaLag,
956 'age' => $dataSnapshotLag,
957 'walltime' => $walltime
963 $wrapped = $this->wrap( $value, $logicalTTL ?: $ttl, $version, $now );
964 $storeTTL = $ttl + $staleTTL;
966 $flags = $this->cache::WRITE_BACKGROUND;
967 if ( $segmentable ) {
968 $flags |= $this->cache::WRITE_ALLOW_SEGMENTS;
972 $ok = $this->cache->add(
973 $this->makeSisterKey( $key, self::TYPE_VALUE ),
979 $ok = $this->cache->merge(
980 $this->makeSisterKey( $key, self::TYPE_VALUE ),
981 static function ( $cache, $key, $cWrapped ) use ( $wrapped ) {
983 return ( is_string( $cWrapped ) ) ?
false : $wrapped;
986 $this->cache::MAX_CONFLICTS_ONE,
1056 final public function delete( $key, $ttl = self::HOLDOFF_TTL ) {
1058 $span = $this->startOperationSpan( __FUNCTION__, $key );
1063 $valueSisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
1069 $ok = $this->cache->delete( $this->getRouteKey( $valueSisterKey ), $this->cache::WRITE_BACKGROUND );
1077 $now = $this->getCurrentTime();
1079 $purge = self::PURGE_VAL_PREFIX .
':' . (int)$now;
1080 $ok = $this->cache->set(
1081 $this->getRouteKey( $valueSisterKey ),
1084 $this->cache::WRITE_BACKGROUND
1088 $keygroup = $this->determineKeyGroupForStats( $key );
1090 $this->stats->getCounter(
'wanobjectcache_delete_total' )
1091 ->setLabel(
'keygroup', $keygroup )
1092 ->setLabel(
'result', ( $ok ?
'ok' :
'error' ) )
1093 ->copyToStatsdAt(
"wanobjectcache.$keygroup.delete." . ( $ok ?
'ok' :
'error' ) )
1120 $span = $this->startOperationSpan( __FUNCTION__, $key );
1122 return $this->getMultiCheckKeyTime( [ $key ] )[$key];
1188 $span = $this->startOperationSpan( __FUNCTION__, $keys );
1190 $checkSisterKeysByKey = [];
1191 foreach ( $keys as $key ) {
1192 $checkSisterKeysByKey[$key] = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1195 $wrappedBySisterKey = $this->cache->getMulti( $checkSisterKeysByKey );
1196 $wrappedBySisterKey += array_fill_keys( $checkSisterKeysByKey,
false );
1198 $now = $this->getCurrentTime();
1200 foreach ( $checkSisterKeysByKey as $key => $checkSisterKey ) {
1201 $purge = $this->parsePurgeValue( $wrappedBySisterKey[$checkSisterKey] );
1202 if ( $purge ===
null ) {
1203 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL_NONE, $purge );
1207 self::CHECK_KEY_TTL,
1208 $this->cache::WRITE_BACKGROUND
1212 $times[$key] = $purge[self::PURGE_TIME];
1253 $span = $this->startOperationSpan( __FUNCTION__, $key );
1255 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1257 $now = $this->getCurrentTime();
1258 $purge = $this->makeCheckPurgeValue( $now, $holdoff );
1259 $ok = $this->cache->set(
1260 $this->getRouteKey( $checkSisterKey ),
1262 self::CHECK_KEY_TTL,
1263 $this->cache::WRITE_BACKGROUND
1266 $keygroup = $this->determineKeyGroupForStats( $key );
1268 $this->stats->getCounter(
'wanobjectcache_check_total' )
1269 ->setLabel(
'keygroup', $keygroup )
1270 ->setLabel(
'result', ( $ok ?
'ok' :
'error' ) )
1271 ->copyToStatsdAt(
"wanobjectcache.$keygroup.ck_touch." . ( $ok ?
'ok' :
'error' ) )
1306 $span = $this->startOperationSpan( __FUNCTION__, $key );
1308 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1309 $ok = $this->cache->delete( $this->getRouteKey( $checkSisterKey ), $this->cache::WRITE_BACKGROUND );
1311 $keygroup = $this->determineKeyGroupForStats( $key );
1313 $this->stats->getCounter(
'wanobjectcache_reset_total' )
1314 ->setLabel(
'keygroup', $keygroup )
1315 ->setLabel(
'result', ( $ok ?
'ok' :
'error' ) )
1316 ->copyToStatsdAt(
"wanobjectcache.$keygroup.ck_reset." . ( $ok ?
'ok' :
'error' ) )
1624 $key, $ttl, $callback, array $opts = [], array $cbParams = []
1627 $span = $this->startOperationSpan( __FUNCTION__, $key );
1629 $version = $opts[
'version'] ??
null;
1630 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
1631 $pCache = ( $pcTTL >= 0 )
1632 ? $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY )
1638 if ( $pCache && $this->callbackDepth == 0 ) {
1639 $cached = $pCache->get( $key, $pcTTL,
false );
1640 if ( $cached !==
false ) {
1641 $this->logger->debug(
"getWithSetCallback($key): process cache hit" );
1646 [ $value, $valueVersion, $curAsOf ] = $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
1647 if ( $valueVersion !== $version ) {
1651 $this->logger->debug(
"getWithSetCallback($key): using variant key" );
1652 [ $value ] = $this->fetchOrRegenerate(
1653 $this->makeGlobalKey(
'WANCache-key-variant', md5( $key ), (
string)$version ),
1656 [
'version' =>
null,
'minAsOf' => $curAsOf ] + $opts,
1662 if ( $pCache && $value !==
false ) {
1663 $pCache->set( $key, $value );
1685 private function fetchOrRegenerate( $key, $ttl, $callback, array $opts, array $cbParams ) {
1686 $checkKeys = $opts[
'checkKeys'] ?? [];
1687 $graceTTL = $opts[
'graceTTL'] ?? self::GRACE_TTL_NONE;
1688 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
1689 $hotTTR = $opts[
'hotTTR'] ?? self::HOT_TTR;
1690 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1691 $ageNew = $opts[
'ageNew'] ?? self::AGE_NEW;
1692 $touchedCb = $opts[
'touchedCallback'] ??
null;
1693 $startTime = $this->getCurrentTime();
1695 $keygroup = $this->determineKeyGroupForStats( $key );
1698 $curState = $this->fetchKeys( [ $key ], $checkKeys, $startTime, $touchedCb )[$key];
1699 $curValue = $curState[self::RES_VALUE];
1702 if ( $this->isAcceptablyFreshValue( $curState, $graceTTL, $minAsOf ) ) {
1703 if ( !$this->isLotteryRefreshDue( $curState, $lowTTL, $ageNew, $hotTTR, $startTime ) ) {
1704 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1705 ->setLabel(
'keygroup', $keygroup )
1706 ->setLabel(
'result',
'hit' )
1707 ->setLabel(
'reason',
'good' )
1708 ->copyToStatsdAt(
"wanobjectcache.$keygroup.hit.good" )
1709 ->observe( 1e3 * ( $this->getCurrentTime() - $startTime ) );
1711 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1712 } elseif ( $this->scheduleAsyncRefresh( $key, $ttl, $callback, $opts, $cbParams ) ) {
1713 $this->logger->debug(
"fetchOrRegenerate($key): hit with async refresh" );
1715 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1716 ->setLabel(
'keygroup', $keygroup )
1717 ->setLabel(
'result',
'hit' )
1718 ->setLabel(
'reason',
'refresh' )
1719 ->copyToStatsdAt(
"wanobjectcache.$keygroup.hit.refresh" )
1720 ->observe( 1e3 * ( $this->getCurrentTime() - $startTime ) );
1722 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1724 $this->logger->debug(
"fetchOrRegenerate($key): hit with sync refresh" );
1728 $isKeyTombstoned = ( $curState[self::RES_TOMB_AS_OF] !== null );
1730 if ( $isKeyTombstoned ) {
1731 $volState = $this->getInterimValue( $key, $minAsOf, $startTime, $touchedCb );
1732 $volValue = $volState[self::RES_VALUE];
1734 $volState = $curState;
1735 $volValue = $curValue;
1743 $lastPurgeTime = max(
1745 $volState[self::RES_TOUCH_AS_OF],
1746 $curState[self::RES_TOMB_AS_OF],
1747 $curState[self::RES_CHECK_AS_OF]
1749 $safeMinAsOf = max( $minAsOf, $lastPurgeTime + self::TINY_POSITIVE );
1751 if ( $volState[self::RES_VALUE] ===
false || $volState[self::RES_AS_OF] < $safeMinAsOf ) {
1752 $isExtremelyNewValue =
false;
1754 $age = $startTime - $volState[self::RES_AS_OF];
1755 $isExtremelyNewValue = ( $age < mt_rand( self::RECENT_SET_LOW_MS, self::RECENT_SET_HIGH_MS ) / 1e3 );
1757 if ( $isExtremelyNewValue ) {
1758 $this->logger->debug(
"fetchOrRegenerate($key): volatile hit" );
1760 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1761 ->setLabel(
'keygroup', $keygroup )
1762 ->setLabel(
'result',
'hit' )
1763 ->setLabel(
'reason',
'volatile' )
1764 ->copyToStatsdAt(
"wanobjectcache.$keygroup.hit.volatile" )
1765 ->observe( 1e3 * ( $this->getCurrentTime() - $startTime ) );
1767 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1770 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
1771 $busyValue = $opts[
'busyValue'] ??
null;
1772 $staleTTL = $opts[
'staleTTL'] ?? self::STALE_TTL_NONE;
1773 $segmentable = $opts[
'segmentable'] ??
false;
1774 $version = $opts[
'version'] ??
null;
1777 $useRegenerationLock =
1788 $curState[self::RES_CUR_TTL] !==
null &&
1789 $curState[self::RES_CUR_TTL] <= 0 &&
1790 abs( $curState[self::RES_CUR_TTL] ) <= $lockTSE
1794 ( $busyValue !==
null && $volValue ===
false );
1799 $mutexKey = $this->makeSisterKey( $key, self::TYPE_MUTEX );
1801 $hasLock = $useRegenerationLock && $this->cache->add( $mutexKey, 1, self::LOCK_TTL );
1802 if ( $useRegenerationLock && !$hasLock ) {
1805 if ( $this->isValid( $volValue, $volState[self::RES_AS_OF], $minAsOf ) ) {
1806 $this->logger->debug(
"fetchOrRegenerate($key): returning stale value" );
1808 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1809 ->setLabel(
'keygroup', $keygroup )
1810 ->setLabel(
'result',
'hit' )
1811 ->setLabel(
'reason',
'stale' )
1812 ->copyToStatsdAt(
"wanobjectcache.$keygroup.hit.stale" )
1813 ->observe( 1e3 * ( $this->getCurrentTime() - $startTime ) );
1815 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1816 } elseif ( $busyValue !==
null ) {
1817 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1818 $this->logger->debug(
"fetchOrRegenerate($key): busy $miss" );
1820 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1821 ->setLabel(
'keygroup', $keygroup )
1822 ->setLabel(
'result', $miss )
1823 ->setLabel(
'reason',
'busy' )
1824 ->copyToStatsdAt(
"wanobjectcache.$keygroup.$miss.busy" )
1825 ->observe( 1e3 * ( $this->getCurrentTime() - $startTime ) );
1827 $placeholderValue = ( $busyValue instanceof Closure ) ? $busyValue() : $busyValue;
1829 return [ $placeholderValue, $version, $curState[self::RES_AS_OF] ];
1835 $preCallbackTime = $this->getCurrentTime();
1836 ++$this->callbackDepth;
1842 ( $curState[self::RES_VERSION] === $version ) ? $curValue : false,
1845 ( $curState[self::RES_VERSION] === $version ) ? $curState[self::RES_AS_OF] : null,
1849 --$this->callbackDepth;
1851 $postCallbackTime = $this->getCurrentTime();
1854 $walltime = max( $postCallbackTime - $preCallbackTime, 0.0 );
1856 $this->stats->getTiming(
'wanobjectcache_regen_seconds' )
1857 ->setLabel(
'keygroup', $keygroup )
1858 ->copyToStatsdAt(
"wanobjectcache.$keygroup.regen_walltime" )
1859 ->observe( 1e3 * $walltime );
1864 ( $value !==
false && $ttl >= 0 ) &&
1866 ( !$useRegenerationLock || $hasLock || $isKeyTombstoned )
1869 if ( $isKeyTombstoned ) {
1870 $this->setInterimValue(
1878 $this->setMainValue(
1885 $setOpts[
'lag'] ?? 0,
1887 $setOpts[
'since'] ?? $preCallbackTime,
1889 $setOpts[
'pending'] ??
false,
1893 ( $curValue ===
false )
1899 $this->cache->delete( $mutexKey, $this->cache::WRITE_BACKGROUND );
1902 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1903 $this->logger->debug(
"fetchOrRegenerate($key): $miss, new value computed" );
1905 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1906 ->setLabel(
'keygroup', $keygroup )
1907 ->setLabel(
'result', $miss )
1908 ->setLabel(
'reason',
'compute' )
1909 ->copyToStatsdAt(
"wanobjectcache.$keygroup.$miss.compute" )
1910 ->observe( 1e3 * ( $this->getCurrentTime() - $startTime ) );
1912 return [ $value, $version, $curState[self::RES_AS_OF] ];
1924 private function makeSisterKey(
string $baseKey,
string $typeChar ) {
1925 if ( $this->coalesceScheme === self::SCHEME_HASH_STOP ) {
1927 $sisterKey =
'WANCache:' . $baseKey .
'|#|' . $typeChar;
1930 $sisterKey =
'WANCache:{' . $baseKey .
'}:' . $typeChar;
1944 private function getInterimValue( $key, $minAsOf, $now, $touchedCb ) {
1945 if ( $this->useInterimHoldOffCaching ) {
1946 $interimSisterKey = $this->makeSisterKey( $key, self::TYPE_INTERIM );
1947 $wrapped = $this->cache->get( $interimSisterKey );
1948 $res = $this->unwrap( $wrapped, $now );
1949 if ( $res[self::RES_VALUE] !==
false && $res[self::RES_AS_OF] >= $minAsOf ) {
1950 if ( $touchedCb !==
null ) {
1953 $res[self::RES_TOUCH_AS_OF] = max(
1954 $touchedCb( $res[self::RES_VALUE] ),
1955 $res[self::RES_TOUCH_AS_OF]
1963 return $this->unwrap(
false, $now );
1974 private function setInterimValue(
1981 $now = $this->getCurrentTime();
1982 $ttl = max( self::INTERIM_KEY_TTL, (
int)$ttl );
1985 $wrapped = $this->wrap( $value, $ttl, $version, $now );
1987 $flags = $this->cache::WRITE_BACKGROUND;
1988 if ( $segmentable ) {
1989 $flags |= $this->cache::WRITE_ALLOW_SEGMENTS;
1992 return $this->cache->set(
1993 $this->makeSisterKey( $key, self::TYPE_INTERIM ),
2066 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2068 $span = $this->startOperationSpan( __FUNCTION__,
'' );
2069 if ( $span->getContext()->isSampled() ) {
2070 $span->setAttributes( [
2071 'org.wikimedia.wancache.multi_count' => $keyedIds->count(),
2072 'org.wikimedia.wancache.ttl' => $ttl,
2076 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache(
2077 $this->getNonProcessCachedMultiKeys( $keyedIds, $opts ),
2078 $opts[
'checkKeys'] ?? []
2080 $this->warmupKeyMisses = 0;
2086 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2089 return $callback( $params[
'id'], $oldValue, $ttl, $setOpts, $oldAsOf );
2094 foreach ( $keyedIds as $key => $id ) {
2095 $values[$key] = $this->getWithSetCallback(
2104 $this->warmupCache = [];
2176 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2178 $span = $this->startOperationSpan( __FUNCTION__,
'' );
2179 if ( $span->getContext()->isSampled() ) {
2180 $span->setAttributes( [
2181 'org.wikimedia.wancache.multi_count' => $keyedIds->count(),
2182 'org.wikimedia.wancache.ttl' => $ttl,
2185 $checkKeys = $opts[
'checkKeys'] ?? [];
2186 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
2189 unset( $opts[
'lockTSE'] );
2190 unset( $opts[
'busyValue'] );
2193 $keysByIdGet = $this->getNonProcessCachedMultiKeys( $keyedIds, $opts );
2194 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache( $keysByIdGet, $checkKeys );
2195 $this->warmupKeyMisses = 0;
2201 $now = $this->getCurrentTime();
2202 $resByKey = $this->fetchKeys( $keysByIdGet, $checkKeys, $now );
2203 foreach ( $keysByIdGet as $id => $key ) {
2204 $res = $resByKey[$key];
2206 $res[self::RES_VALUE] ===
false ||
2207 $res[self::RES_CUR_TTL] < 0 ||
2208 $res[self::RES_AS_OF] < $minAsOf
2216 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
2217 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
2219 $method = __METHOD__;
2224 $proxyCb =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2225 use ( $callback, $newValsById, $newTTLsById, $newSetOpts, $method )
2227 $id = $params[
'id'];
2229 if ( array_key_exists( $id, $newValsById ) ) {
2231 $newValue = $newValsById[$id];
2232 $ttl = $newTTLsById[$id];
2233 $setOpts = $newSetOpts;
2237 $ttls = [ $id => $ttl ];
2238 $result = $callback( [ $id ], $ttls, $setOpts );
2239 if ( !isset( $result[$id] ) ) {
2241 $this->logger->warning(
2242 $method .
' failed due to {id} not set in result {result}', [
2244 'result' => json_encode( $result )
2247 $newValue = $result[$id];
2256 foreach ( $keyedIds as $key => $id ) {
2257 $values[$key] = $this->getWithSetCallback(
2266 $this->warmupCache = [];
2279 return $this->cache->makeGlobalKey( $keygroup, ...$components );
2289 public function makeKey( $keygroup, ...$components ) {
2290 return $this->cache->makeKey( $keygroup, ...$components );
2336 foreach ( $ids as $id ) {
2337 $key = $keyCallback( $id, $this );
2339 if ( !isset( $idByKey[$key] ) ) {
2340 $idByKey[$key] = $id;
2341 } elseif ( (
string)$id !== (
string)$idByKey[$key] ) {
2342 throw new UnexpectedValueException(
2343 "Cache key collision; IDs ('$id','{$idByKey[$key]}') map to '$key'"
2348 return new ArrayIterator( $idByKey );
2387 if ( count( $ids ) !== count( $res ) ) {
2390 $ids = array_keys( array_fill_keys( $ids,
true ) );
2391 if ( count( $ids ) !== count( $res ) ) {
2392 throw new UnexpectedValueException(
"Multi-key result does not match ID list" );
2396 return array_combine( $ids, $res );
2406 return $this->cache->watchErrors();
2427 $code = $this->cache->getLastError( $watchPoint );
2446 $this->processCaches = [];
2470 $this->useInterimHoldOffCaching = $enabled;
2479 return $this->cache->getQoS( $flag );
2545 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
2547 $mtime = (int)$mtime;
2548 if ( $mtime <= 0 ) {
2553 $age = (int)$this->getCurrentTime() - $mtime;
2555 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
2565 return $this->warmupKeyMisses;
2573 if ( $this->broadcastRoute !==
null ) {
2574 if ( $sisterKey[0] ===
'/' ) {
2575 throw new RuntimeException(
"Sister key '$sisterKey' already contains a route." );
2577 return $this->broadcastRoute . $sisterKey;
2593 private function scheduleAsyncRefresh( $key, $ttl, $callback, array $opts, array $cbParams ) {
2594 if ( !$this->asyncHandler ) {
2601 $func = $this->asyncHandler;
2602 $func(
function () use ( $key, $ttl, $callback, $opts, $cbParams ) {
2603 $opts[
'minAsOf'] = INF;
2605 $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
2606 }
catch ( Exception $e ) {
2608 $this->logger->error(
'Async refresh failed for {key}', [
2628 private function isAcceptablyFreshValue( $res, $graceTTL, $minAsOf ) {
2629 if ( !$this->isValid( $res[self::RES_VALUE], $res[self::RES_AS_OF], $minAsOf ) ) {
2634 $curTTL = $res[self::RES_CUR_TTL];
2635 if ( $curTTL > 0 ) {
2641 $curGraceTTL = $graceTTL + $curTTL;
2643 return ( $curGraceTTL > 0 )
2645 ? !$this->worthRefreshExpiring( $curGraceTTL, $graceTTL, $graceTTL )
2661 $curTTL = $res[self::RES_CUR_TTL];
2662 $logicalTTL = $res[self::RES_TTL];
2663 $asOf = $res[self::RES_AS_OF];
2666 $this->worthRefreshExpiring( $curTTL, $logicalTTL, $lowTTL ) ||
2667 $this->worthRefreshPopular( $asOf, $ageNew, $hotTTR, $now )
2687 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
2691 $age = $now - $asOf;
2692 $timeOld = $age - $ageNew;
2693 if ( $timeOld <= 0 ) {
2697 $popularHitsPerSec = 1;
2701 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
2705 $chance = 1 / ( $popularHitsPerSec * $refreshWindowSec );
2707 $chance *= ( $timeOld <= self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2709 return ( mt_rand( 1, 1_000_000_000 ) <= 1_000_000_000 * $chance );
2731 if ( $lowTTL <= 0 ) {
2736 $effectiveLowTTL = min( $lowTTL, $logicalTTL ?: INF );
2739 $timeOld = $effectiveLowTTL - $curTTL;
2740 if ( $timeOld <= 0 || $timeOld >= $effectiveLowTTL ) {
2745 $ttrRatio = $timeOld / $effectiveLowTTL;
2748 $chance = $ttrRatio ** 4;
2750 return ( mt_rand( 1, 1_000_000_000 ) <= 1_000_000_000 * $chance );
2761 protected function isValid( $value, $asOf, $minAsOf ) {
2762 return ( $value !==
false && $asOf >= $minAsOf );
2772 private function wrap( $value, $ttl, $version, $now ) {
2776 self::FLD_FORMAT_VERSION => self::VERSION,
2777 self::FLD_VALUE => $value,
2778 self::FLD_TTL => $ttl,
2779 self::FLD_TIME => $now
2781 if ( $version !==
null ) {
2782 $wrapped[self::FLD_VALUE_VERSION] = $version;
2802 private function unwrap( $wrapped, $now ) {
2806 self::RES_VALUE =>
false,
2807 self::RES_VERSION =>
null,
2808 self::RES_AS_OF =>
null,
2809 self::RES_TTL =>
null,
2810 self::RES_TOMB_AS_OF =>
null,
2812 self::RES_CHECK_AS_OF =>
null,
2813 self::RES_TOUCH_AS_OF =>
null,
2814 self::RES_CUR_TTL => null
2817 if ( is_array( $wrapped ) ) {
2820 ( $wrapped[self::FLD_FORMAT_VERSION] ??
null ) === self::VERSION &&
2821 $wrapped[self::FLD_TIME] >= $this->epoch
2823 if ( $wrapped[self::FLD_TTL] > 0 ) {
2825 $age = $now - $wrapped[self::FLD_TIME];
2826 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2831 $res[self::RES_VALUE] = $wrapped[self::FLD_VALUE];
2832 $res[self::RES_VERSION] = $wrapped[self::FLD_VALUE_VERSION] ??
null;
2833 $res[self::RES_AS_OF] = $wrapped[self::FLD_TIME];
2834 $res[self::RES_CUR_TTL] = $curTTL;
2835 $res[self::RES_TTL] = $wrapped[self::FLD_TTL];
2839 $purge = $this->parsePurgeValue( $wrapped );
2840 if ( $purge !==
null ) {
2842 $curTTL = min( $purge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
2843 $res[self::RES_CUR_TTL] = $curTTL;
2844 $res[self::RES_TOMB_AS_OF] = $purge[self::PURGE_TIME];
2856 private function determineKeyGroupForStats( $key ) {
2857 $parts = explode(
':', $key, 3 );
2860 return strtr( $parts[1] ?? $parts[0],
'.',
'_' );
2871 private function parsePurgeValue( $value ) {
2872 if ( !is_string( $value ) ) {
2876 $segments = explode(
':', $value, 3 );
2877 $prefix = $segments[0];
2878 if ( $prefix !== self::PURGE_VAL_PREFIX ) {
2883 $timestamp = (float)$segments[1];
2885 $holdoff = isset( $segments[2] ) ? (int)$segments[2] : self::HOLDOFF_TTL;
2887 if ( $timestamp < $this->epoch ) {
2892 return [ self::PURGE_TIME => $timestamp, self::PURGE_HOLDOFF => $holdoff ];
2901 private function makeCheckPurgeValue(
float $timestamp,
int $holdoff, ?array &$purge =
null ) {
2902 $normalizedTime = (int)$timestamp;
2904 $purge = [ self::PURGE_TIME => (float)$normalizedTime, self::PURGE_HOLDOFF => $holdoff ];
2906 return self::PURGE_VAL_PREFIX .
":$normalizedTime:$holdoff";
2913 private function getProcessCache( $group ) {
2914 if ( !isset( $this->processCaches[$group] ) ) {
2915 [ , $size ] = explode(
':', $group );
2916 $this->processCaches[$group] =
new MapCacheLRU( (
int)$size );
2917 if ( $this->wallClockOverride !==
null ) {
2918 $this->processCaches[$group]->setMockTime( $this->wallClockOverride );
2922 return $this->processCaches[$group];
2930 private function getNonProcessCachedMultiKeys( ArrayIterator $keys, array $opts ) {
2931 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
2934 if ( $pcTTL > 0 && $this->callbackDepth == 0 ) {
2935 $pCache = $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY );
2936 foreach ( $keys as $key => $id ) {
2937 if ( !$pCache->has( $key, $pcTTL ) ) {
2938 $keysMissing[$id] = $key;
2943 return $keysMissing;
2952 private function fetchWrappedValuesForWarmupCache( array $keys, array $checkKeys ) {
2959 foreach ( $keys as $baseKey ) {
2960 $sisterKeys[] = $this->makeSisterKey( $baseKey, self::TYPE_VALUE );
2963 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
2965 if ( is_int( $i ) ) {
2967 $sisterKeys[] = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
2970 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
2971 $sisterKeys[] = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
2976 $wrappedBySisterKey = $this->cache->getMulti( $sisterKeys );
2977 $wrappedBySisterKey += array_fill_keys( $sisterKeys,
false );
2979 return $wrappedBySisterKey;
2987 private function timeSinceLoggedMiss( $key, $now ) {
2989 for ( end( $this->missLog ); $miss = current( $this->missLog ); prev( $this->missLog ) ) {
2990 if ( $miss[0] === $key ) {
2991 return ( $now - $miss[1] );
3003 return $this->wallClockOverride ?: microtime(
true );
3011 $this->wallClockOverride =& $time;
3012 $this->cache->setMockTime( $time );
3013 foreach ( $this->processCaches as $pCache ) {
3014 $pCache->setMockTime( $time );
3029 private function startOperationSpan( $opName, $keys, $checkKeys = [] ) {
3030 $span = $this->tracer->createSpan(
"WANObjectCache::$opName" )
3034 if ( !$span->getContext()->isSampled() ) {
3038 $keys = is_array( $keys ) ? implode(
' ', $keys ) : $keys;
3040 if ( count( $checkKeys ) > 0 ) {
3041 $checkKeys = array_map(
3042 static fn ( $checkKeyOrKeyGroup ) =>
3043 is_array( $checkKeyOrKeyGroup )
3044 ? implode(
' ', $checkKeyOrKeyGroup )
3045 : $checkKeyOrKeyGroup,
3048 $checkKeys = implode(
' ', $checkKeys );
3049 $span->setAttributes( [
'org.wikimedia.wancache.check_keys' => $checkKeys ] );
3052 $span->setAttributes( [
'org.wikimedia.wancache.keys' => $keys ] );