22 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23 use Psr\Log\LoggerAwareInterface;
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
225 $this->
cache = $params[
'cache'];
226 $this->purgeChannel = isset(
$params[
'channels'][
'purge'] )
229 $this->purgeRelayer = isset(
$params[
'relayers'][
'purge'] )
232 $this->region = isset(
$params[
'region'] ) ?
$params[
'region'] :
'main';
233 $this->cluster = isset(
$params[
'cluster'] ) ?
$params[
'cluster'] :
'wan-main';
234 $this->mcrouterAware = !empty(
$params[
'mcrouterAware'] );
238 $this->asyncHandler = isset(
$params[
'asyncHandler'] ) ?
$params[
'asyncHandler'] :
null;
300 final public function get( $key, &$curTTL =
null,
array $checkKeys = [], &$asOf = null ) {
303 $values = $this->
getMulti( [ $key ], $curTTLs, $checkKeys, $asOfs );
304 $curTTL = isset( $curTTLs[$key] ) ? $curTTLs[$key] :
null;
305 $asOf = isset( $asOfs[$key] ) ? $asOfs[$key] :
null;
307 return isset( $values[$key] ) ? $values[$key] :
false;
329 $vPrefixLen = strlen( self::VALUE_KEY_PREFIX );
332 $checkKeysForAll = [];
333 $checkKeysByKey = [];
335 foreach ( $checkKeys
as $i => $checkKeyGroup ) {
337 $checkKeysFlat = array_merge( $checkKeysFlat, $prefixed );
339 if ( is_int( $i ) ) {
340 $checkKeysForAll = array_merge( $checkKeysForAll, $prefixed );
342 $checkKeysByKey[$i] = isset( $checkKeysByKey[$i] )
343 ? array_merge( $checkKeysByKey[$i], $prefixed )
349 $keysGet = array_merge( $valueKeys, $checkKeysFlat );
350 if ( $this->warmupCache ) {
351 $wrappedValues = array_intersect_key( $this->warmupCache, array_flip( $keysGet ) );
352 $keysGet = array_diff( $keysGet, array_keys( $wrappedValues ) );
353 $this->warmupKeyMisses +=
count( $keysGet );
358 $wrappedValues += $this->
cache->getMulti( $keysGet );
364 $purgeValuesForAll = $this->
processCheckKeys( $checkKeysForAll, $wrappedValues, $now );
365 $purgeValuesByKey = [];
366 foreach ( $checkKeysByKey
as $cacheKey => $checks ) {
367 $purgeValuesByKey[$cacheKey] =
372 foreach ( $valueKeys
as $vKey ) {
373 if ( !isset( $wrappedValues[$vKey] ) ) {
377 $key = substr( $vKey, $vPrefixLen );
385 $purgeValues = $purgeValuesForAll;
386 if ( isset( $purgeValuesByKey[$key] ) ) {
387 $purgeValues = array_merge( $purgeValues, $purgeValuesByKey[$key] );
389 foreach ( $purgeValues
as $purge ) {
391 if ( $safeTimestamp >= $wrappedValues[$vKey][self::FLD_TIME] ) {
393 $ago = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
395 $curTTL = min( $curTTL, $ago );
399 $curTTLs[$key] = $curTTL;
400 $asOfs[$key] = (
$value !==
false ) ? $wrappedValues[$vKey][self::FLD_TIME] :
null;
415 foreach ( $timeKeys
as $timeKey ) {
416 $purge = isset( $wrappedValues[$timeKey] )
419 if ( $purge ===
false ) {
422 $this->
cache->add( $timeKey, $newVal, self::CHECK_KEY_TTL );
425 $purgeValues[] = $purge;
495 final public function set( $key,
$value, $ttl = 0,
array $opts = [] ) {
497 $lockTSE = isset( $opts[
'lockTSE'] ) ? $opts[
'lockTSE'] :
self::TSE_NONE;
499 $age = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
500 $lag = isset( $opts[
'lag'] ) ? $opts[
'lag'] : 0;
503 if ( !empty( $opts[
'pending'] ) ) {
504 $this->logger->info(
'Rejected set() for {cachekey} due to pending writes.',
505 [
'cachekey' => $key ] );
512 if ( $lag ===
false || ( $lag + $age ) > self::MAX_READ_LAG ) {
514 if ( $lockTSE >= 0 ) {
515 $ttl = max( 1, (
int)$lockTSE );
518 } elseif ( $age > self::MAX_READ_LAG ) {
519 $this->logger->info(
'Rejected set() for {cachekey} due to snapshot lag.',
520 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
524 } elseif ( $lag ===
false || $lag > self::MAX_READ_LAG ) {
526 $this->logger->warning(
'Lowered set() TTL for {cachekey} due to replication lag.',
527 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
530 $this->logger->info(
'Rejected set() for {cachekey} due to high read lag.',
531 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
538 $wrapped = $this->
wrap( $value, $ttl, $now ) + $wrapExtra;
540 $func =
function (
$cache, $key, $cWrapped )
use ( $wrapped ) {
541 return ( is_string( $cWrapped ) )
546 return $this->
cache->merge( self::VALUE_KEY_PREFIX . $key, $func, $ttl + $staleTTL, 1 );
611 $key = self::VALUE_KEY_PREFIX . $key;
618 $ok = $this->
relayPurge( $key, $ttl, self::HOLDOFF_NONE );
711 $rawKeys[$key] = self::TIME_KEY_PREFIX . $key;
714 $rawValues = $this->
cache->getMulti( $rawKeys );
715 $rawValues += array_fill_keys( $rawKeys,
false );
718 foreach ( $rawKeys
as $key => $rawKey ) {
720 if ( $purge !==
false ) {
733 $times[$key] =
$time;
775 return $this->
relayPurge( self::TIME_KEY_PREFIX . $key, self::CHECK_KEY_TTL, $holdoff );
807 return $this->
relayDelete( self::TIME_KEY_PREFIX . $key );
1060 if ( $pcTTL >= 0 && $this->callbackDepth == 0 ) {
1063 $value = $procCache->get( $key );
1069 if (
$value ===
false ) {
1071 if ( isset( $opts[
'version'] ) ) {
1072 $version = $opts[
'version'];
1077 function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1078 use ( $callback, $version ) {
1079 if ( is_array( $oldValue )
1080 && array_key_exists( self::VFLD_DATA, $oldValue )
1081 && array_key_exists( self::VFLD_VERSION, $oldValue )
1082 && $oldValue[self::VFLD_VERSION] === $version
1092 self::VFLD_DATA => $callback( $oldData, $ttl, $setOpts, $oldAsOf ),
1093 self::VFLD_VERSION => $version
1099 if ( $cur[self::VFLD_VERSION] === $version ) {
1106 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), $version ),
1110 [
'version' =>
null,
'minAsOf' => $asOf ] + $opts
1118 if ( $procCache &&
$value !==
false ) {
1119 $procCache->set( $key,
$value, $pcTTL );
1140 $lowTTL = isset( $opts[
'lowTTL'] ) ? $opts[
'lowTTL'] : min( self::LOW_TTL, $ttl );
1141 $lockTSE = isset( $opts[
'lockTSE'] ) ? $opts[
'lockTSE'] :
self::TSE_NONE;
1144 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1145 $busyValue = isset( $opts[
'busyValue'] ) ? $opts[
'busyValue'] :
null;
1146 $popWindow = isset( $opts[
'hotTTR'] ) ? $opts[
'hotTTR'] :
self::HOT_TTR;
1147 $ageNew = isset( $opts[
'ageNew'] ) ? $opts[
'ageNew'] :
self::AGE_NEW;
1149 $versioned = isset( $opts[
'version'] );
1156 $cValue = $this->
get( $key, $curTTL, $checkKeys, $asOf );
1161 if ( $value !==
false
1163 && $this->
isValid( $value, $versioned, $asOf, $minTime )
1165 $preemptiveRefresh = (
1170 if ( !$preemptiveRefresh ) {
1171 $this->stats->increment(
"wanobjectcache.$kClass.hit.good" );
1174 } elseif ( $this->asyncHandler ) {
1177 $func(
function ()
use ( $key, $ttl, $callback, $opts, $asOf ) {
1178 $opts[
'minAsOf'] = INF;
1181 $this->stats->increment(
"wanobjectcache.$kClass.hit.refresh" );
1188 $isTombstone = ( $curTTL !==
null &&
$value ===
false );
1189 if ( $isTombstone && $lockTSE <= 0 ) {
1194 $isHot = ( $curTTL !==
null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE );
1196 $checkBusy = ( $busyValue !==
null &&
$value ===
false );
1201 $useMutex = ( $isHot || ( $isTombstone && $lockTSE > 0 ) || $checkBusy );
1203 $lockAcquired =
false;
1206 if ( $this->
cache->add( self::MUTEX_KEY_PREFIX . $key, 1, self::LOCK_TTL ) ) {
1208 $lockAcquired =
true;
1209 } elseif (
$value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
1210 $this->stats->increment(
"wanobjectcache.$kClass.hit.stale" );
1218 if (
$value !==
false ) {
1219 $this->stats->increment(
"wanobjectcache.$kClass.hit.volatile" );
1224 if ( $busyValue !==
null ) {
1225 $this->stats->increment(
"wanobjectcache.$kClass.miss.busy" );
1227 return is_callable( $busyValue ) ? $busyValue() : $busyValue;
1232 if ( !is_callable( $callback ) ) {
1233 throw new InvalidArgumentException(
"Invalid cache miss callback provided." );
1240 $value = call_user_func_array( $callback, [ $cValue, &$ttl, &$setOpts, $asOf ] );
1244 $valueIsCacheable = (
$value !==
false && $ttl >= 0 );
1248 if ( ( $isTombstone && $lockTSE > 0 ) && $valueIsCacheable ) {
1249 $tempTTL = max( 1, (
int)$lockTSE );
1251 $wrapped = $this->
wrap( $value, $tempTTL, $newAsOf );
1256 if ( $valueIsCacheable ) {
1257 $setOpts[
'lockTSE'] = $lockTSE;
1258 $setOpts[
'staleTTL'] = $staleTTL;
1260 $setOpts += [
'since' => $preCallbackTime ];
1262 $this->
set( $key,
$value, $ttl, $setOpts );
1265 if ( $lockAcquired ) {
1267 $this->
cache->changeTTL( self::MUTEX_KEY_PREFIX . $key, (
int)$preCallbackTime - 60 );
1270 $this->stats->increment(
"wanobjectcache.$kClass.miss.compute" );
1287 $wrapped = $this->
cache->get( self::INTERIM_KEY_PREFIX . $key );
1289 if ( $value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
1304 $this->
cache->merge(
1305 self::INTERIM_KEY_PREFIX . $key,
1306 function ()
use ( $wrapped ) {
1381 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1383 $valueKeys = array_keys( $keyedIds->getArrayCopy() );
1384 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1391 $this->warmupKeyMisses = 0;
1395 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
use ( $callback, &$id ) {
1396 return $callback( $id, $oldValue, $ttl, $setOpts, $oldAsOf );
1400 foreach ( $keyedIds
as $key => $id ) {
1404 $this->warmupCache = [];
1475 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1477 $idsByValueKey = $keyedIds->getArrayCopy();
1478 $valueKeys = array_keys( $idsByValueKey );
1479 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1480 unset( $opts[
'lockTSE'] );
1481 unset( $opts[
'busyValue'] );
1486 $this->warmupKeyMisses = 0;
1494 $curByKey = $this->
getMulti( $keysGet, $curTTLs, $checkKeys, $asOfs );
1495 foreach ( $keysGet
as $key ) {
1496 if ( !array_key_exists( $key, $curByKey ) || $curTTLs[$key] < 0 ) {
1497 $idsRegen[] = $idsByValueKey[$key];
1503 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
1504 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
1508 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1509 use ( $callback, &$id, $newValsById, $newTTLsById, $newSetOpts )
1511 if ( array_key_exists( $id, $newValsById ) ) {
1513 $newValue = $newValsById[$id];
1514 $ttl = $newTTLsById[$id];
1515 $setOpts = $newSetOpts;
1519 $ttls = [ $id => $ttl ];
1520 $newValue = $callback( [ $id ], $ttls, $setOpts )[$id];
1529 foreach ( $idsByValueKey
as $key => $id ) {
1533 $this->warmupCache = [];
1550 final public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
1552 $wrapped = $this->
cache->get( self::VALUE_KEY_PREFIX . $key );
1553 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
1555 $this->logger->warning(
"Reaping stale value key '$key'." );
1557 $ok = $this->
cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
1559 $this->logger->error(
"Could not complete reap of key '$key'." );
1579 final public function reapCheckKey( $key, $purgeTimestamp, &$isStale =
false ) {
1581 if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
1583 $this->logger->warning(
"Reaping stale check key '$key'." );
1584 $ok = $this->
cache->changeTTL( self::TIME_KEY_PREFIX . $key, self::TTL_SECOND );
1586 $this->logger->error(
"Could not complete reap of check key '$key'." );
1604 public function makeKey( $class, $component =
null ) {
1605 return call_user_func_array( [ $this->
cache, __FUNCTION__ ], func_get_args() );
1616 return call_user_func_array( [ $this->
cache, __FUNCTION__ ], func_get_args() );
1627 foreach ( $entities
as $entity ) {
1628 $map[$keyFunc( $entity, $this )] = $entity;
1631 return new ArrayIterator( $map );
1639 if ( $this->lastRelayError ) {
1665 $this->
cache->clearLastError();
1675 $this->processCaches = [];
1708 return $this->
cache->getQoS( $flag );
1774 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
1775 if ( is_float( $mtime ) || ctype_digit( $mtime ) ) {
1776 $mtime = (int)$mtime;
1779 if ( !is_int( $mtime ) || $mtime <= 0 ) {
1785 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
1807 if ( $this->mcrouterAware ) {
1810 $ok = $this->
cache->set(
1811 "/*/{$this->cluster}/{$key}",
1817 $ok = $this->
cache->set(
1823 $event = $this->
cache->modifySimpleRelayEvent( [
1826 'val' =>
'PURGED:$UNIXTIME$:' . (
int)$holdoff,
1827 'ttl' => max( $ttl, self::TTL_SECOND ),
1831 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1847 if ( $this->mcrouterAware ) {
1850 $ok = $this->
cache->delete(
"/*/{$this->cluster}/{$key}" );
1853 $ok = $this->
cache->delete( $key );
1855 $event = $this->
cache->modifySimpleRelayEvent( [
1860 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1883 if ( $curTTL > 0 ) {
1885 } elseif ( $graceTTL <= 0 ) {
1889 $ageStale = abs( $curTTL );
1890 $curGTTL = ( $graceTTL - $ageStale );
1891 if ( $curGTTL <= 0 ) {
1913 if ( $lowTTL <= 0 ) {
1915 } elseif ( $curTTL >= $lowTTL ) {
1917 } elseif ( $curTTL <= 0 ) {
1921 $chance = ( 1 - $curTTL / $lowTTL );
1923 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1942 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
1946 $age = $now - $asOf;
1947 $timeOld = $age - $ageNew;
1948 if ( $timeOld <= 0 ) {
1955 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
1959 $chance = 1 / ( self::HIT_RATE_HIGH * $refreshWindowSec );
1962 $chance *= ( $timeOld <=
self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
1964 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1977 if ( $versioned && !isset(
$value[self::VFLD_VERSION] ) ) {
1979 } elseif ( $minTime > 0 && $asOf < $minTime ) {
1997 self::FLD_VALUE =>
$value,
1998 self::FLD_TTL => $ttl,
1999 self::FLD_TIME => $now
2010 protected function unwrap( $wrapped, $now ) {
2013 if ( $purge !==
false ) {
2015 $curTTL = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
2016 return [
false, $curTTL ];
2019 if ( !is_array( $wrapped )
2020 || !isset( $wrapped[self::FLD_VERSION] )
2021 || $wrapped[self::FLD_VERSION] !== self::VERSION
2023 return [
false, null ];
2026 $flags = isset( $wrapped[self::FLD_FLAGS] ) ? $wrapped[
self::FLD_FLAGS] : 0;
2027 if ( ( $flags & self::FLG_STALE ) == self::FLG_STALE ) {
2030 $curTTL = min( -$age, self::TINY_NEGATIVE );
2031 } elseif ( $wrapped[self::FLD_TTL] > 0 ) {
2034 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2051 $res[] = $prefix . $key;
2062 $parts = explode(
':', $key );
2064 return isset( $parts[1] ) ? $parts[1] : $parts[0];
2072 return microtime(
true );
2081 if ( !is_string(
$value ) ) {
2084 $segments = explode(
':',
$value, 3 );
2085 if ( !isset( $segments[0] ) || !isset( $segments[1] )
2086 ||
"{$segments[0]}:" !== self::PURGE_VAL_PREFIX
2090 if ( !isset( $segments[2] ) ) {
2095 self::FLD_TIME => (float)$segments[1],
2096 self::FLD_HOLDOFF => (
int)$segments[2],
2106 return self::PURGE_VAL_PREFIX . (float)$timestamp .
':' . (
int)$holdoff;
2114 if ( !isset( $this->processCaches[$group] ) ) {
2115 list( , $n ) = explode(
':', $group );
2116 $this->processCaches[$group] =
new HashBagOStuff( [
'maxKeys' => (
int)$n ] );
2119 return $this->processCaches[$group];
2129 if ( isset( $opts[
'pcTTL'] ) && $opts[
'pcTTL'] > 0 && $this->callbackDepth == 0 ) {
2130 $pcGroup = isset( $opts[
'pcGroup'] ) ? $opts[
'pcGroup'] :
self::PC_PRIMARY;
2133 if ( $procCache->get( $key ) !==
false ) {
2134 $keysFound[] = $key;
2139 return array_diff(
$keys, $keysFound );
2155 $keysWarmUp[] = self::VALUE_KEY_PREFIX . $key;
2158 foreach ( $checkKeys
as $i => $checkKeyOrKeys ) {
2159 if ( is_int( $i ) ) {
2161 $keysWarmUp[] = self::TIME_KEY_PREFIX . $checkKeyOrKeys;
2164 $keysWarmUp = array_merge(
2166 self::prefixCacheKeys( $checkKeyOrKeys, self::TIME_KEY_PREFIX )