22 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
23 use Psr\Log\LoggerAwareInterface;
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
255 $this->
cache = $params[
'cache'];
256 $this->region =
$params[
'region'] ??
'main';
257 $this->cluster =
$params[
'cluster'] ??
'wan-main';
258 $this->mcrouterAware = !empty(
$params[
'mcrouterAware'] );
259 $this->epoch =
$params[
'epoch'] ?? 1.0;
261 $this->
setLogger( $params[
'logger'] ??
new NullLogger() );
263 $this->asyncHandler =
$params[
'asyncHandler'] ??
null;
331 final public function get(
332 $key, &$curTTL =
null,
array $checkKeys = [], &$info =
null
336 $values = $this->
getMulti( [ $key ], $curTTLs, $checkKeys, $infoByKey );
337 $curTTL = $curTTLs[$key] ??
null;
338 if ( $info === self::PASS_BY_REF ) {
340 'asOf' => $infoByKey[$key][
'asOf'] ??
null,
341 'tombAsOf' => $infoByKey[$key][
'tombAsOf'] ??
null,
342 'lastCKPurge' => $infoByKey[$key][
'lastCKPurge'] ??
null
345 $info = $infoByKey[$key][
'asOf'] ??
null;
348 return $values[$key] ??
false;
373 array $checkKeys = [],
380 $vPrefixLen = strlen( self::VALUE_KEY_PREFIX );
383 $checkKeysForAll = [];
384 $checkKeysByKey = [];
386 foreach ( $checkKeys
as $i => $checkKeyGroup ) {
388 $checkKeysFlat = array_merge( $checkKeysFlat, $prefixed );
390 if ( is_int( $i ) ) {
391 $checkKeysForAll = array_merge( $checkKeysForAll, $prefixed );
393 $checkKeysByKey[$i] = $prefixed;
398 $keysGet = array_merge( $valueKeys, $checkKeysFlat );
399 if ( $this->warmupCache ) {
400 $wrappedValues = array_intersect_key( $this->warmupCache, array_flip( $keysGet ) );
401 $keysGet = array_diff( $keysGet, array_keys( $wrappedValues ) );
402 $this->warmupKeyMisses +=
count( $keysGet );
407 $wrappedValues += $this->
cache->getMulti( $keysGet );
413 $purgeValuesForAll = $this->
processCheckKeys( $checkKeysForAll, $wrappedValues, $now );
414 $purgeValuesByKey = [];
415 foreach ( $checkKeysByKey
as $cacheKey => $checks ) {
416 $purgeValuesByKey[$cacheKey] =
421 foreach ( $valueKeys
as $vKey ) {
422 $key = substr( $vKey, $vPrefixLen );
423 list(
$value, $curTTL, $asOf, $tombAsOf ) = isset( $wrappedValues[$vKey] )
424 ? $this->
unwrap( $wrappedValues[$vKey], $now )
425 : [
false,
null,
null,
null ];
428 $purgeValues = $purgeValuesForAll;
429 if ( isset( $purgeValuesByKey[$key] ) ) {
430 $purgeValues = array_merge( $purgeValues, $purgeValuesByKey[$key] );
434 foreach ( $purgeValues
as $purge ) {
435 $lastCKPurge = max( $purge[self::FLD_TIME], $lastCKPurge );
437 if (
$value !==
false && $safeTimestamp >= $asOf ) {
439 $ago = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
441 $curTTL = min( $curTTL, $ago );
448 if ( $curTTL !==
null ) {
449 $curTTLs[$key] = $curTTL;
453 ? [
'asOf' => $asOf,
'tombAsOf' => $tombAsOf,
'lastCKPurge' => $lastCKPurge ]
471 foreach ( $timeKeys
as $timeKey ) {
472 $purge = isset( $wrappedValues[$timeKey] )
475 if ( $purge ===
false ) {
478 $this->
cache->add( $timeKey, $newVal, self::CHECK_KEY_TTL );
481 $purgeValues[] = $purge;
558 $age = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
559 $creating = $opts[
'creating'] ??
false;
560 $lag = $opts[
'lag'] ?? 0;
563 if ( !empty( $opts[
'pending'] ) ) {
565 'Rejected set() for {cachekey} due to pending writes.',
566 [
'cachekey' => $key ]
574 if ( $lag ===
false || ( $lag + $age ) > self::MAX_READ_LAG ) {
576 if ( $age > self::MAX_READ_LAG ) {
577 if ( $lockTSE >= 0 ) {
581 'Lowered set() TTL for {cachekey} due to snapshot lag.',
582 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ]
586 'Rejected set() for {cachekey} due to snapshot lag.',
587 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ]
593 } elseif ( $lag ===
false || $lag > self::MAX_READ_LAG ) {
594 if ( $lockTSE >= 0 ) {
595 $logicalTTL = min( $ttl ?: INF, self::TTL_LAGGED );
597 $ttl = min( $ttl ?: INF, self::TTL_LAGGED );
599 $this->logger->warning(
600 'Lowered set() TTL for {cachekey} due to replication lag.',
601 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ]
604 } elseif ( $lockTSE >= 0 ) {
608 'Lowered set() TTL for {cachekey} due to high read lag.',
609 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ]
613 'Rejected set() for {cachekey} due to high read lag.',
614 [
'cachekey' => $key,
'lag' => $lag,
'age' => $age ]
622 $wrapped = $this->
wrap( $value, $logicalTTL ?: $ttl, $now );
623 $storeTTL = $ttl + $staleTTL;
626 $ok = $this->
cache->add( self::VALUE_KEY_PREFIX . $key, $wrapped, $storeTTL );
628 $ok = $this->
cache->merge(
629 self::VALUE_KEY_PREFIX . $key,
630 function (
$cache, $key, $cWrapped )
use ( $wrapped ) {
632 return ( is_string( $cWrapped ) ) ?
false : $wrapped;
706 $ok = $this->
relayDelete( self::VALUE_KEY_PREFIX . $key );
709 $ok = $this->
relayPurge( self::VALUE_KEY_PREFIX . $key, $ttl, self::HOLDOFF_NONE );
713 $this->stats->increment(
"wanobjectcache.$kClass.delete." . ( $ok ?
'ok' :
'error' ) );
805 $rawKeys[$key] = self::TIME_KEY_PREFIX . $key;
808 $rawValues = $this->
cache->getMulti( $rawKeys );
809 $rawValues += array_fill_keys( $rawKeys,
false );
812 foreach ( $rawKeys
as $key => $rawKey ) {
814 if ( $purge !==
false ) {
827 $times[$key] =
$time;
869 $ok = $this->
relayPurge( self::TIME_KEY_PREFIX . $key, self::CHECK_KEY_TTL, $holdoff );
872 $this->stats->increment(
"wanobjectcache.$kClass.ck_touch." . ( $ok ?
'ok' :
'error' ) );
906 $ok = $this->
relayDelete( self::TIME_KEY_PREFIX . $key );
909 $this->stats->increment(
"wanobjectcache.$kClass.ck_reset." . ( $ok ?
'ok' :
'error' ) );
1221 if ( $pcTTL >= 0 && $this->callbackDepth == 0 ) {
1224 $value = $procCache->has( $key, $pcTTL ) ? $procCache->get( $key ) :
false;
1230 if (
$value ===
false ) {
1232 if ( isset( $opts[
'version'] ) ) {
1233 $version = $opts[
'version'];
1238 function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1239 use ( $callback, $version ) {
1240 if ( is_array( $oldValue )
1241 && array_key_exists( self::VFLD_DATA, $oldValue )
1242 && array_key_exists( self::VFLD_VERSION, $oldValue )
1243 && $oldValue[self::VFLD_VERSION] === $version
1253 self::VFLD_DATA => $callback( $oldData, $ttl, $setOpts, $oldAsOf ),
1254 self::VFLD_VERSION => $version
1260 if ( $cur[self::VFLD_VERSION] === $version ) {
1267 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), $version ),
1271 [
'version' =>
null,
'minAsOf' => $asOf ] + $opts
1279 if ( $procCache &&
$value !==
false ) {
1280 $procCache->set( $key,
$value );
1301 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1305 $checkKeys = $opts[
'checkKeys'] ?? [];
1306 $busyValue = $opts[
'busyValue'] ??
null;
1310 $needsVersion = isset( $opts[
'version'] );
1311 $touchedCb = $opts[
'touchedCallback'] ??
null;
1319 $curValue = $this->
get( $key, $curTTL, $checkKeys, $curInfo );
1321 list( $curTTL, $LPT ) = $this->
resolveCTL( $curValue, $curTTL, $curInfo, $touchedCb );
1324 $asOf = $curInfo[
'asOf'];
1328 $this->
isValid( $value, $needsVersion, $asOf, $minTime ) &&
1331 $preemptiveRefresh = (
1336 if ( !$preemptiveRefresh ) {
1337 $this->stats->increment(
"wanobjectcache.$kClass.hit.good" );
1341 $this->stats->increment(
"wanobjectcache.$kClass.hit.refresh" );
1347 $isKeyTombstoned = ( $curInfo[
'tombAsOf'] !==
null );
1348 if ( $isKeyTombstoned ) {
1358 $this->
isValid( $value, $needsVersion, $asOf, $minTime, $LPT ) &&
1361 $this->stats->increment(
"wanobjectcache.$kClass.hit.volatile" );
1377 ( $curTTL !==
null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE ) ||
1380 ( $busyValue !==
null &&
$value ===
false );
1385 if ( $this->
cache->add( self::MUTEX_KEY_PREFIX . $key, 1, self::LOCK_TTL ) ) {
1388 } elseif ( $this->
isValid( $value, $needsVersion, $asOf, $minTime ) ) {
1390 $this->stats->increment(
"wanobjectcache.$kClass.hit.stale" );
1395 if ( $busyValue !==
null ) {
1397 $miss = is_infinite( $minTime ) ?
'renew' :
'miss';
1398 $this->stats->increment(
"wanobjectcache.$kClass.$miss.busy" );
1400 return is_callable( $busyValue ) ? $busyValue() : $busyValue;
1405 if ( !is_callable( $callback ) ) {
1406 throw new InvalidArgumentException(
"Invalid cache miss callback provided." );
1414 $value = call_user_func_array( $callback, [ $curValue, &$ttl, &$setOpts, $asOf ] );
1418 $valueIsCacheable = (
$value !==
false && $ttl >= 0 );
1420 if ( $valueIsCacheable ) {
1422 $this->stats->timing(
"wanobjectcache.$kClass.regen_set_delay", 1000 * $ago );
1424 if ( $isKeyTombstoned ) {
1427 $tempTTL = max( self::INTERIM_KEY_TTL, (
int)$lockTSE );
1430 } elseif ( !$useMutex || $hasLock ) {
1432 $setOpts[
'creating'] = ( $curValue ===
false );
1434 $setOpts[
'lockTSE'] = $lockTSE;
1435 $setOpts[
'staleTTL'] = $staleTTL;
1437 $setOpts += [
'since' => $preCallbackTime ];
1439 $this->
set( $key,
$value, $ttl, $setOpts );
1445 $this->
cache->changeTTL( self::MUTEX_KEY_PREFIX . $key, (
int)$initialTime - 60 );
1448 $miss = is_infinite( $minTime ) ?
'renew' :
'miss';
1449 $this->stats->increment(
"wanobjectcache.$kClass.$miss.compute" );
1459 return ( $age < mt_rand( self::RECENT_SET_LOW_MS, self::RECENT_SET_HIGH_MS ) / 1e3 );
1477 if ( $lockTSE < 0 || $hasLock ) {
1479 } elseif ( $elapsed <= self::SET_DELAY_HIGH_MS * 1e3 ) {
1483 $this->
cache->clearLastError();
1485 !$this->
cache->add( self::COOLOFF_KEY_PREFIX . $key, 1, self::COOLOFF_TTL ) &&
1489 $this->stats->increment(
"wanobjectcache.$kClass.cooloff_bounce" );
1506 if ( $touchedCallback ===
null ||
$value ===
false ) {
1507 return [ $curTTL, max( $curInfo[
'tombAsOf'], $curInfo[
'lastCKPurge'] ) ];
1510 if ( !is_callable( $touchedCallback ) ) {
1511 throw new InvalidArgumentException(
"Invalid expiration callback provided." );
1514 $touched = $touchedCallback(
$value );
1515 if ( $touched !==
null && $touched >= $curInfo[
'asOf'] ) {
1516 $curTTL = min( $curTTL, self::TINY_NEGATIVE, $curInfo[
'asOf'] - $touched );
1519 return [ $curTTL, max( $curInfo[
'tombAsOf'], $curInfo[
'lastCKPurge'], $touched ) ];
1530 if ( $touchedCallback ===
null ||
$value ===
false ) {
1534 if ( !is_callable( $touchedCallback ) ) {
1535 throw new InvalidArgumentException(
"Invalid expiration callback provided." );
1538 return max( $touchedCallback(
$value ), $lastPurge );
1549 return [
false,
null ];
1552 $wrapped = $this->
cache->get( self::INTERIM_KEY_PREFIX . $key );
1555 if ( $this->
isValid( $value, $versioned, $valueAsOf, $minTime ) ) {
1556 return [
$value, $valueAsOf ];
1559 return [
false,
null ];
1569 $wrapped = $this->
wrap( $value, $tempTTL, $newAsOf );
1571 $this->
cache->merge(
1572 self::INTERIM_KEY_PREFIX . $key,
1573 function ()
use ( $wrapped ) {
1648 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1650 $valueKeys = array_keys( $keyedIds->getArrayCopy() );
1651 $checkKeys = $opts[
'checkKeys'] ?? [];
1659 $this->warmupKeyMisses = 0;
1663 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
use ( $callback, &$id ) {
1664 return $callback( $id, $oldValue, $ttl, $setOpts, $oldAsOf );
1668 foreach ( $keyedIds
as $key => $id ) {
1672 $this->warmupCache = [];
1743 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1745 $idsByValueKey = $keyedIds->getArrayCopy();
1746 $valueKeys = array_keys( $idsByValueKey );
1747 $checkKeys = $opts[
'checkKeys'] ?? [];
1749 unset( $opts[
'lockTSE'] );
1750 unset( $opts[
'busyValue'] );
1755 $this->warmupKeyMisses = 0;
1763 $curByKey = $this->
getMulti( $keysGet, $curTTLs, $checkKeys, $asOfs );
1764 foreach ( $keysGet
as $key ) {
1765 if ( !array_key_exists( $key, $curByKey ) || $curTTLs[$key] < 0 ) {
1766 $idsRegen[] = $idsByValueKey[$key];
1772 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
1773 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
1777 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1778 use ( $callback, &$id, $newValsById, $newTTLsById, $newSetOpts )
1780 if ( array_key_exists( $id, $newValsById ) ) {
1782 $newValue = $newValsById[$id];
1783 $ttl = $newTTLsById[$id];
1784 $setOpts = $newSetOpts;
1788 $ttls = [ $id => $ttl ];
1789 $newValue = $callback( [ $id ], $ttls, $setOpts )[$id];
1798 foreach ( $idsByValueKey
as $key => $id ) {
1802 $this->warmupCache = [];
1819 final public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
1821 $wrapped = $this->
cache->get( self::VALUE_KEY_PREFIX . $key );
1822 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
1824 $this->logger->warning(
"Reaping stale value key '$key'." );
1826 $ok = $this->
cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
1828 $this->logger->error(
"Could not complete reap of key '$key'." );
1848 final public function reapCheckKey( $key, $purgeTimestamp, &$isStale =
false ) {
1850 if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
1852 $this->logger->warning(
"Reaping stale check key '$key'." );
1853 $ok = $this->
cache->changeTTL( self::TIME_KEY_PREFIX . $key, self::TTL_SECOND );
1855 $this->logger->error(
"Could not complete reap of check key '$key'." );
1873 public function makeKey( $class, $component =
null ) {
1874 return $this->
cache->makeKey( ...func_get_args() );
1885 return $this->
cache->makeGlobalKey( ...func_get_args() );
1896 foreach ( $entities
as $entity ) {
1897 $map[$keyFunc( $entity, $this )] = $entity;
1900 return new ArrayIterator( $map );
1925 $this->
cache->clearLastError();
1934 $this->processCaches = [];
1967 return $this->
cache->getQoS( $flag );
2033 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
2034 if ( is_float( $mtime ) || ctype_digit( $mtime ) ) {
2035 $mtime = (int)$mtime;
2038 if ( !is_int( $mtime ) || $mtime <= 0 ) {
2044 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
2066 if ( $this->mcrouterAware ) {
2069 $ok = $this->
cache->set(
2070 "/*/{$this->cluster}/{$key}",
2076 $ok = $this->
cache->set(
2093 if ( $this->mcrouterAware ) {
2096 $ok = $this->
cache->delete(
"/*/{$this->cluster}/{$key}" );
2099 $ok = $this->
cache->delete( $key );
2113 if ( !$this->asyncHandler ) {
2118 $func(
function ()
use ( $key, $ttl, $callback, $opts ) {
2120 $opts[
'minAsOf'] = INF;
2141 if ( $curTTL > 0 ) {
2143 } elseif ( $graceTTL <= 0 ) {
2147 $ageStale = abs( $curTTL );
2148 $curGTTL = ( $graceTTL - $ageStale );
2149 if ( $curGTTL <= 0 ) {
2171 if ( $lowTTL <= 0 ) {
2173 } elseif ( $curTTL >= $lowTTL ) {
2175 } elseif ( $curTTL <= 0 ) {
2179 $chance = ( 1 - $curTTL / $lowTTL );
2181 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
2200 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
2204 $age = $now - $asOf;
2205 $timeOld = $age - $ageNew;
2206 if ( $timeOld <= 0 ) {
2213 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
2217 $chance = 1 / ( self::HIT_RATE_HIGH * $refreshWindowSec );
2220 $chance *= ( $timeOld <=
self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2222 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
2235 protected function isValid(
$value, $versioned, $asOf, $minTime, $purgeTime =
null ) {
2237 $safeMinTime = max( $minTime, $purgeTime + self::TINY_POSTIVE );
2239 if (
$value ===
false ) {
2241 } elseif ( $versioned && !isset(
$value[self::VFLD_VERSION] ) ) {
2243 } elseif ( $safeMinTime > 0 && $asOf < $minTime ) {
2261 self::FLD_VALUE =>
$value,
2262 self::FLD_TTL => $ttl,
2263 self::FLD_TIME => $now
2276 protected function unwrap( $wrapped, $now ) {
2279 if ( $purge !==
false ) {
2281 $curTTL = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
2285 if ( !is_array( $wrapped )
2286 || !isset( $wrapped[self::FLD_VERSION] )
2287 || $wrapped[self::FLD_VERSION] !== self::VERSION
2289 return [
false,
null,
null,
null ];
2292 if ( $wrapped[self::FLD_TTL] > 0 ) {
2295 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2301 if ( $wrapped[self::FLD_TIME] < $this->epoch ) {
2303 return [
false,
null,
null,
null ];
2317 $res[] = $prefix . $key;
2328 $parts = explode(
':', $key );
2330 return $parts[1] ?? $parts[0];
2339 if ( !is_string(
$value ) ) {
2343 $segments = explode(
':',
$value, 3 );
2344 if ( !isset( $segments[0] ) || !isset( $segments[1] )
2345 ||
"{$segments[0]}:" !== self::PURGE_VAL_PREFIX
2350 if ( !isset( $segments[2] ) ) {
2355 if ( $segments[1] < $this->epoch ) {
2361 self::FLD_TIME => (float)$segments[1],
2362 self::FLD_HOLDOFF => (
int)$segments[2],
2372 return self::PURGE_VAL_PREFIX . (float)$timestamp .
':' . (
int)$holdoff;
2380 if ( !isset( $this->processCaches[$group] ) ) {
2381 list( , $n ) = explode(
':', $group );
2382 $this->processCaches[$group] =
new MapCacheLRU( (
int)$n );
2385 return $this->processCaches[$group];
2396 if ( isset( $opts[
'pcTTL'] ) && $opts[
'pcTTL'] > 0 && $this->callbackDepth == 0 ) {
2400 if ( $procCache->has( $key, $pcTTL ) ) {
2401 $keysFound[] = $key;
2406 return array_diff(
$keys, $keysFound );
2422 $keysWarmUp[] = self::VALUE_KEY_PREFIX . $key;
2425 foreach ( $checkKeys
as $i => $checkKeyOrKeys ) {
2426 if ( is_int( $i ) ) {
2428 $keysWarmUp[] = self::TIME_KEY_PREFIX . $checkKeyOrKeys;
2431 $keysWarmUp = array_merge(
2433 self::prefixCacheKeys( $checkKeyOrKeys, self::TIME_KEY_PREFIX )
2449 if ( $this->wallClockOverride ) {
2453 $clockTime = (float)time();
2459 return max( microtime(
true ), $clockTime );
2467 $this->wallClockOverride =&
$time;