23 use Psr\Log\LoggerAwareInterface;
24 use Psr\Log\LoggerInterface;
25 use Psr\Log\NullLogger;
181 $this->
cache = $params[
'cache'];
182 $this->purgeChannel = isset(
$params[
'channels'][
'purge'] )
185 $this->purgeRelayer = isset(
$params[
'relayers'][
'purge'] )
248 final public function get( $key, &$curTTL =
null,
array $checkKeys = [], &$asOf = null ) {
251 $values = $this->
getMulti( [ $key ], $curTTLs, $checkKeys, $asOfs );
252 $curTTL = isset( $curTTLs[$key] ) ? $curTTLs[$key] :
null;
253 $asOf = isset( $asOfs[$key] ) ? $asOfs[$key] :
null;
255 return isset( $values[$key] ) ? $values[$key] :
false;
277 $vPrefixLen = strlen( self::VALUE_KEY_PREFIX );
280 $checkKeysForAll = [];
281 $checkKeysByKey = [];
283 foreach ( $checkKeys
as $i => $checkKeyGroup ) {
285 $checkKeysFlat = array_merge( $checkKeysFlat, $prefixed );
287 if ( is_int( $i ) ) {
288 $checkKeysForAll = array_merge( $checkKeysForAll, $prefixed );
290 $checkKeysByKey[$i] = isset( $checkKeysByKey[$i] )
291 ? array_merge( $checkKeysByKey[$i], $prefixed )
297 $keysGet = array_merge( $valueKeys, $checkKeysFlat );
298 if ( $this->warmupCache ) {
299 $wrappedValues = array_intersect_key( $this->warmupCache, array_flip( $keysGet ) );
300 $keysGet = array_diff( $keysGet, array_keys( $wrappedValues ) );
304 $wrappedValues += $this->
cache->getMulti( $keysGet );
306 $now = microtime(
true );
309 $purgeValuesForAll = $this->
processCheckKeys( $checkKeysForAll, $wrappedValues, $now );
310 $purgeValuesByKey = [];
311 foreach ( $checkKeysByKey
as $cacheKey => $checks ) {
312 $purgeValuesByKey[$cacheKey] =
317 foreach ( $valueKeys
as $vKey ) {
318 if ( !isset( $wrappedValues[$vKey] ) ) {
322 $key = substr( $vKey, $vPrefixLen );
330 $purgeValues = $purgeValuesForAll;
331 if ( isset( $purgeValuesByKey[$key] ) ) {
332 $purgeValues = array_merge( $purgeValues, $purgeValuesByKey[$key] );
334 foreach ( $purgeValues
as $purge ) {
336 if ( $safeTimestamp >= $wrappedValues[$vKey][self::FLD_TIME] ) {
338 $ago = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
340 $curTTL = min( $curTTL, $ago );
344 $curTTLs[$key] = $curTTL;
345 $asOfs[$key] = (
$value !==
false ) ? $wrappedValues[$vKey][self::FLD_TIME] :
null;
360 foreach ( $timeKeys
as $timeKey ) {
361 $purge = isset( $wrappedValues[$timeKey] )
364 if ( $purge ===
false ) {
367 $this->
cache->add( $timeKey, $newVal, self::CHECK_KEY_TTL );
370 $purgeValues[] = $purge;
433 final public function set( $key,
$value, $ttl = 0,
array $opts = [] ) {
434 $now = microtime(
true );
435 $lockTSE = isset( $opts[
'lockTSE'] ) ? $opts[
'lockTSE'] :
self::TSE_NONE;
436 $age = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
437 $lag = isset( $opts[
'lag'] ) ? $opts[
'lag'] : 0;
438 $staleTTL = isset( $opts[
'staleTTL'] ) ? $opts[
'staleTTL'] : 0;
441 if ( !empty( $opts[
'pending'] ) ) {
442 $this->logger->info(
"Rejected set() for $key due to pending writes." );
449 if ( $lag ===
false || ( $lag + $age ) > self::MAX_READ_LAG ) {
451 if ( $lockTSE >= 0 ) {
452 $ttl = max( 1, (
int)$lockTSE );
455 } elseif ( $age > self::MAX_READ_LAG ) {
456 $this->logger->info(
"Rejected set() for $key due to snapshot lag." );
460 } elseif ( $lag ===
false || $lag > self::MAX_READ_LAG ) {
462 $this->logger->warning(
"Lowered set() TTL for $key due to replication lag." );
465 $this->logger->info(
"Rejected set() for $key due to high read lag." );
472 $wrapped = $this->
wrap( $value, $ttl, $now ) + $wrapExtra;
474 $func =
function (
$cache, $key, $cWrapped )
use ( $wrapped ) {
475 return ( is_string( $cWrapped ) )
480 return $this->
cache->merge( self::VALUE_KEY_PREFIX . $key, $func, $ttl + $staleTTL, 1 );
541 $key = self::VALUE_KEY_PREFIX . $key;
548 $ok = $this->
relayPurge( $key, $ttl, self::HOLDOFF_NONE );
574 $key = self::TIME_KEY_PREFIX . $key;
577 if ( $purge !==
false ) {
581 $now = (
string)microtime(
true );
582 $this->
cache->add( $key,
627 return $this->
relayPurge( self::TIME_KEY_PREFIX . $key, self::CHECK_KEY_TTL, $holdoff );
662 return $this->
relayDelete( self::TIME_KEY_PREFIX . $key );
860 if ( $pcTTL >= 0 && $this->callbackDepth == 0 ) {
863 $value = $procCache->get( $key );
871 if ( isset( $opts[
'version'] ) ) {
872 $version = $opts[
'version'];
877 function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
878 use ( $callback, $version ) {
879 if ( is_array( $oldValue )
880 && array_key_exists( self::VFLD_DATA, $oldValue )
889 self::VFLD_DATA => $callback( $oldData, $ttl, $setOpts, $oldAsOf ),
890 self::VFLD_VERSION => $version
896 if ( $cur[self::VFLD_VERSION] === $version ) {
903 'cache-variant:' . md5( $key ) .
":$version",
907 [
'version' =>
null,
'minAsOf' => $asOf ] + $opts
915 if ( $procCache &&
$value !==
false ) {
916 $procCache->set( $key,
$value, $pcTTL );
937 $lowTTL = isset( $opts[
'lowTTL'] ) ? $opts[
'lowTTL'] : min( self::LOW_TTL, $ttl );
938 $lockTSE = isset( $opts[
'lockTSE'] ) ? $opts[
'lockTSE'] :
self::TSE_NONE;
939 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
940 $busyValue = isset( $opts[
'busyValue'] ) ? $opts[
'busyValue'] :
null;
941 $popWindow = isset( $opts[
'hotTTR'] ) ? $opts[
'hotTTR'] :
self::HOT_TTR;
942 $ageNew = isset( $opts[
'ageNew'] ) ? $opts[
'ageNew'] :
self::AGE_NEW;
944 $versioned = isset( $opts[
'version'] );
948 $cValue = $this->
get( $key, $curTTL, $checkKeys, $asOf );
951 $preCallbackTime = microtime(
true );
955 && $this->
isValid( $value, $versioned, $asOf, $minTime )
963 $isTombstone = ( $curTTL !==
null &&
$value ===
false );
965 $isHot = ( $curTTL !==
null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE );
967 $checkBusy = ( $busyValue !==
null &&
$value ===
false );
972 $useMutex = ( $isHot || ( $isTombstone && $lockTSE > 0 ) || $checkBusy );
974 $lockAcquired =
false;
977 if ( $this->
cache->add( self::MUTEX_KEY_PREFIX . $key, 1, self::LOCK_TTL ) ) {
979 $lockAcquired =
true;
980 } elseif (
$value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
987 $wrapped = $this->
cache->get( self::INTERIM_KEY_PREFIX . $key );
989 if (
$value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
995 if ( $busyValue !==
null ) {
996 return is_callable( $busyValue ) ? $busyValue() : $busyValue;
1001 if ( !is_callable( $callback ) ) {
1002 throw new InvalidArgumentException(
"Invalid cache miss callback provided." );
1009 $value = call_user_func_array( $callback, [ $cValue, &$ttl, &$setOpts, $asOf ] );
1015 if ( ( $isTombstone && $lockTSE > 0 ) &&
$value !==
false && $ttl >= 0 ) {
1016 $tempTTL = max( 1, (
int)$lockTSE );
1017 $newAsOf = microtime(
true );
1018 $wrapped = $this->
wrap( $value, $tempTTL, $newAsOf );
1020 $this->
cache->merge(
1021 self::INTERIM_KEY_PREFIX . $key,
1022 function ()
use ( $wrapped ) {
1030 if (
$value !==
false && $ttl >= 0 ) {
1031 $setOpts[
'lockTSE'] = $lockTSE;
1033 $setOpts += [
'since' => $preCallbackTime ];
1035 $this->
set( $key,
$value, $ttl, $setOpts );
1038 if ( $lockAcquired ) {
1040 $this->
cache->changeTTL( self::MUTEX_KEY_PREFIX . $key, 1 );
1104 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1106 $keysWarmUp = iterator_to_array( $keyedIds,
true );
1107 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1108 foreach ( $checkKeys
as $i => $checkKeyOrKeys ) {
1109 if ( is_int( $i ) ) {
1110 $keysWarmUp[] = $checkKeyOrKeys;
1112 $keysWarmUp = array_merge( $keysWarmUp, $checkKeyOrKeys );
1116 $this->warmupCache = $this->
cache->getMulti( $keysWarmUp );
1117 $this->warmupCache += array_fill_keys( $keysWarmUp,
false );
1121 $func =
function ( $oldValue, &$ttl,
array $setOpts, $oldAsOf )
use ( $callback, &$id ) {
1122 return $callback( $id, $oldValue, $ttl, $setOpts, $oldAsOf );
1126 foreach ( $keyedIds
as $key => $id ) {
1130 $this->warmupCache = [];
1147 public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
1149 $wrapped = $this->
cache->get( self::VALUE_KEY_PREFIX . $key );
1150 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
1152 $this->logger->warning(
"Reaping stale value key '$key'." );
1154 $ok = $this->
cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
1156 $this->logger->error(
"Could not complete reap of key '$key'." );
1178 if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
1180 $this->logger->warning(
"Reaping stale check key '$key'." );
1181 $ok = $this->
cache->changeTTL( self::TIME_KEY_PREFIX . $key, 1 );
1183 $this->logger->error(
"Could not complete reap of check key '$key'." );
1201 return call_user_func_array( [ $this->
cache, __FUNCTION__ ], func_get_args() );
1211 return call_user_func_array( [ $this->
cache, __FUNCTION__ ], func_get_args() );
1222 foreach ( $entities
as $entity ) {
1223 $map[$keyFunc( $entity, $this )] = $entity;
1226 return new ArrayIterator( $map );
1234 if ( $this->lastRelayError ) {
1260 $this->
cache->clearLastError();
1270 $this->processCaches = [];
1279 return $this->
cache->getQoS( $flag );
1305 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = .2 ) {
1306 if ( is_float( $mtime ) || ctype_digit( $mtime ) ) {
1307 $mtime = (int)$mtime;
1310 if ( !is_int( $mtime ) || $mtime <= 0 ) {
1314 $age = time() - $mtime;
1316 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
1332 $ok = $this->
cache->set( $key,
1337 $event = $this->
cache->modifySimpleRelayEvent( [
1340 'val' =>
'PURGED:$UNIXTIME$:' . (
int)$holdoff,
1341 'ttl' => max( $ttl, 1 ),
1345 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1363 $ok = $this->
cache->delete( $key );
1365 $event = $this->
cache->modifySimpleRelayEvent( [
1370 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1392 if ( $curTTL >= $lowTTL ) {
1394 } elseif ( $curTTL <= 0 ) {
1398 $chance = ( 1 - $curTTL / $lowTTL );
1400 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1419 $age = $now - $asOf;
1420 $timeOld = $age - $ageNew;
1421 if ( $timeOld <= 0 ) {
1428 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
1432 $chance = 1 / ( self::HIT_RATE_HIGH * $refreshWindowSec );
1435 $chance *= ( $timeOld <=
self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
1437 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1450 if ( $versioned && !isset(
$value[self::VFLD_VERSION] ) ) {
1452 } elseif ( $minTime > 0 && $asOf < $minTime ) {
1470 self::FLD_VALUE =>
$value,
1471 self::FLD_TTL => $ttl,
1472 self::FLD_TIME => $now
1483 protected function unwrap( $wrapped, $now ) {
1486 if ( $purge !==
false ) {
1488 $curTTL = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
1489 return [
false, $curTTL ];
1492 if ( !is_array( $wrapped )
1493 || !isset( $wrapped[self::FLD_VERSION] )
1494 || $wrapped[self::FLD_VERSION] !== self::VERSION
1496 return [
false, null ];
1500 if ( (
$flags & self::FLG_STALE ) == self::FLG_STALE ) {
1503 $curTTL = min( -$age, self::TINY_NEGATIVE );
1504 } elseif ( $wrapped[self::FLD_TTL] > 0 ) {
1507 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
1524 $res[] = $prefix . $key;
1536 if ( !is_string(
$value ) ) {
1539 $segments = explode(
':',
$value, 3 );
1540 if ( !isset( $segments[0] ) || !isset( $segments[1] )
1541 ||
"{$segments[0]}:" !== self::PURGE_VAL_PREFIX
1545 if ( !isset( $segments[2] ) ) {
1550 self::FLD_TIME => (float)$segments[1],
1551 self::FLD_HOLDOFF => (
int)$segments[2],
1561 return self::PURGE_VAL_PREFIX . (float)$timestamp .
':' . (
int)$holdoff;
1569 if ( !isset( $this->processCaches[$group] ) ) {
1570 list( , $n ) = explode(
':', $group );
1571 $this->processCaches[$group] =
new HashBagOStuff( [
'maxKeys' => (
int)$n ] );
1574 return $this->processCaches[$group];