22 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23 use Psr\Log\LoggerAwareInterface;
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
262 $this->
cache = $params[
'cache'];
265 $this->region =
$params[
'region'] ??
'main';
266 $this->cluster =
$params[
'cluster'] ??
'wan-main';
267 $this->mcrouterAware = !empty(
$params[
'mcrouterAware'] );
268 $this->epoch =
$params[
'epoch'] ?? 1.0;
270 $this->
setLogger( $params[
'logger'] ??
new NullLogger() );
272 $this->asyncHandler =
$params[
'asyncHandler'] ??
null;
334 final public function get( $key, &$curTTL =
null,
array $checkKeys = [], &$asOf = null ) {
337 $values = $this->
getMulti( [ $key ], $curTTLs, $checkKeys, $asOfs );
338 $curTTL = $curTTLs[$key] ??
null;
339 $asOf = $asOfs[$key] ??
null;
341 return $values[$key] ??
false;
363 $vPrefixLen = strlen( self::VALUE_KEY_PREFIX );
366 $checkKeysForAll = [];
367 $checkKeysByKey = [];
369 foreach ( $checkKeys
as $i => $checkKeyGroup ) {
371 $checkKeysFlat = array_merge( $checkKeysFlat, $prefixed );
373 if ( is_int( $i ) ) {
374 $checkKeysForAll = array_merge( $checkKeysForAll, $prefixed );
376 $checkKeysByKey[$i] = isset( $checkKeysByKey[$i] )
377 ? array_merge( $checkKeysByKey[$i], $prefixed )
383 $keysGet = array_merge( $valueKeys, $checkKeysFlat );
384 if ( $this->warmupCache ) {
385 $wrappedValues = array_intersect_key( $this->warmupCache, array_flip( $keysGet ) );
386 $keysGet = array_diff( $keysGet, array_keys( $wrappedValues ) );
387 $this->warmupKeyMisses +=
count( $keysGet );
392 $wrappedValues += $this->
cache->getMulti( $keysGet );
398 $purgeValuesForAll = $this->
processCheckKeys( $checkKeysForAll, $wrappedValues, $now );
399 $purgeValuesByKey = [];
400 foreach ( $checkKeysByKey
as $cacheKey => $checks ) {
401 $purgeValuesByKey[$cacheKey] =
406 foreach ( $valueKeys
as $vKey ) {
407 if ( !isset( $wrappedValues[$vKey] ) ) {
411 $key = substr( $vKey, $vPrefixLen );
418 $purgeValues = $purgeValuesForAll;
419 if ( isset( $purgeValuesByKey[$key] ) ) {
420 $purgeValues = array_merge( $purgeValues, $purgeValuesByKey[$key] );
422 foreach ( $purgeValues
as $purge ) {
424 if ( $safeTimestamp >= $wrappedValues[$vKey][self::FLD_TIME] ) {
426 $ago = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
428 $curTTL = min( $curTTL, $ago );
432 $curTTLs[$key] = $curTTL;
433 $asOfs[$key] = (
$value !==
false ) ? $wrappedValues[$vKey][self::FLD_TIME] :
null;
448 foreach ( $timeKeys
as $timeKey ) {
449 $purge = isset( $wrappedValues[$timeKey] )
452 if ( $purge ===
false ) {
455 $this->
cache->add( $timeKey, $newVal, self::CHECK_KEY_TTL );
458 $purgeValues[] = $purge;
528 final public function set( $key,
$value, $ttl = 0,
array $opts = [] ) {
532 $age = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
533 $lag = $opts[
'lag'] ?? 0;
536 if ( !empty( $opts[
'pending'] ) ) {
537 $this->logger->info(
'Rejected set() for {cachekey} due to pending writes.',
538 [
'cachekey' => $key ] );
545 if ( $lag ===
false || ( $lag + $age ) > self::MAX_READ_LAG ) {
547 if ( $lockTSE >= 0 ) {
548 $ttl = max( 1, (
int)$lockTSE );
551 } elseif ( $age > self::MAX_READ_LAG ) {
552 $this->logger->info(
'Rejected set() for {cachekey} due to snapshot lag.',
553 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
557 } elseif ( $lag ===
false || $lag > self::MAX_READ_LAG ) {
559 $this->logger->warning(
'Lowered set() TTL for {cachekey} due to replication lag.',
560 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
563 $this->logger->info(
'Rejected set() for {cachekey} due to high read lag.',
564 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ] );
571 $wrapped = $this->
wrap( $value, $ttl, $now ) + $wrapExtra;
573 $func =
function (
$cache, $key, $cWrapped )
use ( $wrapped ) {
574 return ( is_string( $cWrapped ) )
579 return $this->
cache->merge( self::VALUE_KEY_PREFIX . $key, $func, $ttl + $staleTTL, 1 );
644 $key = self::VALUE_KEY_PREFIX . $key;
651 $ok = $this->
relayPurge( $key, $ttl, self::HOLDOFF_NONE );
744 $rawKeys[$key] = self::TIME_KEY_PREFIX . $key;
747 $rawValues = $this->
cache->getMulti( $rawKeys );
748 $rawValues += array_fill_keys( $rawKeys,
false );
751 foreach ( $rawKeys
as $key => $rawKey ) {
753 if ( $purge !==
false ) {
766 $times[$key] =
$time;
808 return $this->
relayPurge( self::TIME_KEY_PREFIX . $key, self::CHECK_KEY_TTL, $holdoff );
840 return $this->
relayDelete( self::TIME_KEY_PREFIX . $key );
1096 if ( $pcTTL >= 0 && $this->callbackDepth == 0 ) {
1099 $value = $procCache->has( $key, $pcTTL ) ? $procCache->get( $key ) :
false;
1105 if (
$value ===
false ) {
1107 if ( isset( $opts[
'version'] ) ) {
1108 $version = $opts[
'version'];
1113 function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1114 use ( $callback, $version ) {
1115 if ( is_array( $oldValue )
1116 && array_key_exists( self::VFLD_DATA, $oldValue )
1117 && array_key_exists( self::VFLD_VERSION, $oldValue )
1118 && $oldValue[self::VFLD_VERSION] === $version
1128 self::VFLD_DATA => $callback( $oldData, $ttl, $setOpts, $oldAsOf ),
1129 self::VFLD_VERSION => $version
1135 if ( $cur[self::VFLD_VERSION] === $version ) {
1142 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), $version ),
1146 [
'version' =>
null,
'minAsOf' => $asOf ] + $opts
1154 if ( $procCache &&
$value !==
false ) {
1155 $procCache->set( $key,
$value );
1176 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1180 $checkKeys = $opts[
'checkKeys'] ?? [];
1181 $busyValue = $opts[
'busyValue'] ??
null;
1185 $versioned = isset( $opts[
'version'] );
1192 $cValue = $this->
get( $key, $curTTL, $checkKeys, $asOf );
1197 if ( $value !==
false
1199 && $this->
isValid( $value, $versioned, $asOf, $minTime )
1201 $preemptiveRefresh = (
1206 if ( !$preemptiveRefresh ) {
1207 $this->stats->increment(
"wanobjectcache.$kClass.hit.good" );
1210 } elseif ( $this->asyncHandler ) {
1213 $func(
function ()
use ( $key, $ttl, $callback, $opts, $asOf ) {
1214 $opts[
'minAsOf'] = INF;
1217 $this->stats->increment(
"wanobjectcache.$kClass.hit.refresh" );
1224 $isTombstone = ( $curTTL !==
null &&
$value ===
false );
1225 if ( $isTombstone && $lockTSE <= 0 ) {
1230 $isHot = ( $curTTL !==
null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE );
1232 $checkBusy = ( $busyValue !==
null &&
$value ===
false );
1237 $useMutex = ( $isHot || ( $isTombstone && $lockTSE > 0 ) || $checkBusy );
1239 $lockAcquired =
false;
1242 if ( $this->
cache->add( self::MUTEX_KEY_PREFIX . $key, 1, self::LOCK_TTL ) ) {
1244 $lockAcquired =
true;
1245 } elseif (
$value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
1246 $this->stats->increment(
"wanobjectcache.$kClass.hit.stale" );
1254 if (
$value !==
false ) {
1255 $this->stats->increment(
"wanobjectcache.$kClass.hit.volatile" );
1260 if ( $busyValue !==
null ) {
1261 $miss = is_infinite( $minTime ) ?
'renew' :
'miss';
1262 $this->stats->increment(
"wanobjectcache.$kClass.$miss.busy" );
1264 return is_callable( $busyValue ) ? $busyValue() : $busyValue;
1269 if ( !is_callable( $callback ) ) {
1270 throw new InvalidArgumentException(
"Invalid cache miss callback provided." );
1277 $value = call_user_func_array( $callback, [ $cValue, &$ttl, &$setOpts, $asOf ] );
1281 $valueIsCacheable = (
$value !==
false && $ttl >= 0 );
1285 if ( ( $isTombstone && $lockTSE > 0 ) && $valueIsCacheable ) {
1286 $tempTTL = max( 1, (
int)$lockTSE );
1288 $wrapped = $this->
wrap( $value, $tempTTL, $newAsOf );
1293 if ( $valueIsCacheable ) {
1294 $setOpts[
'lockTSE'] = $lockTSE;
1295 $setOpts[
'staleTTL'] = $staleTTL;
1297 $setOpts += [
'since' => $preCallbackTime ];
1299 $this->
set( $key,
$value, $ttl, $setOpts );
1302 if ( $lockAcquired ) {
1304 $this->
cache->changeTTL( self::MUTEX_KEY_PREFIX . $key, (
int)$preCallbackTime - 60 );
1307 $miss = is_infinite( $minTime ) ?
'renew' :
'miss';
1308 $this->stats->increment(
"wanobjectcache.$kClass.$miss.compute" );
1325 $wrapped = $this->
cache->get( self::INTERIM_KEY_PREFIX . $key );
1327 if ( $value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
1342 $this->
cache->merge(
1343 self::INTERIM_KEY_PREFIX . $key,
1344 function ()
use ( $wrapped ) {
1419 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1421 $valueKeys = array_keys( $keyedIds->getArrayCopy() );
1422 $checkKeys = $opts[
'checkKeys'] ?? [];
1430 $this->warmupKeyMisses = 0;
1434 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
use ( $callback, &$id ) {
1435 return $callback( $id, $oldValue, $ttl, $setOpts, $oldAsOf );
1439 foreach ( $keyedIds
as $key => $id ) {
1443 $this->warmupCache = [];
1514 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1516 $idsByValueKey = $keyedIds->getArrayCopy();
1517 $valueKeys = array_keys( $idsByValueKey );
1518 $checkKeys = $opts[
'checkKeys'] ?? [];
1520 unset( $opts[
'lockTSE'] );
1521 unset( $opts[
'busyValue'] );
1526 $this->warmupKeyMisses = 0;
1534 $curByKey = $this->
getMulti( $keysGet, $curTTLs, $checkKeys, $asOfs );
1535 foreach ( $keysGet
as $key ) {
1536 if ( !array_key_exists( $key, $curByKey ) || $curTTLs[$key] < 0 ) {
1537 $idsRegen[] = $idsByValueKey[$key];
1543 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
1544 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
1548 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1549 use ( $callback, &$id, $newValsById, $newTTLsById, $newSetOpts )
1551 if ( array_key_exists( $id, $newValsById ) ) {
1553 $newValue = $newValsById[$id];
1554 $ttl = $newTTLsById[$id];
1555 $setOpts = $newSetOpts;
1559 $ttls = [ $id => $ttl ];
1560 $newValue = $callback( [ $id ], $ttls, $setOpts )[$id];
1569 foreach ( $idsByValueKey
as $key => $id ) {
1573 $this->warmupCache = [];
1590 final public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
1592 $wrapped = $this->
cache->get( self::VALUE_KEY_PREFIX . $key );
1593 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
1595 $this->logger->warning(
"Reaping stale value key '$key'." );
1597 $ok = $this->
cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
1599 $this->logger->error(
"Could not complete reap of key '$key'." );
1619 final public function reapCheckKey( $key, $purgeTimestamp, &$isStale =
false ) {
1621 if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
1623 $this->logger->warning(
"Reaping stale check key '$key'." );
1624 $ok = $this->
cache->changeTTL( self::TIME_KEY_PREFIX . $key, self::TTL_SECOND );
1626 $this->logger->error(
"Could not complete reap of check key '$key'." );
1644 public function makeKey( $class, $component =
null ) {
1645 return $this->
cache->makeKey( ...func_get_args() );
1656 return $this->
cache->makeGlobalKey( ...func_get_args() );
1667 foreach ( $entities
as $entity ) {
1668 $map[$keyFunc( $entity, $this )] = $entity;
1671 return new ArrayIterator( $map );
1679 if ( $this->lastRelayError ) {
1705 $this->
cache->clearLastError();
1715 $this->processCaches = [];
1748 return $this->
cache->getQoS( $flag );
1814 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
1815 if ( is_float( $mtime ) || ctype_digit( $mtime ) ) {
1816 $mtime = (int)$mtime;
1819 if ( !is_int( $mtime ) || $mtime <= 0 ) {
1825 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
1847 if ( $this->mcrouterAware ) {
1850 $ok = $this->
cache->set(
1851 "/*/{$this->cluster}/{$key}",
1857 $ok = $this->
cache->set(
1863 $event = $this->
cache->modifySimpleRelayEvent( [
1866 'val' =>
'PURGED:$UNIXTIME$:' . (
int)$holdoff,
1867 'ttl' => max( $ttl, self::TTL_SECOND ),
1871 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1887 if ( $this->mcrouterAware ) {
1890 $ok = $this->
cache->delete(
"/*/{$this->cluster}/{$key}" );
1893 $ok = $this->
cache->delete( $key );
1895 $event = $this->
cache->modifySimpleRelayEvent( [
1900 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1923 if ( $curTTL > 0 ) {
1925 } elseif ( $graceTTL <= 0 ) {
1929 $ageStale = abs( $curTTL );
1930 $curGTTL = ( $graceTTL - $ageStale );
1931 if ( $curGTTL <= 0 ) {
1953 if ( $lowTTL <= 0 ) {
1955 } elseif ( $curTTL >= $lowTTL ) {
1957 } elseif ( $curTTL <= 0 ) {
1961 $chance = ( 1 - $curTTL / $lowTTL );
1963 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1982 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
1986 $age = $now - $asOf;
1987 $timeOld = $age - $ageNew;
1988 if ( $timeOld <= 0 ) {
1995 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
1999 $chance = 1 / ( self::HIT_RATE_HIGH * $refreshWindowSec );
2002 $chance *= ( $timeOld <=
self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2004 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
2017 if ( $versioned && !isset(
$value[self::VFLD_VERSION] ) ) {
2019 } elseif ( $minTime > 0 && $asOf < $minTime ) {
2037 self::FLD_VALUE =>
$value,
2038 self::FLD_TTL => $ttl,
2039 self::FLD_TIME => $now
2050 protected function unwrap( $wrapped, $now ) {
2053 if ( $purge !==
false ) {
2055 $curTTL = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
2056 return [
false, $curTTL ];
2059 if ( !is_array( $wrapped )
2060 || !isset( $wrapped[self::FLD_VERSION] )
2061 || $wrapped[self::FLD_VERSION] !== self::VERSION
2063 return [
false, null ];
2067 if ( ( $flags & self::FLG_STALE ) == self::FLG_STALE ) {
2070 $curTTL = min( -$age, self::TINY_NEGATIVE );
2071 } elseif ( $wrapped[self::FLD_TTL] > 0 ) {
2074 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2080 if ( $wrapped[self::FLD_TIME] < $this->epoch ) {
2082 return [
false, null ];
2096 $res[] = $prefix . $key;
2107 $parts = explode(
':', $key );
2109 return $parts[1] ?? $parts[0];
2118 if ( !is_string(
$value ) ) {
2122 $segments = explode(
':',
$value, 3 );
2123 if ( !isset( $segments[0] ) || !isset( $segments[1] )
2124 ||
"{$segments[0]}:" !== self::PURGE_VAL_PREFIX
2129 if ( !isset( $segments[2] ) ) {
2134 if ( $segments[1] < $this->epoch ) {
2140 self::FLD_TIME => (float)$segments[1],
2141 self::FLD_HOLDOFF => (
int)$segments[2],
2151 return self::PURGE_VAL_PREFIX . (float)$timestamp .
':' . (
int)$holdoff;
2159 if ( !isset( $this->processCaches[$group] ) ) {
2160 list( , $n ) = explode(
':', $group );
2161 $this->processCaches[$group] =
new MapCacheLRU( (
int)$n );
2164 return $this->processCaches[$group];
2175 if ( isset( $opts[
'pcTTL'] ) && $opts[
'pcTTL'] > 0 && $this->callbackDepth == 0 ) {
2179 if ( $procCache->has( $key, $pcTTL ) ) {
2180 $keysFound[] = $key;
2185 return array_diff(
$keys, $keysFound );
2201 $keysWarmUp[] = self::VALUE_KEY_PREFIX . $key;
2204 foreach ( $checkKeys
as $i => $checkKeyOrKeys ) {
2205 if ( is_int( $i ) ) {
2207 $keysWarmUp[] = self::TIME_KEY_PREFIX . $checkKeyOrKeys;
2210 $keysWarmUp = array_merge(
2212 self::prefixCacheKeys( $checkKeyOrKeys, self::TIME_KEY_PREFIX )
2228 return $this->wallClockOverride ?: microtime(
true );
2236 $this->wallClockOverride =&
$time;