203 private $callbackDepth = 0;
205 private $warmupCache = [];
207 private $warmupKeyMisses = 0;
210 private $wallClockOverride;
213 private const MAX_COMMIT_DELAY = 3;
215 private const MAX_READ_LAG = 7;
217 public const HOLDOFF_TTL = self::MAX_COMMIT_DELAY + self::MAX_READ_LAG + 1;
220 private const LOW_TTL = 60;
225 private const HOT_TTR = 900;
227 private const AGE_NEW = 60;
230 private const TSE_NONE = -1;
240 public const MIN_TIMESTAMP_NONE = 0.0;
243 private const PC_PRIMARY =
'primary:1000';
249 private const SCHEME_HASH_TAG = 1;
251 private const SCHEME_HASH_STOP = 2;
254 private const CHECK_KEY_TTL = self::TTL_YEAR;
256 private const INTERIM_KEY_TTL = 2;
259 private const LOCK_TTL = 10;
261 private const RAMPUP_TTL = 30;
264 private const TINY_NEGATIVE = -0.000001;
266 private const TINY_POSITIVE = 0.000001;
269 private const RECENT_SET_LOW_MS = 50;
271 private const RECENT_SET_HIGH_MS = 100;
274 private const GENERATION_HIGH_SEC = 0.2;
277 private const PURGE_TIME = 0;
279 private const PURGE_HOLDOFF = 1;
282 private const VERSION = 1;
298 private const RES_VALUE = 0;
300 private const RES_VERSION = 1;
302 private const RES_AS_OF = 2;
304 private const RES_TTL = 3;
306 private const RES_TOMB_AS_OF = 4;
308 private const RES_CHECK_AS_OF = 5;
310 private const RES_TOUCH_AS_OF = 6;
312 private const RES_CUR_TTL = 7;
315 private const FLD_FORMAT_VERSION = 0;
317 private const FLD_VALUE = 1;
319 private const FLD_TTL = 2;
321 private const FLD_TIME = 3;
323 private const FLD_FLAGS = 4;
325 private const FLD_VALUE_VERSION = 5;
326 private const FLD_GENERATION_TIME = 6;
329 private const TYPE_VALUE =
'v';
331 private const TYPE_TIMESTAMP =
't';
333 private const TYPE_MUTEX =
'm';
335 private const TYPE_INTERIM =
'i';
338 private const PURGE_VAL_PREFIX =
'PURGED';
369 $this->cache = $params[
'cache'];
370 $this->broadcastRoute = $params[
'broadcastRoutingPrefix'] ??
null;
371 $this->epoch = $params[
'epoch'] ?? 0;
372 $this->secret = $params[
'secret'] ?? (string)$this->epoch;
373 if ( ( $params[
'coalesceScheme'] ??
'' ) ===
'hash_tag' ) {
377 $this->coalesceScheme = self::SCHEME_HASH_TAG;
380 $this->coalesceScheme = self::SCHEME_HASH_STOP;
383 $this->
setLogger( $params[
'logger'] ??
new NullLogger() );
384 $this->tracer = $params[
'tracer'] ??
new NoopTracer();
385 $this->stats = $params[
'stats'] ?? StatsFactory::newNull();
387 $this->asyncHandler = $params[
'asyncHandler'] ??
null;
388 $this->missLog = array_fill( 0, 10, [
'', 0.0 ] );
459 final public function get( $key, &$curTTL =
null, array $checkKeys = [], &$info = [] ) {
465 $span = $this->startOperationSpan( __FUNCTION__, $key, $checkKeys );
468 $res = $this->
fetchKeys( [ $key ], $checkKeys, $now )[$key];
470 $curTTL = $res[self::RES_CUR_TTL];
472 ? $res[self::RES_AS_OF]
474 self::KEY_VERSION => $res[self::RES_VERSION],
475 self::KEY_AS_OF => $res[self::RES_AS_OF],
476 self::KEY_TTL => $res[self::RES_TTL],
477 self::KEY_CUR_TTL => $res[self::RES_CUR_TTL],
478 self::KEY_TOMB_AS_OF => $res[self::RES_TOMB_AS_OF],
479 self::KEY_CHECK_AS_OF => $res[self::RES_CHECK_AS_OF]
482 if ( $curTTL ===
null || $curTTL <= 0 ) {
484 unset( $this->missLog[array_key_first( $this->missLog )] );
488 return $res[self::RES_VALUE];
518 array $checkKeys = [],
526 $span = $this->startOperationSpan( __FUNCTION__, $keys, $checkKeys );
533 $resByKey = $this->
fetchKeys( $keys, $checkKeys, $now );
534 foreach ( $resByKey as $key => $res ) {
535 if ( $res[self::RES_VALUE] !==
false ) {
536 $valuesByKey[$key] = $res[self::RES_VALUE];
539 if ( $res[self::RES_CUR_TTL] !==
null ) {
540 $curTTLs[$key] = $res[self::RES_CUR_TTL];
542 $info[$key] = $legacyInfo
543 ? $res[self::RES_AS_OF]
545 self::KEY_VERSION => $res[self::RES_VERSION],
546 self::KEY_AS_OF => $res[self::RES_AS_OF],
547 self::KEY_TTL => $res[self::RES_TTL],
548 self::KEY_CUR_TTL => $res[self::RES_CUR_TTL],
549 self::KEY_TOMB_AS_OF => $res[self::RES_TOMB_AS_OF],
550 self::KEY_CHECK_AS_OF => $res[self::RES_CHECK_AS_OF]
572 protected function fetchKeys( array $keys, array $checkKeys,
float $now, $touchedCb =
null ) {
578 $valueSisterKeys = [];
580 $checkSisterKeysForAll = [];
582 $checkSisterKeysByKey = [];
584 foreach ( $keys as $key ) {
585 $sisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
586 $allSisterKeys[] = $sisterKey;
587 $valueSisterKeys[] = $sisterKey;
590 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
592 if ( is_int( $i ) ) {
594 $sisterKey = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
595 $allSisterKeys[] = $sisterKey;
596 $checkSisterKeysForAll[] = $sisterKey;
599 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
600 $sisterKey = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
601 $allSisterKeys[] = $sisterKey;
602 $checkSisterKeysByKey[$i][] = $sisterKey;
607 if ( $this->warmupCache ) {
609 $wrappedBySisterKey = $this->warmupCache;
610 $sisterKeysMissing = array_diff( $allSisterKeys, array_keys( $wrappedBySisterKey ) );
611 if ( $sisterKeysMissing ) {
612 $this->warmupKeyMisses += count( $sisterKeysMissing );
613 $wrappedBySisterKey += $this->cache->getMulti( $sisterKeysMissing );
617 $wrappedBySisterKey = $this->cache->getMulti( $allSisterKeys );
621 $ckPurgesForAll = $this->processCheckKeys(
622 $checkSisterKeysForAll,
628 foreach ( $checkSisterKeysByKey as $keyWithCheckKeys => $checkKeysForKey ) {
629 $ckPurgesByKey[$keyWithCheckKeys] = $this->processCheckKeys(
638 array_map(
null, $valueSisterKeys, $keys )
639 as [ $valueSisterKey, $key ]
641 if ( array_key_exists( $valueSisterKey, $wrappedBySisterKey ) ) {
643 $wrapped = $wrappedBySisterKey[$valueSisterKey];
649 $res = $this->unwrap( $wrapped, $now );
650 $value = $res[self::RES_VALUE];
652 foreach ( array_merge( $ckPurgesForAll, $ckPurgesByKey[$key] ?? [] ) as $ckPurge ) {
653 $res[self::RES_CHECK_AS_OF] = max(
654 $ckPurge[self::PURGE_TIME],
655 $res[self::RES_CHECK_AS_OF]
658 $holdoffDeadline = $ckPurge[self::PURGE_TIME] + $ckPurge[self::PURGE_HOLDOFF];
660 if ( $value !==
false && $holdoffDeadline >= $res[self::RES_AS_OF] ) {
662 $ago = min( $ckPurge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
664 $res[self::RES_CUR_TTL] = min( $res[self::RES_CUR_TTL], $ago );
668 if ( $touchedCb !==
null && $value !==
false ) {
669 $touched = $touchedCb( $value );
670 if ( $touched !==
null && $touched >= $res[self::RES_AS_OF] ) {
671 $res[self::RES_CUR_TTL] = min(
672 $res[self::RES_CUR_TTL],
673 $res[self::RES_AS_OF] - $touched,
681 $res[self::RES_TOUCH_AS_OF] = max( $res[self::RES_TOUCH_AS_OF], $touched );
683 $resByKey[$key] = $res;
695 private function processCheckKeys(
696 array $checkSisterKeys,
697 array $wrappedBySisterKey,
702 foreach ( $checkSisterKeys as $timeKey ) {
703 $purge = isset( $wrappedBySisterKey[$timeKey] )
704 ? $this->parsePurgeValue( $wrappedBySisterKey[$timeKey] )
707 if ( $purge ===
null ) {
709 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL_NONE, $purge );
714 $this->cache::WRITE_BACKGROUND
807 final public function set( $key, $value, $ttl = self::TTL_INDEFINITE, array $opts = [] ) {
809 $span = $this->startOperationSpan( __FUNCTION__, $key );
811 $keygroup = $this->determineKeyGroupForStats( $key );
813 $ok = $this->setMainValue(
817 $opts[
'version'] ??
null,
818 $opts[
'walltime'] ??
null,
820 $opts[
'since'] ??
null,
821 $opts[
'pending'] ??
false,
822 $opts[
'lockTSE'] ?? self::TSE_NONE,
823 $opts[
'staleTTL'] ?? self::STALE_TTL_NONE,
824 $opts[
'segmentable'] ??
false,
825 $opts[
'creating'] ??
false
828 $this->stats->getCounter(
'wanobjectcache_set_total' )
829 ->setLabel(
'keygroup', $keygroup )
830 ->setLabel(
'result', ( $ok ?
'ok' :
'error' ) )
831 ->copyToStatsdAt(
"wanobjectcache.$keygroup.set." . ( $ok ?
'ok' :
'error' ) )
852 private function setMainValue(
860 bool $dataPendingCommit,
873 $walltime ??= $this->timeSinceLoggedMiss( $key, $now );
874 $dataSnapshotLag = ( $dataReadSince !== null ) ? max( 0, $now - $dataReadSince ) : 0;
875 $dataCombinedLag = $dataReplicaLag + $dataSnapshotLag;
882 if ( $dataPendingCommit ) {
884 $mitigated =
'pending writes';
886 $mitigationTTL = self::TTL_UNCACHEABLE;
887 } elseif ( $dataSnapshotLag > self::MAX_READ_LAG ) {
889 $pregenSnapshotLag = ( $walltime !== null ) ? ( $dataSnapshotLag - $walltime ) : 0;
890 if ( ( $pregenSnapshotLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
892 $mitigated =
'snapshot lag (late generation)';
894 $mitigationTTL = self::TTL_UNCACHEABLE;
897 $mitigated =
'snapshot lag (high generation time)';
901 } elseif ( $dataReplicaLag ===
false || $dataReplicaLag > self::MAX_READ_LAG ) {
903 $mitigated =
'replication lag';
906 } elseif ( $dataCombinedLag > self::MAX_READ_LAG ) {
907 $pregenCombinedLag = ( $walltime !== null ) ? ( $dataCombinedLag - $walltime ) : 0;
909 if ( ( $pregenCombinedLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
911 $mitigated =
'read lag (late generation)';
913 $mitigationTTL = self::TTL_UNCACHEABLE;
916 $mitigated =
'read lag (high generation time)';
924 $mitigationTTL =
null;
927 if ( $mitigationTTL === self::TTL_UNCACHEABLE ) {
928 $this->logger->warning(
929 "Rejected set() for {cachekey} due to $mitigated.",
932 'lag' => $dataReplicaLag,
933 'age' => $dataSnapshotLag,
934 'walltime' => $walltime
945 if ( $mitigationTTL !==
null ) {
947 if ( $lockTSE >= 0 ) {
949 $logicalTTL = min( $ttl ?: INF, $mitigationTTL );
952 $ttl = min( $ttl ?: INF, $mitigationTTL );
955 $this->logger->warning(
956 "Lowered set() TTL for {cachekey} due to $mitigated.",
959 'lag' => $dataReplicaLag,
960 'age' => $dataSnapshotLag,
961 'walltime' => $walltime
967 $wrapped = $this->wrap( $value, $logicalTTL ?: $ttl, $version, $now );
968 $storeTTL = $ttl + $staleTTL;
970 $flags = $this->cache::WRITE_BACKGROUND;
971 if ( $segmentable ) {
972 $flags |= $this->cache::WRITE_ALLOW_SEGMENTS;
976 $ok = $this->cache->add(
977 $this->makeSisterKey( $key, self::TYPE_VALUE ),
983 $ok = $this->cache->merge(
984 $this->makeSisterKey( $key, self::TYPE_VALUE ),
985 static function (
$cache, $key, $cWrapped ) use ( $wrapped ) {
987 return ( is_string( $cWrapped ) ) ?
false : $wrapped;
990 $this->cache::MAX_CONFLICTS_ONE,
1062 $span = $this->startOperationSpan( __FUNCTION__, $key );
1067 $valueSisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
1073 $ok = $this->cache->delete( $this->
getRouteKey( $valueSisterKey ), $this->cache::WRITE_BACKGROUND );
1083 $purge = self::PURGE_VAL_PREFIX .
':' . (int)$now;
1084 $ok = $this->cache->set(
1088 $this->cache::WRITE_BACKGROUND
1092 $keygroup = $this->determineKeyGroupForStats( $key );
1094 $this->stats->getCounter(
'wanobjectcache_delete_total' )
1095 ->setLabel(
'keygroup', $keygroup )
1096 ->setLabel(
'result', ( $ok ?
'ok' :
'error' ) )
1097 ->copyToStatsdAt(
"wanobjectcache.$keygroup.delete." . ( $ok ?
'ok' :
'error' ) )
1124 $span = $this->startOperationSpan( __FUNCTION__, $key );
1192 $span = $this->startOperationSpan( __FUNCTION__, $keys );
1194 $checkSisterKeysByKey = [];
1195 foreach ( $keys as $key ) {
1196 $checkSisterKeysByKey[$key] = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1199 $wrappedBySisterKey = $this->cache->getMulti( $checkSisterKeysByKey );
1200 $wrappedBySisterKey += array_fill_keys( $checkSisterKeysByKey,
false );
1204 foreach ( $checkSisterKeysByKey as $key => $checkSisterKey ) {
1205 $purge = $this->parsePurgeValue( $wrappedBySisterKey[$checkSisterKey] );
1206 if ( $purge ===
null ) {
1207 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL_NONE, $purge );
1211 self::CHECK_KEY_TTL,
1212 $this->cache::WRITE_BACKGROUND
1216 $times[$key] = $purge[self::PURGE_TIME];
1257 $span = $this->startOperationSpan( __FUNCTION__, $key );
1259 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1262 $purge = $this->makeCheckPurgeValue( $now, $holdoff );
1263 $ok = $this->cache->set(
1266 self::CHECK_KEY_TTL,
1267 $this->cache::WRITE_BACKGROUND
1270 $keygroup = $this->determineKeyGroupForStats( $key );
1272 $this->stats->getCounter(
'wanobjectcache_check_total' )
1273 ->setLabel(
'keygroup', $keygroup )
1274 ->setLabel(
'result', ( $ok ?
'ok' :
'error' ) )
1275 ->copyToStatsdAt(
"wanobjectcache.$keygroup.ck_touch." . ( $ok ?
'ok' :
'error' ) )
1310 $span = $this->startOperationSpan( __FUNCTION__, $key );
1312 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1313 $ok = $this->cache->delete( $this->
getRouteKey( $checkSisterKey ), $this->cache::WRITE_BACKGROUND );
1315 $keygroup = $this->determineKeyGroupForStats( $key );
1317 $this->stats->getCounter(
'wanobjectcache_reset_total' )
1318 ->setLabel(
'keygroup', $keygroup )
1319 ->setLabel(
'result', ( $ok ?
'ok' :
'error' ) )
1320 ->copyToStatsdAt(
"wanobjectcache.$keygroup.ck_reset." . ( $ok ?
'ok' :
'error' ) )
1628 $key, $ttl, $callback, array $opts = [], array $cbParams = []
1631 $span = $this->startOperationSpan( __FUNCTION__, $key );
1633 $version = $opts[
'version'] ??
null;
1634 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
1635 $pCache = ( $pcTTL >= 0 )
1636 ? $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY )
1642 if ( $pCache && $this->callbackDepth == 0 ) {
1643 $cached = $pCache->get( $key, $pcTTL,
false );
1644 if ( $cached !==
false ) {
1645 $this->logger->debug(
"getWithSetCallback($key): process cache hit" );
1650 [ $value, $valueVersion, $curAsOf ] = $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
1651 if ( $valueVersion !== $version ) {
1655 $this->logger->debug(
"getWithSetCallback($key): using variant key" );
1656 [ $value ] = $this->fetchOrRegenerate(
1657 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), (
string)$version ),
1660 [
'version' =>
null,
'minAsOf' => $curAsOf ] + $opts,
1666 if ( $pCache && $value !==
false ) {
1667 $pCache->set( $key, $value );
1689 private function fetchOrRegenerate( $key, $ttl, $callback, array $opts, array $cbParams ) {
1690 $checkKeys = $opts[
'checkKeys'] ?? [];
1692 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
1693 $hotTTR = $opts[
'hotTTR'] ?? self::HOT_TTR;
1694 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1695 $ageNew = $opts[
'ageNew'] ?? self::AGE_NEW;
1696 $touchedCb = $opts[
'touchedCallback'] ??
null;
1699 $keygroup = $this->determineKeyGroupForStats( $key );
1702 $curState = $this->
fetchKeys( [ $key ], $checkKeys, $startTime, $touchedCb )[$key];
1703 $curValue = $curState[self::RES_VALUE];
1706 if ( $this->isAcceptablyFreshValue( $curState, $graceTTL, $minAsOf ) ) {
1708 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1709 ->setLabel(
'keygroup', $keygroup )
1710 ->setLabel(
'result',
'hit' )
1711 ->setLabel(
'reason',
'good' )
1712 ->copyToStatsdAt(
"wanobjectcache.$keygroup.hit.good" )
1715 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1716 } elseif ( $this->scheduleAsyncRefresh( $key, $ttl, $callback, $opts, $cbParams ) ) {
1717 $this->logger->debug(
"fetchOrRegenerate($key): hit with async refresh" );
1719 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1720 ->setLabel(
'keygroup', $keygroup )
1721 ->setLabel(
'result',
'hit' )
1722 ->setLabel(
'reason',
'refresh' )
1723 ->copyToStatsdAt(
"wanobjectcache.$keygroup.hit.refresh" )
1726 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1728 $this->logger->debug(
"fetchOrRegenerate($key): hit with sync refresh" );
1732 $isKeyTombstoned = ( $curState[self::RES_TOMB_AS_OF] !== null );
1734 if ( $isKeyTombstoned ) {
1735 $volState = $this->getInterimValue( $key, $minAsOf, $startTime, $touchedCb );
1736 $volValue = $volState[self::RES_VALUE];
1738 $volState = $curState;
1739 $volValue = $curValue;
1747 $lastPurgeTime = max(
1749 $volState[self::RES_TOUCH_AS_OF],
1750 $curState[self::RES_TOMB_AS_OF],
1751 $curState[self::RES_CHECK_AS_OF]
1753 $safeMinAsOf = max( $minAsOf, $lastPurgeTime + self::TINY_POSITIVE );
1755 if ( $volState[self::RES_VALUE] ===
false || $volState[self::RES_AS_OF] < $safeMinAsOf ) {
1756 $isExtremelyNewValue =
false;
1758 $age = $startTime - $volState[self::RES_AS_OF];
1759 $isExtremelyNewValue = ( $age < mt_rand( self::RECENT_SET_LOW_MS, self::RECENT_SET_HIGH_MS ) / 1e3 );
1761 if ( $isExtremelyNewValue ) {
1762 $this->logger->debug(
"fetchOrRegenerate($key): volatile hit" );
1764 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1765 ->setLabel(
'keygroup', $keygroup )
1766 ->setLabel(
'result',
'hit' )
1767 ->setLabel(
'reason',
'volatile' )
1768 ->copyToStatsdAt(
"wanobjectcache.$keygroup.hit.volatile" )
1771 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1774 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
1775 $busyValue = $opts[
'busyValue'] ??
null;
1777 $segmentable = $opts[
'segmentable'] ??
false;
1778 $version = $opts[
'version'] ??
null;
1781 $useRegenerationLock =
1792 $curState[self::RES_CUR_TTL] !==
null &&
1793 $curState[self::RES_CUR_TTL] <= 0 &&
1794 abs( $curState[self::RES_CUR_TTL] ) <= $lockTSE
1798 ( $busyValue !==
null && $volValue ===
false );
1803 $mutexKey = $this->makeSisterKey( $key, self::TYPE_MUTEX );
1805 $hasLock = $useRegenerationLock && $this->cache->add( $mutexKey, 1, self::LOCK_TTL );
1806 if ( $useRegenerationLock && !$hasLock ) {
1809 if ( $this->
isValid( $volValue, $volState[self::RES_AS_OF], $minAsOf ) ) {
1810 $this->logger->debug(
"fetchOrRegenerate($key): returning stale value" );
1812 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1813 ->setLabel(
'keygroup', $keygroup )
1814 ->setLabel(
'result',
'hit' )
1815 ->setLabel(
'reason',
'stale' )
1816 ->copyToStatsdAt(
"wanobjectcache.$keygroup.hit.stale" )
1819 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1820 } elseif ( $busyValue !==
null ) {
1821 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1822 $this->logger->debug(
"fetchOrRegenerate($key): busy $miss" );
1824 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1825 ->setLabel(
'keygroup', $keygroup )
1826 ->setLabel(
'result', $miss )
1827 ->setLabel(
'reason',
'busy' )
1828 ->copyToStatsdAt(
"wanobjectcache.$keygroup.$miss.busy" )
1831 $placeholderValue = ( $busyValue instanceof Closure ) ? $busyValue() : $busyValue;
1833 return [ $placeholderValue, $version, $curState[self::RES_AS_OF] ];
1840 ++$this->callbackDepth;
1846 ( $curState[self::RES_VERSION] === $version ) ? $curValue : false,
1849 ( $curState[self::RES_VERSION] === $version ) ? $curState[self::RES_AS_OF] : null,
1853 --$this->callbackDepth;
1858 $walltime = max( $postCallbackTime - $preCallbackTime, 0.0 );
1860 $this->stats->getTiming(
'wanobjectcache_regen_seconds' )
1861 ->setLabel(
'keygroup', $keygroup )
1862 ->copyToStatsdAt(
"wanobjectcache.$keygroup.regen_walltime" )
1863 ->observe( 1e3 * $walltime );
1868 ( $value !==
false && $ttl >= 0 ) &&
1870 ( !$useRegenerationLock || $hasLock || $isKeyTombstoned )
1873 if ( $isKeyTombstoned ) {
1874 $this->setInterimValue(
1882 $this->setMainValue(
1889 $setOpts[
'lag'] ?? 0,
1891 $setOpts[
'since'] ?? $preCallbackTime,
1893 $setOpts[
'pending'] ??
false,
1897 ( $curValue ===
false )
1903 $this->cache->delete( $mutexKey, $this->cache::WRITE_BACKGROUND );
1906 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1907 $this->logger->debug(
"fetchOrRegenerate($key): $miss, new value computed" );
1909 $this->stats->getTiming(
'wanobjectcache_getwithset_seconds' )
1910 ->setLabel(
'keygroup', $keygroup )
1911 ->setLabel(
'result', $miss )
1912 ->setLabel(
'reason',
'compute' )
1913 ->copyToStatsdAt(
"wanobjectcache.$keygroup.$miss.compute" )
1916 return [ $value, $version, $curState[self::RES_AS_OF] ];
1928 private function makeSisterKey(
string $baseKey,
string $typeChar ) {
1929 if ( $this->coalesceScheme === self::SCHEME_HASH_STOP ) {
1931 $sisterKey =
'WANCache:' . $baseKey .
'|#|' . $typeChar;
1934 $sisterKey =
'WANCache:{' . $baseKey .
'}:' . $typeChar;
1948 private function getInterimValue( $key, $minAsOf, $now, $touchedCb ) {
1950 $interimSisterKey = $this->makeSisterKey( $key, self::TYPE_INTERIM );
1951 $wrapped = $this->cache->get( $interimSisterKey );
1952 $res = $this->unwrap( $wrapped, $now );
1953 if ( $res[self::RES_VALUE] !==
false && $res[self::RES_AS_OF] >= $minAsOf ) {
1954 if ( $touchedCb !==
null ) {
1957 $res[self::RES_TOUCH_AS_OF] = max(
1958 $touchedCb( $res[self::RES_VALUE] ),
1959 $res[self::RES_TOUCH_AS_OF]
1967 return $this->unwrap(
false, $now );
1978 private function setInterimValue(
1986 $ttl = max( self::INTERIM_KEY_TTL, (
int)$ttl );
1989 $wrapped = $this->wrap( $value, $ttl, $version, $now );
1991 $flags = $this->cache::WRITE_BACKGROUND;
1992 if ( $segmentable ) {
1993 $flags |= $this->cache::WRITE_ALLOW_SEGMENTS;
1996 return $this->cache->set(
1997 $this->makeSisterKey( $key, self::TYPE_INTERIM ),
2070 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2072 $span = $this->startOperationSpan( __FUNCTION__,
'' );
2073 if ( $span->getContext()->isSampled() ) {
2074 $span->setAttributes( [
2075 'org.wikimedia.wancache.multi_count' => $keyedIds->count(),
2076 'org.wikimedia.wancache.ttl' => $ttl,
2080 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache(
2081 $this->getNonProcessCachedMultiKeys( $keyedIds, $opts ),
2082 $opts[
'checkKeys'] ?? []
2084 $this->warmupKeyMisses = 0;
2090 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2093 return $callback( $params[
'id'], $oldValue, $ttl, $setOpts, $oldAsOf );
2098 foreach ( $keyedIds as $key => $id ) {
2108 $this->warmupCache = [];
2180 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2182 $span = $this->startOperationSpan( __FUNCTION__,
'' );
2183 if ( $span->getContext()->isSampled() ) {
2184 $span->setAttributes( [
2185 'org.wikimedia.wancache.multi_count' => $keyedIds->count(),
2186 'org.wikimedia.wancache.ttl' => $ttl,
2189 $checkKeys = $opts[
'checkKeys'] ?? [];
2190 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
2193 unset( $opts[
'lockTSE'] );
2194 unset( $opts[
'busyValue'] );
2197 $keysByIdGet = $this->getNonProcessCachedMultiKeys( $keyedIds, $opts );
2198 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache( $keysByIdGet, $checkKeys );
2199 $this->warmupKeyMisses = 0;
2206 $resByKey = $this->
fetchKeys( $keysByIdGet, $checkKeys, $now );
2207 foreach ( $keysByIdGet as $id => $key ) {
2208 $res = $resByKey[$key];
2210 $res[self::RES_VALUE] ===
false ||
2211 $res[self::RES_CUR_TTL] < 0 ||
2212 $res[self::RES_AS_OF] < $minAsOf
2220 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
2221 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
2223 $method = __METHOD__;
2228 $proxyCb =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2229 use ( $callback, $newValsById, $newTTLsById, $newSetOpts, $method )
2231 $id = $params[
'id'];
2233 if ( array_key_exists( $id, $newValsById ) ) {
2235 $newValue = $newValsById[$id];
2236 $ttl = $newTTLsById[$id];
2237 $setOpts = $newSetOpts;
2241 $ttls = [ $id => $ttl ];
2242 $result = $callback( [ $id ], $ttls, $setOpts );
2243 if ( !isset( $result[$id] ) ) {
2245 $this->logger->warning(
2246 $method .
' failed due to {id} not set in result {result}', [
2248 'result' => json_encode( $result )
2251 $newValue = $result[$id];
2260 foreach ( $keyedIds as $key => $id ) {
2270 $this->warmupCache = [];
2283 return $this->cache->makeGlobalKey( $keygroup, ...$components );
2293 public function makeKey( $keygroup, ...$components ) {
2294 return $this->cache->makeKey( $keygroup, ...$components );
2305 return hash_hmac(
'sha256', $component, $this->secret );
2361 foreach ( $ids as $id ) {
2363 if ( strlen( $id ) > 64 ) {
2364 $this->logger->warning( __METHOD__ .
": long ID '$id'; use hash256()" );
2366 $key = $keyCallback( $id, $this );
2368 if ( !isset( $idByKey[$key] ) ) {
2369 $idByKey[$key] = $id;
2370 } elseif ( (
string)$id !== (
string)$idByKey[$key] ) {
2371 throw new UnexpectedValueException(
2372 "Cache key collision; IDs ('$id','{$idByKey[$key]}') map to '$key'"
2377 return new ArrayIterator( $idByKey );
2416 if ( count( $ids ) !== count( $res ) ) {
2419 $ids = array_keys( array_fill_keys( $ids,
true ) );
2420 if ( count( $ids ) !== count( $res ) ) {
2421 throw new UnexpectedValueException(
"Multi-key result does not match ID list" );
2425 return array_combine( $ids, $res );
2435 return $this->cache->watchErrors();
2456 $code = $this->cache->getLastError( $watchPoint );
2475 $this->processCaches = [];
2508 return $this->cache->getQoS( $flag );
2574 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
2576 $mtime = (int)$mtime;
2577 if ( $mtime <= 0 ) {
2584 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
2594 return $this->warmupKeyMisses;
2602 if ( $this->broadcastRoute !==
null ) {
2603 if ( $sisterKey[0] ===
'/' ) {
2604 throw new RuntimeException(
"Sister key '$sisterKey' already contains a route." );
2606 return $this->broadcastRoute . $sisterKey;
2622 private function scheduleAsyncRefresh( $key, $ttl, $callback, array $opts, array $cbParams ) {
2623 if ( !$this->asyncHandler ) {
2631 $func(
function () use ( $key, $ttl, $callback, $opts, $cbParams ) {
2632 $opts[
'minAsOf'] = INF;
2634 $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
2635 }
catch ( Exception $e ) {
2637 $this->logger->error(
'Async refresh failed for {key}', [
2657 private function isAcceptablyFreshValue( $res, $graceTTL, $minAsOf ) {
2658 if ( !$this->
isValid( $res[self::RES_VALUE], $res[self::RES_AS_OF], $minAsOf ) ) {
2663 $curTTL = $res[self::RES_CUR_TTL];
2664 if ( $curTTL > 0 ) {
2670 $curGraceTTL = $graceTTL + $curTTL;
2672 return ( $curGraceTTL > 0 )
2690 $curTTL = $res[self::RES_CUR_TTL];
2691 $logicalTTL = $res[self::RES_TTL];
2692 $asOf = $res[self::RES_AS_OF];
2716 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
2720 $age = $now - $asOf;
2721 $timeOld = $age - $ageNew;
2722 if ( $timeOld <= 0 ) {
2726 $popularHitsPerSec = 1;
2730 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
2734 $chance = 1 / ( $popularHitsPerSec * $refreshWindowSec );
2736 $chance *= ( $timeOld <= self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2738 return ( mt_rand( 1, 1_000_000_000 ) <= 1_000_000_000 * $chance );
2760 if ( $lowTTL <= 0 ) {
2765 $effectiveLowTTL = min( $lowTTL, $logicalTTL ?: INF );
2768 $timeOld = $effectiveLowTTL - $curTTL;
2769 if ( $timeOld <= 0 || $timeOld >= $effectiveLowTTL ) {
2774 $ttrRatio = $timeOld / $effectiveLowTTL;
2777 $chance = $ttrRatio ** 4;
2779 return ( mt_rand( 1, 1_000_000_000 ) <= 1_000_000_000 * $chance );
2790 protected function isValid( $value, $asOf, $minAsOf ) {
2791 return ( $value !==
false && $asOf >= $minAsOf );
2801 private function wrap( $value, $ttl, $version, $now ) {
2805 self::FLD_FORMAT_VERSION => self::VERSION,
2806 self::FLD_VALUE => $value,
2807 self::FLD_TTL => $ttl,
2808 self::FLD_TIME => $now
2810 if ( $version !==
null ) {
2811 $wrapped[self::FLD_VALUE_VERSION] = $version;
2831 private function unwrap( $wrapped, $now ) {
2835 self::RES_VALUE =>
false,
2836 self::RES_VERSION =>
null,
2837 self::RES_AS_OF =>
null,
2838 self::RES_TTL =>
null,
2839 self::RES_TOMB_AS_OF =>
null,
2841 self::RES_CHECK_AS_OF =>
null,
2842 self::RES_TOUCH_AS_OF =>
null,
2843 self::RES_CUR_TTL => null
2846 if ( is_array( $wrapped ) ) {
2849 ( $wrapped[self::FLD_FORMAT_VERSION] ??
null ) === self::VERSION &&
2850 $wrapped[self::FLD_TIME] >= $this->epoch
2852 if ( $wrapped[self::FLD_TTL] > 0 ) {
2854 $age = $now - $wrapped[self::FLD_TIME];
2855 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2860 $res[self::RES_VALUE] = $wrapped[self::FLD_VALUE];
2861 $res[self::RES_VERSION] = $wrapped[self::FLD_VALUE_VERSION] ??
null;
2862 $res[self::RES_AS_OF] = $wrapped[self::FLD_TIME];
2863 $res[self::RES_CUR_TTL] = $curTTL;
2864 $res[self::RES_TTL] = $wrapped[self::FLD_TTL];
2868 $purge = $this->parsePurgeValue( $wrapped );
2869 if ( $purge !==
null ) {
2871 $curTTL = min( $purge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
2872 $res[self::RES_CUR_TTL] = $curTTL;
2873 $res[self::RES_TOMB_AS_OF] = $purge[self::PURGE_TIME];
2885 private function determineKeyGroupForStats( $key ) {
2886 $parts = explode(
':', $key, 3 );
2889 return strtr( $parts[1] ?? $parts[0],
'.',
'_' );
2900 private function parsePurgeValue( $value ) {
2901 if ( !is_string( $value ) ) {
2905 $segments = explode(
':', $value, 3 );
2906 $prefix = $segments[0];
2907 if ( $prefix !== self::PURGE_VAL_PREFIX ) {
2912 $timestamp = (float)$segments[1];
2914 $holdoff = isset( $segments[2] ) ? (int)$segments[2] : self::
HOLDOFF_TTL;
2916 if ( $timestamp < $this->epoch ) {
2921 return [ self::PURGE_TIME => $timestamp, self::PURGE_HOLDOFF => $holdoff ];
2930 private function makeCheckPurgeValue(
float $timestamp,
int $holdoff, ?array &$purge =
null ) {
2931 $normalizedTime = (int)$timestamp;
2933 $purge = [ self::PURGE_TIME => (float)$normalizedTime, self::PURGE_HOLDOFF => $holdoff ];
2935 return self::PURGE_VAL_PREFIX .
":$normalizedTime:$holdoff";
2942 private function getProcessCache( $group ) {
2943 if ( !isset( $this->processCaches[$group] ) ) {
2944 [ , $size ] = explode(
':', $group );
2945 $this->processCaches[$group] =
new MapCacheLRU( (
int)$size );
2946 if ( $this->wallClockOverride !==
null ) {
2947 $this->processCaches[$group]->setMockTime( $this->wallClockOverride );
2951 return $this->processCaches[$group];
2959 private function getNonProcessCachedMultiKeys( ArrayIterator $keys, array $opts ) {
2960 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
2963 if ( $pcTTL > 0 && $this->callbackDepth == 0 ) {
2964 $pCache = $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY );
2965 foreach ( $keys as $key => $id ) {
2966 if ( !$pCache->has( $key, $pcTTL ) ) {
2967 $keysMissing[$id] = $key;
2972 return $keysMissing;
2981 private function fetchWrappedValuesForWarmupCache( array $keys, array $checkKeys ) {
2988 foreach ( $keys as $baseKey ) {
2989 $sisterKeys[] = $this->makeSisterKey( $baseKey, self::TYPE_VALUE );
2992 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
2994 if ( is_int( $i ) ) {
2996 $sisterKeys[] = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
2999 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
3000 $sisterKeys[] = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
3005 $wrappedBySisterKey = $this->cache->getMulti( $sisterKeys );
3006 $wrappedBySisterKey += array_fill_keys( $sisterKeys,
false );
3008 return $wrappedBySisterKey;
3016 private function timeSinceLoggedMiss( $key, $now ) {
3018 for ( end( $this->missLog ); $miss = current( $this->missLog ); prev( $this->missLog ) ) {
3019 if ( $miss[0] === $key ) {
3020 return ( $now - $miss[1] );
3032 return $this->wallClockOverride ?: microtime(
true );
3040 $this->wallClockOverride =& $time;
3041 $this->cache->setMockTime( $time );
3042 foreach ( $this->processCaches as $pCache ) {
3043 $pCache->setMockTime( $time );
3058 private function startOperationSpan( $opName, $keys, $checkKeys = [] ) {
3059 $span = $this->tracer->createSpan(
"WANObjectCache::$opName" )
3063 if ( !$span->getContext()->isSampled() ) {
3067 $keys = is_array( $keys ) ? implode(
' ', $keys ) : $keys;
3069 if ( count( $checkKeys ) > 0 ) {
3070 $checkKeys = array_map(
3071 static fn ( $checkKeyOrKeyGroup ) =>
3072 is_array( $checkKeyOrKeyGroup )
3073 ? implode(
' ', $checkKeyOrKeyGroup )
3074 : $checkKeyOrKeyGroup,
3077 $checkKeys = implode(
' ', $checkKeys );
3078 $span->setAttributes( [
'org.wikimedia.wancache.check_keys' => $checkKeys ] );
3081 $span->setAttributes( [
'org.wikimedia.wancache.keys' => $keys ] );