22 use Psr\Log\LoggerAwareInterface;
23 use Psr\Log\LoggerInterface;
24 use Psr\Log\NullLogger;
182 $this->
cache = $params[
'cache'];
183 $this->purgeChannel = isset(
$params[
'channels'][
'purge'] )
186 $this->purgeRelayer = isset(
$params[
'relayers'][
'purge'] )
248 final public function get( $key, &$curTTL =
null,
array $checkKeys = [], &$asOf = null ) {
251 $values = $this->
getMulti( [ $key ], $curTTLs, $checkKeys, $asOfs );
252 $curTTL = isset( $curTTLs[$key] ) ? $curTTLs[$key] :
null;
253 $asOf = isset( $asOfs[$key] ) ? $asOfs[$key] :
null;
255 return isset( $values[$key] ) ? $values[$key] :
false;
277 $vPrefixLen = strlen( self::VALUE_KEY_PREFIX );
280 $checkKeysForAll = [];
281 $checkKeysByKey = [];
283 foreach ( $checkKeys
as $i => $checkKeyGroup ) {
285 $checkKeysFlat = array_merge( $checkKeysFlat, $prefixed );
287 if ( is_int( $i ) ) {
288 $checkKeysForAll = array_merge( $checkKeysForAll, $prefixed );
290 $checkKeysByKey[$i] = isset( $checkKeysByKey[$i] )
291 ? array_merge( $checkKeysByKey[$i], $prefixed )
297 $keysGet = array_merge( $valueKeys, $checkKeysFlat );
298 if ( $this->warmupCache ) {
299 $wrappedValues = array_intersect_key( $this->warmupCache, array_flip( $keysGet ) );
300 $keysGet = array_diff( $keysGet, array_keys( $wrappedValues ) );
301 $this->warmupKeyMisses +=
count( $keysGet );
306 $wrappedValues += $this->
cache->getMulti( $keysGet );
309 $now = microtime(
true );
312 $purgeValuesForAll = $this->
processCheckKeys( $checkKeysForAll, $wrappedValues, $now );
313 $purgeValuesByKey = [];
314 foreach ( $checkKeysByKey
as $cacheKey => $checks ) {
315 $purgeValuesByKey[$cacheKey] =
320 foreach ( $valueKeys
as $vKey ) {
321 if ( !isset( $wrappedValues[$vKey] ) ) {
325 $key = substr( $vKey, $vPrefixLen );
333 $purgeValues = $purgeValuesForAll;
334 if ( isset( $purgeValuesByKey[$key] ) ) {
335 $purgeValues = array_merge( $purgeValues, $purgeValuesByKey[$key] );
337 foreach ( $purgeValues
as $purge ) {
339 if ( $safeTimestamp >= $wrappedValues[$vKey][self::FLD_TIME] ) {
341 $ago = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
343 $curTTL = min( $curTTL, $ago );
347 $curTTLs[$key] = $curTTL;
348 $asOfs[$key] = (
$value !==
false ) ? $wrappedValues[$vKey][self::FLD_TIME] :
null;
363 foreach ( $timeKeys
as $timeKey ) {
364 $purge = isset( $wrappedValues[$timeKey] )
367 if ( $purge ===
false ) {
370 $this->
cache->add( $timeKey, $newVal, self::CHECK_KEY_TTL );
373 $purgeValues[] = $purge;
436 final public function set( $key,
$value, $ttl = 0,
array $opts = [] ) {
437 $now = microtime(
true );
438 $lockTSE = isset( $opts[
'lockTSE'] ) ? $opts[
'lockTSE'] :
self::TSE_NONE;
439 $age = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
440 $lag = isset( $opts[
'lag'] ) ? $opts[
'lag'] : 0;
441 $staleTTL = isset( $opts[
'staleTTL'] ) ? $opts[
'staleTTL'] : 0;
444 if ( !empty( $opts[
'pending'] ) ) {
445 $this->logger->info(
"Rejected set() for $key due to pending writes." );
452 if ( $lag ===
false || ( $lag + $age ) > self::MAX_READ_LAG ) {
454 if ( $lockTSE >= 0 ) {
455 $ttl = max( 1, (
int)$lockTSE );
458 } elseif ( $age > self::MAX_READ_LAG ) {
459 $this->logger->info(
"Rejected set() for $key due to snapshot lag." );
463 } elseif ( $lag ===
false || $lag > self::MAX_READ_LAG ) {
465 $this->logger->warning(
"Lowered set() TTL for $key due to replication lag." );
468 $this->logger->info(
"Rejected set() for $key due to high read lag." );
475 $wrapped = $this->
wrap( $value, $ttl, $now ) + $wrapExtra;
477 $func =
function (
$cache, $key, $cWrapped )
use ( $wrapped ) {
478 return ( is_string( $cWrapped ) )
483 return $this->
cache->merge( self::VALUE_KEY_PREFIX . $key, $func, $ttl + $staleTTL, 1 );
544 $key = self::VALUE_KEY_PREFIX . $key;
551 $ok = $this->
relayPurge( $key, $ttl, self::HOLDOFF_NONE );
577 $key = self::TIME_KEY_PREFIX . $key;
580 if ( $purge !==
false ) {
584 $now = (
string)microtime(
true );
585 $this->
cache->add( $key,
630 return $this->
relayPurge( self::TIME_KEY_PREFIX . $key, self::CHECK_KEY_TTL, $holdoff );
665 return $this->
relayDelete( self::TIME_KEY_PREFIX . $key );
863 if ( $pcTTL >= 0 && $this->callbackDepth == 0 ) {
866 $value = $procCache->get( $key );
874 if ( isset( $opts[
'version'] ) ) {
875 $version = $opts[
'version'];
880 function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
881 use ( $callback, $version ) {
882 if ( is_array( $oldValue )
883 && array_key_exists( self::VFLD_DATA, $oldValue )
892 self::VFLD_DATA => $callback( $oldData, $ttl, $setOpts, $oldAsOf ),
893 self::VFLD_VERSION => $version
899 if ( $cur[self::VFLD_VERSION] === $version ) {
906 'cache-variant:' . md5( $key ) .
":$version",
910 [
'version' =>
null,
'minAsOf' => $asOf ] + $opts
918 if ( $procCache &&
$value !==
false ) {
919 $procCache->set( $key,
$value, $pcTTL );
940 $lowTTL = isset( $opts[
'lowTTL'] ) ? $opts[
'lowTTL'] : min( self::LOW_TTL, $ttl );
941 $lockTSE = isset( $opts[
'lockTSE'] ) ? $opts[
'lockTSE'] :
self::TSE_NONE;
942 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
943 $busyValue = isset( $opts[
'busyValue'] ) ? $opts[
'busyValue'] :
null;
944 $popWindow = isset( $opts[
'hotTTR'] ) ? $opts[
'hotTTR'] :
self::HOT_TTR;
945 $ageNew = isset( $opts[
'ageNew'] ) ? $opts[
'ageNew'] :
self::AGE_NEW;
947 $versioned = isset( $opts[
'version'] );
951 $cValue = $this->
get( $key, $curTTL, $checkKeys, $asOf );
954 $preCallbackTime = microtime(
true );
958 && $this->
isValid( $value, $versioned, $asOf, $minTime )
966 $isTombstone = ( $curTTL !==
null &&
$value ===
false );
968 $isHot = ( $curTTL !==
null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE );
970 $checkBusy = ( $busyValue !==
null &&
$value ===
false );
975 $useMutex = ( $isHot || ( $isTombstone && $lockTSE > 0 ) || $checkBusy );
977 $lockAcquired =
false;
980 if ( $this->
cache->add( self::MUTEX_KEY_PREFIX . $key, 1, self::LOCK_TTL ) ) {
982 $lockAcquired =
true;
983 } elseif (
$value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
990 $wrapped = $this->
cache->get( self::INTERIM_KEY_PREFIX . $key );
992 if (
$value !==
false && $this->
isValid( $value, $versioned, $asOf, $minTime ) ) {
998 if ( $busyValue !==
null ) {
999 return is_callable( $busyValue ) ? $busyValue() : $busyValue;
1004 if ( !is_callable( $callback ) ) {
1005 throw new InvalidArgumentException(
"Invalid cache miss callback provided." );
1012 $value = call_user_func_array( $callback, [ $cValue, &$ttl, &$setOpts, $asOf ] );
1018 if ( ( $isTombstone && $lockTSE > 0 ) &&
$value !==
false && $ttl >= 0 ) {
1019 $tempTTL = max( 1, (
int)$lockTSE );
1020 $newAsOf = microtime(
true );
1021 $wrapped = $this->
wrap( $value, $tempTTL, $newAsOf );
1023 $this->
cache->merge(
1024 self::INTERIM_KEY_PREFIX . $key,
1025 function ()
use ( $wrapped ) {
1033 if (
$value !==
false && $ttl >= 0 ) {
1034 $setOpts[
'lockTSE'] = $lockTSE;
1036 $setOpts += [
'since' => $preCallbackTime ];
1038 $this->
set( $key,
$value, $ttl, $setOpts );
1041 if ( $lockAcquired ) {
1043 $this->
cache->changeTTL( self::MUTEX_KEY_PREFIX . $key, 1 );
1108 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1110 $valueKeys = array_keys( $keyedIds->getArrayCopy() );
1111 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1118 $this->warmupKeyMisses = 0;
1122 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
use ( $callback, &$id ) {
1123 return $callback( $id, $oldValue, $ttl, $setOpts, $oldAsOf );
1127 foreach ( $keyedIds
as $key => $id ) {
1131 $this->warmupCache = [];
1194 ArrayIterator $keyedIds, $ttl, callable $callback,
array $opts = []
1196 $idsByValueKey = $keyedIds->getArrayCopy();
1197 $valueKeys = array_keys( $idsByValueKey );
1198 $checkKeys = isset( $opts[
'checkKeys'] ) ? $opts[
'checkKeys'] : [];
1199 unset( $opts[
'lockTSE'] );
1200 unset( $opts[
'busyValue'] );
1205 $this->warmupKeyMisses = 0;
1213 $curByKey = $this->
getMulti( $keysGet, $curTTLs, $checkKeys, $asOfs );
1214 foreach ( $keysGet
as $key ) {
1215 if ( !array_key_exists( $key, $curByKey ) || $curTTLs[$key] < 0 ) {
1216 $idsRegen[] = $idsByValueKey[$key];
1222 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
1223 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
1227 $func =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf )
1228 use ( $callback, &$id, $newValsById, $newTTLsById, $newSetOpts )
1230 if ( array_key_exists( $id, $newValsById ) ) {
1232 $newValue = $newValsById[$id];
1233 $ttl = $newTTLsById[$id];
1234 $setOpts = $newSetOpts;
1238 $ttls = [ $id => $ttl ];
1239 $newValue = $callback( [ $id ], $ttls, $setOpts )[$id];
1248 foreach ( $idsByValueKey
as $key => $id ) {
1252 $this->warmupCache = [];
1269 public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
1271 $wrapped = $this->
cache->get( self::VALUE_KEY_PREFIX . $key );
1272 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
1274 $this->logger->warning(
"Reaping stale value key '$key'." );
1276 $ok = $this->
cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
1278 $this->logger->error(
"Could not complete reap of key '$key'." );
1300 if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
1302 $this->logger->warning(
"Reaping stale check key '$key'." );
1303 $ok = $this->
cache->changeTTL( self::TIME_KEY_PREFIX . $key, 1 );
1305 $this->logger->error(
"Could not complete reap of check key '$key'." );
1323 return call_user_func_array( [ $this->
cache, __FUNCTION__ ], func_get_args() );
1333 return call_user_func_array( [ $this->
cache, __FUNCTION__ ], func_get_args() );
1344 foreach ( $entities
as $entity ) {
1345 $map[$keyFunc( $entity, $this )] = $entity;
1348 return new ArrayIterator( $map );
1356 if ( $this->lastRelayError ) {
1382 $this->
cache->clearLastError();
1392 $this->processCaches = [];
1401 return $this->
cache->getQoS( $flag );
1427 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
1428 if ( is_float( $mtime ) || ctype_digit( $mtime ) ) {
1429 $mtime = (int)$mtime;
1432 if ( !is_int( $mtime ) || $mtime <= 0 ) {
1436 $age = time() - $mtime;
1438 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
1462 $ok = $this->
cache->set( $key,
1467 $event = $this->
cache->modifySimpleRelayEvent( [
1470 'val' =>
'PURGED:$UNIXTIME$:' . (
int)$holdoff,
1471 'ttl' => max( $ttl, 1 ),
1475 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1493 $ok = $this->
cache->delete( $key );
1495 $event = $this->
cache->modifySimpleRelayEvent( [
1500 $ok = $this->purgeRelayer->notify( $this->purgeChannel, $event );
1522 if ( $curTTL >= $lowTTL ) {
1524 } elseif ( $curTTL <= 0 ) {
1528 $chance = ( 1 - $curTTL / $lowTTL );
1530 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1549 $age = $now - $asOf;
1550 $timeOld = $age - $ageNew;
1551 if ( $timeOld <= 0 ) {
1558 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
1562 $chance = 1 / ( self::HIT_RATE_HIGH * $refreshWindowSec );
1565 $chance *= ( $timeOld <=
self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
1567 return mt_rand( 1, 1e9 ) <= 1e9 * $chance;
1580 if ( $versioned && !isset(
$value[self::VFLD_VERSION] ) ) {
1582 } elseif ( $minTime > 0 && $asOf < $minTime ) {
1600 self::FLD_VALUE =>
$value,
1601 self::FLD_TTL => $ttl,
1602 self::FLD_TIME => $now
1613 protected function unwrap( $wrapped, $now ) {
1616 if ( $purge !==
false ) {
1618 $curTTL = min( $purge[self::FLD_TIME] - $now, self::TINY_NEGATIVE );
1619 return [
false, $curTTL ];
1622 if ( !is_array( $wrapped )
1623 || !isset( $wrapped[self::FLD_VERSION] )
1624 || $wrapped[self::FLD_VERSION] !== self::VERSION
1626 return [
false, null ];
1630 if ( (
$flags & self::FLG_STALE ) == self::FLG_STALE ) {
1633 $curTTL = min( -$age, self::TINY_NEGATIVE );
1634 } elseif ( $wrapped[self::FLD_TTL] > 0 ) {
1637 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
1654 $res[] = $prefix . $key;
1666 if ( !is_string(
$value ) ) {
1669 $segments = explode(
':',
$value, 3 );
1670 if ( !isset( $segments[0] ) || !isset( $segments[1] )
1671 ||
"{$segments[0]}:" !== self::PURGE_VAL_PREFIX
1675 if ( !isset( $segments[2] ) ) {
1680 self::FLD_TIME => (float)$segments[1],
1681 self::FLD_HOLDOFF => (
int)$segments[2],
1691 return self::PURGE_VAL_PREFIX . (float)$timestamp .
':' . (
int)$holdoff;
1699 if ( !isset( $this->processCaches[$group] ) ) {
1700 list( , $n ) = explode(
':', $group );
1701 $this->processCaches[$group] =
new HashBagOStuff( [
'maxKeys' => (
int)$n ] );
1704 return $this->processCaches[$group];
1714 if ( isset( $opts[
'pcTTL'] ) && $opts[
'pcTTL'] > 0 && $this->callbackDepth == 0 ) {
1715 $pcGroup = isset( $opts[
'pcGroup'] ) ? $opts[
'pcGroup'] :
self::PC_PRIMARY;
1718 if ( $procCache->get( $key ) !==
false ) {
1719 $keysFound[] = $key;
1724 return array_diff(
$keys, $keysFound );
1740 $keysWarmUp[] = self::VALUE_KEY_PREFIX . $key;
1743 foreach ( $checkKeys
as $i => $checkKeyOrKeys ) {
1744 if ( is_int( $i ) ) {
1746 $keysWarmUp[] = self::TIME_KEY_PREFIX . $checkKeyOrKeys;
1749 $keysWarmUp = array_merge(
1751 self::prefixCacheKeys( $checkKeyOrKeys, self::TIME_KEY_PREFIX )