22 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23 use Psr\Log\LoggerAwareInterface;
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
228 $this->
cache = $params[
'cache'];
229 $this->purgeChannel = isset(
$params[
'channels'][
'purge'] )
232 $this->purgeRelayer = isset(
$params[
'relayers'][
'purge'] )
235 $this->region = isset(
$params[
'region'] ) ?
$params[
'region'] :
'main';
236 $this->cluster = isset(
$params[
'cluster'] ) ?
$params[
'cluster'] :
'wan-main';
237 $this->mcrouterAware = !empty(
$params[
'mcrouterAware'] );
241 $this->asyncHandler = isset(
$params[
'asyncHandler'] ) ?
$params[
'asyncHandler'] :
null;
303 final public function get( $key, &$curTTL =
null,
array $checkKeys = [], &$asOf = null ) {
306 $values = $this->
getMulti( [ $key ], $curTTLs, $checkKeys, $asOfs );
307 $curTTL = isset( $curTTLs[$key] ) ? $curTTLs[$key] :
null;
308 $asOf = isset( $asOfs[$key] ) ? $asOfs[$key] :
null;
310 return isset( $values[$key] ) ? $values[$key] :
false;
332 $vPrefixLen = strlen( self::VALUE_KEY_PREFIX );
335 $checkKeysForAll = [];
336 $checkKeysByKey = [];
338 foreach ( $checkKeys
as $i => $checkKeyGroup ) {
340 $checkKeysFlat = array_merge( $checkKeysFlat, $prefixed );
342 if ( is_int( $i ) ) {
343 $checkKeysForAll = array_merge( $checkKeysForAll, $prefixed );
345 $checkKeysByKey[$i] = isset( $checkKeysByKey[$i] )
346 ? array_merge( $checkKeysByKey[$i], $prefixed )
352 $keysGet = array_merge( $valueKeys, $checkKeysFlat );
353 if ( $this->warmupCache ) {
354 $wrappedValues = array_intersect_key( $this->warmupCache, array_flip( $keysGet ) );
355 $keysGet = array_diff( $keysGet, array_keys( $wrappedValues ) );
356 $this->warmupKeyMisses += count( $keysGet );
361 $wrappedValues += $this->
cache->getMulti( $keysGet );
367 $purgeValuesForAll = $this->
processCheckKeys( $checkKeysForAll, $wrappedValues, $now );
368 $purgeValuesByKey = [];
369 foreach ( $checkKeysByKey
as $cacheKey => $checks ) {
370 $purgeValuesByKey[$cacheKey] =
375 foreach ( $valueKeys
as $vKey ) {
376 if ( !isset( $wrappedValues[$vKey] ) ) {
380 $key = substr( $vKey, $vPrefixLen );
388 $purgeValues = $purgeValuesForAll;
389 if ( isset( $purgeValuesByKey[$key] ) ) {
390 $purgeValues = array_merge( $purgeValues, $purgeValuesByKey[$key] );
392 foreach ( $purgeValues
as $purge ) {
394 if ( $safeTimestamp >= $wrappedValues[$vKey][self::FLD_TIME] ) {
396 $ago = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
398 $curTTL = min( $curTTL, $ago );
402 $curTTLs[$key] = $curTTL;
403 $asOfs[$key] = (
$value !==
false ) ? $wrappedValues[$vKey][self::FLD_TIME] :
null;
418 foreach ( $timeKeys
as $timeKey ) {
419 $purge = isset( $wrappedValues[$timeKey] )
422 if ( $purge ===
false ) {
425 $this->
cache->add( $timeKey, $newVal, self::CHECK_KEY_TTL );
428 $purgeValues[] = $purge;
498 final public function set( $key,
$value, $ttl = 0,
array $opts = [] ) {
500 $lockTSE = isset( $opts[
'lockTSE'] ) ? $opts[
'lockTSE'] :
self::TSE_NONE;
502 $age = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
503 $lag = isset( $opts[
'lag'] ) ? $opts[
'lag'] : 0;
506 if ( !empty( $opts[
'pending'] ) ) {
507 $this->logger->info(
'Rejected set() for {cachekey} due to pending writes.',
508 [
'cachekey' => $key ] );
515 if ( $lag ===
false || ( $lag + $age ) > self::MAX_READ_LAG ) {
517 if ( $lockTSE >= 0 ) {
518 $ttl = max( 1, (
int)$lockTSE );
521 } elseif ( $age > self::MAX_READ_LAG ) {
522 $this->logger->info(
'Rejected set() for {cachekey} due to snapshot lag.',
523 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
527 } elseif ( $lag ===
false || $lag > self::MAX_READ_LAG ) {
529 $this->logger->warning(
'Lowered set() TTL for {cachekey} due to replication lag.',
530 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
533 $this->logger->info(
'Rejected set() for {cachekey} due to high read lag.',
534 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
541 $wrapped = $this->
wrap( $value, $ttl, $now ) + $wrapExtra;
543 $func =
function (
$cache, $key, $cWrapped )
use ( $wrapped ) {
544 return ( is_string( $cWrapped ) )
549 return $this->
cache->merge( self::VALUE_KEY_PREFIX . $key, $func, $ttl + $staleTTL, 1 );
614 $key = self::VALUE_KEY_PREFIX . $key;
621 $ok = $this->
relayPurge( $key, $ttl, self::HOLDOFF_NONE );
714 $rawKeys[$key] = self::TIME_KEY_PREFIX . $key;
717 $rawValues = $this->
cache->getMulti( $rawKeys );
718 $rawValues += array_fill_keys( $rawKeys,
false );
721 foreach ( $rawKeys
as $key => $rawKey ) {
723 if ( $purge !==
false ) {
736 $times[$key] =
$time;
778 return $this->
relayPurge( self::TIME_KEY_PREFIX . $key, self::CHECK_KEY_TTL, $holdoff );
810 return $this->
relayDelete( self::TIME_KEY_PREFIX . $key );
1063 if ( $pcTTL >= 0 && $this->callbackDepth == 0 ) {
1066 $value = $procCache->get( $key );
1072 if (
$value ===
false ) {
1074 if ( isset( $opts[
'version'] ) ) {
1075 $version = $opts[
'version'];
1080 function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1081 use ( $callback, $version ) {
1082 if ( is_array( $oldValue )
1083 && array_key_exists( self::VFLD_DATA, $oldValue )
1084 && array_key_exists( self::VFLD_VERSION, $oldValue )
1085 && $oldValue[self::VFLD_VERSION] === $version
1095 self::VFLD_DATA => $callback( $oldData, $ttl, $setOpts, $oldAsOf ),
1096 self::VFLD_VERSION => $version
1102 if ( $cur[self::VFLD_VERSION] === $version ) {
1109 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), $version ),
1113 [
'version' =>
null,
'minAsOf' => $asOf ] + $opts
1121 if ( $procCache &&
$value !==
false ) {
1122 $procCache->set( $key,
$value, $pcTTL );
1143 $lowTTL = isset( $opts[
'lowTTL'] ) ? $opts[
'lowTTL'] : min( self::LOW_TTL, $ttl );
1144 $lockTSE = isset( $opts[
'lockTSE'] ) ? $opts[
'lockTSE'] :
self::TSE_NONE;
1147 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1148 $busyValue = isset( $opts[
'busyValue'] ) ? $opts[
'busyValue'] :
null;
1149 $popWindow = isset( $opts[
'hotTTR'] ) ? $opts[
'hotTTR'] :
self::HOT_TTR;
1150 $ageNew = isset( $opts[
'ageNew'] ) ? $opts[
'ageNew'] :
self::AGE_NEW;
1152 $versioned = isset( $opts[
'version'] );
1159 $cValue = $this->
get( $key, $curTTL, $checkKeys, $asOf );
1164 if ( $value !==
false
1166 && $this->
isValid( $value, $versioned, $asOf, $minTime )
1168 $preemptiveRefresh = (
1173 if ( !$preemptiveRefresh ) {
1174 $this->stats->increment(
"wanobjectcache.$kClass.hit.good" );
1177 } elseif ( $this->asyncHandler ) {
1180 $func(
function ()
use ( $key, $ttl, $callback, $opts, $asOf ) {
1181 $opts[
'minAsOf'] = INF;
1184 $this->stats->increment(
"wanobjectcache.$kClass.hit.refresh" );
1191 $isTombstone = ( $curTTL !==
null &&
$value ===
false );
1192 if ( $isTombstone && $lockTSE <= 0 ) {
1197 $isHot = ( $curTTL !==
null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE );
1199 $checkBusy = ( $busyValue !==
null &&
$value ===
false );
1204 $useMutex = ( $isHot || ( $isTombstone && $lockTSE > 0 ) || $checkBusy );
1206 $lockAcquired =
false;
1209 if ( $this->
cache->add( self::MUTEX_KEY_PREFIX . $key, 1, self::LOCK_TTL ) ) {
1211 $lockAcquired =
true;
1212 } elseif (
$value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
1213 $this->stats->increment(
"wanobjectcache.$kClass.hit.stale" );
1221 if (
$value !==
false ) {
1222 $this->stats->increment(
"wanobjectcache.$kClass.hit.volatile" );
1227 if ( $busyValue !==
null ) {
1228 $this->stats->increment(
"wanobjectcache.$kClass.miss.busy" );
1230 return is_callable( $busyValue ) ? $busyValue() : $busyValue;
1235 if ( !is_callable( $callback ) ) {
1236 throw new InvalidArgumentException(
"Invalid cache miss callback provided." );
1243 $value = call_user_func_array( $callback, [ $cValue, &$ttl, &$setOpts, $asOf ] );
1247 $valueIsCacheable = (
$value !==
false && $ttl >= 0 );
1251 if ( ( $isTombstone && $lockTSE > 0 ) && $valueIsCacheable ) {
1252 $tempTTL = max( 1, (
int)$lockTSE );
1254 $wrapped = $this->
wrap( $value, $tempTTL, $newAsOf );
1259 if ( $valueIsCacheable ) {
1260 $setOpts[
'lockTSE'] = $lockTSE;
1261 $setOpts[
'staleTTL'] = $staleTTL;
1263 $setOpts += [
'since' => $preCallbackTime ];
1265 $this->
set( $key,
$value, $ttl, $setOpts );
1268 if ( $lockAcquired ) {
1270 $this->
cache->changeTTL( self::MUTEX_KEY_PREFIX . $key, (
int)$preCallbackTime - 60 );
1273 $this->stats->increment(
"wanobjectcache.$kClass.miss.compute" );
1290 $wrapped = $this->
cache->get( self::INTERIM_KEY_PREFIX . $key );
1292 if ( $value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
1307 $this->
cache->merge(
1308 self::INTERIM_KEY_PREFIX . $key,
1309 function ()
use ( $wrapped ) {
1384 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1386 $valueKeys = array_keys( $keyedIds->getArrayCopy() );
1387 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1394 $this->warmupKeyMisses = 0;
1398 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
use ( $callback, &$id ) {
1399 return $callback( $id, $oldValue, $ttl, $setOpts, $oldAsOf );
1403 foreach ( $keyedIds
as $key => $id ) {
1407 $this->warmupCache = [];
1478 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1480 $idsByValueKey = $keyedIds->getArrayCopy();
1481 $valueKeys = array_keys( $idsByValueKey );
1482 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1483 unset( $opts[
'lockTSE'] );
1484 unset( $opts[
'busyValue'] );
1489 $this->warmupKeyMisses = 0;
1497 $curByKey = $this->
getMulti( $keysGet, $curTTLs, $checkKeys, $asOfs );
1498 foreach ( $keysGet
as $key ) {
1499 if ( !array_key_exists( $key, $curByKey ) || $curTTLs[$key] < 0 ) {
1500 $idsRegen[] = $idsByValueKey[$key];
1506 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
1507 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
1511 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1512 use ( $callback, &$id, $newValsById, $newTTLsById, $newSetOpts )
1514 if ( array_key_exists( $id, $newValsById ) ) {
1516 $newValue = $newValsById[$id];
1517 $ttl = $newTTLsById[$id];
1518 $setOpts = $newSetOpts;
1522 $ttls = [ $id => $ttl ];
1523 $newValue = $callback( [ $id ], $ttls, $setOpts )[$id];
1532 foreach ( $idsByValueKey
as $key => $id ) {
1536 $this->warmupCache = [];
1553 final public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
1555 $wrapped = $this->
cache->get( self::VALUE_KEY_PREFIX . $key );
1556 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
1558 $this->logger->warning(
"Reaping stale value key '$key'." );
1560 $ok = $this->
cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
1562 $this->logger->error(
"Could not complete reap of key '$key'." );
1582 final public function reapCheckKey( $key, $purgeTimestamp, &$isStale =
false ) {
1584 if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
1586 $this->logger->warning(
"Reaping stale check key '$key'." );
1587 $ok = $this->
cache->changeTTL( self::TIME_KEY_PREFIX . $key, self::TTL_SECOND );
1589 $this->logger->error(
"Could not complete reap of check key '$key'." );
1607 public function makeKey( $class, $component =
null ) {
1608 return call_user_func_array( [ $this->
cache, __FUNCTION__ ], func_get_args() );
1619 return call_user_func_array( [ $this->
cache, __FUNCTION__ ], func_get_args() );
1630 foreach ( $entities
as $entity ) {
1631 $map[$keyFunc( $entity, $this )] = $entity;
1634 return new ArrayIterator( $map );
1642 if ( $this->lastRelayError ) {
1668 $this->
cache->clearLastError();
1678 $this->processCaches = [];
1711 return $this->
cache->getQoS( $flag );
1777 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
1778 if ( is_float( $mtime ) || ctype_digit( $mtime ) ) {
1779 $mtime = (int)$mtime;
1782 if ( !is_int( $mtime ) || $mtime <= 0 ) {
1788 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
1810 if ( $this->mcrouterAware ) {
1813 $ok = $this->
cache->set(
1814 "/*/{$this->cluster}/{$key}",
1820 $ok = $this->
cache->set(
1826 $event = $this->
cache->modifySimpleRelayEvent( [
1829 'val' =>
'PURGED:$UNIXTIME$:' . (
int)$holdoff,
1830 'ttl' => max( $ttl, self::TTL_SECOND ),
1834 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1850 if ( $this->mcrouterAware ) {
1853 $ok = $this->
cache->delete(
"/*/{$this->cluster}/{$key}" );
1856 $ok = $this->
cache->delete( $key );
1858 $event = $this->
cache->modifySimpleRelayEvent( [
1863 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1886 if ( $curTTL > 0 ) {
1888 } elseif ( $graceTTL <= 0 ) {
1892 $ageStale = abs( $curTTL );
1893 $curGTTL = ( $graceTTL - $ageStale );
1894 if ( $curGTTL <= 0 ) {
1916 if ( $lowTTL <= 0 ) {
1918 } elseif ( $curTTL >= $lowTTL ) {
1920 } elseif ( $curTTL <= 0 ) {
1924 $chance = ( 1 - $curTTL / $lowTTL );
1926 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1945 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
1949 $age = $now - $asOf;
1950 $timeOld = $age - $ageNew;
1951 if ( $timeOld <= 0 ) {
1958 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
1962 $chance = 1 / ( self::HIT_RATE_HIGH * $refreshWindowSec );
1965 $chance *= ( $timeOld <=
self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
1967 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1980 if ( $versioned && !isset(
$value[self::VFLD_VERSION] ) ) {
1982 } elseif ( $minTime > 0 && $asOf < $minTime ) {
2000 self::FLD_VALUE =>
$value,
2001 self::FLD_TTL => $ttl,
2002 self::FLD_TIME => $now
2013 protected function unwrap( $wrapped, $now ) {
2016 if ( $purge !==
false ) {
2018 $curTTL = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
2019 return [
false, $curTTL ];
2022 if ( !is_array( $wrapped )
2023 || !isset( $wrapped[self::FLD_VERSION] )
2024 || $wrapped[self::FLD_VERSION] !== self::VERSION
2026 return [
false, null ];
2029 $flags = isset( $wrapped[self::FLD_FLAGS] ) ? $wrapped[
self::FLD_FLAGS] : 0;
2030 if ( ( $flags & self::FLG_STALE ) == self::FLG_STALE ) {
2033 $curTTL = min( -$age, self::TINY_NEGATIVE );
2034 } elseif ( $wrapped[self::FLD_TTL] > 0 ) {
2037 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2054 $res[] = $prefix . $key;
2065 $parts = explode(
':', $key );
2067 return isset( $parts[1] ) ? $parts[1] : $parts[0];
2076 if ( !is_string(
$value ) ) {
2079 $segments = explode(
':',
$value, 3 );
2080 if ( !isset( $segments[0] ) || !isset( $segments[1] )
2081 ||
"{$segments[0]}:" !== self::PURGE_VAL_PREFIX
2085 if ( !isset( $segments[2] ) ) {
2090 self::FLD_TIME => (float)$segments[1],
2091 self::FLD_HOLDOFF => (
int)$segments[2],
2101 return self::PURGE_VAL_PREFIX . (float)$timestamp .
':' . (
int)$holdoff;
2109 if ( !isset( $this->processCaches[$group] ) ) {
2110 list( , $n ) = explode(
':', $group );
2111 $this->processCaches[$group] =
new HashBagOStuff( [
'maxKeys' => (
int)$n ] );
2114 return $this->processCaches[$group];
2124 if ( isset( $opts[
'pcTTL'] ) && $opts[
'pcTTL'] > 0 && $this->callbackDepth == 0 ) {
2125 $pcGroup = isset( $opts[
'pcGroup'] ) ? $opts[
'pcGroup'] :
self::PC_PRIMARY;
2128 if ( $procCache->get( $key ) !==
false ) {
2129 $keysFound[] = $key;
2134 return array_diff(
$keys, $keysFound );
2150 $keysWarmUp[] = self::VALUE_KEY_PREFIX . $key;
2153 foreach ( $checkKeys
as $i => $checkKeyOrKeys ) {
2154 if ( is_int( $i ) ) {
2156 $keysWarmUp[] = self::TIME_KEY_PREFIX . $checkKeyOrKeys;
2159 $keysWarmUp = array_merge(
2161 self::prefixCacheKeys( $checkKeyOrKeys, self::TIME_KEY_PREFIX )
2177 return $this->wallClockOverride ?: microtime(
true );
2185 $this->wallClockOverride =&
$time;