22 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23 use Psr\Log\LoggerAwareInterface;
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
163 private $callbackDepth = 0;
165 private $warmupCache = [];
167 private $warmupKeyMisses = 0;
170 private $wallClockOverride;
173 private const MAX_COMMIT_DELAY = 3;
175 private const MAX_READ_LAG = 7;
177 public const HOLDOFF_TTL = self::MAX_COMMIT_DELAY + self::MAX_READ_LAG + 1;
180 private const LOW_TTL = 60;
185 private const HOT_TTR = 900;
187 private const AGE_NEW = 60;
190 private const TSE_NONE = -1;
200 public const MIN_TIMESTAMP_NONE = 0.0;
203 private const PC_PRIMARY =
'primary:1000';
209 private const SCHEME_HASH_TAG = 1;
211 private const SCHEME_HASH_STOP = 2;
214 private const CHECK_KEY_TTL = self::TTL_YEAR;
216 private const INTERIM_KEY_TTL = 2;
219 private const LOCK_TTL = 10;
221 private const RAMPUP_TTL = 30;
224 private const TINY_NEGATIVE = -0.000001;
226 private const TINY_POSTIVE = 0.000001;
229 private const RECENT_SET_LOW_MS = 50;
231 private const RECENT_SET_HIGH_MS = 100;
234 private const GENERATION_HIGH_SEC = 0.2;
237 private const PURGE_TIME = 0;
239 private const PURGE_HOLDOFF = 1;
242 private const VERSION = 1;
258 private const RES_VALUE = 0;
260 private const RES_VERSION = 1;
262 private const RES_AS_OF = 2;
264 private const RES_TTL = 3;
266 private const RES_TOMB_AS_OF = 4;
268 private const RES_CHECK_AS_OF = 5;
270 private const RES_TOUCH_AS_OF = 6;
272 private const RES_CUR_TTL = 7;
275 private const FLD_FORMAT_VERSION = 0;
277 private const FLD_VALUE = 1;
279 private const FLD_TTL = 2;
281 private const FLD_TIME = 3;
283 private const FLD_FLAGS = 4;
285 private const FLD_VALUE_VERSION = 5;
286 private const FLD_GENERATION_TIME = 6;
289 private const TYPE_VALUE =
'v';
291 private const TYPE_TIMESTAMP =
't';
293 private const TYPE_MUTEX =
'm';
295 private const TYPE_INTERIM =
'i';
298 private const PURGE_VAL_PREFIX =
'PURGED';
327 $this->cache = $params[
'cache'];
328 $this->broadcastRoute = $params[
'broadcastRoutingPrefix'] ??
null;
329 $this->epoch = $params[
'epoch'] ?? 0;
330 $this->secret = $params[
'secret'] ?? (string)$this->epoch;
331 if ( ( $params[
'coalesceScheme'] ??
'' ) ===
'hash_tag' ) {
335 $this->coalesceScheme = self::SCHEME_HASH_TAG;
338 $this->coalesceScheme = self::SCHEME_HASH_STOP;
341 $this->
setLogger( $params[
'logger'] ??
new NullLogger() );
343 $this->asyncHandler = $params[
'asyncHandler'] ??
null;
345 $this->missLog = array_fill( 0, 10, [
'', 0.0 ] );
419 final public function get( $key, &$curTTL =
null, array $checkKeys = [], &$info = [] ) {
424 $res = $this->
fetchKeys( [ $key ], $checkKeys )[$key];
426 $curTTL = $res[self::RES_CUR_TTL];
428 ? $res[self::RES_AS_OF]
430 self::KEY_VERSION => $res[self::RES_VERSION],
431 self::KEY_AS_OF => $res[self::RES_AS_OF],
432 self::KEY_TTL => $res[self::RES_TTL],
433 self::KEY_CUR_TTL => $res[self::RES_CUR_TTL],
434 self::KEY_TOMB_AS_OF => $res[self::RES_TOMB_AS_OF],
435 self::KEY_CHECK_AS_OF => $res[self::RES_CHECK_AS_OF]
438 if ( $curTTL ===
null || $curTTL <= 0 ) {
440 unset( $this->missLog[array_key_first( $this->missLog )] );
444 return $res[self::RES_VALUE];
474 array $checkKeys = [],
485 $resByKey = $this->
fetchKeys( $keys, $checkKeys );
486 foreach ( $resByKey as $key => $res ) {
487 if ( $res[self::RES_VALUE] !==
false ) {
488 $valuesByKey[$key] = $res[self::RES_VALUE];
491 if ( $res[self::RES_CUR_TTL] !==
null ) {
492 $curTTLs[$key] = $res[self::RES_CUR_TTL];
494 $info[$key] = $legacyInfo
495 ? $res[self::RES_AS_OF]
497 self::KEY_VERSION => $res[self::RES_VERSION],
498 self::KEY_AS_OF => $res[self::RES_AS_OF],
499 self::KEY_TTL => $res[self::RES_TTL],
500 self::KEY_CUR_TTL => $res[self::RES_CUR_TTL],
501 self::KEY_TOMB_AS_OF => $res[self::RES_TOMB_AS_OF],
502 self::KEY_CHECK_AS_OF => $res[self::RES_CHECK_AS_OF]
523 protected function fetchKeys( array $keys, array $checkKeys, $touchedCb =
null ) {
529 $valueSisterKeys = [];
531 $checkSisterKeysForAll = [];
533 $checkSisterKeysByKey = [];
535 foreach ( $keys as $key ) {
536 $sisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
537 $allSisterKeys[] = $sisterKey;
538 $valueSisterKeys[] = $sisterKey;
541 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
543 if ( is_int( $i ) ) {
545 $sisterKey = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
546 $allSisterKeys[] = $sisterKey;
547 $checkSisterKeysForAll[] = $sisterKey;
550 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
551 $sisterKey = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
552 $allSisterKeys[] = $sisterKey;
553 $checkSisterKeysByKey[$i][] = $sisterKey;
558 if ( $this->warmupCache ) {
560 $wrappedBySisterKey = $this->warmupCache;
561 $sisterKeysMissing = array_diff( $allSisterKeys, array_keys( $wrappedBySisterKey ) );
562 if ( $sisterKeysMissing ) {
563 $this->warmupKeyMisses += count( $sisterKeysMissing );
564 $wrappedBySisterKey += $this->cache->getMulti( $sisterKeysMissing );
568 $wrappedBySisterKey = $this->cache->getMulti( $allSisterKeys );
575 $ckPurgesForAll = $this->processCheckKeys(
576 $checkSisterKeysForAll,
582 foreach ( $checkSisterKeysByKey as $keyWithCheckKeys => $checkKeysForKey ) {
583 $ckPurgesByKey[$keyWithCheckKeys] = $this->processCheckKeys(
592 foreach ( $valueSisterKeys as $valueSisterKey ) {
594 $key = current( $keys );
597 if ( array_key_exists( $valueSisterKey, $wrappedBySisterKey ) ) {
599 $wrapped = $wrappedBySisterKey[$valueSisterKey];
605 $res = $this->unwrap( $wrapped, $now );
606 $value = $res[self::RES_VALUE];
608 foreach ( array_merge( $ckPurgesForAll, $ckPurgesByKey[$key] ?? [] ) as $ckPurge ) {
609 $res[self::RES_CHECK_AS_OF] = max(
610 $ckPurge[self::PURGE_TIME],
611 $res[self::RES_CHECK_AS_OF]
614 $holdoffDeadline = $ckPurge[self::PURGE_TIME] + $ckPurge[self::PURGE_HOLDOFF];
616 if ( $value !==
false && $holdoffDeadline >= $res[self::RES_AS_OF] ) {
618 $ago = min( $ckPurge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
620 $res[self::RES_CUR_TTL] = min( $res[self::RES_CUR_TTL], $ago );
624 if ( $touchedCb !==
null && $value !==
false ) {
625 $touched = $touchedCb( $value );
626 if ( $touched !==
null && $touched >= $res[self::RES_AS_OF] ) {
627 $res[self::RES_CUR_TTL] = min(
628 $res[self::RES_CUR_TTL],
629 $res[self::RES_AS_OF] - $touched,
637 $res[self::RES_TOUCH_AS_OF] = max( $res[self::RES_TOUCH_AS_OF], $touched );
639 $resByKey[$key] = $res;
651 private function processCheckKeys(
652 array $checkSisterKeys,
653 array $wrappedBySisterKey,
658 foreach ( $checkSisterKeys as $timeKey ) {
659 $purge = isset( $wrappedBySisterKey[$timeKey] )
660 ? $this->parsePurgeValue( $wrappedBySisterKey[$timeKey] )
663 if ( $purge ===
null ) {
665 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL_NONE, $purge );
670 $this->cache::WRITE_BACKGROUND
763 final public function set( $key, $value, $ttl = self::TTL_INDEFINITE, array $opts = [] ) {
764 $keygroup = $this->determineKeyGroupForStats( $key );
766 $ok = $this->setMainValue(
770 $opts[
'version'] ??
null,
771 $opts[
'walltime'] ??
null,
773 $opts[
'since'] ??
null,
774 $opts[
'pending'] ??
false,
775 $opts[
'lockTSE'] ?? self::TSE_NONE,
776 $opts[
'staleTTL'] ?? self::STALE_TTL_NONE,
777 $opts[
'segmentable'] ??
false,
778 $opts[
'creating'] ??
false
781 $this->stats->increment(
"wanobjectcache.$keygroup.set." . ( $ok ?
'ok' :
'error' ) );
801 private function setMainValue(
809 bool $dataPendingCommit,
822 $walltime ??= $this->timeSinceLoggedMiss( $key, $now );
823 $dataSnapshotLag = ( $dataReadSince !== null ) ? max( 0, $now - $dataReadSince ) : 0;
824 $dataCombinedLag = $dataReplicaLag + $dataSnapshotLag;
831 if ( $dataPendingCommit ) {
833 $mitigated =
'pending writes';
835 $mitigationTTL = self::TTL_UNCACHEABLE;
836 } elseif ( $dataSnapshotLag > self::MAX_READ_LAG ) {
838 $pregenSnapshotLag = ( $walltime !== null ) ? ( $dataSnapshotLag - $walltime ) : 0;
839 if ( ( $pregenSnapshotLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
841 $mitigated =
'snapshot lag (late generation)';
843 $mitigationTTL = self::TTL_UNCACHEABLE;
846 $mitigated =
'snapshot lag (high generation time)';
850 } elseif ( $dataReplicaLag ===
false || $dataReplicaLag > self::MAX_READ_LAG ) {
852 $mitigated =
'replication lag';
855 } elseif ( $dataCombinedLag > self::MAX_READ_LAG ) {
856 $pregenCombinedLag = ( $walltime !== null ) ? ( $dataCombinedLag - $walltime ) : 0;
858 if ( ( $pregenCombinedLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
860 $mitigated =
'read lag (late generation)';
862 $mitigationTTL = self::TTL_UNCACHEABLE;
865 $mitigated =
'read lag (high generation time)';
873 $mitigationTTL =
null;
876 if ( $mitigationTTL === self::TTL_UNCACHEABLE ) {
877 $this->logger->warning(
878 "Rejected set() for {cachekey} due to $mitigated.",
881 'lag' => $dataReplicaLag,
882 'age' => $dataSnapshotLag,
883 'walltime' => $walltime
894 if ( $mitigationTTL !==
null ) {
896 if ( $lockTSE >= 0 ) {
898 $logicalTTL = min( $ttl ?: INF, $mitigationTTL );
901 $ttl = min( $ttl ?: INF, $mitigationTTL );
904 $this->logger->warning(
905 "Lowered set() TTL for {cachekey} due to $mitigated.",
908 'lag' => $dataReplicaLag,
909 'age' => $dataSnapshotLag,
910 'walltime' => $walltime
916 $wrapped = $this->wrap( $value, $logicalTTL ?: $ttl, $version, $now );
917 $storeTTL = $ttl + $staleTTL;
919 $flags = $this->cache::WRITE_BACKGROUND;
920 if ( $segmentable ) {
921 $flags |= $this->cache::WRITE_ALLOW_SEGMENTS;
925 $ok = $this->cache->add(
926 $this->makeSisterKey( $key, self::TYPE_VALUE ),
932 $ok = $this->cache->merge(
933 $this->makeSisterKey( $key, self::TYPE_VALUE ),
934 static function (
$cache, $key, $cWrapped ) use ( $wrapped ) {
936 return ( is_string( $cWrapped ) ) ?
false : $wrapped;
939 $this->cache::MAX_CONFLICTS_ONE,
1013 $valueSisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
1029 $purge = $this->makeTombstonePurgeValue( $now );
1033 $keygroup = $this->determineKeyGroupForStats( $key );
1034 $this->stats->increment(
"wanobjectcache.$keygroup.delete." . ( $ok ?
'ok' :
'error' ) );
1124 $checkSisterKeysByKey = [];
1125 foreach ( $keys as $key ) {
1126 $checkSisterKeysByKey[$key] = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1129 $wrappedBySisterKey = $this->cache->getMulti( $checkSisterKeysByKey );
1130 $wrappedBySisterKey += array_fill_keys( $checkSisterKeysByKey,
false );
1134 foreach ( $checkSisterKeysByKey as $key => $checkSisterKey ) {
1135 $purge = $this->parsePurgeValue( $wrappedBySisterKey[$checkSisterKey] );
1136 if ( $purge ===
null ) {
1137 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL_NONE, $purge );
1141 self::CHECK_KEY_TTL,
1142 $this->cache::WRITE_BACKGROUND
1146 $times[$key] = $purge[self::PURGE_TIME];
1186 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1189 $purge = $this->makeCheckPurgeValue( $now, $holdoff );
1192 $keygroup = $this->determineKeyGroupForStats( $key );
1193 $this->stats->increment(
"wanobjectcache.$keygroup.ck_touch." . ( $ok ?
'ok' :
'error' ) );
1226 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1229 $keygroup = $this->determineKeyGroupForStats( $key );
1230 $this->stats->increment(
"wanobjectcache.$keygroup.ck_reset." . ( $ok ?
'ok' :
'error' ) );
1537 $key, $ttl, $callback, array $opts = [], array $cbParams = []
1539 $version = $opts[
'version'] ??
null;
1540 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
1541 $pCache = ( $pcTTL >= 0 )
1542 ? $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY )
1548 if ( $pCache && $this->callbackDepth == 0 ) {
1549 $cached = $pCache->get( $key, $pcTTL,
false );
1550 if ( $cached !==
false ) {
1551 $this->logger->debug(
"getWithSetCallback($key): process cache hit" );
1556 [ $value, $valueVersion, $curAsOf ] = $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
1557 if ( $valueVersion !== $version ) {
1561 $this->logger->debug(
"getWithSetCallback($key): using variant key" );
1562 [ $value ] = $this->fetchOrRegenerate(
1563 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), (
string)$version ),
1566 [
'version' =>
null,
'minAsOf' => $curAsOf ] + $opts,
1572 if ( $pCache && $value !==
false ) {
1573 $pCache->set( $key, $value );
1595 private function fetchOrRegenerate( $key, $ttl, $callback, array $opts, array $cbParams ) {
1596 $checkKeys = $opts[
'checkKeys'] ?? [];
1598 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
1599 $hotTTR = $opts[
'hotTTR'] ?? self::HOT_TTR;
1600 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1601 $ageNew = $opts[
'ageNew'] ?? self::AGE_NEW;
1602 $touchedCb = $opts[
'touchedCallback'] ??
null;
1605 $keygroup = $this->determineKeyGroupForStats( $key );
1608 $curState = $this->
fetchKeys( [ $key ], $checkKeys, $touchedCb )[$key];
1609 $curValue = $curState[self::RES_VALUE];
1611 if ( $this->isAcceptablyFreshValue( $curState, $graceTTL, $minAsOf ) ) {
1613 $this->stats->timing(
1614 "wanobjectcache.$keygroup.hit.good",
1618 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1619 } elseif ( $this->scheduleAsyncRefresh( $key, $ttl, $callback, $opts, $cbParams ) ) {
1620 $this->logger->debug(
"fetchOrRegenerate($key): hit with async refresh" );
1621 $this->stats->timing(
1622 "wanobjectcache.$keygroup.hit.refresh",
1626 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1628 $this->logger->debug(
"fetchOrRegenerate($key): hit with sync refresh" );
1632 $isKeyTombstoned = ( $curState[self::RES_TOMB_AS_OF] !== null );
1634 if ( $isKeyTombstoned ) {
1635 $volState = $this->getInterimValue( $key, $minAsOf, $startTime, $touchedCb );
1636 $volValue = $volState[self::RES_VALUE];
1638 $volState = $curState;
1639 $volValue = $curValue;
1647 $lastPurgeTime = max(
1649 $volState[self::RES_TOUCH_AS_OF],
1650 $curState[self::RES_TOMB_AS_OF],
1651 $curState[self::RES_CHECK_AS_OF]
1653 $safeMinAsOf = max( $minAsOf, $lastPurgeTime + self::TINY_POSTIVE );
1654 if ( $this->isExtremelyNewValue( $volState, $safeMinAsOf, $startTime ) ) {
1655 $this->logger->debug(
"fetchOrRegenerate($key): volatile hit" );
1656 $this->stats->timing(
1657 "wanobjectcache.$keygroup.hit.volatile",
1661 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1664 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
1665 $busyValue = $opts[
'busyValue'] ??
null;
1667 $segmentable = $opts[
'segmentable'] ??
false;
1668 $version = $opts[
'version'] ??
null;
1671 $useRegenerationLock =
1682 $curState[self::RES_CUR_TTL] !==
null &&
1683 $curState[self::RES_CUR_TTL] <= 0 &&
1684 abs( $curState[self::RES_CUR_TTL] ) <= $lockTSE
1688 ( $busyValue !==
null && $volValue ===
false );
1693 $hasLock = $useRegenerationLock && $this->claimStampedeLock( $key );
1694 if ( $useRegenerationLock && !$hasLock ) {
1697 if ( $this->
isValid( $volValue, $volState[self::RES_AS_OF], $minAsOf ) ) {
1698 $this->logger->debug(
"fetchOrRegenerate($key): returning stale value" );
1699 $this->stats->timing(
1700 "wanobjectcache.$keygroup.hit.stale",
1704 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1705 } elseif ( $busyValue !==
null ) {
1706 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1707 $this->logger->debug(
"fetchOrRegenerate($key): busy $miss" );
1708 $this->stats->timing(
1709 "wanobjectcache.$keygroup.$miss.busy",
1712 $placeholderValue = $this->resolveBusyValue( $busyValue );
1714 return [ $placeholderValue, $version, $curState[self::RES_AS_OF] ];
1721 ++$this->callbackDepth;
1726 ( $curState[self::RES_VERSION] === $version ) ? $curValue :
false,
1729 ( $curState[self::RES_VERSION] === $version ) ? $curState[self::RES_AS_OF] :
null,
1733 --$this->callbackDepth;
1738 $elapsed = max( $postCallbackTime - $startTime, 0.0 );
1741 $walltime = max( $postCallbackTime - $preCallbackTime, 0.0 );
1742 $this->stats->timing(
"wanobjectcache.$keygroup.regen_walltime", 1e3 * $walltime );
1747 ( $value !==
false && $ttl >= 0 ) &&
1749 ( !$useRegenerationLock || $hasLock || $isKeyTombstoned )
1751 $this->stats->timing(
"wanobjectcache.$keygroup.regen_set_delay", 1e3 * $elapsed );
1753 if ( $isKeyTombstoned ) {
1754 $this->setInterimValue(
1762 $this->setMainValue(
1769 $setOpts[
'lag'] ?? 0,
1771 $setOpts[
'since'] ?? $preCallbackTime,
1773 $setOpts[
'pending'] ??
false,
1777 ( $curValue ===
false )
1782 $this->yieldStampedeLock( $key, $hasLock );
1784 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1785 $this->logger->debug(
"fetchOrRegenerate($key): $miss, new value computed" );
1786 $this->stats->timing(
1787 "wanobjectcache.$keygroup.$miss.compute",
1791 return [ $value, $version, $curState[self::RES_AS_OF] ];
1798 private function claimStampedeLock( $key ) {
1799 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_MUTEX );
1801 return $this->cache->add( $checkSisterKey, 1, self::LOCK_TTL );
1808 private function yieldStampedeLock( $key, $hasLock ) {
1810 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_MUTEX );
1811 $this->cache->delete( $checkSisterKey, $this->cache::WRITE_BACKGROUND );
1825 private function makeSisterKeys( array $baseKeys,
string $type,
string $route =
null ) {
1827 foreach ( $baseKeys as $baseKey ) {
1828 $sisterKeys[] = $this->makeSisterKey( $baseKey, $type, $route );
1844 private function makeSisterKey(
string $baseKey,
string $typeChar,
string $route =
null ) {
1845 if ( $this->coalesceScheme === self::SCHEME_HASH_STOP ) {
1847 $sisterKey =
'WANCache:' . $baseKey .
'|#|' . $typeChar;
1850 $sisterKey =
'WANCache:{' . $baseKey .
'}:' . $typeChar;
1853 if ( $route !==
null ) {
1854 $sisterKey = $this->
prependRoute( $sisterKey, $route );
1872 private function isExtremelyNewValue( $res, $minAsOf, $now ) {
1873 if ( $res[self::RES_VALUE] ===
false || $res[self::RES_AS_OF] < $minAsOf ) {
1877 $age = $now - $res[self::RES_AS_OF];
1879 return ( $age < mt_rand( self::RECENT_SET_LOW_MS, self::RECENT_SET_HIGH_MS ) / 1e3 );
1891 private function getInterimValue( $key, $minAsOf, $now, $touchedCb ) {
1893 $interimSisterKey = $this->makeSisterKey( $key, self::TYPE_INTERIM );
1894 $wrapped = $this->cache->get( $interimSisterKey );
1895 $res = $this->unwrap( $wrapped, $now );
1896 if ( $res[self::RES_VALUE] !==
false && $res[self::RES_AS_OF] >= $minAsOf ) {
1897 if ( $touchedCb !==
null ) {
1900 $res[self::RES_TOUCH_AS_OF] = max(
1901 $touchedCb( $res[self::RES_VALUE] ),
1902 $res[self::RES_TOUCH_AS_OF]
1910 return $this->unwrap(
false, $now );
1921 private function setInterimValue(
1929 $ttl = max( self::INTERIM_KEY_TTL, (
int)$ttl );
1932 $wrapped = $this->wrap( $value, $ttl, $version, $now );
1934 $flags = $this->cache::WRITE_BACKGROUND;
1935 if ( $segmentable ) {
1936 $flags |= $this->cache::WRITE_ALLOW_SEGMENTS;
1939 return $this->cache->set(
1940 $this->makeSisterKey( $key, self::TYPE_INTERIM ),
1951 private function resolveBusyValue( $busyValue ) {
1952 return ( $busyValue instanceof Closure ) ? $busyValue() : $busyValue;
2021 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2024 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache(
2025 $this->getNonProcessCachedMultiKeys( $keyedIds, $opts ),
2026 $opts[
'checkKeys'] ?? []
2028 $this->warmupKeyMisses = 0;
2034 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2037 return $callback( $params[
'id'], $oldValue, $ttl, $setOpts, $oldAsOf );
2042 foreach ( $keyedIds as $key => $id ) {
2052 $this->warmupCache = [];
2124 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2126 $checkKeys = $opts[
'checkKeys'] ?? [];
2127 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
2130 unset( $opts[
'lockTSE'] );
2131 unset( $opts[
'busyValue'] );
2134 $keysByIdGet = $this->getNonProcessCachedMultiKeys( $keyedIds, $opts );
2135 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache( $keysByIdGet, $checkKeys );
2136 $this->warmupKeyMisses = 0;
2142 $resByKey = $this->
fetchKeys( $keysByIdGet, $checkKeys );
2143 foreach ( $keysByIdGet as $id => $key ) {
2144 $res = $resByKey[$key];
2146 $res[self::RES_VALUE] ===
false ||
2147 $res[self::RES_CUR_TTL] < 0 ||
2148 $res[self::RES_AS_OF] < $minAsOf
2156 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
2157 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
2159 $method = __METHOD__;
2164 $proxyCb =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2165 use ( $callback, $newValsById, $newTTLsById, $newSetOpts, $method )
2167 $id = $params[
'id'];
2169 if ( array_key_exists( $id, $newValsById ) ) {
2171 $newValue = $newValsById[$id];
2172 $ttl = $newTTLsById[$id];
2173 $setOpts = $newSetOpts;
2177 $ttls = [ $id => $ttl ];
2178 $result = $callback( [ $id ], $ttls, $setOpts );
2179 if ( !isset( $result[$id] ) ) {
2181 $this->logger->warning(
2182 $method .
' failed due to {id} not set in result {result}', [
2184 'result' => json_encode( $result )
2187 $newValue = $result[$id];
2196 foreach ( $keyedIds as $key => $id ) {
2206 $this->warmupCache = [];
2220 return $this->cache->makeGlobalKey( ...func_get_args() );
2230 public function makeKey( $keygroup, ...$components ) {
2232 return $this->cache->makeKey( ...func_get_args() );
2243 return hash_hmac(
'sha256', $component, $this->secret );
2299 foreach ( $ids as $id ) {
2301 if ( strlen( $id ) > 64 ) {
2302 $this->logger->warning( __METHOD__ .
": long ID '$id'; use hash256()" );
2304 $key = $keyCallback( $id, $this );
2306 if ( !isset( $idByKey[$key] ) ) {
2307 $idByKey[$key] = $id;
2308 } elseif ( (
string)$id !== (
string)$idByKey[$key] ) {
2309 throw new UnexpectedValueException(
2310 "Cache key collision; IDs ('$id','{$idByKey[$key]}') map to '$key'"
2315 return new ArrayIterator( $idByKey );
2354 if ( count( $ids ) !== count( $res ) ) {
2357 $ids = array_keys( array_fill_keys( $ids,
true ) );
2358 if ( count( $ids ) !== count( $res ) ) {
2359 throw new UnexpectedValueException(
"Multi-key result does not match ID list" );
2363 return array_combine( $ids, $res );
2373 return $this->cache->watchErrors();
2394 $code = $this->cache->getLastError( $watchPoint );
2412 $this->cache->clearLastError();
2421 $this->processCaches = [];
2454 return $this->cache->getQoS( $flag );
2520 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
2522 $mtime = (int)$mtime;
2523 if ( $mtime <= 0 ) {
2530 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
2540 return $this->warmupKeyMisses;
2558 if ( $this->broadcastRoute !==
null ) {
2559 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2561 $routeKey = $sisterKey;
2564 return $this->cache->set(
2568 $this->cache::WRITE_BACKGROUND
2581 if ( $this->broadcastRoute !==
null ) {
2582 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2584 $routeKey = $sisterKey;
2587 return $this->cache->delete( $routeKey, $this->cache::WRITE_BACKGROUND );
2596 if ( $sisterKey[0] ===
'/' ) {
2597 throw new RuntimeException(
"Sister key '$sisterKey' already contains a route." );
2600 return $route . $sisterKey;
2614 private function scheduleAsyncRefresh( $key, $ttl, $callback, array $opts, array $cbParams ) {
2615 if ( !$this->asyncHandler ) {
2623 $func(
function () use ( $key, $ttl, $callback, $opts, $cbParams ) {
2624 $opts[
'minAsOf'] = INF;
2626 $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
2627 }
catch ( Exception $e ) {
2629 $this->logger->error(
'Async refresh failed for {key}', [
2649 private function isAcceptablyFreshValue( $res, $graceTTL, $minAsOf ) {
2650 if ( !$this->
isValid( $res[self::RES_VALUE], $res[self::RES_AS_OF], $minAsOf ) ) {
2655 $curTTL = $res[self::RES_CUR_TTL];
2656 if ( $curTTL > 0 ) {
2662 $curGraceTTL = $graceTTL + $curTTL;
2664 return ( $curGraceTTL > 0 )
2682 $curTTL = $res[self::RES_CUR_TTL];
2683 $logicalTTL = $res[self::RES_TTL];
2684 $asOf = $res[self::RES_AS_OF];
2708 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
2712 $age = $now - $asOf;
2713 $timeOld = $age - $ageNew;
2714 if ( $timeOld <= 0 ) {
2718 $popularHitsPerSec = 1;
2722 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
2726 $chance = 1 / ( $popularHitsPerSec * $refreshWindowSec );
2728 $chance *= ( $timeOld <= self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2730 return ( mt_rand( 1, 1000000000 ) <= 1000000000 * $chance );
2752 if ( $lowTTL <= 0 ) {
2757 $effectiveLowTTL = min( $lowTTL, $logicalTTL ?: INF );
2760 $timeOld = $effectiveLowTTL - $curTTL;
2761 if ( $timeOld <= 0 || $timeOld >= $effectiveLowTTL ) {
2766 $ttrRatio = $timeOld / $effectiveLowTTL;
2769 $chance = $ttrRatio ** 4;
2771 return ( mt_rand( 1, 1000000000 ) <= 1000000000 * $chance );
2782 protected function isValid( $value, $asOf, $minAsOf ) {
2783 return ( $value !==
false && $asOf >= $minAsOf );
2793 private function wrap( $value, $ttl, $version, $now ) {
2797 self::FLD_FORMAT_VERSION => self::VERSION,
2798 self::FLD_VALUE => $value,
2799 self::FLD_TTL => $ttl,
2800 self::FLD_TIME => $now
2802 if ( $version !==
null ) {
2803 $wrapped[self::FLD_VALUE_VERSION] = $version;
2823 private function unwrap( $wrapped, $now ) {
2827 self::RES_VALUE =>
false,
2828 self::RES_VERSION =>
null,
2829 self::RES_AS_OF =>
null,
2830 self::RES_TTL =>
null,
2831 self::RES_TOMB_AS_OF =>
null,
2833 self::RES_CHECK_AS_OF =>
null,
2834 self::RES_TOUCH_AS_OF =>
null,
2835 self::RES_CUR_TTL => null
2838 if ( is_array( $wrapped ) ) {
2841 ( $wrapped[self::FLD_FORMAT_VERSION] ??
null ) === self::VERSION &&
2842 $wrapped[self::FLD_TIME] >= $this->epoch
2844 if ( $wrapped[self::FLD_TTL] > 0 ) {
2846 $age = $now - $wrapped[self::FLD_TIME];
2847 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2852 $res[self::RES_VALUE] = $wrapped[self::FLD_VALUE];
2853 $res[self::RES_VERSION] = $wrapped[self::FLD_VALUE_VERSION] ??
null;
2854 $res[self::RES_AS_OF] = $wrapped[self::FLD_TIME];
2855 $res[self::RES_CUR_TTL] = $curTTL;
2856 $res[self::RES_TTL] = $wrapped[self::FLD_TTL];
2860 $purge = $this->parsePurgeValue( $wrapped );
2861 if ( $purge !==
null ) {
2863 $curTTL = min( $purge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
2864 $res[self::RES_CUR_TTL] = $curTTL;
2865 $res[self::RES_TOMB_AS_OF] = $purge[self::PURGE_TIME];
2877 private function determineKeyGroupForStats( $key ) {
2878 $parts = explode(
':', $key, 3 );
2881 return strtr( $parts[1] ?? $parts[0],
'.',
'_' );
2892 private function parsePurgeValue( $value ) {
2893 if ( !is_string( $value ) ) {
2897 $segments = explode(
':', $value, 3 );
2898 $prefix = $segments[0];
2899 if ( $prefix !== self::PURGE_VAL_PREFIX ) {
2904 $timestamp = (float)$segments[1];
2906 $holdoff = isset( $segments[2] ) ? (int)$segments[2] : self::HOLDOFF_TTL;
2908 if ( $timestamp < $this->epoch ) {
2913 return [ self::PURGE_TIME => $timestamp, self::PURGE_HOLDOFF => $holdoff ];
2920 private function makeTombstonePurgeValue(
float $timestamp ) {
2921 return self::PURGE_VAL_PREFIX .
':' . (int)$timestamp;
2930 private function makeCheckPurgeValue(
float $timestamp,
int $holdoff, array &$purge =
null ) {
2931 $normalizedTime = (int)$timestamp;
2933 $purge = [ self::PURGE_TIME => (float)$normalizedTime, self::PURGE_HOLDOFF => $holdoff ];
2935 return self::PURGE_VAL_PREFIX .
":$normalizedTime:$holdoff";
2942 private function getProcessCache( $group ) {
2943 if ( !isset( $this->processCaches[$group] ) ) {
2944 [ , $size ] = explode(
':', $group );
2945 $this->processCaches[$group] =
new MapCacheLRU( (
int)$size );
2946 if ( $this->wallClockOverride !==
null ) {
2947 $this->processCaches[$group]->setMockTime( $this->wallClockOverride );
2951 return $this->processCaches[$group];
2959 private function getNonProcessCachedMultiKeys( ArrayIterator $keys, array $opts ) {
2960 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
2963 if ( $pcTTL > 0 && $this->callbackDepth == 0 ) {
2964 $pCache = $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY );
2965 foreach ( $keys as $key => $id ) {
2966 if ( !$pCache->has( $key, $pcTTL ) ) {
2967 $keysMissing[$id] = $key;
2972 return $keysMissing;
2981 private function fetchWrappedValuesForWarmupCache( array $keys, array $checkKeys ) {
2987 $sisterKeys = $this->makeSisterKeys( $keys, self::TYPE_VALUE );
2989 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
2991 if ( is_int( $i ) ) {
2993 $sisterKeys[] = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
2996 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
2997 $sisterKeys[] = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
3002 $wrappedBySisterKey = $this->cache->getMulti( $sisterKeys );
3003 $wrappedBySisterKey += array_fill_keys( $sisterKeys,
false );
3005 return $wrappedBySisterKey;
3013 private function timeSinceLoggedMiss( $key, $now ) {
3014 for ( end( $this->missLog ); $miss = current( $this->missLog ); prev( $this->missLog ) ) {
3015 if ( $miss[0] === $key ) {
3016 return ( $now - $miss[1] );
3028 return $this->wallClockOverride ?: microtime(
true );
3036 $this->wallClockOverride =& $time;
3037 $this->cache->setMockTime( $time );
3038 foreach ( $this->processCaches as $pCache ) {
3039 $pCache->setMockTime( $time );
A BagOStuff object with no objects in it.
Store key-value entries in a size-limited in-memory LRU cache.
Multi-datacenter aware caching interface.
makeGlobalKey( $keygroup,... $components)
const HOLDOFF_TTL
Seconds to tombstone keys on delete() and to treat keys as volatile after purges.
const KEY_VERSION
Version number attribute for a key; keep value for b/c (< 1.36)
__construct(array $params)
isValid( $value, $asOf, $minAsOf)
Check that a wrapper value exists and has an acceptable age.
worthRefreshPopular( $asOf, $ageNew, $timeTillRefresh, $now)
Check if a key is due for randomized regeneration due to its popularity.
multiRemap(array $ids, array $res)
Get an (ID => value) map from (i) a non-unique list of entity IDs, and (ii) the list of corresponding...
relayVolatilePurge(string $sisterKey, string $purgeValue, int $ttl)
Set a sister key to a purge value in all datacenters.
prependRoute(string $sisterKey, string $route)
touchCheckKey( $key, $holdoff=self::HOLDOFF_TTL)
Increase the last-purge timestamp of a "check" key in all datacenters.
adaptiveTTL( $mtime, $maxTTL, $minTTL=30, $factor=0.2)
Get a TTL that is higher for objects that have not changed recently.
const GRACE_TTL_NONE
Idiom for set()/getWithSetCallback() meaning "no post-expiration grace period".
BagOStuff $cache
The local datacenter cache.
makeKey( $keygroup,... $components)
fetchKeys(array $keys, array $checkKeys, $touchedCb=null)
Fetch the value and key metadata of several keys from cache.
getWithSetCallback( $key, $ttl, $callback, array $opts=[], array $cbParams=[])
Method to fetch/regenerate a cache key.
getCheckKeyTime( $key)
Fetch the value of a timestamp "check" key.
getMulti(array $keys, &$curTTLs=[], array $checkKeys=[], &$info=[])
Fetch the value of several keys from cache.
getMultiWithUnionSetCallback(ArrayIterator $keyedIds, $ttl, callable $callback, array $opts=[])
Method to fetch/regenerate multiple cache keys at once.
relayNonVolatilePurge(string $sisterKey)
Remove a sister key from all datacenters.
getMultiWithSetCallback(ArrayIterator $keyedIds, $ttl, callable $callback, array $opts=[])
Method to fetch multiple cache keys at once with regeneration.
makeMultiKeys(array $ids, $keyCallback)
Get an iterator of (cache key => entity ID) for a list of entity IDs.
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
int $coalesceScheme
Scheme to use for key coalescing (Hash Tags or Hash Stops)
isLotteryRefreshDue( $res, $lowTTL, $ageNew, $hotTTR, $now)
Check if a key is due for randomized regeneration due to near-expiration/popularity.
watchErrors()
Get a "watch point" token that can be used to get the "last error" to occur after now.
bool $useInterimHoldOffCaching
Whether to use "interim" caching while keys are tombstoned.
worthRefreshExpiring( $curTTL, $logicalTTL, $lowTTL)
Check if a key is nearing expiration and thus due for randomized regeneration.
const HOLDOFF_TTL_NONE
Idiom for delete()/touchCheckKey() meaning "no hold-off period".
useInterimHoldOffCaching( $enabled)
Enable or disable the use of brief caching for tombstoned keys.
StatsdDataFactoryInterface $stats
clearProcessCache()
Clear the in-process caches; useful for testing.
const KEY_AS_OF
Generation completion timestamp attribute for a key; keep value for b/c (< 1.36)
getLastError( $watchPoint=0)
Get the "last error" registry.
float $epoch
Unix timestamp of the oldest possible valid values.
callable null $asyncHandler
Function that takes a WAN cache callback and runs it later.
string null $broadcastRoute
Routing prefix for operations that should be broadcasted to all data centers.
setLogger(LoggerInterface $logger)
const PASS_BY_REF
Idiom for get()/getMulti() to return extra information by reference.
const KEY_CHECK_AS_OF
Highest "check" key timestamp for a key; keep value for b/c (< 1.36)
clearLastError()
Clear the "last error" registry.
const STALE_TTL_NONE
Idiom for set()/getWithSetCallback() meaning "no post-expiration persistence".
MapCacheLRU[] $processCaches
Map of group PHP instance caches.
string $secret
Stable secret used for hashing long strings into key components.
resetCheckKey( $key)
Clear the last-purge timestamp of a "check" key in all datacenters.
const KEY_TOMB_AS_OF
Tomstone timestamp attribute for a key; keep value for b/c (< 1.36)
const KEY_CUR_TTL
Remaining TTL attribute for a key; keep value for b/c (< 1.36)
const TTL_LAGGED
Max TTL, in seconds, to store keys when a data source has high replication lag.
hash256( $component)
Hash a possibly long string into a suitable component for makeKey()/makeGlobalKey()
getMultiCheckKeyTime(array $keys)
Fetch the values of each timestamp "check" key.
const KEY_TTL
Logical TTL attribute for a key.
Key-encoding methods for object caching (BagOStuff and WANObjectCache)