166 private $callbackDepth = 0;
168 private $warmupCache = [];
170 private $warmupKeyMisses = 0;
173 private $wallClockOverride;
176 private const MAX_COMMIT_DELAY = 3;
178 private const MAX_READ_LAG = 7;
180 public const HOLDOFF_TTL = self::MAX_COMMIT_DELAY + self::MAX_READ_LAG + 1;
183 private const LOW_TTL = 60;
188 private const HOT_TTR = 900;
190 private const AGE_NEW = 60;
193 private const TSE_NONE = -1;
203 public const MIN_TIMESTAMP_NONE = 0.0;
206 private const PC_PRIMARY =
'primary:1000';
212 private const SCHEME_HASH_TAG = 1;
214 private const SCHEME_HASH_STOP = 2;
217 private const CHECK_KEY_TTL = self::TTL_YEAR;
219 private const INTERIM_KEY_TTL = 2;
222 private const LOCK_TTL = 10;
224 private const RAMPUP_TTL = 30;
227 private const TINY_NEGATIVE = -0.000001;
229 private const TINY_POSITIVE = 0.000001;
232 private const RECENT_SET_LOW_MS = 50;
234 private const RECENT_SET_HIGH_MS = 100;
237 private const GENERATION_HIGH_SEC = 0.2;
240 private const PURGE_TIME = 0;
242 private const PURGE_HOLDOFF = 1;
245 private const VERSION = 1;
261 private const RES_VALUE = 0;
263 private const RES_VERSION = 1;
265 private const RES_AS_OF = 2;
267 private const RES_TTL = 3;
269 private const RES_TOMB_AS_OF = 4;
271 private const RES_CHECK_AS_OF = 5;
273 private const RES_TOUCH_AS_OF = 6;
275 private const RES_CUR_TTL = 7;
278 private const FLD_FORMAT_VERSION = 0;
280 private const FLD_VALUE = 1;
282 private const FLD_TTL = 2;
284 private const FLD_TIME = 3;
286 private const FLD_FLAGS = 4;
288 private const FLD_VALUE_VERSION = 5;
289 private const FLD_GENERATION_TIME = 6;
292 private const TYPE_VALUE =
'v';
294 private const TYPE_TIMESTAMP =
't';
296 private const TYPE_MUTEX =
'm';
298 private const TYPE_INTERIM =
'i';
301 private const PURGE_VAL_PREFIX =
'PURGED';
330 $this->cache =
$params[
'cache'];
331 $this->broadcastRoute =
$params[
'broadcastRoutingPrefix'] ??
null;
332 $this->epoch =
$params[
'epoch'] ?? 0;
333 $this->secret =
$params[
'secret'] ?? (string)$this->epoch;
334 if ( (
$params[
'coalesceScheme'] ??
'' ) ===
'hash_tag' ) {
338 $this->coalesceScheme = self::SCHEME_HASH_TAG;
341 $this->coalesceScheme = self::SCHEME_HASH_STOP;
346 $this->asyncHandler =
$params[
'asyncHandler'] ??
null;
348 $this->missLog = array_fill( 0, 10, [
'', 0.0 ] );
355 $this->logger = $logger;
422 final public function get( $key, &$curTTL =
null, array $checkKeys = [], &$info = [] ) {
425 $legacyInfo = ( $info !== self::PASS_BY_REF );
428 $res = $this->
fetchKeys( [ $key ], $checkKeys, $now )[$key];
430 $curTTL = $res[self::RES_CUR_TTL];
432 ? $res[self::RES_AS_OF]
434 self::KEY_VERSION => $res[self::RES_VERSION],
435 self::KEY_AS_OF => $res[self::RES_AS_OF],
436 self::KEY_TTL => $res[self::RES_TTL],
437 self::KEY_CUR_TTL => $res[self::RES_CUR_TTL],
438 self::KEY_TOMB_AS_OF => $res[self::RES_TOMB_AS_OF],
439 self::KEY_CHECK_AS_OF => $res[self::RES_CHECK_AS_OF]
442 if ( $curTTL ===
null || $curTTL <= 0 ) {
444 unset( $this->missLog[array_key_first( $this->missLog )] );
448 return $res[self::RES_VALUE];
478 array $checkKeys = [],
483 $legacyInfo = ( $info !== self::PASS_BY_REF );
490 $resByKey = $this->
fetchKeys( $keys, $checkKeys, $now );
491 foreach ( $resByKey as $key => $res ) {
492 if ( $res[self::RES_VALUE] !==
false ) {
493 $valuesByKey[$key] = $res[self::RES_VALUE];
496 if ( $res[self::RES_CUR_TTL] !==
null ) {
497 $curTTLs[$key] = $res[self::RES_CUR_TTL];
499 $info[$key] = $legacyInfo
500 ? $res[self::RES_AS_OF]
502 self::KEY_VERSION => $res[self::RES_VERSION],
503 self::KEY_AS_OF => $res[self::RES_AS_OF],
504 self::KEY_TTL => $res[self::RES_TTL],
505 self::KEY_CUR_TTL => $res[self::RES_CUR_TTL],
506 self::KEY_TOMB_AS_OF => $res[self::RES_TOMB_AS_OF],
507 self::KEY_CHECK_AS_OF => $res[self::RES_CHECK_AS_OF]
529 protected function fetchKeys( array $keys, array $checkKeys,
float $now, $touchedCb =
null ) {
535 $valueSisterKeys = [];
537 $checkSisterKeysForAll = [];
539 $checkSisterKeysByKey = [];
541 foreach ( $keys as $key ) {
542 $sisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
543 $allSisterKeys[] = $sisterKey;
544 $valueSisterKeys[] = $sisterKey;
547 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
549 if ( is_int( $i ) ) {
551 $sisterKey = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
552 $allSisterKeys[] = $sisterKey;
553 $checkSisterKeysForAll[] = $sisterKey;
556 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
557 $sisterKey = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
558 $allSisterKeys[] = $sisterKey;
559 $checkSisterKeysByKey[$i][] = $sisterKey;
564 if ( $this->warmupCache ) {
566 $wrappedBySisterKey = $this->warmupCache;
567 $sisterKeysMissing = array_diff( $allSisterKeys, array_keys( $wrappedBySisterKey ) );
568 if ( $sisterKeysMissing ) {
569 $this->warmupKeyMisses += count( $sisterKeysMissing );
570 $wrappedBySisterKey += $this->cache->getMulti( $sisterKeysMissing );
574 $wrappedBySisterKey = $this->cache->getMulti( $allSisterKeys );
578 $ckPurgesForAll = $this->processCheckKeys(
579 $checkSisterKeysForAll,
585 foreach ( $checkSisterKeysByKey as $keyWithCheckKeys => $checkKeysForKey ) {
586 $ckPurgesByKey[$keyWithCheckKeys] = $this->processCheckKeys(
595 foreach ( $valueSisterKeys as $valueSisterKey ) {
597 $key = current( $keys );
600 if ( array_key_exists( $valueSisterKey, $wrappedBySisterKey ) ) {
602 $wrapped = $wrappedBySisterKey[$valueSisterKey];
608 $res = $this->unwrap( $wrapped, $now );
609 $value = $res[self::RES_VALUE];
611 foreach ( array_merge( $ckPurgesForAll, $ckPurgesByKey[$key] ?? [] ) as $ckPurge ) {
612 $res[self::RES_CHECK_AS_OF] = max(
613 $ckPurge[self::PURGE_TIME],
614 $res[self::RES_CHECK_AS_OF]
617 $holdoffDeadline = $ckPurge[self::PURGE_TIME] + $ckPurge[self::PURGE_HOLDOFF];
619 if ( $value !==
false && $holdoffDeadline >= $res[self::RES_AS_OF] ) {
621 $ago = min( $ckPurge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
623 $res[self::RES_CUR_TTL] = min( $res[self::RES_CUR_TTL], $ago );
627 if ( $touchedCb !==
null && $value !==
false ) {
628 $touched = $touchedCb( $value );
629 if ( $touched !==
null && $touched >= $res[self::RES_AS_OF] ) {
630 $res[self::RES_CUR_TTL] = min(
631 $res[self::RES_CUR_TTL],
632 $res[self::RES_AS_OF] - $touched,
640 $res[self::RES_TOUCH_AS_OF] = max( $res[self::RES_TOUCH_AS_OF], $touched );
642 $resByKey[$key] = $res;
654 private function processCheckKeys(
655 array $checkSisterKeys,
656 array $wrappedBySisterKey,
661 foreach ( $checkSisterKeys as $timeKey ) {
662 $purge = isset( $wrappedBySisterKey[$timeKey] )
663 ? $this->parsePurgeValue( $wrappedBySisterKey[$timeKey] )
666 if ( $purge ===
null ) {
668 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL_NONE, $purge );
673 $this->cache::WRITE_BACKGROUND
766 final public function set( $key, $value, $ttl = self::TTL_INDEFINITE, array $opts = [] ) {
767 $keygroup = $this->determineKeyGroupForStats( $key );
769 $ok = $this->setMainValue(
773 $opts[
'version'] ??
null,
774 $opts[
'walltime'] ??
null,
776 $opts[
'since'] ??
null,
777 $opts[
'pending'] ??
false,
778 $opts[
'lockTSE'] ?? self::TSE_NONE,
779 $opts[
'staleTTL'] ?? self::STALE_TTL_NONE,
780 $opts[
'segmentable'] ??
false,
781 $opts[
'creating'] ??
false
784 $this->stats->increment(
"wanobjectcache.$keygroup.set." . ( $ok ?
'ok' :
'error' ) );
804 private function setMainValue(
812 bool $dataPendingCommit,
825 $walltime ??= $this->timeSinceLoggedMiss( $key, $now );
826 $dataSnapshotLag = ( $dataReadSince !== null ) ? max( 0, $now - $dataReadSince ) : 0;
827 $dataCombinedLag = $dataReplicaLag + $dataSnapshotLag;
834 if ( $dataPendingCommit ) {
836 $mitigated =
'pending writes';
838 $mitigationTTL = self::TTL_UNCACHEABLE;
839 } elseif ( $dataSnapshotLag > self::MAX_READ_LAG ) {
841 $pregenSnapshotLag = ( $walltime !== null ) ? ( $dataSnapshotLag - $walltime ) : 0;
842 if ( ( $pregenSnapshotLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
844 $mitigated =
'snapshot lag (late generation)';
846 $mitigationTTL = self::TTL_UNCACHEABLE;
849 $mitigated =
'snapshot lag (high generation time)';
851 $mitigationTTL = self::TTL_LAGGED;
853 } elseif ( $dataReplicaLag ===
false || $dataReplicaLag > self::MAX_READ_LAG ) {
855 $mitigated =
'replication lag';
857 $mitigationTTL = self::TTL_LAGGED;
858 } elseif ( $dataCombinedLag > self::MAX_READ_LAG ) {
859 $pregenCombinedLag = ( $walltime !== null ) ? ( $dataCombinedLag - $walltime ) : 0;
861 if ( ( $pregenCombinedLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
863 $mitigated =
'read lag (late generation)';
865 $mitigationTTL = self::TTL_UNCACHEABLE;
868 $mitigated =
'read lag (high generation time)';
870 $mitigationTTL = self::TTL_LAGGED;
876 $mitigationTTL =
null;
879 if ( $mitigationTTL === self::TTL_UNCACHEABLE ) {
880 $this->logger->warning(
881 "Rejected set() for {cachekey} due to $mitigated.",
884 'lag' => $dataReplicaLag,
885 'age' => $dataSnapshotLag,
886 'walltime' => $walltime
897 if ( $mitigationTTL !==
null ) {
899 if ( $lockTSE >= 0 ) {
901 $logicalTTL = min( $ttl ?: INF, $mitigationTTL );
904 $ttl = min( $ttl ?: INF, $mitigationTTL );
907 $this->logger->warning(
908 "Lowered set() TTL for {cachekey} due to $mitigated.",
911 'lag' => $dataReplicaLag,
912 'age' => $dataSnapshotLag,
913 'walltime' => $walltime
919 $wrapped = $this->wrap( $value, $logicalTTL ?: $ttl, $version, $now );
920 $storeTTL = $ttl + $staleTTL;
922 $flags = $this->cache::WRITE_BACKGROUND;
923 if ( $segmentable ) {
924 $flags |= $this->cache::WRITE_ALLOW_SEGMENTS;
928 $ok = $this->cache->add(
929 $this->makeSisterKey( $key, self::TYPE_VALUE ),
935 $ok = $this->cache->merge(
936 $this->makeSisterKey( $key, self::TYPE_VALUE ),
937 static function ( $cache, $key, $cWrapped ) use ( $wrapped ) {
939 return ( is_string( $cWrapped ) ) ?
false : $wrapped;
942 $this->cache::MAX_CONFLICTS_ONE,
1012 final public function delete( $key, $ttl = self::HOLDOFF_TTL ) {
1016 $valueSisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
1032 $purge = $this->makeTombstonePurgeValue( $now );
1036 $keygroup = $this->determineKeyGroupForStats( $key );
1037 $this->stats->increment(
"wanobjectcache.$keygroup.delete." . ( $ok ?
'ok' :
'error' ) );
1127 $checkSisterKeysByKey = [];
1128 foreach ( $keys as $key ) {
1129 $checkSisterKeysByKey[$key] = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1132 $wrappedBySisterKey = $this->cache->getMulti( $checkSisterKeysByKey );
1133 $wrappedBySisterKey += array_fill_keys( $checkSisterKeysByKey,
false );
1137 foreach ( $checkSisterKeysByKey as $key => $checkSisterKey ) {
1138 $purge = $this->parsePurgeValue( $wrappedBySisterKey[$checkSisterKey] );
1139 if ( $purge ===
null ) {
1140 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL_NONE, $purge );
1144 self::CHECK_KEY_TTL,
1145 $this->cache::WRITE_BACKGROUND
1149 $times[$key] = $purge[self::PURGE_TIME];
1189 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1192 $purge = $this->makeCheckPurgeValue( $now, $holdoff );
1195 $keygroup = $this->determineKeyGroupForStats( $key );
1196 $this->stats->increment(
"wanobjectcache.$keygroup.ck_touch." . ( $ok ?
'ok' :
'error' ) );
1229 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1232 $keygroup = $this->determineKeyGroupForStats( $key );
1233 $this->stats->increment(
"wanobjectcache.$keygroup.ck_reset." . ( $ok ?
'ok' :
'error' ) );
1540 $key, $ttl, $callback, array $opts = [], array $cbParams = []
1542 $version = $opts[
'version'] ??
null;
1543 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
1544 $pCache = ( $pcTTL >= 0 )
1545 ? $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY )
1551 if ( $pCache && $this->callbackDepth == 0 ) {
1552 $cached = $pCache->get( $key, $pcTTL,
false );
1553 if ( $cached !==
false ) {
1554 $this->logger->debug(
"getWithSetCallback($key): process cache hit" );
1559 [ $value, $valueVersion, $curAsOf ] = $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
1560 if ( $valueVersion !== $version ) {
1564 $this->logger->debug(
"getWithSetCallback($key): using variant key" );
1565 [ $value ] = $this->fetchOrRegenerate(
1566 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), (
string)$version ),
1569 [
'version' =>
null,
'minAsOf' => $curAsOf ] + $opts,
1575 if ( $pCache && $value !==
false ) {
1576 $pCache->set( $key, $value );
1598 private function fetchOrRegenerate( $key, $ttl, $callback, array $opts, array $cbParams ) {
1599 $checkKeys = $opts[
'checkKeys'] ?? [];
1600 $graceTTL = $opts[
'graceTTL'] ?? self::GRACE_TTL_NONE;
1601 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
1602 $hotTTR = $opts[
'hotTTR'] ?? self::HOT_TTR;
1603 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1604 $ageNew = $opts[
'ageNew'] ?? self::AGE_NEW;
1605 $touchedCb = $opts[
'touchedCallback'] ??
null;
1606 $startTime = $this->getCurrentTime();
1608 $keygroup = $this->determineKeyGroupForStats( $key );
1611 $curState = $this->fetchKeys( [ $key ], $checkKeys, $startTime, $touchedCb )[$key];
1612 $curValue = $curState[self::RES_VALUE];
1614 if ( $this->isAcceptablyFreshValue( $curState, $graceTTL, $minAsOf ) ) {
1615 if ( !$this->isLotteryRefreshDue( $curState, $lowTTL, $ageNew, $hotTTR, $startTime ) ) {
1616 $this->stats->timing(
1617 "wanobjectcache.$keygroup.hit.good",
1618 1e3 * ( $this->getCurrentTime() - $startTime )
1621 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1622 } elseif ( $this->scheduleAsyncRefresh( $key, $ttl, $callback, $opts, $cbParams ) ) {
1623 $this->logger->debug(
"fetchOrRegenerate($key): hit with async refresh" );
1624 $this->stats->timing(
1625 "wanobjectcache.$keygroup.hit.refresh",
1629 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1631 $this->logger->debug(
"fetchOrRegenerate($key): hit with sync refresh" );
1635 $isKeyTombstoned = ( $curState[self::RES_TOMB_AS_OF] !== null );
1637 if ( $isKeyTombstoned ) {
1638 $volState = $this->getInterimValue( $key, $minAsOf, $startTime, $touchedCb );
1639 $volValue = $volState[self::RES_VALUE];
1641 $volState = $curState;
1642 $volValue = $curValue;
1650 $lastPurgeTime = max(
1652 $volState[self::RES_TOUCH_AS_OF],
1653 $curState[self::RES_TOMB_AS_OF],
1654 $curState[self::RES_CHECK_AS_OF]
1656 $safeMinAsOf = max( $minAsOf, $lastPurgeTime + self::TINY_POSITIVE );
1657 if ( $this->isExtremelyNewValue( $volState, $safeMinAsOf, $startTime ) ) {
1658 $this->logger->debug(
"fetchOrRegenerate($key): volatile hit" );
1659 $this->stats->timing(
1660 "wanobjectcache.$keygroup.hit.volatile",
1664 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1667 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
1668 $busyValue = $opts[
'busyValue'] ??
null;
1669 $staleTTL = $opts[
'staleTTL'] ?? self::STALE_TTL_NONE;
1670 $segmentable = $opts[
'segmentable'] ??
false;
1671 $version = $opts[
'version'] ??
null;
1674 $useRegenerationLock =
1685 $curState[self::RES_CUR_TTL] !==
null &&
1686 $curState[self::RES_CUR_TTL] <= 0 &&
1687 abs( $curState[self::RES_CUR_TTL] ) <= $lockTSE
1691 ( $busyValue !==
null && $volValue ===
false );
1696 $hasLock = $useRegenerationLock && $this->claimStampedeLock( $key );
1697 if ( $useRegenerationLock && !$hasLock ) {
1700 if ( $this->
isValid( $volValue, $volState[self::RES_AS_OF], $minAsOf ) ) {
1701 $this->logger->debug(
"fetchOrRegenerate($key): returning stale value" );
1702 $this->stats->timing(
1703 "wanobjectcache.$keygroup.hit.stale",
1707 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1708 } elseif ( $busyValue !==
null ) {
1709 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1710 $this->logger->debug(
"fetchOrRegenerate($key): busy $miss" );
1711 $this->stats->timing(
1712 "wanobjectcache.$keygroup.$miss.busy",
1715 $placeholderValue = $this->resolveBusyValue( $busyValue );
1717 return [ $placeholderValue, $version, $curState[self::RES_AS_OF] ];
1724 ++$this->callbackDepth;
1729 ( $curState[self::RES_VERSION] === $version ) ? $curValue : false,
1732 ( $curState[self::RES_VERSION] === $version ) ? $curState[self::RES_AS_OF] : null,
1736 --$this->callbackDepth;
1741 $elapsed = max( $postCallbackTime - $startTime, 0.0 );
1744 $walltime = max( $postCallbackTime - $preCallbackTime, 0.0 );
1745 $this->stats->timing(
"wanobjectcache.$keygroup.regen_walltime", 1e3 * $walltime );
1750 ( $value !==
false && $ttl >= 0 ) &&
1752 ( !$useRegenerationLock || $hasLock || $isKeyTombstoned )
1754 $this->stats->timing(
"wanobjectcache.$keygroup.regen_set_delay", 1e3 * $elapsed );
1756 if ( $isKeyTombstoned ) {
1757 $this->setInterimValue(
1765 $this->setMainValue(
1772 $setOpts[
'lag'] ?? 0,
1774 $setOpts[
'since'] ?? $preCallbackTime,
1776 $setOpts[
'pending'] ??
false,
1780 ( $curValue ===
false )
1785 $this->yieldStampedeLock( $key, $hasLock );
1787 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1788 $this->logger->debug(
"fetchOrRegenerate($key): $miss, new value computed" );
1789 $this->stats->timing(
1790 "wanobjectcache.$keygroup.$miss.compute",
1794 return [ $value, $version, $curState[self::RES_AS_OF] ];
1801 private function claimStampedeLock( $key ) {
1802 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_MUTEX );
1804 return $this->cache->add( $checkSisterKey, 1, self::LOCK_TTL );
1811 private function yieldStampedeLock( $key, $hasLock ) {
1813 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_MUTEX );
1814 $this->cache->delete( $checkSisterKey, $this->cache::WRITE_BACKGROUND );
1828 private function makeSisterKeys( array $baseKeys,
string $type,
string $route =
null ) {
1830 foreach ( $baseKeys as $baseKey ) {
1831 $sisterKeys[] = $this->makeSisterKey( $baseKey, $type, $route );
1847 private function makeSisterKey(
string $baseKey,
string $typeChar,
string $route =
null ) {
1848 if ( $this->coalesceScheme === self::SCHEME_HASH_STOP ) {
1850 $sisterKey =
'WANCache:' . $baseKey .
'|#|' . $typeChar;
1853 $sisterKey =
'WANCache:{' . $baseKey .
'}:' . $typeChar;
1856 if ( $route !==
null ) {
1857 $sisterKey = $this->
prependRoute( $sisterKey, $route );
1875 private function isExtremelyNewValue( $res, $minAsOf, $now ) {
1876 if ( $res[self::RES_VALUE] ===
false || $res[self::RES_AS_OF] < $minAsOf ) {
1880 $age = $now - $res[self::RES_AS_OF];
1882 return ( $age < mt_rand( self::RECENT_SET_LOW_MS, self::RECENT_SET_HIGH_MS ) / 1e3 );
1894 private function getInterimValue( $key, $minAsOf, $now, $touchedCb ) {
1895 if ( $this->useInterimHoldOffCaching ) {
1896 $interimSisterKey = $this->makeSisterKey( $key, self::TYPE_INTERIM );
1897 $wrapped = $this->cache->get( $interimSisterKey );
1898 $res = $this->unwrap( $wrapped, $now );
1899 if ( $res[self::RES_VALUE] !==
false && $res[self::RES_AS_OF] >= $minAsOf ) {
1900 if ( $touchedCb !==
null ) {
1903 $res[self::RES_TOUCH_AS_OF] = max(
1904 $touchedCb( $res[self::RES_VALUE] ),
1905 $res[self::RES_TOUCH_AS_OF]
1913 return $this->unwrap(
false, $now );
1924 private function setInterimValue(
1932 $ttl = max( self::INTERIM_KEY_TTL, (
int)$ttl );
1935 $wrapped = $this->wrap( $value, $ttl, $version, $now );
1937 $flags = $this->cache::WRITE_BACKGROUND;
1938 if ( $segmentable ) {
1939 $flags |= $this->cache::WRITE_ALLOW_SEGMENTS;
1942 return $this->cache->set(
1943 $this->makeSisterKey( $key, self::TYPE_INTERIM ),
1954 private function resolveBusyValue( $busyValue ) {
1955 return ( $busyValue instanceof Closure ) ? $busyValue() : $busyValue;
2024 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2027 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache(
2028 $this->getNonProcessCachedMultiKeys( $keyedIds, $opts ),
2029 $opts[
'checkKeys'] ?? []
2031 $this->warmupKeyMisses = 0;
2037 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf,
$params )
2040 return $callback(
$params[
'id'], $oldValue, $ttl, $setOpts, $oldAsOf );
2045 foreach ( $keyedIds as $key => $id ) {
2055 $this->warmupCache = [];
2127 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2129 $checkKeys = $opts[
'checkKeys'] ?? [];
2130 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
2133 unset( $opts[
'lockTSE'] );
2134 unset( $opts[
'busyValue'] );
2137 $keysByIdGet = $this->getNonProcessCachedMultiKeys( $keyedIds, $opts );
2138 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache( $keysByIdGet, $checkKeys );
2139 $this->warmupKeyMisses = 0;
2146 $resByKey = $this->
fetchKeys( $keysByIdGet, $checkKeys, $now );
2147 foreach ( $keysByIdGet as $id => $key ) {
2148 $res = $resByKey[$key];
2150 $res[self::RES_VALUE] ===
false ||
2151 $res[self::RES_CUR_TTL] < 0 ||
2152 $res[self::RES_AS_OF] < $minAsOf
2160 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
2161 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
2163 $method = __METHOD__;
2168 $proxyCb =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf,
$params )
2169 use ( $callback, $newValsById, $newTTLsById, $newSetOpts, $method )
2173 if ( array_key_exists( $id, $newValsById ) ) {
2175 $newValue = $newValsById[$id];
2176 $ttl = $newTTLsById[$id];
2177 $setOpts = $newSetOpts;
2181 $ttls = [ $id => $ttl ];
2182 $result = $callback( [ $id ], $ttls, $setOpts );
2183 if ( !isset( $result[$id] ) ) {
2185 $this->logger->warning(
2186 $method .
' failed due to {id} not set in result {result}', [
2188 'result' => json_encode( $result )
2191 $newValue = $result[$id];
2200 foreach ( $keyedIds as $key => $id ) {
2210 $this->warmupCache = [];
2224 return $this->cache->makeGlobalKey( ...func_get_args() );
2234 public function makeKey( $keygroup, ...$components ) {
2236 return $this->cache->makeKey( ...func_get_args() );
2247 return hash_hmac(
'sha256', $component, $this->secret );
2303 foreach ( $ids as $id ) {
2305 if ( strlen( $id ) > 64 ) {
2306 $this->logger->warning( __METHOD__ .
": long ID '$id'; use hash256()" );
2308 $key = $keyCallback( $id, $this );
2310 if ( !isset( $idByKey[$key] ) ) {
2311 $idByKey[$key] = $id;
2312 } elseif ( (
string)$id !== (
string)$idByKey[$key] ) {
2313 throw new UnexpectedValueException(
2314 "Cache key collision; IDs ('$id','{$idByKey[$key]}') map to '$key'"
2319 return new ArrayIterator( $idByKey );
2358 if ( count( $ids ) !== count( $res ) ) {
2361 $ids = array_keys( array_fill_keys( $ids,
true ) );
2362 if ( count( $ids ) !== count( $res ) ) {
2363 throw new UnexpectedValueException(
"Multi-key result does not match ID list" );
2367 return array_combine( $ids, $res );
2377 return $this->cache->watchErrors();
2398 $code = $this->cache->getLastError( $watchPoint );
2400 case self::ERR_NONE:
2401 return self::ERR_NONE;
2402 case self::ERR_NO_RESPONSE:
2403 return self::ERR_NO_RESPONSE;
2404 case self::ERR_UNREACHABLE:
2405 return self::ERR_UNREACHABLE;
2407 return self::ERR_UNEXPECTED;
2417 $this->cache->clearLastError();
2426 $this->processCaches = [];
2459 return $this->cache->getQoS( $flag );
2525 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
2527 $mtime = (int)$mtime;
2528 if ( $mtime <= 0 ) {
2535 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
2545 return $this->warmupKeyMisses;
2563 if ( $this->broadcastRoute !==
null ) {
2564 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2566 $routeKey = $sisterKey;
2569 return $this->cache->set(
2573 $this->cache::WRITE_BACKGROUND
2586 if ( $this->broadcastRoute !==
null ) {
2587 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2589 $routeKey = $sisterKey;
2592 return $this->cache->delete( $routeKey, $this->cache::WRITE_BACKGROUND );
2601 if ( $sisterKey[0] ===
'/' ) {
2602 throw new RuntimeException(
"Sister key '$sisterKey' already contains a route." );
2605 return $route . $sisterKey;
2619 private function scheduleAsyncRefresh( $key, $ttl, $callback, array $opts, array $cbParams ) {
2620 if ( !$this->asyncHandler ) {
2627 $func = $this->asyncHandler;
2628 $func(
function () use ( $key, $ttl, $callback, $opts, $cbParams ) {
2629 $opts[
'minAsOf'] = INF;
2631 $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
2632 }
catch ( Exception $e ) {
2634 $this->logger->error(
'Async refresh failed for {key}', [
2654 private function isAcceptablyFreshValue( $res, $graceTTL, $minAsOf ) {
2655 if ( !$this->
isValid( $res[self::RES_VALUE], $res[self::RES_AS_OF], $minAsOf ) ) {
2660 $curTTL = $res[self::RES_CUR_TTL];
2661 if ( $curTTL > 0 ) {
2667 $curGraceTTL = $graceTTL + $curTTL;
2669 return ( $curGraceTTL > 0 )
2687 $curTTL = $res[self::RES_CUR_TTL];
2688 $logicalTTL = $res[self::RES_TTL];
2689 $asOf = $res[self::RES_AS_OF];
2713 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
2717 $age = $now - $asOf;
2718 $timeOld = $age - $ageNew;
2719 if ( $timeOld <= 0 ) {
2723 $popularHitsPerSec = 1;
2727 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
2731 $chance = 1 / ( $popularHitsPerSec * $refreshWindowSec );
2733 $chance *= ( $timeOld <= self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2735 return ( mt_rand( 1, 1_000_000_000 ) <= 1_000_000_000 * $chance );
2757 if ( $lowTTL <= 0 ) {
2762 $effectiveLowTTL = min( $lowTTL, $logicalTTL ?: INF );
2765 $timeOld = $effectiveLowTTL - $curTTL;
2766 if ( $timeOld <= 0 || $timeOld >= $effectiveLowTTL ) {
2771 $ttrRatio = $timeOld / $effectiveLowTTL;
2774 $chance = $ttrRatio ** 4;
2776 return ( mt_rand( 1, 1_000_000_000 ) <= 1_000_000_000 * $chance );
2787 protected function isValid( $value, $asOf, $minAsOf ) {
2788 return ( $value !==
false && $asOf >= $minAsOf );
2798 private function wrap( $value, $ttl, $version, $now ) {
2802 self::FLD_FORMAT_VERSION => self::VERSION,
2803 self::FLD_VALUE => $value,
2804 self::FLD_TTL => $ttl,
2805 self::FLD_TIME => $now
2807 if ( $version !==
null ) {
2808 $wrapped[self::FLD_VALUE_VERSION] = $version;
2828 private function unwrap( $wrapped, $now ) {
2832 self::RES_VALUE =>
false,
2833 self::RES_VERSION =>
null,
2834 self::RES_AS_OF =>
null,
2835 self::RES_TTL =>
null,
2836 self::RES_TOMB_AS_OF =>
null,
2838 self::RES_CHECK_AS_OF =>
null,
2839 self::RES_TOUCH_AS_OF =>
null,
2840 self::RES_CUR_TTL => null
2843 if ( is_array( $wrapped ) ) {
2846 ( $wrapped[self::FLD_FORMAT_VERSION] ??
null ) === self::VERSION &&
2847 $wrapped[self::FLD_TIME] >= $this->epoch
2849 if ( $wrapped[self::FLD_TTL] > 0 ) {
2851 $age = $now - $wrapped[self::FLD_TIME];
2852 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2857 $res[self::RES_VALUE] = $wrapped[self::FLD_VALUE];
2858 $res[self::RES_VERSION] = $wrapped[self::FLD_VALUE_VERSION] ??
null;
2859 $res[self::RES_AS_OF] = $wrapped[self::FLD_TIME];
2860 $res[self::RES_CUR_TTL] = $curTTL;
2861 $res[self::RES_TTL] = $wrapped[self::FLD_TTL];
2865 $purge = $this->parsePurgeValue( $wrapped );
2866 if ( $purge !==
null ) {
2868 $curTTL = min( $purge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
2869 $res[self::RES_CUR_TTL] = $curTTL;
2870 $res[self::RES_TOMB_AS_OF] = $purge[self::PURGE_TIME];
2882 private function determineKeyGroupForStats( $key ) {
2883 $parts = explode(
':', $key, 3 );
2886 return strtr( $parts[1] ?? $parts[0],
'.',
'_' );
2897 private function parsePurgeValue( $value ) {
2898 if ( !is_string( $value ) ) {
2902 $segments = explode(
':', $value, 3 );
2903 $prefix = $segments[0];
2904 if ( $prefix !== self::PURGE_VAL_PREFIX ) {
2909 $timestamp = (float)$segments[1];
2911 $holdoff = isset( $segments[2] ) ? (int)$segments[2] : self::
HOLDOFF_TTL;
2913 if ( $timestamp < $this->epoch ) {
2918 return [ self::PURGE_TIME => $timestamp, self::PURGE_HOLDOFF => $holdoff ];
2925 private function makeTombstonePurgeValue(
float $timestamp ) {
2926 return self::PURGE_VAL_PREFIX .
':' . (int)$timestamp;
2935 private function makeCheckPurgeValue(
float $timestamp,
int $holdoff, array &$purge =
null ) {
2936 $normalizedTime = (int)$timestamp;
2938 $purge = [ self::PURGE_TIME => (float)$normalizedTime, self::PURGE_HOLDOFF => $holdoff ];
2940 return self::PURGE_VAL_PREFIX .
":$normalizedTime:$holdoff";
2947 private function getProcessCache( $group ) {
2948 if ( !isset( $this->processCaches[$group] ) ) {
2949 [ , $size ] = explode(
':', $group );
2950 $this->processCaches[$group] =
new MapCacheLRU( (
int)$size );
2951 if ( $this->wallClockOverride !==
null ) {
2952 $this->processCaches[$group]->setMockTime( $this->wallClockOverride );
2956 return $this->processCaches[$group];
2964 private function getNonProcessCachedMultiKeys( ArrayIterator $keys, array $opts ) {
2965 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
2968 if ( $pcTTL > 0 && $this->callbackDepth == 0 ) {
2969 $pCache = $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY );
2970 foreach ( $keys as $key => $id ) {
2971 if ( !$pCache->has( $key, $pcTTL ) ) {
2972 $keysMissing[$id] = $key;
2977 return $keysMissing;
2986 private function fetchWrappedValuesForWarmupCache( array $keys, array $checkKeys ) {
2992 $sisterKeys = $this->makeSisterKeys( $keys, self::TYPE_VALUE );
2994 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
2996 if ( is_int( $i ) ) {
2998 $sisterKeys[] = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
3001 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
3002 $sisterKeys[] = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
3007 $wrappedBySisterKey = $this->cache->getMulti( $sisterKeys );
3008 $wrappedBySisterKey += array_fill_keys( $sisterKeys,
false );
3010 return $wrappedBySisterKey;
3018 private function timeSinceLoggedMiss( $key, $now ) {
3019 for ( end( $this->missLog ); $miss = current( $this->missLog ); prev( $this->missLog ) ) {
3020 if ( $miss[0] === $key ) {
3021 return ( $now - $miss[1] );
3033 return $this->wallClockOverride ?: microtime(
true );
3041 $this->wallClockOverride =& $time;
3042 $this->cache->setMockTime( $time );
3043 foreach ( $this->processCaches as $pCache ) {
3044 $pCache->setMockTime( $time );