22use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23use Psr\Log\LoggerAwareInterface;
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
183 public const HOLDOFF_TTL = self::MAX_COMMIT_DELAY + self::MAX_READ_LAG + 1;
206 public const MIN_TIMESTAMP_NONE = 0.0;
232 private const TINY_NEGATIVE = -0.000001;
234 private const TINY_POSTIVE = 0.000001;
346 $this->cache = $params[
'cache'];
347 $this->broadcastRoute = $params[
'broadcastRoutingPrefix'] ??
null;
348 $this->epoch = $params[
'epoch'] ?? 0;
349 $this->secret = $params[
'secret'] ?? (string)$this->epoch;
350 if ( ( $params[
'coalesceScheme'] ??
'' ) ===
'hash_tag' ) {
354 $this->coalesceScheme = self::SCHEME_HASH_TAG;
357 $this->coalesceScheme = self::SCHEME_HASH_STOP;
360 $this->keyHighQps = $params[
'keyHighQps'] ?? 100;
361 $this->keyHighUplinkBps = $params[
'keyHighUplinkBps'] ?? ( 1e9 / 8 / 100 );
363 $this->
setLogger( $params[
'logger'] ??
new NullLogger() );
365 $this->asyncHandler = $params[
'asyncHandler'] ??
null;
367 $this->missLog = array_fill( 0, 10, [
'', 0.0 ] );
369 $this->cache->registerWrapperInfoForStats(
372 [ __CLASS__,
'getCollectionFromSisterKey' ]
380 $this->logger = $logger;
447 final public function get( $key, &$curTTL =
null, array $checkKeys = [], &$info = [] ) {
450 $legacyInfo = ( $info !== self::PASS_BY_REF );
454 $curTTL =
$res[self::RES_CUR_TTL];
456 ?
$res[self::RES_AS_OF]
458 self::KEY_VERSION =>
$res[self::RES_VERSION],
459 self::KEY_AS_OF =>
$res[self::RES_AS_OF],
460 self::KEY_TTL =>
$res[self::RES_TTL],
461 self::KEY_CUR_TTL =>
$res[self::RES_CUR_TTL],
462 self::KEY_TOMB_AS_OF =>
$res[self::RES_TOMB_AS_OF],
463 self::KEY_CHECK_AS_OF =>
$res[self::RES_CHECK_AS_OF]
466 if ( $curTTL ===
null || $curTTL <= 0 ) {
468 reset( $this->missLog );
469 unset( $this->missLog[key( $this->missLog )] );
473 return $res[self::RES_VALUE];
503 array $checkKeys = [],
508 $legacyInfo = ( $info !== self::PASS_BY_REF );
515 foreach ( $resByKey as $key =>
$res ) {
516 if (
$res[self::RES_VALUE] !==
false ) {
517 $valuesByKey[$key] =
$res[self::RES_VALUE];
520 if (
$res[self::RES_CUR_TTL] !==
null ) {
521 $curTTLs[$key] =
$res[self::RES_CUR_TTL];
523 $info[$key] = $legacyInfo
524 ?
$res[self::RES_AS_OF]
526 self::KEY_VERSION =>
$res[self::RES_VERSION],
527 self::KEY_AS_OF =>
$res[self::RES_AS_OF],
528 self::KEY_TTL =>
$res[self::RES_TTL],
529 self::KEY_CUR_TTL =>
$res[self::RES_CUR_TTL],
530 self::KEY_TOMB_AS_OF =>
$res[self::RES_TOMB_AS_OF],
531 self::KEY_CHECK_AS_OF =>
$res[self::RES_CHECK_AS_OF]
552 protected function fetchKeys( array
$keys, array $checkKeys, $touchedCb =
null ) {
558 $valueSisterKeys = [];
560 $checkSisterKeysForAll = [];
562 $checkSisterKeysByKey = [];
564 foreach (
$keys as $key ) {
566 $allSisterKeys[] = $sisterKey;
567 $valueSisterKeys[] = $sisterKey;
570 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
572 if ( is_int( $i ) ) {
574 $sisterKey = $this->
makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
575 $allSisterKeys[] = $sisterKey;
576 $checkSisterKeysForAll[] = $sisterKey;
579 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
580 $sisterKey = $this->
makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
581 $allSisterKeys[] = $sisterKey;
582 $checkSisterKeysByKey[$i][] = $sisterKey;
587 if ( $this->warmupCache ) {
589 $wrappedBySisterKey = $this->warmupCache;
590 $sisterKeysMissing = array_diff( $allSisterKeys, array_keys( $wrappedBySisterKey ) );
591 if ( $sisterKeysMissing ) {
592 $this->warmupKeyMisses += count( $sisterKeysMissing );
593 $wrappedBySisterKey += $this->cache->getMulti( $sisterKeysMissing );
597 $wrappedBySisterKey = $this->cache->getMulti( $allSisterKeys );
605 $checkSisterKeysForAll,
611 foreach ( $checkSisterKeysByKey as $keyWithCheckKeys => $checkKeysForKey ) {
621 foreach ( $valueSisterKeys as $valueSisterKey ) {
623 $key = current(
$keys );
626 if ( array_key_exists( $valueSisterKey, $wrappedBySisterKey ) ) {
628 $wrapped = $wrappedBySisterKey[$valueSisterKey];
635 $value =
$res[self::RES_VALUE];
637 foreach ( array_merge( $ckPurgesForAll, $ckPurgesByKey[$key] ?? [] ) as $ckPurge ) {
638 $res[self::RES_CHECK_AS_OF] = max(
639 $ckPurge[self::PURGE_TIME],
640 $res[self::RES_CHECK_AS_OF]
643 $holdoffDeadline = $ckPurge[self::PURGE_TIME] + $ckPurge[self::PURGE_HOLDOFF];
645 if ( $value !==
false && $holdoffDeadline >=
$res[self::RES_AS_OF] ) {
647 $ago = min( $ckPurge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
649 $res[self::RES_CUR_TTL] = min(
$res[self::RES_CUR_TTL], $ago );
653 if ( $touchedCb !==
null && $value !==
false ) {
654 $touched = $touchedCb( $value );
655 if ( $touched !==
null && $touched >=
$res[self::RES_AS_OF] ) {
656 $res[self::RES_CUR_TTL] = min(
657 $res[self::RES_CUR_TTL],
658 $res[self::RES_AS_OF] - $touched,
666 $res[self::RES_TOUCH_AS_OF] = max(
$res[self::RES_TOUCH_AS_OF], $touched );
668 $resByKey[$key] =
$res;
681 array $checkSisterKeys,
682 array $wrappedBySisterKey,
687 foreach ( $checkSisterKeys as $timeKey ) {
688 $purge = isset( $wrappedBySisterKey[$timeKey] )
692 if ( $purge ===
null ) {
694 $this->cache->add( $timeKey, $wrapped, self::CHECK_KEY_TTL );
783 final public function set( $key, $value, $ttl = self::TTL_INDEFINITE, array $opts = [] ) {
785 $dataReplicaLag = $opts[
'lag'] ?? 0;
786 $dataSnapshotLag = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
787 $dataCombinedLag = $dataReplicaLag + $dataSnapshotLag;
788 $dataPendingCommit = $opts[
'pending'] ??
null;
789 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
790 $staleTTL = $opts[
'staleTTL'] ?? self::STALE_TTL_NONE;
791 $creating = $opts[
'creating'] ??
false;
792 $version = $opts[
'version'] ??
null;
804 if ( $dataPendingCommit ) {
806 $mitigated =
'pending writes';
808 $mitigationTTL = self::TTL_UNCACHEABLE;
809 } elseif ( $dataSnapshotLag > self::MAX_READ_LAG ) {
811 $pregenSnapshotLag = ( $walltime !== null ) ? ( $dataSnapshotLag - $walltime ) : 0;
812 if ( ( $pregenSnapshotLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
814 $mitigated =
'snapshot lag (late generation)';
816 $mitigationTTL = self::TTL_UNCACHEABLE;
819 $mitigated =
'snapshot lag (high generation time)';
821 $mitigationTTL = self::LOW_TTL;
823 } elseif ( $dataReplicaLag ===
false || $dataReplicaLag > self::MAX_READ_LAG ) {
825 $mitigated =
'replication lag';
827 $mitigationTTL = self::TTL_LAGGED;
828 } elseif ( $dataCombinedLag > self::MAX_READ_LAG ) {
829 $pregenCombinedLag = ( $walltime !== null ) ? ( $dataCombinedLag - $walltime ) : 0;
831 if ( ( $pregenCombinedLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
833 $mitigated =
'read lag (late generation)';
835 $mitigationTTL = self::TTL_UNCACHEABLE;
838 $mitigated =
'read lag (high generation time)';
840 $mitigationTTL = self::LOW_TTL;
846 $mitigationTTL =
null;
849 if ( $mitigationTTL === self::TTL_UNCACHEABLE ) {
850 $this->logger->warning(
851 "Rejected set() for {cachekey} due to $mitigated.",
854 'lag' => $dataReplicaLag,
855 'age' => $dataSnapshotLag,
856 'walltime' => $walltime
866 if ( $mitigationTTL !==
null ) {
868 if ( $lockTSE >= 0 ) {
870 $logicalTTL = min( $ttl ?: INF, $mitigationTTL );
873 $ttl = min( $ttl ?: INF, $mitigationTTL );
876 $this->logger->warning(
877 "Lowered set() TTL for {cachekey} due to $mitigated.",
880 'lag' => $dataReplicaLag,
881 'age' => $dataSnapshotLag,
882 'walltime' => $walltime
888 $wrapped = $this->
wrap( $value, $logicalTTL ?: $ttl, $version, $now, $walltime );
889 $storeTTL = $ttl + $staleTTL;
892 $ok = $this->cache->add(
898 $ok = $this->cache->merge(
900 static function (
$cache, $key, $cWrapped ) use ( $wrapped ) {
902 return ( is_string( $cWrapped ) ) ?
false : $wrapped;
974 final public function delete( $key, $ttl = self::HOLDOFF_TTL ) {
978 $valueSisterKey = $this->
makeSisterKey( $key, self::TYPE_VALUE );
999 $this->stats->increment(
"wanobjectcache.$kClass.delete." . ( $ok ?
'ok' :
'error' ) );
1089 $checkSisterKeysByKey = [];
1090 foreach (
$keys as $key ) {
1091 $checkSisterKeysByKey[$key] = $this->
makeSisterKey( $key, self::TYPE_TIMESTAMP );
1094 $wrappedBySisterKey = $this->cache->getMulti( $checkSisterKeysByKey );
1095 $wrappedBySisterKey += array_fill_keys( $checkSisterKeysByKey,
false );
1099 foreach ( $checkSisterKeysByKey as $key => $checkSisterKey ) {
1100 $purge = $this->
parsePurgeValue( $wrappedBySisterKey[$checkSisterKey] );
1101 if ( $purge ===
null ) {
1103 $this->cache->add( $checkSisterKey, $wrapped, self::CHECK_KEY_TTL );
1106 $times[$key] = $purge[self::PURGE_TIME];
1146 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_TIMESTAMP );
1153 $this->stats->increment(
"wanobjectcache.$kClass.ck_touch." . ( $ok ?
'ok' :
'error' ) );
1186 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_TIMESTAMP );
1190 $this->stats->increment(
"wanobjectcache.$kClass.ck_reset." . ( $ok ?
'ok' :
'error' ) );
1499 $key, $ttl, $callback, array $opts = [], array $cbParams = []
1501 $version = $opts[
'version'] ??
null;
1502 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
1503 $pCache = ( $pcTTL >= 0 )
1510 if ( $pCache && $this->callbackDepth == 0 ) {
1511 $cached = $pCache->get( $key, $pcTTL,
false );
1512 if ( $cached !==
false ) {
1513 $this->logger->debug(
"getWithSetCallback($key): process cache hit" );
1519 list( $value, $valueVersion, $curAsOf ) =
$res;
1520 if ( $valueVersion !== $version ) {
1524 $this->logger->debug(
"getWithSetCallback($key): using variant key" );
1526 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), (
string)$version ),
1529 [
'version' =>
null,
'minAsOf' => $curAsOf ] + $opts,
1535 if ( $pCache && $value !==
false ) {
1536 $pCache->set( $key, $value );
1559 $checkKeys = $opts[
'checkKeys'] ?? [];
1560 $graceTTL = $opts[
'graceTTL'] ?? self::GRACE_TTL_NONE;
1561 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
1562 $hotTTR = $opts[
'hotTTR'] ?? self::HOT_TTR;
1563 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1564 $ageNew = $opts[
'ageNew'] ?? self::AGE_NEW;
1565 $touchedCb = $opts[
'touchedCallback'] ??
null;
1571 $curState = $this->
fetchKeys( [ $key ], $checkKeys, $touchedCb )[$key];
1572 $curValue = $curState[self::RES_VALUE];
1576 $this->stats->timing(
1577 "wanobjectcache.$kClass.hit.good",
1581 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1583 $this->logger->debug(
"fetchOrRegenerate($key): hit with async refresh" );
1584 $this->stats->timing(
1585 "wanobjectcache.$kClass.hit.refresh",
1589 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1591 $this->logger->debug(
"fetchOrRegenerate($key): hit with sync refresh" );
1595 $isKeyTombstoned = ( $curState[self::RES_TOMB_AS_OF] !== null );
1597 if ( $isKeyTombstoned ) {
1598 $volState = $this->
getInterimValue( $key, $minAsOf, $startTime, $touchedCb );
1599 $volValue = $volState[self::RES_VALUE];
1601 $volState = $curState;
1602 $volValue = $curValue;
1610 $lastPurgeTime = max(
1612 $volState[self::RES_TOUCH_AS_OF],
1613 $curState[self::RES_TOMB_AS_OF],
1614 $curState[self::RES_CHECK_AS_OF]
1616 $safeMinAsOf = max( $minAsOf, $lastPurgeTime + self::TINY_POSTIVE );
1618 $this->logger->debug(
"fetchOrRegenerate($key): volatile hit" );
1619 $this->stats->timing(
1620 "wanobjectcache.$kClass.hit.volatile",
1624 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1627 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
1628 $busyValue = $opts[
'busyValue'] ??
null;
1629 $staleTTL = $opts[
'staleTTL'] ?? self::STALE_TTL_NONE;
1630 $version = $opts[
'version'] ??
null;
1633 $useRegenerationLock =
1644 $curState[self::RES_CUR_TTL] !==
null &&
1645 $curState[self::RES_CUR_TTL] <= 0 &&
1646 abs( $curState[self::RES_CUR_TTL] ) <= $lockTSE
1650 ( $busyValue !==
null && $volValue ===
false );
1656 if ( $useRegenerationLock && !$hasLock ) {
1658 if ( $this->
isValid( $volValue, $volState[self::RES_AS_OF], $minAsOf ) ) {
1659 $this->logger->debug(
"fetchOrRegenerate($key): returning stale value" );
1660 $this->stats->timing(
1661 "wanobjectcache.$kClass.hit.stale",
1665 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1666 } elseif ( $busyValue !==
null ) {
1667 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1668 $this->logger->debug(
"fetchOrRegenerate($key): busy $miss" );
1669 $this->stats->timing(
1670 "wanobjectcache.$kClass.$miss.busy",
1675 return [ $placeholderValue, $version, $curState[self::RES_AS_OF] ];
1682 ++$this->callbackDepth;
1685 ( $curState[self::RES_VERSION] === $version ) ? $curValue :
false,
1688 ( $curState[self::RES_VERSION] === $version ) ? $curState[self::RES_AS_OF] :
null,
1692 --$this->callbackDepth;
1697 $elapsed = max( $postCallbackTime - $startTime, 0.0 );
1700 $walltime = max( $postCallbackTime - $preCallbackTime, 0.0 );
1701 $this->stats->timing(
"wanobjectcache.$kClass.regen_walltime", 1e3 * $walltime );
1706 ( $value !==
false && $ttl >= 0 ) &&
1708 ( !$useRegenerationLock || $hasLock || $isKeyTombstoned ) &&
1713 if ( $isKeyTombstoned ) {
1714 $this->
setInterimValue( $key, $value, $lockTSE, $version, $postCallbackTime, $walltime );
1718 'since' => $setOpts[
'since'] ?? $preCallbackTime,
1719 'version' => $version,
1720 'staleTTL' => $staleTTL,
1721 'lockTSE' => $lockTSE,
1722 'creating' => ( $curValue === false ),
1723 'walltime' => $walltime
1725 $this->
set( $key, $value, $ttl, $finalSetOpts );
1731 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1732 $this->logger->debug(
"fetchOrRegenerate($key): $miss, new value computed" );
1733 $this->stats->timing(
1734 "wanobjectcache.$kClass.$miss.compute",
1738 return [ $value, $version, $curState[self::RES_AS_OF] ];
1746 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_MUTEX );
1748 return $this->cache->add( $checkSisterKey, 1, self::LOCK_TTL );
1757 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_MUTEX );
1758 $this->cache->changeTTL( $checkSisterKey, (
int)$this->
getCurrentTime() - 60 );
1774 foreach ( $baseKeys as $baseKey ) {
1791 private function makeSisterKey(
string $baseKey,
string $typeChar,
string $route =
null ) {
1792 if ( $this->coalesceScheme === self::SCHEME_HASH_STOP ) {
1794 $sisterKey =
'WANCache:' . $baseKey .
'|#|' . $typeChar;
1797 $sisterKey =
'WANCache:{' . $baseKey .
'}:' . $typeChar;
1800 if ( $route !==
null ) {
1801 $sisterKey = $this->
prependRoute( $sisterKey, $route );
1814 if ( substr( $sisterKey, -4 ) ===
'|#|v' ) {
1816 $collection = substr( $sisterKey, 9, strcspn( $sisterKey,
':|', 9 ) );
1817 } elseif ( substr( $sisterKey, -3 ) ===
'}:v' ) {
1819 $collection = substr( $sisterKey, 10, strcspn( $sisterKey,
':}', 10 ) );
1821 $collection =
'internal';
1840 if (
$res[self::RES_VALUE] ===
false ||
$res[self::RES_AS_OF] < $minAsOf ) {
1844 $age = $now -
$res[self::RES_AS_OF];
1846 return ( $age < mt_rand( self::RECENT_SET_LOW_MS, self::RECENT_SET_HIGH_MS ) / 1e3 );
1871 $valueSisterKey = $this->
makeSisterKey( $key, self::TYPE_VALUE );
1872 list( $estimatedSize ) = $this->cache->setNewPreparedValues( [
1873 $valueSisterKey => $value
1885 $missesPerSecForHighQPS = ( min( $elapsed, 1 ) * $this->keyHighQps );
1897 if ( ( $missesPerSecForHighQPS * $estimatedSize ) >= $this->keyHighUplinkBps ) {
1898 $cooloffSisterKey = $this->
makeSisterKey( $key, self::TYPE_COOLOFF );
1899 $watchPoint = $this->cache->watchErrors();
1901 !$this->cache->add( $cooloffSisterKey, 1, self::COOLOFF_TTL ) &&
1903 $this->cache->getLastError( $watchPoint ) === self::ERR_NONE
1905 $this->stats->increment(
"wanobjectcache.$kClass.cooloff_bounce" );
1913 $this->stats->timing(
"wanobjectcache.$kClass.regen_set_delay", 1e3 * $elapsed );
1914 $this->stats->updateCount(
"wanobjectcache.$kClass.regen_set_bytes", $estimatedSize );
1930 $interimSisterKey = $this->
makeSisterKey( $key, self::TYPE_INTERIM );
1931 $wrapped = $this->cache->get( $interimSisterKey );
1933 if (
$res[self::RES_VALUE] !==
false &&
$res[self::RES_AS_OF] >= $minAsOf ) {
1934 if ( $touchedCb !==
null ) {
1937 $res[self::RES_TOUCH_AS_OF] = max(
1938 $touchedCb(
$res[self::RES_VALUE] ),
1939 $res[self::RES_TOUCH_AS_OF]
1947 return $this->
unwrap(
false, $now );
1959 $ttl = max( self::INTERIM_KEY_TTL, (
int)$ttl );
1961 $wrapped = $this->
wrap( $value, $ttl, $version, $now, $walltime );
1962 $this->cache->merge(
1964 static function () use ( $wrapped ) {
1977 return ( $busyValue instanceof Closure ) ? $busyValue() : $busyValue;
2045 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2050 $opts[
'checkKeys'] ?? []
2052 $this->warmupKeyMisses = 0;
2058 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2061 return $callback( $params[
'id'], $oldValue, $ttl, $setOpts, $oldAsOf );
2065 foreach ( $keyedIds as $key => $id ) {
2075 $this->warmupCache = [];
2146 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2148 $checkKeys = $opts[
'checkKeys'] ?? [];
2149 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
2150 unset( $opts[
'lockTSE'] );
2151 unset( $opts[
'busyValue'] );
2156 $this->warmupKeyMisses = 0;
2162 $resByKey = $this->
fetchKeys( $keysByIdGet, $checkKeys );
2163 foreach ( $keysByIdGet as $id => $key ) {
2164 $res = $resByKey[$key];
2166 $res[self::RES_VALUE] ===
false ||
2167 $res[self::RES_CUR_TTL] < 0 ||
2168 $res[self::RES_AS_OF] < $minAsOf
2176 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
2177 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
2183 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2184 use ( $callback, $newValsById, $newTTLsById, $newSetOpts )
2186 $id = $params[
'id'];
2188 if ( array_key_exists( $id, $newValsById ) ) {
2190 $newValue = $newValsById[$id];
2191 $ttl = $newTTLsById[$id];
2192 $setOpts = $newSetOpts;
2196 $ttls = [ $id => $ttl ];
2197 $newValue = $callback( [ $id ], $ttls, $setOpts )[$id];
2206 foreach ( $keyedIds as $key => $id ) {
2216 $this->warmupCache = [];
2233 final public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
2234 $valueSisterKey = $this->
makeSisterKey( $key, self::TYPE_VALUE );
2236 $minAsOf = $purgeTimestamp + self::HOLDOFF_TTL;
2237 $wrapped = $this->cache->get( $valueSisterKey );
2238 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
2240 $this->logger->warning(
"Reaping stale value key '$key'." );
2241 $ttlReap = self::HOLDOFF_TTL;
2242 $ok = $this->cache->changeTTL( $valueSisterKey, $ttlReap );
2244 $this->logger->error(
"Could not complete reap of key '$key'." );
2264 final public function reapCheckKey( $key, $purgeTimestamp, &$isStale =
false ) {
2265 $checkSisterKey = $this->
makeSisterKey( $key, self::TYPE_TIMESTAMP );
2267 $wrapped = $this->cache->get( $checkSisterKey );
2269 if ( $purge !==
null && $purge[self::PURGE_TIME] < $purgeTimestamp ) {
2271 $this->logger->warning(
"Reaping stale check key '$key'." );
2272 $ok = $this->cache->changeTTL( $checkSisterKey, self::TTL_SECOND );
2274 $this->logger->error(
"Could not complete reap of check key '$key'." );
2296 return $this->cache->makeGlobalKey( ...func_get_args() );
2309 public function makeKey( $collection, ...$components ) {
2310 return $this->cache->makeKey( ...func_get_args() );
2321 return hash_hmac(
'sha256', $component, $this->secret );
2376 foreach ( $ids as $id ) {
2378 if ( strlen( $id ) > 64 ) {
2379 $this->logger->warning( __METHOD__ .
": long ID '$id'; use hash256()" );
2381 $key = $keyCallback( $id, $this );
2383 if ( !isset( $idByKey[$key] ) ) {
2384 $idByKey[$key] = $id;
2385 } elseif ( (
string)$id !== (
string)$idByKey[$key] ) {
2386 throw new UnexpectedValueException(
2387 "Cache key collision; IDs ('$id','{$idByKey[$key]}') map to '$key'"
2392 return new ArrayIterator( $idByKey );
2431 if ( count( $ids ) !== count(
$res ) ) {
2434 $ids = array_keys( array_fill_keys( $ids,
true ) );
2435 if ( count( $ids ) !== count(
$res ) ) {
2436 throw new UnexpectedValueException(
"Multi-key result does not match ID list" );
2440 return array_combine( $ids,
$res );
2450 return $this->cache->watchErrors();
2471 $code = $this->cache->getLastError( $watchPoint );
2473 case self::ERR_NONE:
2474 return self::ERR_NONE;
2475 case self::ERR_NO_RESPONSE:
2476 return self::ERR_NO_RESPONSE;
2477 case self::ERR_UNREACHABLE:
2478 return self::ERR_UNREACHABLE;
2480 return self::ERR_UNEXPECTED;
2489 $this->cache->clearLastError();
2498 $this->processCaches = [];
2531 return $this->cache->getQoS( $flag );
2597 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
2598 $mtime = (int)$mtime;
2599 if ( $mtime <= 0 ) {
2605 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
2613 return $this->warmupKeyMisses;
2631 $purgeByRouteKey = [];
2632 foreach ( $purgeBySisterKey as $sisterKey => $purge ) {
2633 if ( $this->broadcastRoute !==
null ) {
2634 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2636 $routeKey = $sisterKey;
2638 $purgeByRouteKey[$routeKey] = $purge;
2641 if ( count( $purgeByRouteKey ) == 1 ) {
2642 $purge = reset( $purgeByRouteKey );
2643 $ok = $this->cache->set( key( $purgeByRouteKey ), $purge, $ttl );
2645 $ok = $this->cache->setMulti( $purgeByRouteKey, $ttl );
2660 if ( $this->broadcastRoute !==
null ) {
2661 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2663 $routeKey = $sisterKey;
2666 return $this->cache->delete( $routeKey );
2675 if ( $sisterKey[0] ===
'/' ) {
2676 throw new RuntimeException(
"Sister key '$sisterKey' already contains a route." );
2679 return $route . $sisterKey;
2694 if ( !$this->asyncHandler ) {
2701 $func = $this->asyncHandler;
2702 $func(
function () use ( $key, $ttl, $callback, $opts, $cbParams ) {
2703 $opts[
'minAsOf'] = INF;
2706 }
catch ( Exception $e ) {
2708 $this->logger->error(
'Async refresh failed for {key}', [
2729 if ( !$this->
isValid(
$res[self::RES_VALUE],
$res[self::RES_AS_OF], $minAsOf ) ) {
2734 $curTTL =
$res[self::RES_CUR_TTL];
2735 if ( $curTTL > 0 ) {
2741 $curGraceTTL = $graceTTL + $curTTL;
2743 return ( $curGraceTTL > 0 )
2761 $curTTL =
$res[self::RES_CUR_TTL];
2762 $logicalTTL =
$res[self::RES_TTL];
2763 $asOf =
$res[self::RES_AS_OF];
2787 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
2791 $age = $now - $asOf;
2792 $timeOld = $age - $ageNew;
2793 if ( $timeOld <= 0 ) {
2797 $popularHitsPerSec = 1;
2801 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
2805 $chance = 1 / ( $popularHitsPerSec * $refreshWindowSec );
2807 $chance *= ( $timeOld <= self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2809 $decision = ( mt_rand( 1, 1000000000 ) <= 1000000000 * $chance );
2833 if ( $lowTTL <= 0 ) {
2839 $effectiveLowTTL = min( $lowTTL, $logicalTTL ?: INF );
2841 if ( $curTTL >= $effectiveLowTTL || $curTTL <= 0 ) {
2845 $chance = ( 1 - $curTTL / $effectiveLowTTL );
2847 $decision = ( mt_rand( 1, 1000000000 ) <= 1000000000 * $chance );
2860 protected function isValid( $value, $asOf, $minAsOf ) {
2861 return ( $value !==
false && $asOf >= $minAsOf );
2872 private function wrap( $value, $ttl, $version, $now, $walltime ) {
2876 self::FLD_FORMAT_VERSION => self::VERSION,
2877 self::FLD_VALUE => $value,
2878 self::FLD_TTL => $ttl,
2879 self::FLD_TIME => $now
2881 if ( $version !==
null ) {
2882 $wrapped[self::FLD_VALUE_VERSION] = $version;
2884 if ( $walltime >= self::GENERATION_SLOW_SEC ) {
2885 $wrapped[self::FLD_GENERATION_TIME] = $walltime;
2909 self::RES_VALUE =>
false,
2910 self::RES_VERSION =>
null,
2911 self::RES_AS_OF =>
null,
2912 self::RES_TTL =>
null,
2913 self::RES_TOMB_AS_OF =>
null,
2915 self::RES_CHECK_AS_OF =>
null,
2916 self::RES_TOUCH_AS_OF =>
null,
2917 self::RES_CUR_TTL => null
2920 if ( is_array( $wrapped ) ) {
2923 ( $wrapped[self::FLD_FORMAT_VERSION] ??
null ) === self::VERSION &&
2924 $wrapped[self::FLD_TIME] >= $this->epoch
2926 if ( $wrapped[self::FLD_TTL] > 0 ) {
2928 $age = $now - $wrapped[self::FLD_TIME];
2929 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2934 $res[self::RES_VALUE] = $wrapped[self::FLD_VALUE];
2935 $res[self::RES_VERSION] = $wrapped[self::FLD_VALUE_VERSION] ??
null;
2936 $res[self::RES_AS_OF] = $wrapped[self::FLD_TIME];
2937 $res[self::RES_CUR_TTL] = $curTTL;
2938 $res[self::RES_TTL] = $wrapped[self::FLD_TTL];
2943 if ( $purge !==
null ) {
2945 $curTTL = min( $purge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
2946 $res[self::RES_CUR_TTL] = $curTTL;
2947 $res[self::RES_TOMB_AS_OF] = $purge[self::PURGE_TIME];
2959 $parts = explode(
':', $key, 3 );
2962 return strtr( $parts[1] ?? $parts[0],
'.',
'_' );
2974 if ( !is_string( $value ) ) {
2978 $segments = explode(
':', $value, 3 );
2979 $prefix = $segments[0];
2980 if ( $prefix !== self::PURGE_VAL_PREFIX ) {
2985 $timestamp = (float)$segments[1];
2987 $holdoff = isset( $segments[2] ) ? (int)$segments[2] : self::HOLDOFF_TTL;
2989 if ( $timestamp < $this->epoch ) {
2994 return [ self::PURGE_TIME => $timestamp, self::PURGE_HOLDOFF => $holdoff ];
3002 return self::PURGE_VAL_PREFIX .
':' . (int)$timestamp;
3012 $normalizedTime = (int)$timestamp;
3014 $purge = [ self::PURGE_TIME => (float)$normalizedTime, self::PURGE_HOLDOFF => $holdoff ];
3016 return self::PURGE_VAL_PREFIX .
":$normalizedTime:$holdoff";
3024 if ( !isset( $this->processCaches[$group] ) ) {
3025 list( , $size ) = explode(
':', $group );
3026 $this->processCaches[$group] =
new MapCacheLRU( (
int)$size );
3027 if ( $this->wallClockOverride !==
null ) {
3028 $this->processCaches[$group]->setMockTime( $this->wallClockOverride );
3032 return $this->processCaches[$group];
3041 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
3044 if ( $pcTTL > 0 && $this->callbackDepth == 0 ) {
3045 $pCache = $this->
getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY );
3046 foreach (
$keys as $key => $id ) {
3047 if ( !$pCache->has( $key, $pcTTL ) ) {
3048 $keysMissing[$id] = $key;
3053 return $keysMissing;
3070 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
3072 if ( is_int( $i ) ) {
3074 $sisterKeys[] = $this->
makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
3077 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
3078 $sisterKeys[] = $this->
makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
3083 $wrappedBySisterKey = $this->cache->getMulti( $sisterKeys );
3084 $wrappedBySisterKey += array_fill_keys( $sisterKeys,
false );
3086 return $wrappedBySisterKey;
3095 for ( end( $this->missLog ); $miss = current( $this->missLog ); prev( $this->missLog ) ) {
3096 if ( $miss[0] === $key ) {
3097 return ( $now - $miss[1] );
3109 if ( $this->wallClockOverride ) {
3110 return $this->wallClockOverride;
3113 $clockTime = (float)time();
3119 return max( microtime(
true ), $clockTime );
3127 $this->wallClockOverride =& $time;
3128 $this->cache->setMockTime( $time );
3129 foreach ( $this->processCaches as $pCache ) {
3130 $pCache->setMockTime( $time );
Class representing a cache/ephemeral data store.
A BagOStuff object with no objects in it.
Handles a simple LRU key/value map with a maximum number of entries.
Multi-datacenter aware caching interface.
makeGlobalKey( $collection,... $components)
Make a cache key for the global keyspace and given components.
int $callbackDepth
Callback stack depth for getWithSetCallback()
const PURGE_TIME
Key to the tombstone entry timestamp.
const RES_TOUCH_AS_OF
Highest "touched" timestamp for a key.
const HOLDOFF_TTL
Seconds to tombstone keys on delete() and to treat keys as volatile after purges.
const HOT_TTR
Expected time-till-refresh, in seconds, if the key is accessed once per second.
const KEY_VERSION
Version number attribute for a key; keep value for b/c (< 1.36)
__construct(array $params)
isValid( $value, $asOf, $minAsOf)
Check that a wrapper value exists and has an acceptable age.
const TYPE_TIMESTAMP
Single character component for timestamp check keys.
const RES_AS_OF
Generation completion timestamp attribute for a key.
worthRefreshPopular( $asOf, $ageNew, $timeTillRefresh, $now)
Check if a key is due for randomized regeneration due to its popularity.
fetchOrRegenerate( $key, $ttl, $callback, array $opts, array $cbParams)
Do the actual I/O for getWithSetCallback() when needed.
multiRemap(array $ids, array $res)
Get an (ID => value) map from (i) a non-unique list of entity IDs, and (ii) the list of corresponding...
const FLD_FORMAT_VERSION
Key to WAN cache version number; stored in blobs.
determineKeyClassForStats( $key)
const SCHEME_HASH_STOP
Use mcrouter-style Hash Stop key scheme (e.g.
const RES_VALUE
Value for a key.
const RES_VERSION
Version number attribute for a key.
prependRoute(string $sisterKey, string $route)
touchCheckKey( $key, $holdoff=self::HOLDOFF_TTL)
Increase the last-purge timestamp of a "check" key in all datacenters.
const FLD_VALUE
Key to the cached value; stored in blobs.
const PURGE_HOLDOFF
Key to the tombstone entry hold-off TTL.
adaptiveTTL( $mtime, $maxTTL, $minTTL=30, $factor=0.2)
Get a TTL that is higher for objects that have not changed recently.
const GRACE_TTL_NONE
Idiom for set()/getWithSetCallback() meaning "no post-expiration grace period".
int $warmupKeyMisses
Key fetched.
float null $wallClockOverride
relayVolatilePurges(array $purgeBySisterKey, int $ttl)
Set a sister key to a purge value in all datacenters.
mixed[] $warmupCache
Temporary warm-up cache.
const VERSION
Cache format version number.
const LOW_TTL
Consider regeneration if the key will expire within this many seconds.
BagOStuff $cache
The local datacenter cache.
fetchKeys(array $keys, array $checkKeys, $touchedCb=null)
Fetch the value and key metadata of several keys from cache.
parsePurgeValue( $value)
Extract purge metadata from cached value if it is a valid purge value.
const RES_TOMB_AS_OF
Tomstone timestamp attribute for a key.
scheduleAsyncRefresh( $key, $ttl, $callback, array $opts, array $cbParams)
Schedule a deferred cache regeneration if possible.
const RES_TTL
Logical TTL attribute for a key.
const GENERATION_HIGH_SEC
Consider value generation somewhat high if it takes this many seconds or more.
const GENERATION_SLOW_SEC
Consider value generation slow if it takes this many seconds or more.
const COOLOFF_TTL
Seconds to no-op key set() calls to avoid large blob I/O stampedes.
getWithSetCallback( $key, $ttl, $callback, array $opts=[], array $cbParams=[])
Method to fetch/regenerate a cache key.
getCheckKeyTime( $key)
Fetch the value of a timestamp "check" key.
getNonProcessCachedMultiKeys(ArrayIterator $keys, array $opts)
const SCHEME_HASH_TAG
Use twemproxy-style Hash Tag key scheme (e.g.
const RECENT_SET_HIGH_MS
Max millisecond set() backoff during hold-off (far less than INTERIM_KEY_TTL)
const LOCK_TTL
Seconds to keep lock keys around.
getMulti(array $keys, &$curTTLs=[], array $checkKeys=[], &$info=[])
Fetch the value of several keys from cache.
const PC_PRIMARY
Default process cache name and max key count.
getMultiWithUnionSetCallback(ArrayIterator $keyedIds, $ttl, callable $callback, array $opts=[])
Method to fetch/regenerate multiple cache keys at once.
const TYPE_MUTEX
Single character component for mutex lock keys.
relayNonVolatilePurge(string $sisterKey)
Remove a sister key from all datacenters.
getMultiWithSetCallback(ArrayIterator $keyedIds, $ttl, callable $callback, array $opts=[])
Method to fetch multiple cache keys at once with regeneration.
timeSinceLoggedMiss( $key, $now)
isExtremelyNewValue( $res, $minAsOf, $now)
Check if a key value is non-false, new enough, and has an "as of" time almost equal to now.
wrap( $value, $ttl, $version, $now, $walltime)
const PURGE_VAL_PREFIX
Value prefix of purge values.
const INTERIM_KEY_TTL
Seconds to keep interim value keys for tombstoned keys around.
makeMultiKeys(array $ids, $keyCallback)
Get an iterator of (cache key => entity ID) for a list of entity IDs.
makeTombstonePurgeValue(float $timestamp)
array< int, array > $missLog
List of (key, UNIX timestamp) tuples for get() cache misses.
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
int $coalesceScheme
Scheme to use for key coalescing (Hash Tags or Hash Stops)
const FLD_TTL
Key to the original TTL; stored in blobs.
isLotteryRefreshDue( $res, $lowTTL, $ageNew, $hotTTR, $now)
Check if a key is due for randomized regeneration due to near-expiration/popularity.
watchErrors()
Get a "watch point" token that can be used to get the "last error" to occur after now.
bool $useInterimHoldOffCaching
Whether to use "interim" caching while keys are tombstoned.
worthRefreshExpiring( $curTTL, $logicalTTL, $lowTTL)
Check if a key is nearing expiration and thus due for randomized regeneration.
const FLD_FLAGS
Key to the flags bit field (reserved number)
const HOLDOFF_TTL_NONE
Idiom for delete()/touchCheckKey() meaning "no hold-off period".
const MAX_READ_LAG
Max expected seconds of combined lag from replication and "view snapshots".
const RES_CUR_TTL
Remaining TTL attribute for a key.
const FLD_TIME
Key to the cache timestamp; stored in blobs.
const CHECK_KEY_TTL
Seconds to keep dependency purge keys around.
useInterimHoldOffCaching( $enabled)
Enable or disable the use of brief caching for tombstoned keys.
StatsdDataFactoryInterface $stats
makeSisterKeys(array $baseKeys, string $type, string $route=null)
Get sister keys that should be collocated with their corresponding base cache keys.
clearProcessCache()
Clear the in-process caches; useful for testing.
const KEY_AS_OF
Generation completion timestamp attribute for a key; keep value for b/c (< 1.36)
const TYPE_COOLOFF
Single character component for cool-off bounce keys.
const FLD_GENERATION_TIME
Key to how long it took to generate the value; stored in blobs.
getLastError( $watchPoint=0)
Get the "last error" registry.
makeSisterKey(string $baseKey, string $typeChar, string $route=null)
Get a sister key that should be collocated with a base cache key.
makeKey( $collection,... $components)
Make a cache key using the "global" keyspace for the given components.
float $epoch
Unix timestamp of the oldest possible valid values.
fetchWrappedValuesForWarmupCache(array $keys, array $checkKeys)
callable null $asyncHandler
Function that takes a WAN cache callback and runs it later.
string null $broadcastRoute
Routing prefix for operations that should be broadcasted to all data centers.
resolveBusyValue( $busyValue)
reap( $key, $purgeTimestamp, &$isStale=false)
Set a key to soon expire in the local cluster if it pre-dates $purgeTimestamp.
const RECENT_SET_LOW_MS
Min millisecond set() backoff during hold-off (far less than INTERIM_KEY_TTL)
setLogger(LoggerInterface $logger)
static getCollectionFromSisterKey(string $sisterKey)
reapCheckKey( $key, $purgeTimestamp, &$isStale=false)
Set a "check" key to soon expire in the local cluster if it pre-dates $purgeTimestamp.
const TYPE_INTERIM
Single character component for interium value keys.
const PASS_BY_REF
Idiom for get()/getMulti() to return extra information by reference.
checkAndSetCooloff( $key, $kClass, $value, $elapsed, $hasLock)
Check whether set() is rate-limited to avoid concurrent I/O spikes.
float $keyHighUplinkBps
Max tolerable bytes/second to spend on a cache write stampede for a key.
getInterimValue( $key, $minAsOf, $now, $touchedCb)
const KEY_CHECK_AS_OF
Highest "check" key timestamp for a key; keep value for b/c (< 1.36)
processCheckKeys(array $checkSisterKeys, array $wrappedBySisterKey, float $now)
setInterimValue( $key, $value, $ttl, $version, $now, $walltime)
isAcceptablyFreshValue( $res, $graceTTL, $minAsOf)
Check if a key value is non-false, new enough, and either fresh or "gracefully" stale.
clearLastError()
Clear the "last error" registry.
const STALE_TTL_NONE
Idiom for set()/getWithSetCallback() meaning "no post-expiration persistence".
MapCacheLRU[] $processCaches
Map of group PHP instance caches.
const TSE_NONE
Idiom for getWithSetCallback() meaning "no cache stampede mutex".
string $secret
Stable secret used for hasing long strings into key components.
const RES_CHECK_AS_OF
Highest "check" key timestamp for a key.
const TYPE_VALUE
Single character component for value keys.
resetCheckKey( $key)
Clear the last-purge timestamp of a "check" key in all datacenters.
const KEY_TOMB_AS_OF
Tomstone timestamp attribute for a key; keep value for b/c (< 1.36)
int $keyHighQps
Reads/second assumed during a hypothetical cache write stampede for a key.
const MAX_COMMIT_DELAY
Max expected seconds to pass between delete() and DB commit finishing.
const KEY_CUR_TTL
Remaining TTL attribute for a key; keep value for b/c (< 1.36)
const AGE_NEW
Minimum key age, in seconds, for expected time-till-refresh to be considered.
yieldStampedeLock( $key, $hasLock)
const RAMPUP_TTL
Seconds to ramp up the chance of regeneration due to expected time-till-refresh.
const TTL_LAGGED
Max TTL, in seconds, to store keys when a data source has high replication lag.
const FLD_VALUE_VERSION
Key to collection cache version number; stored in blobs.
hash256( $component)
Hash a possibly long string into a suitable component for makeKey()/makeGlobalKey()
getMultiCheckKeyTime(array $keys)
Fetch the values of each timestamp "check" key.
makeCheckPurgeValue(float $timestamp, int $holdoff, array &$purge=null)
const KEY_TTL
Logical TTL attribute for a key.
Generic interface for object stores with key encoding methods.