22use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23use Psr\Log\LoggerAwareInterface;
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
179 public const HOLDOFF_TTL = self::MAX_COMMIT_DELAY + self::MAX_READ_LAG + 1;
202 public const MIN_TIMESTAMP_NONE = 0.0;
228 private const TINY_NEGATIVE = -0.000001;
230 private const TINY_POSTIVE = 0.000001;
341 $this->cache = $params[
'cache'];
342 $this->broadcastRoute = $params[
'broadcastRoutingPrefix'] ??
null;
343 $this->onHostRoute = $params[
'onHostRoutingPrefix'] ??
null;
344 $this->epoch = $params[
'epoch'] ?? 0;
345 $this->secret = $params[
'secret'] ?? (string)$this->epoch;
346 if ( ( $params[
'coalesceScheme'] ??
'' ) ===
'hash_tag' ) {
350 $this->coalesceScheme = self::SCHEME_HASH_TAG;
353 $this->coalesceScheme = self::SCHEME_HASH_STOP;
356 $this->keyHighQps = $params[
'keyHighQps'] ?? 100;
357 $this->keyHighUplinkBps = $params[
'keyHighUplinkBps'] ?? ( 1e9 / 8 / 100 );
359 $this->
setLogger( $params[
'logger'] ??
new NullLogger() );
361 $this->asyncHandler = $params[
'asyncHandler'] ??
null;
363 $this->cache->registerWrapperInfoForStats(
366 [ __CLASS__,
'getCollectionFromSisterKey' ]
374 $this->logger = $logger;
441 final public function get( $key, &$curTTL =
null, array $checkKeys = [], &$info = [] ) {
444 $legacyInfo = ( $info !== self::PASS_BY_REF );
447 $value =
$res[self::RES_VALUE];
448 $metadata =
$res[self::RES_METADATA];
450 $curTTL = $metadata[self::KEY_CUR_TTL];
451 $info = $legacyInfo ? $metadata[self::KEY_AS_OF] : $metadata;
483 array $checkKeys = [],
488 $legacyInfo = ( $info !== self::PASS_BY_REF );
495 foreach ( $resByKey as $key =>
$res ) {
496 $value =
$res[self::RES_VALUE];
497 $metadata =
$res[self::RES_METADATA];
499 if ( $value !==
false ) {
500 $valuesByKey[$key] = $value;
503 if ( $metadata[self::KEY_CUR_TTL] !==
null ) {
504 $curTTLs[$key] = $metadata[self::KEY_CUR_TTL];
507 $info[$key] = $legacyInfo ? $metadata[self::KEY_AS_OF] : $metadata;
536 $valueSisterKeys = [];
538 $fluxSisterKeys = [];
540 $checkSisterKeysForAll = [];
542 $checkSisterKeysByKey = [];
544 foreach (
$keys as $key ) {
545 $sisterKey = $this->
makeSisterKey( $key, self::TYPE_VALUE, $this->onHostRoute );
546 $allSisterKeys[] = $sisterKey;
547 $valueSisterKeys[] = $sisterKey;
548 if ( $this->onHostRoute !==
null ) {
550 $allSisterKeys[] = $sisterKey;
551 $fluxSisterKeys[] = $sisterKey;
555 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
557 if ( is_int( $i ) ) {
559 $sisterKey = $this->
makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
560 $allSisterKeys[] = $sisterKey;
561 $checkSisterKeysForAll[] = $sisterKey;
564 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
565 $sisterKey = $this->
makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
566 $allSisterKeys[] = $sisterKey;
567 $checkSisterKeysByKey[$i][] = $sisterKey;
572 if ( $this->warmupCache ) {
574 $wrappedBySisterKey = $this->warmupCache;
575 $sisterKeysMissing = array_diff( $allSisterKeys, array_keys( $wrappedBySisterKey ) );
576 if ( $sisterKeysMissing ) {
577 $this->warmupKeyMisses += count( $sisterKeysMissing );
578 $wrappedBySisterKey += $this->cache->getMulti( $sisterKeysMissing );
582 $wrappedBySisterKey = $this->cache->getMulti( $allSisterKeys );
590 $checkSisterKeysForAll,
596 foreach ( $checkSisterKeysByKey as $keyWithCheckKeys => $checkKeysForKey ) {
609 foreach ( $valueSisterKeys as $valueSisterKey ) {
611 $key = current(
$keys );
614 if ( isset( $fkPurgesByKey[$key] ) ) {
617 $wrapped = $fkPurgesByKey[$key];
618 } elseif ( array_key_exists( $valueSisterKey, $wrappedBySisterKey ) ) {
620 $wrapped = $wrappedBySisterKey[$valueSisterKey];
626 list( $value, $metadata ) = $this->
unwrap( $wrapped, $now );
628 $metadata[self::KEY_CHECK_AS_OF] =
null;
630 foreach ( array_merge( $ckPurgesForAll, $ckPurgesByKey[$key] ?? [] ) as $ckPurge ) {
631 $metadata[self::KEY_CHECK_AS_OF] = max(
632 $ckPurge[self::PURGE_TIME],
633 $metadata[self::KEY_CHECK_AS_OF]
636 $holdoffDeadline = $ckPurge[self::PURGE_TIME] + $ckPurge[self::PURGE_HOLDOFF];
638 if ( $value !==
false && $holdoffDeadline >= $metadata[self::KEY_AS_OF] ) {
640 $ago = min( $ckPurge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
642 $metadata[self::KEY_CUR_TTL] = min( $metadata[self::KEY_CUR_TTL], $ago );
646 $resByKey[$key] = [ self::RES_VALUE => $value, self::RES_METADATA => $metadata ];
660 array $fluxSisterKeys,
661 array $wrappedBySisterKey
666 foreach ( $fluxSisterKeys as $fluxKey ) {
668 $key = current(
$keys );
671 $purge = isset( $wrappedBySisterKey[$fluxKey] )
675 if ( $purge !==
null ) {
676 $purges[$key] = $purge;
690 array $checkSisterKeys,
691 array $wrappedBySisterKey,
696 foreach ( $checkSisterKeys as $timeKey ) {
697 $purge = isset( $wrappedBySisterKey[$timeKey] )
701 if ( $purge ===
null ) {
703 $this->cache->add( $timeKey, $wrapped, self::CHECK_KEY_TTL );
790 final public function set( $key, $value, $ttl = self::TTL_INDEFINITE, array $opts = [] ) {
792 $lag = $opts[
'lag'] ?? 0;
793 $age = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
794 $pending = $opts[
'pending'] ??
false;
795 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
796 $staleTTL = $opts[
'staleTTL'] ?? self::STALE_TTL_NONE;
797 $creating = $opts[
'creating'] ??
false;
798 $version = $opts[
'version'] ??
null;
799 $walltime = $opts[
'walltime'] ??
null;
808 'Rejected set() for {cachekey} due to pending writes.',
809 [
'cachekey' => $key ]
821 if ( $age > self::MAX_READ_LAG ) {
823 if ( $walltime ===
null ) {
826 $mitigated =
'snapshot lag';
827 $mitigationTTL = self::TTL_SECOND;
828 } elseif ( ( $age - $walltime ) > self::MAX_READ_LAG ) {
831 $mitigated =
'snapshot lag (late regeneration)';
832 $mitigationTTL = self::TTL_UNCACHEABLE;
836 $mitigated =
'snapshot lag (high regeneration time)';
837 $mitigationTTL = self::TTL_SECOND;
839 } elseif ( $lag ===
false || $lag > self::MAX_READ_LAG ) {
842 $mitigated =
'replication lag';
843 $mitigationTTL = self::TTL_LAGGED;
844 } elseif ( ( $lag + $age ) > self::MAX_READ_LAG ) {
847 $mitigated =
'read lag';
848 $mitigationTTL = self::TTL_UNCACHEABLE;
852 $mitigationTTL =
null;
855 if ( $mitigationTTL === self::TTL_UNCACHEABLE ) {
856 $this->logger->warning(
857 "Rejected set() for {cachekey} due to $mitigated.",
858 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age,
'walltime' => $walltime ]
867 if ( $mitigationTTL !==
null ) {
869 if ( $lockTSE >= 0 ) {
871 $logicalTTL = min( $ttl ?: INF, $mitigationTTL );
874 $ttl = min( $ttl ?: INF, max( $mitigationTTL, self::LOW_TTL ) );
877 $this->logger->warning(
878 "Lowered set() TTL for {cachekey} due to $mitigated.",
879 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age,
'walltime' => $walltime ]
884 $wrapped = $this->
wrap( $value, $logicalTTL ?: $ttl, $version, $now, $walltime );
885 $storeTTL = $ttl + $staleTTL;
888 $ok = $this->cache->add(
894 $ok = $this->cache->merge(
896 static function (
$cache, $key, $cWrapped ) use ( $wrapped ) {
898 return ( is_string( $cWrapped ) ) ?
false : $wrapped;
970 final public function delete( $key, $ttl = self::HOLDOFF_TTL ) {
974 $valueSisterKey = $this->
makeSisterKey( $key, self::TYPE_VALUE );
1003 if ( $this->onHostRoute !==
null ) {
1004 $fluxSisterKey = $this->
makeSisterKey( $key, self::TYPE_FLUX );
1012 $this->stats->increment(
"wanobjectcache.$kClass.delete." . ( $ok ?
'ok' :
'error' ) );
1102 $checkSisterKeysByKey = [];
1103 foreach (
$keys as $key ) {
1104 $checkSisterKeysByKey[$key] = $this->
makeSisterKey( $key, self::TYPE_TIMESTAMP );
1107 $wrappedBySisterKey = $this->cache->getMulti( $checkSisterKeysByKey );
1108 $wrappedBySisterKey += array_fill_keys( $checkSisterKeysByKey,
false );
1112 foreach ( $checkSisterKeysByKey as $key => $checkSisterKey ) {
1113 $purge = $this->
parsePurgeValue( $wrappedBySisterKey[$checkSisterKey] );
1114 if ( $purge ===
null ) {
1116 $this->cache->add( $checkSisterKey, $wrapped, self::CHECK_KEY_TTL );
1119 $times[$key] = $purge[self::PURGE_TIME];
1160 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_TIMESTAMP );
1167 $this->stats->increment(
"wanobjectcache.$kClass.ck_touch." . ( $ok ?
'ok' :
'error' ) );
1200 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_TIMESTAMP );
1204 $this->stats->increment(
"wanobjectcache.$kClass.ck_reset." . ( $ok ?
'ok' :
'error' ) );
1513 $key, $ttl, $callback, array $opts = [], array $cbParams = []
1515 $version = $opts[
'version'] ??
null;
1516 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
1517 $pCache = ( $pcTTL >= 0 )
1524 if ( $pCache && $this->callbackDepth == 0 ) {
1525 $cached = $pCache->get( $key, $pcTTL,
false );
1526 if ( $cached !==
false ) {
1527 $this->logger->debug(
"getWithSetCallback($key): process cache hit" );
1533 list( $value, $valueVersion, $curAsOf ) =
$res;
1534 if ( $valueVersion !== $version ) {
1538 $this->logger->debug(
"getWithSetCallback($key): using variant key" );
1540 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), $version ),
1543 [
'version' =>
null,
'minAsOf' => $curAsOf ] + $opts,
1549 if ( $pCache && $value !==
false ) {
1550 $pCache->set( $key, $value );
1573 $checkKeys = $opts[
'checkKeys'] ?? [];
1574 $graceTTL = $opts[
'graceTTL'] ?? self::GRACE_TTL_NONE;
1575 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
1576 $hotTTR = $opts[
'hotTTR'] ?? self::HOT_TTR;
1577 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1578 $ageNew = $opts[
'ageNew'] ?? self::AGE_NEW;
1579 $touchedCb = $opts[
'touchedCallback'] ??
null;
1586 $curValue =
$res[self::RES_VALUE];
1587 $curInfo =
$res[self::RES_METADATA];
1588 $curTTL = $curInfo[self::KEY_CUR_TTL];
1590 list( $curTTL, $LPT ) = $this->
resolveCTL( $curValue, $curTTL, $curInfo, $touchedCb );
1593 $this->
isValid( $curValue, $curInfo[self::KEY_AS_OF], $minAsOf ) &&
1596 $preemptiveRefresh = (
1600 if ( !$preemptiveRefresh ) {
1601 $this->stats->timing(
1602 "wanobjectcache.$kClass.hit.good",
1606 return [ $curValue, $curInfo[self::KEY_VERSION], $curInfo[self::KEY_AS_OF] ];
1608 $this->logger->debug(
"fetchOrRegenerate($key): hit with async refresh" );
1609 $this->stats->timing(
1610 "wanobjectcache.$kClass.hit.refresh",
1614 return [ $curValue, $curInfo[self::KEY_VERSION], $curInfo[self::KEY_AS_OF] ];
1616 $this->logger->debug(
"fetchOrRegenerate($key): hit with sync refresh" );
1621 $isKeyTombstoned = ( $curInfo[self::KEY_TOMB_AS_OF] !== null );
1622 if ( $isKeyTombstoned ) {
1624 list( $possValue, $possInfo ) = $this->
getInterimValue( $key, $minAsOf );
1628 $possValue = $curValue;
1629 $possInfo = $curInfo;
1635 $this->
isValid( $possValue, $possInfo[self::KEY_AS_OF], $minAsOf, $LPT ) &&
1638 $this->logger->debug(
"fetchOrRegenerate($key): volatile hit" );
1639 $this->stats->timing(
1640 "wanobjectcache.$kClass.hit.volatile",
1644 return [ $possValue, $possInfo[self::KEY_VERSION], $curInfo[self::KEY_AS_OF] ];
1647 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
1648 $busyValue = $opts[
'busyValue'] ??
null;
1649 $staleTTL = $opts[
'staleTTL'] ?? self::STALE_TTL_NONE;
1650 $version = $opts[
'version'] ??
null;
1653 $useRegenerationLock =
1663 ( $curTTL !==
null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE ) ||
1666 ( $busyValue !==
null && $possValue ===
false );
1672 if ( $useRegenerationLock && !$hasLock ) {
1673 if ( $this->
isValid( $possValue, $possInfo[self::KEY_AS_OF], $minAsOf ) ) {
1674 $this->logger->debug(
"fetchOrRegenerate($key): returning stale value" );
1675 $this->stats->timing(
1676 "wanobjectcache.$kClass.hit.stale",
1680 return [ $possValue, $possInfo[self::KEY_VERSION], $curInfo[self::KEY_AS_OF] ];
1681 } elseif ( $busyValue !==
null ) {
1682 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1683 $this->logger->debug(
"fetchOrRegenerate($key): busy $miss" );
1684 $this->stats->timing(
1685 "wanobjectcache.$kClass.$miss.busy",
1690 return [ $placeholderValue, $version, $curInfo[self::KEY_AS_OF] ];
1697 ++$this->callbackDepth;
1700 ( $curInfo[self::KEY_VERSION] === $version ) ? $curValue :
false,
1703 ( $curInfo[self::KEY_VERSION] === $version ) ? $curInfo[self::KEY_AS_OF] :
null,
1707 --$this->callbackDepth;
1712 $elapsed = max( $postCallbackTime - $initialTime, 0.0 );
1715 $walltime = max( $postCallbackTime - $preCallbackTime, 0.0 );
1716 $this->stats->timing(
"wanobjectcache.$kClass.regen_walltime", 1e3 * $walltime );
1721 ( $value !==
false && $ttl >= 0 ) &&
1723 ( !$useRegenerationLock || $hasLock || $isKeyTombstoned ) &&
1728 if ( $isKeyTombstoned ) {
1729 $this->
setInterimValue( $key, $value, $lockTSE, $version, $walltime );
1733 'since' => $setOpts[
'since'] ?? $preCallbackTime,
1734 'version' => $version,
1735 'staleTTL' => $staleTTL,
1736 'lockTSE' => $lockTSE,
1737 'creating' => ( $curValue === false ),
1738 'walltime' => $walltime
1740 $this->
set( $key, $value, $ttl, $finalSetOpts );
1746 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1747 $this->logger->debug(
"fetchOrRegenerate($key): $miss, new value computed" );
1748 $this->stats->timing(
1749 "wanobjectcache.$kClass.$miss.compute",
1753 return [ $value, $version, $curInfo[self::KEY_AS_OF] ];
1761 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_MUTEX );
1763 return $this->cache->add( $checkSisterKey, 1, self::LOCK_TTL );
1772 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_MUTEX );
1776 $this->cache->changeTTL( $checkSisterKey, $this->
getCurrentTime() - 60 );
1792 foreach ( $baseKeys as $baseKey ) {
1809 private function makeSisterKey(
string $baseKey,
string $typeChar,
string $route =
null ) {
1810 if ( $this->coalesceScheme === self::SCHEME_HASH_STOP ) {
1812 $sisterKey =
'WANCache:' . $baseKey .
'|#|' . $typeChar;
1815 $sisterKey =
'WANCache:{' . $baseKey .
'}:' . $typeChar;
1818 if ( $route !==
null ) {
1819 $sisterKey = $this->
prependRoute( $sisterKey, $route );
1832 if ( substr( $sisterKey, -4 ) ===
'|#|v' ) {
1834 $collection = substr( $sisterKey, 9, strcspn( $sisterKey,
':|', 9 ) );
1835 } elseif ( substr( $sisterKey, -3 ) ===
'}:v' ) {
1837 $collection = substr( $sisterKey, 10, strcspn( $sisterKey,
':}', 10 ) );
1839 $collection =
'internal';
1850 return ( $age < mt_rand( self::RECENT_SET_LOW_MS, self::RECENT_SET_HIGH_MS ) / 1e3 );
1875 $valueSisterKey = $this->
makeSisterKey( $key, self::TYPE_VALUE, $this->onHostRoute );
1876 list( $estimatedSize ) = $this->cache->setNewPreparedValues( [
1877 $valueSisterKey => $value
1889 $missesPerSecForHighQPS = ( min( $elapsed, 1 ) * $this->keyHighQps );
1901 if ( ( $missesPerSecForHighQPS * $estimatedSize ) >= $this->keyHighUplinkBps ) {
1902 $cooloffSisterKey = $this->
makeSisterKey( $key, self::TYPE_COOLOFF );
1903 $this->cache->clearLastError();
1905 !$this->cache->add( $cooloffSisterKey, 1, self::COOLOFF_TTL ) &&
1909 $this->stats->increment(
"wanobjectcache.$kClass.cooloff_bounce" );
1917 $this->stats->timing(
"wanobjectcache.$kClass.regen_set_delay", 1e3 * $elapsed );
1918 $this->stats->updateCount(
"wanobjectcache.$kClass.regen_set_bytes", $estimatedSize );
1931 private function resolveCTL( $value, $curTTL, $curInfo, $touchedCallback ) {
1932 if ( $touchedCallback ===
null || $value ===
false ) {
1935 max( $curInfo[self::KEY_TOMB_AS_OF], $curInfo[self::KEY_CHECK_AS_OF] )
1939 $touched = $touchedCallback( $value );
1940 if ( $touched !==
null && $touched >= $curInfo[self::KEY_AS_OF] ) {
1941 $curTTL = min( $curTTL, self::TINY_NEGATIVE, $curInfo[self::KEY_AS_OF] - $touched );
1947 $curInfo[self::KEY_TOMB_AS_OF],
1948 $curInfo[self::KEY_CHECK_AS_OF],
1962 return ( $touchedCallback ===
null || $value ===
false )
1964 : max( $touchedCallback( $value ), $lastPurge );
1976 $wrapped = $this->cache->get(
1980 list( $value, $keyInfo ) = $this->
unwrap( $wrapped, $now );
1981 if ( $this->
isValid( $value, $keyInfo[self::KEY_AS_OF], $minAsOf ) ) {
1982 return [ $value, $keyInfo ];
1986 return $this->
unwrap(
false, $now );
1997 $ttl = max( self::INTERIM_KEY_TTL, (
int)$ttl );
1999 $wrapped = $this->
wrap( $value, $ttl, $version, $this->
getCurrentTime(), $walltime );
2000 $this->cache->merge(
2002 static function () use ( $wrapped ) {
2015 return ( $busyValue instanceof Closure ) ? $busyValue() : $busyValue;
2083 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2088 $opts[
'checkKeys'] ?? []
2090 $this->warmupKeyMisses = 0;
2096 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2099 return $callback( $params[
'id'], $oldValue, $ttl, $setOpts, $oldAsOf );
2103 foreach ( $keyedIds as $key => $id ) {
2113 $this->warmupCache = [];
2184 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2186 $checkKeys = $opts[
'checkKeys'] ?? [];
2187 unset( $opts[
'lockTSE'] );
2188 unset( $opts[
'busyValue'] );
2193 $this->warmupKeyMisses = 0;
2199 $resByKey = $this->
fetchKeys( $keysByIdGet, $checkKeys );
2200 foreach ( $keysByIdGet as $id => $key ) {
2201 $res = $resByKey[$key];
2202 $value =
$res[self::RES_VALUE];
2203 $metadata =
$res[self::RES_METADATA];
2204 if ( $value ===
false || $metadata[self::KEY_CUR_TTL] < 0 ) {
2211 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
2212 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
2218 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2219 use ( $callback, $newValsById, $newTTLsById, $newSetOpts )
2221 $id = $params[
'id'];
2223 if ( array_key_exists( $id, $newValsById ) ) {
2225 $newValue = $newValsById[$id];
2226 $ttl = $newTTLsById[$id];
2227 $setOpts = $newSetOpts;
2231 $ttls = [ $id => $ttl ];
2232 $newValue = $callback( [ $id ], $ttls, $setOpts )[$id];
2241 foreach ( $keyedIds as $key => $id ) {
2251 $this->warmupCache = [];
2268 final public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
2269 $valueSisterKey = $this->
makeSisterKey( $key, self::TYPE_VALUE );
2271 $minAsOf = $purgeTimestamp + self::HOLDOFF_TTL;
2272 $wrapped = $this->cache->get( $valueSisterKey );
2273 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
2275 $this->logger->warning(
"Reaping stale value key '$key'." );
2276 $ttlReap = self::HOLDOFF_TTL;
2277 $ok = $this->cache->changeTTL( $valueSisterKey, $ttlReap );
2279 $this->logger->error(
"Could not complete reap of key '$key'." );
2299 final public function reapCheckKey( $key, $purgeTimestamp, &$isStale =
false ) {
2300 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_TIMESTAMP );
2302 $wrapped = $this->cache->get( $checkSisterKey );
2304 if ( $purge !==
null && $purge[self::PURGE_TIME] < $purgeTimestamp ) {
2306 $this->logger->warning(
"Reaping stale check key '$key'." );
2307 $ok = $this->cache->changeTTL( $checkSisterKey, self::TTL_SECOND );
2309 $this->logger->error(
"Could not complete reap of check key '$key'." );
2331 return $this->cache->makeGlobalKey( ...func_get_args() );
2344 public function makeKey( $collection, ...$components ) {
2345 return $this->cache->makeKey( ...func_get_args() );
2356 return hash_hmac(
'sha256', $component, $this->secret );
2411 foreach ( $ids as $id ) {
2413 if ( strlen( $id ) > 64 ) {
2414 $this->logger->warning( __METHOD__ .
": long ID '$id'; use hash256()" );
2416 $key = $keyCallback( $id, $this );
2418 if ( !isset( $idByKey[$key] ) ) {
2419 $idByKey[$key] = $id;
2420 } elseif ( (
string)$id !== (
string)$idByKey[$key] ) {
2421 throw new UnexpectedValueException(
2422 "Cache key collision; IDs ('$id','{$idByKey[$key]}') map to '$key'"
2427 return new ArrayIterator( $idByKey );
2466 if ( count( $ids ) !== count(
$res ) ) {
2469 $ids = array_keys( array_fill_keys( $ids,
true ) );
2470 if ( count( $ids ) !== count(
$res ) ) {
2471 throw new UnexpectedValueException(
"Multi-key result does not match ID list" );
2475 return array_combine( $ids,
$res );
2483 $code = $this->cache->getLastError();
2486 return self::ERR_NONE;
2488 return self::ERR_NO_RESPONSE;
2490 return self::ERR_UNREACHABLE;
2492 return self::ERR_UNEXPECTED;
2500 $this->cache->clearLastError();
2509 $this->processCaches = [];
2542 return $this->cache->getQoS( $flag );
2608 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
2609 if ( is_float( $mtime ) || ctype_digit( $mtime ) ) {
2610 $mtime = (int)$mtime;
2613 if ( !is_int( $mtime ) || $mtime <= 0 ) {
2619 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
2627 return $this->warmupKeyMisses;
2645 $purgeByRouteKey = [];
2646 foreach ( $purgeBySisterKey as $sisterKey => $purge ) {
2647 if ( $this->broadcastRoute !==
null ) {
2648 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2650 $routeKey = $sisterKey;
2652 $purgeByRouteKey[$routeKey] = $purge;
2655 if ( count( $purgeByRouteKey ) == 1 ) {
2656 $purge = reset( $purgeByRouteKey );
2657 $ok = $this->cache->set( key( $purgeByRouteKey ), $purge, $ttl );
2659 $ok = $this->cache->setMulti( $purgeByRouteKey, $ttl );
2674 if ( $this->broadcastRoute !==
null ) {
2675 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2677 $routeKey = $sisterKey;
2680 return $this->cache->delete( $routeKey );
2689 if ( $sisterKey[0] ===
'/' ) {
2690 throw new RuntimeException(
"Sister key '$sisterKey' already contains a route." );
2693 return $route . $sisterKey;
2708 if ( !$this->asyncHandler ) {
2715 $func = $this->asyncHandler;
2716 $func(
function () use ( $key, $ttl, $callback, $opts, $cbParams ) {
2717 $opts[
'minAsOf'] = INF;
2720 }
catch ( Exception $e ) {
2722 $this->logger->error(
'Async refresh failed for {key}', [
2749 if ( $curTTL > 0 ) {
2751 } elseif ( $graceTTL <= 0 ) {
2755 $ageStale = abs( $curTTL );
2756 $curGraceTTL = ( $graceTTL - $ageStale );
2757 if ( $curGraceTTL <= 0 ) {
2783 if ( $lowTTL <= 0 ) {
2789 $effectiveLowTTL = min( $lowTTL, $logicalTTL ?: INF );
2791 if ( $curTTL >= $effectiveLowTTL || $curTTL <= 0 ) {
2795 $chance = ( 1 - $curTTL / $effectiveLowTTL );
2798 $decision = ( mt_rand( 1, 1e9 ) <= 1e9 * $chance );
2800 $this->logger->debug(
2801 "worthRefreshExpiring($curTTL, $logicalTTL, $lowTTL): " .
2802 "p = $chance; refresh = " . ( $decision ?
'Y' :
'N' )
2824 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
2828 $age = $now - $asOf;
2829 $timeOld = $age - $ageNew;
2830 if ( $timeOld <= 0 ) {
2834 $popularHitsPerSec = 1;
2838 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
2842 $chance = 1 / ( $popularHitsPerSec * $refreshWindowSec );
2844 $chance *= ( $timeOld <= self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2847 $decision = ( mt_rand( 1, 1e9 ) <= 1e9 * $chance );
2849 $this->logger->debug(
2850 "worthRefreshPopular($asOf, $ageNew, $timeTillRefresh, $now): " .
2851 "p = $chance; refresh = " . ( $decision ?
'Y' :
'N' )
2866 protected function isValid( $value, $asOf, $minAsOf, $purgeTime =
null ) {
2868 $safeMinAsOf = max( $minAsOf, $purgeTime + self::TINY_POSTIVE );
2870 if ( $value ===
false ) {
2872 } elseif ( $safeMinAsOf > 0 && $asOf < $minAsOf ) {
2887 private function wrap( $value, $ttl, $version, $now, $walltime ) {
2891 self::FLD_FORMAT_VERSION => self::VERSION,
2892 self::FLD_VALUE => $value,
2893 self::FLD_TTL => $ttl,
2894 self::FLD_TIME => $now
2896 if ( $version !==
null ) {
2897 $wrapped[self::FLD_VALUE_VERSION] = $version;
2899 if ( $walltime >= self::GENERATION_SLOW_SEC ) {
2900 $wrapped[self::FLD_GENERATION_TIME] = $walltime;
2922 if ( is_array( $wrapped ) ) {
2925 ( $wrapped[self::FLD_FORMAT_VERSION] ??
null ) === self::VERSION &&
2926 $wrapped[self::FLD_TIME] >= $this->epoch
2928 if ( $wrapped[self::FLD_TTL] > 0 ) {
2930 $age = $now - $wrapped[self::FLD_TIME];
2931 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2936 $value = $wrapped[self::FLD_VALUE];
2937 $info[self::KEY_VERSION] = $wrapped[self::FLD_VALUE_VERSION] ??
null;
2938 $info[self::KEY_AS_OF] = $wrapped[self::FLD_TIME];
2939 $info[self::KEY_CUR_TTL] = $curTTL;
2940 $info[self::KEY_TTL] = $wrapped[self::FLD_TTL];
2945 if ( $purge !==
null ) {
2947 $info[self::KEY_CUR_TTL] =
2948 min( $purge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
2949 $info[self::KEY_TOMB_AS_OF] = $purge[self::PURGE_TIME];
2953 return [ $value, $info ];
2961 self::KEY_VERSION =>
null,
2962 self::KEY_AS_OF =>
null,
2963 self::KEY_TTL =>
null,
2964 self::KEY_CUR_TTL =>
null,
2965 self::KEY_TOMB_AS_OF => null
2974 $parts = explode(
':', $key, 3 );
2977 return strtr( $parts[1] ?? $parts[0],
'.',
'_' );
2989 if ( !is_string( $value ) ) {
2993 $segments = explode(
':', $value, 3 );
2994 if ( isset( $segments[2] ) ) {
2995 $prefix = $segments[0];
2996 $timestamp = (float)$segments[1];
2997 $holdoff = (int)$segments[2];
2998 } elseif ( isset( $segments[1] ) ) {
2999 $prefix = $segments[0];
3000 $timestamp = (float)$segments[1];
3002 $holdoff = self::HOLDOFF_TTL;
3007 if (
"{$prefix}:" !== self::PURGE_VAL_PREFIX || $timestamp < $this->epoch ) {
3012 return [ self::PURGE_TIME => $timestamp, self::PURGE_HOLDOFF => $holdoff ];
3020 return self::PURGE_VAL_PREFIX . number_format( $timestamp, 4,
'.',
'' );
3030 $normalizedTime = number_format( $timestamp, 4,
'.',
'' );
3032 $purge = [ self::PURGE_TIME => (float)$normalizedTime, self::PURGE_HOLDOFF => $holdoff ];
3034 return self::PURGE_VAL_PREFIX .
"$normalizedTime:$holdoff";
3042 if ( !isset( $this->processCaches[$group] ) ) {
3043 list( , $size ) = explode(
':', $group );
3044 $this->processCaches[$group] =
new MapCacheLRU( (
int)$size );
3045 if ( $this->wallClockOverride !==
null ) {
3046 $this->processCaches[$group]->setMockTime( $this->wallClockOverride );
3050 return $this->processCaches[$group];
3059 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
3062 if ( $pcTTL > 0 && $this->callbackDepth == 0 ) {
3063 $version = $opts[
'version'] ??
null;
3064 $pCache = $this->
getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY );
3065 foreach (
$keys as $key => $id ) {
3066 if ( !$pCache->has( $key, $pcTTL ) ) {
3067 $keysMissing[$id] = $key;
3072 return $keysMissing;
3089 if ( $this->onHostRoute !==
null ) {
3090 foreach (
$keys as $key ) {
3091 $sisterKeys[] = $this->
makeSisterKey( $key, self::TYPE_FLUX );
3095 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
3097 if ( is_int( $i ) ) {
3099 $sisterKeys[] = $this->
makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
3102 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
3103 $sisterKeys[] = $this->
makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
3108 $wrappedBySisterKey = $this->cache->getMulti( $sisterKeys );
3109 $wrappedBySisterKey += array_fill_keys( $sisterKeys,
false );
3111 return $wrappedBySisterKey;
3119 if ( $this->wallClockOverride ) {
3120 return $this->wallClockOverride;
3123 $clockTime = (float)time();
3129 return max( microtime(
true ), $clockTime );
3137 $this->wallClockOverride =& $time;
3138 $this->cache->setMockTime( $time );
3139 foreach ( $this->processCaches as $pCache ) {
3140 $pCache->setMockTime( $time );
Class representing a cache/ephemeral data store.
A BagOStuff object with no objects in it.
Handles a simple LRU key/value map with a maximum number of entries.
Multi-datacenter aware caching interface.
makeGlobalKey( $collection,... $components)
Make a cache key for the global keyspace and given components.
int $callbackDepth
Callback stack depth for getWithSetCallback()
const PURGE_TIME
Key to the tombstone entry timestamp.
const HOLDOFF_TTL
Seconds to tombstone keys on delete() and treat as volatile after invalidation.
const HOT_TTR
Expected time-till-refresh, in seconds, if the key is accessed once per second.
const KEY_VERSION
Version number attribute for a key; keep value for b/c (< 1.36)
const RES_METADATA
The key metadata component of a fetchMulti() result.
__construct(array $params)
resolveCTL( $value, $curTTL, $curInfo, $touchedCallback)
const TYPE_TIMESTAMP
Single character component for timestamp check keys.
worthRefreshPopular( $asOf, $ageNew, $timeTillRefresh, $now)
Check if a key is due for randomized regeneration due to its popularity.
fetchOrRegenerate( $key, $ttl, $callback, array $opts, array $cbParams)
Do the actual I/O for getWithSetCallback() when needed.
multiRemap(array $ids, array $res)
Get an (ID => value) map from (i) a non-unique list of entity IDs, and (ii) the list of corresponding...
const FLD_FORMAT_VERSION
Key to WAN cache version number.
determineKeyClassForStats( $key)
const SCHEME_HASH_STOP
Use mcrouter-style Hash Stop key scheme (e.g.
const RES_VALUE
The key value component of a fetchMulti() result.
prependRoute(string $sisterKey, string $route)
touchCheckKey( $key, $holdoff=self::HOLDOFF_TTL)
Purge a "check" key from all datacenters, invalidating keys that use it.
const FLD_VALUE
Key to the cached value.
const PURGE_HOLDOFF
Key to the tombstone entry hold-off TTL.
isValid( $value, $asOf, $minAsOf, $purgeTime=null)
Check if $value is not false, versioned (if needed), and not older than $minTime (if set)
adaptiveTTL( $mtime, $maxTTL, $minTTL=30, $factor=0.2)
Get a TTL that is higher for objects that have not changed recently.
const GRACE_TTL_NONE
Idiom for set()/getWithSetCallback() meaning "no post-expiration grace period".
isVolatileValueAgeNegligible( $age)
string null $onHostRoute
Routing prefix for value keys that support use of an on-host tier.
int $warmupKeyMisses
Key fetched.
float null $wallClockOverride
relayVolatilePurges(array $purgeBySisterKey, int $ttl)
Set a sister key to a purge value in all datacenters.
mixed[] $warmupCache
Temporary warm-up cache.
const VERSION
Cache format version number.
const LOW_TTL
Consider regeneration if the key will expire within this many seconds.
getLastError()
Get the "last error" registered; clearLastError() should be called manually.
BagOStuff $cache
The local datacenter cache.
parsePurgeValue( $value)
Extract purge metadata from cached value if it is a valid purge value.
scheduleAsyncRefresh( $key, $ttl, $callback, array $opts, array $cbParams)
Schedule a deferred cache regeneration if possible.
const GENERATION_SLOW_SEC
Consider value generation slow if it takes more than this many seconds.
const COOLOFF_TTL
Seconds to no-op key set() calls to avoid large blob I/O stampedes.
getWithSetCallback( $key, $ttl, $callback, array $opts=[], array $cbParams=[])
Method to fetch/regenerate a cache key.
getCheckKeyTime( $key)
Fetch the value of a timestamp "check" key.
getNonProcessCachedMultiKeys(ArrayIterator $keys, array $opts)
const SCHEME_HASH_TAG
Use twemproxy-style Hash Tag key scheme (e.g.
const RECENT_SET_HIGH_MS
Max millisecond set() backoff during hold-off (far less than INTERIM_KEY_TTL)
const LOCK_TTL
Seconds to keep lock keys around.
getMulti(array $keys, &$curTTLs=[], array $checkKeys=[], &$info=[])
Fetch the value of several keys from cache.
const PC_PRIMARY
Default process cache name and max key count.
getMultiWithUnionSetCallback(ArrayIterator $keyedIds, $ttl, callable $callback, array $opts=[])
Method to fetch/regenerate multiple cache keys at once.
const TYPE_MUTEX
Single character component for mutex lock keys.
relayNonVolatilePurge(string $sisterKey)
Remove a sister key from all datacenters.
getMultiWithSetCallback(ArrayIterator $keyedIds, $ttl, callable $callback, array $opts=[])
Method to fetch multiple cache keys at once with regeneration.
wrap( $value, $ttl, $version, $now, $walltime)
const PURGE_VAL_PREFIX
Value prefix of purge values.
const INTERIM_KEY_TTL
Seconds to keep interim value keys for tombstoned keys around.
makeMultiKeys(array $ids, $keyCallback)
Get an iterator of (cache key => entity ID) for a list of entity IDs.
makeTombstonePurgeValue(float $timestamp)
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
int $coalesceScheme
Scheme to use for key coalescing (Hash Tags or Hash Stops)
const FLD_TTL
Key to the original TTL.
bool $useInterimHoldOffCaching
Whether to use "interim" caching while keys are tombstoned.
worthRefreshExpiring( $curTTL, $logicalTTL, $lowTTL)
Check if a key is nearing expiration and thus due for randomized regeneration.
const FLD_FLAGS
Key to the flags bit field (reserved number)
const HOLDOFF_TTL_NONE
Idiom for delete()/touchCheckKey() meaning "no hold-off period".
const TYPE_FLUX
Single character component for flux keys.
const MAX_READ_LAG
Max expected seconds of combined lag from replication and view snapshots.
resolveTouched( $value, $lastPurge, $touchedCallback)
setInterimValue( $key, $value, $ttl, $version, $walltime)
const FLD_TIME
Key to the cache timestamp.
const CHECK_KEY_TTL
Seconds to keep dependency purge keys around.
useInterimHoldOffCaching( $enabled)
Enable or disable the use of brief caching for tombstoned keys.
StatsdDataFactoryInterface $stats
makeSisterKeys(array $baseKeys, string $type, string $route=null)
Get sister keys that should be collocated with their corresponding base cache keys.
clearProcessCache()
Clear the in-process caches; useful for testing.
const KEY_AS_OF
Generation timestamp attribute for a key; keep value for b/c (< 1.36)
const TYPE_COOLOFF
Single character component for cool-off bounce keys.
const FLD_GENERATION_TIME
Key to how long it took to generate the value.
makeSisterKey(string $baseKey, string $typeChar, string $route=null)
Get a sister key that should be collocated with a base cache key.
makeKey( $collection,... $components)
Make a cache key using the "global" keyspace for the given components.
float $epoch
Unix timestamp of the oldest possible valid values.
fetchWrappedValuesForWarmupCache(array $keys, array $checkKeys)
callable null $asyncHandler
Function that takes a WAN cache callback and runs it later.
string null $broadcastRoute
Routing prefix for values that should be broadcasted to all data centers.
resolveBusyValue( $busyValue)
reap( $key, $purgeTimestamp, &$isStale=false)
Set a key to soon expire in the local cluster if it pre-dates $purgeTimestamp.
const RECENT_SET_LOW_MS
Min millisecond set() backoff during hold-off (far less than INTERIM_KEY_TTL)
setLogger(LoggerInterface $logger)
processFluxKeys(array $keys, array $fluxSisterKeys, array $wrappedBySisterKey)
static getCollectionFromSisterKey(string $sisterKey)
reapCheckKey( $key, $purgeTimestamp, &$isStale=false)
Set a "check" key to soon expire in the local cluster if it pre-dates $purgeTimestamp.
const TYPE_INTERIM
Single character component for interium value keys.
const PASS_BY_REF
Idiom for get()/getMulti() to return extra information by reference.
checkAndSetCooloff( $key, $kClass, $value, $elapsed, $hasLock)
Check whether set() is rate-limited to avoid concurrent I/O spikes.
float $keyHighUplinkBps
Max tolerable bytes/second to spend on a cache write stampede for a key.
const KEY_CHECK_AS_OF
Highest "check" key timestamp for a key; keep value for b/c (< 1.36)
processCheckKeys(array $checkSisterKeys, array $wrappedBySisterKey, float $now)
clearLastError()
Clear the "last error" registry.
const STALE_TTL_NONE
Idiom for set()/getWithSetCallback() meaning "no post-expiration persistence".
MapCacheLRU[] $processCaches
Map of group PHP instance caches.
const TSE_NONE
Idiom for getWithSetCallback() meaning "no cache stampede mutex".
string $secret
Stable secret used for hasing long strings into key components.
fetchKeys(array $keys, array $checkKeys)
Fetch the value and key metadata of several keys from cache.
const TYPE_VALUE
Single character component for value keys.
resetCheckKey( $key)
Delete a "check" key from all datacenters, invalidating keys that use it.
const KEY_TOMB_AS_OF
Tomstone timestamp attribute for a key; keep value for b/c (< 1.36)
int $keyHighQps
Reads/second assumed during a hypothetical cache write stampede for a key.
const MAX_COMMIT_DELAY
Max expected seconds to pass between delete() and DB commit finishing.
const KEY_CUR_TTL
Remaining TTL attribute for a key; keep value for b/c (< 1.36)
const AGE_NEW
Minimum key age, in seconds, for expected time-till-refresh to be considered.
getInterimValue( $key, $minAsOf)
yieldStampedeLock( $key, $hasLock)
const RAMPUP_TTL
Seconds to ramp up the chance of regeneration due to expected time-till-refresh.
const TTL_LAGGED
Max TTL, in seconds, to store keys when a data sourced is lagged.
const FLD_VALUE_VERSION
Key to collection cache version number.
isAliveOrInGracePeriod( $curTTL, $graceTTL)
Check if a key is fresh or in the grace window and thus due for randomized reuse.
hash256( $component)
Hash a possibly long string into a suitable component for makeKey()/makeGlobalKey()
getMultiCheckKeyTime(array $keys)
Fetch the values of each timestamp "check" key.
makeCheckPurgeValue(float $timestamp, int $holdoff, array &$purge=null)
const KEY_TTL
Logical TTL attribute for a key.
Generic interface for object stores with key encoding methods.