164 private $keyHighUplinkBps;
170 private $callbackDepth = 0;
172 private $warmupCache = [];
174 private $warmupKeyMisses = 0;
177 private $wallClockOverride;
180 private const MAX_COMMIT_DELAY = 3;
182 private const MAX_READ_LAG = 7;
184 public const HOLDOFF_TTL = self::MAX_COMMIT_DELAY + self::MAX_READ_LAG + 1;
187 private const LOW_TTL = 30;
192 private const HOT_TTR = 900;
194 private const AGE_NEW = 60;
197 private const TSE_NONE = -1;
207 public const MIN_TIMESTAMP_NONE = 0.0;
210 private const PC_PRIMARY =
'primary:1000';
216 private const SCHEME_HASH_TAG = 1;
218 private const SCHEME_HASH_STOP = 2;
221 private const CHECK_KEY_TTL = self::TTL_YEAR;
223 private const INTERIM_KEY_TTL = 1;
226 private const LOCK_TTL = 10;
228 private const COOLOFF_TTL = 1;
230 private const RAMPUP_TTL = 30;
233 private const TINY_NEGATIVE = -0.000001;
235 private const TINY_POSTIVE = 0.000001;
238 private const RECENT_SET_LOW_MS = 50;
240 private const RECENT_SET_HIGH_MS = 100;
243 private const GENERATION_HIGH_SEC = 0.2;
245 private const GENERATION_SLOW_SEC = 3.0;
248 private const PURGE_TIME = 0;
250 private const PURGE_HOLDOFF = 1;
253 private const VERSION = 1;
269 private const RES_VALUE = 0;
271 private const RES_VERSION = 1;
273 private const RES_AS_OF = 2;
275 private const RES_TTL = 3;
277 private const RES_TOMB_AS_OF = 4;
279 private const RES_CHECK_AS_OF = 5;
281 private const RES_TOUCH_AS_OF = 6;
283 private const RES_CUR_TTL = 7;
286 private const FLD_FORMAT_VERSION = 0;
288 private const FLD_VALUE = 1;
290 private const FLD_TTL = 2;
292 private const FLD_TIME = 3;
294 private const FLD_FLAGS = 4;
296 private const FLD_VALUE_VERSION = 5;
298 private const FLD_GENERATION_TIME = 6;
301 private const TYPE_VALUE =
'v';
303 private const TYPE_TIMESTAMP =
't';
305 private const TYPE_MUTEX =
'm';
307 private const TYPE_INTERIM =
'i';
309 private const TYPE_COOLOFF =
'c';
312 private const PURGE_VAL_PREFIX =
'PURGED';
348 $this->cache = $params[
'cache'];
349 $this->broadcastRoute = $params[
'broadcastRoutingPrefix'] ??
null;
350 $this->epoch = $params[
'epoch'] ?? 0;
351 $this->secret = $params[
'secret'] ?? (string)$this->epoch;
352 if ( ( $params[
'coalesceScheme'] ??
'' ) ===
'hash_tag' ) {
356 $this->coalesceScheme = self::SCHEME_HASH_TAG;
359 $this->coalesceScheme = self::SCHEME_HASH_STOP;
362 $this->keyHighQps = $params[
'keyHighQps'] ?? 100;
363 $this->keyHighUplinkBps = $params[
'keyHighUplinkBps'] ?? ( 1e9 / 8 / 100 );
365 $this->
setLogger( $params[
'logger'] ??
new NullLogger() );
367 $this->asyncHandler = $params[
'asyncHandler'] ??
null;
369 $this->missLog = array_fill( 0, 10, [
'', 0.0 ] );
371 $this->cache->registerWrapperInfoForStats(
374 [ __CLASS__,
'getCollectionFromSisterKey' ]
382 $this->logger = $logger;
449 final public function get( $key, &$curTTL =
null, array $checkKeys = [], &$info = [] ) {
452 $legacyInfo = ( $info !== self::PASS_BY_REF );
456 $curTTL =
$res[self::RES_CUR_TTL];
458 ?
$res[self::RES_AS_OF]
460 self::KEY_VERSION =>
$res[self::RES_VERSION],
461 self::KEY_AS_OF =>
$res[self::RES_AS_OF],
462 self::KEY_TTL =>
$res[self::RES_TTL],
463 self::KEY_CUR_TTL =>
$res[self::RES_CUR_TTL],
464 self::KEY_TOMB_AS_OF =>
$res[self::RES_TOMB_AS_OF],
465 self::KEY_CHECK_AS_OF =>
$res[self::RES_CHECK_AS_OF]
468 if ( $curTTL ===
null || $curTTL <= 0 ) {
470 reset( $this->missLog );
471 unset( $this->missLog[key( $this->missLog )] );
475 return $res[self::RES_VALUE];
505 array $checkKeys = [],
510 $legacyInfo = ( $info !== self::PASS_BY_REF );
517 foreach ( $resByKey as $key =>
$res ) {
518 if (
$res[self::RES_VALUE] !==
false ) {
519 $valuesByKey[$key] =
$res[self::RES_VALUE];
522 if (
$res[self::RES_CUR_TTL] !==
null ) {
523 $curTTLs[$key] =
$res[self::RES_CUR_TTL];
525 $info[$key] = $legacyInfo
526 ?
$res[self::RES_AS_OF]
528 self::KEY_VERSION =>
$res[self::RES_VERSION],
529 self::KEY_AS_OF =>
$res[self::RES_AS_OF],
530 self::KEY_TTL =>
$res[self::RES_TTL],
531 self::KEY_CUR_TTL =>
$res[self::RES_CUR_TTL],
532 self::KEY_TOMB_AS_OF =>
$res[self::RES_TOMB_AS_OF],
533 self::KEY_CHECK_AS_OF =>
$res[self::RES_CHECK_AS_OF]
554 protected function fetchKeys( array
$keys, array $checkKeys, $touchedCb =
null ) {
560 $valueSisterKeys = [];
562 $checkSisterKeysForAll = [];
564 $checkSisterKeysByKey = [];
566 foreach (
$keys as $key ) {
567 $sisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
568 $allSisterKeys[] = $sisterKey;
569 $valueSisterKeys[] = $sisterKey;
572 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
574 if ( is_int( $i ) ) {
576 $sisterKey = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
577 $allSisterKeys[] = $sisterKey;
578 $checkSisterKeysForAll[] = $sisterKey;
581 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
582 $sisterKey = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
583 $allSisterKeys[] = $sisterKey;
584 $checkSisterKeysByKey[$i][] = $sisterKey;
589 if ( $this->warmupCache ) {
591 $wrappedBySisterKey = $this->warmupCache;
592 $sisterKeysMissing = array_diff( $allSisterKeys, array_keys( $wrappedBySisterKey ) );
593 if ( $sisterKeysMissing ) {
594 $this->warmupKeyMisses += count( $sisterKeysMissing );
595 $wrappedBySisterKey += $this->cache->getMulti( $sisterKeysMissing );
599 $wrappedBySisterKey = $this->cache->getMulti( $allSisterKeys );
606 $ckPurgesForAll = $this->processCheckKeys(
607 $checkSisterKeysForAll,
613 foreach ( $checkSisterKeysByKey as $keyWithCheckKeys => $checkKeysForKey ) {
614 $ckPurgesByKey[$keyWithCheckKeys] = $this->processCheckKeys(
623 foreach ( $valueSisterKeys as $valueSisterKey ) {
625 $key = current(
$keys );
628 if ( array_key_exists( $valueSisterKey, $wrappedBySisterKey ) ) {
630 $wrapped = $wrappedBySisterKey[$valueSisterKey];
636 $res = $this->unwrap( $wrapped, $now );
637 $value =
$res[self::RES_VALUE];
639 foreach ( array_merge( $ckPurgesForAll, $ckPurgesByKey[$key] ?? [] ) as $ckPurge ) {
640 $res[self::RES_CHECK_AS_OF] = max(
641 $ckPurge[self::PURGE_TIME],
642 $res[self::RES_CHECK_AS_OF]
645 $holdoffDeadline = $ckPurge[self::PURGE_TIME] + $ckPurge[self::PURGE_HOLDOFF];
647 if ( $value !==
false && $holdoffDeadline >=
$res[self::RES_AS_OF] ) {
649 $ago = min( $ckPurge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
651 $res[self::RES_CUR_TTL] = min(
$res[self::RES_CUR_TTL], $ago );
655 if ( $touchedCb !==
null && $value !==
false ) {
656 $touched = $touchedCb( $value );
657 if ( $touched !==
null && $touched >=
$res[self::RES_AS_OF] ) {
658 $res[self::RES_CUR_TTL] = min(
659 $res[self::RES_CUR_TTL],
660 $res[self::RES_AS_OF] - $touched,
668 $res[self::RES_TOUCH_AS_OF] = max(
$res[self::RES_TOUCH_AS_OF], $touched );
670 $resByKey[$key] =
$res;
682 private function processCheckKeys(
683 array $checkSisterKeys,
684 array $wrappedBySisterKey,
689 foreach ( $checkSisterKeys as $timeKey ) {
690 $purge = isset( $wrappedBySisterKey[$timeKey] )
691 ? $this->parsePurgeValue( $wrappedBySisterKey[$timeKey] )
694 if ( $purge ===
null ) {
695 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL, $purge );
696 $this->cache->add( $timeKey, $wrapped, self::CHECK_KEY_TTL );
785 final public function set( $key, $value, $ttl = self::TTL_INDEFINITE, array $opts = [] ) {
787 $dataReplicaLag = $opts[
'lag'] ?? 0;
788 $dataSnapshotLag = isset( $opts[
'since'] ) ? max( 0, $now - $opts[
'since'] ) : 0;
789 $dataCombinedLag = $dataReplicaLag + $dataSnapshotLag;
790 $dataPendingCommit = $opts[
'pending'] ??
null;
791 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
792 $staleTTL = $opts[
'staleTTL'] ?? self::STALE_TTL_NONE;
793 $creating = $opts[
'creating'] ??
false;
794 $version = $opts[
'version'] ??
null;
795 $walltime = $opts[
'walltime'] ?? $this->timeSinceLoggedMiss( $key, $now );
807 if ( $dataPendingCommit ) {
809 $mitigated =
'pending writes';
811 $mitigationTTL = self::TTL_UNCACHEABLE;
812 } elseif ( $dataSnapshotLag > self::MAX_READ_LAG ) {
814 $pregenSnapshotLag = ( $walltime !== null ) ? ( $dataSnapshotLag - $walltime ) : 0;
815 if ( ( $pregenSnapshotLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
817 $mitigated =
'snapshot lag (late generation)';
819 $mitigationTTL = self::TTL_UNCACHEABLE;
822 $mitigated =
'snapshot lag (high generation time)';
824 $mitigationTTL = self::LOW_TTL;
826 } elseif ( $dataReplicaLag ===
false || $dataReplicaLag > self::MAX_READ_LAG ) {
828 $mitigated =
'replication lag';
830 $mitigationTTL = self::TTL_LAGGED;
831 } elseif ( $dataCombinedLag > self::MAX_READ_LAG ) {
832 $pregenCombinedLag = ( $walltime !== null ) ? ( $dataCombinedLag - $walltime ) : 0;
834 if ( ( $pregenCombinedLag + self::GENERATION_HIGH_SEC ) > self::MAX_READ_LAG ) {
836 $mitigated =
'read lag (late generation)';
838 $mitigationTTL = self::TTL_UNCACHEABLE;
841 $mitigated =
'read lag (high generation time)';
843 $mitigationTTL = self::LOW_TTL;
849 $mitigationTTL =
null;
852 if ( $mitigationTTL === self::TTL_UNCACHEABLE ) {
853 $this->logger->warning(
854 "Rejected set() for {cachekey} due to $mitigated.",
857 'lag' => $dataReplicaLag,
858 'age' => $dataSnapshotLag,
859 'walltime' => $walltime
870 if ( $mitigationTTL !==
null ) {
872 if ( $lockTSE >= 0 ) {
874 $logicalTTL = min( $ttl ?: INF, $mitigationTTL );
877 $ttl = min( $ttl ?: INF, $mitigationTTL );
880 $this->logger->warning(
881 "Lowered set() TTL for {cachekey} due to $mitigated.",
884 'lag' => $dataReplicaLag,
885 'age' => $dataSnapshotLag,
886 'walltime' => $walltime
892 $wrapped = $this->wrap( $value, $logicalTTL ?: $ttl, $version, $now, $walltime );
893 $storeTTL = $ttl + $staleTTL;
896 $ok = $this->cache->add(
897 $this->makeSisterKey( $key, self::TYPE_VALUE ),
902 $ok = $this->cache->merge(
903 $this->makeSisterKey( $key, self::TYPE_VALUE ),
904 static function (
$cache, $key, $cWrapped ) use ( $wrapped ) {
906 return ( is_string( $cWrapped ) ) ?
false : $wrapped;
978 final public function delete( $key, $ttl = self::HOLDOFF_TTL ) {
982 $valueSisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
998 $purgeBySisterKey = [ $valueSisterKey => $this->makeTombstonePurgeValue( $now ) ];
1002 $kClass = $this->determineKeyClassForStats( $key );
1003 $this->stats->increment(
"wanobjectcache.$kClass.delete." . ( $ok ?
'ok' :
'error' ) );
1093 $checkSisterKeysByKey = [];
1094 foreach (
$keys as $key ) {
1095 $checkSisterKeysByKey[$key] = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1098 $wrappedBySisterKey = $this->cache->getMulti( $checkSisterKeysByKey );
1099 $wrappedBySisterKey += array_fill_keys( $checkSisterKeysByKey,
false );
1103 foreach ( $checkSisterKeysByKey as $key => $checkSisterKey ) {
1104 $purge = $this->parsePurgeValue( $wrappedBySisterKey[$checkSisterKey] );
1105 if ( $purge ===
null ) {
1106 $wrapped = $this->makeCheckPurgeValue( $now, self::HOLDOFF_TTL, $purge );
1107 $this->cache->add( $checkSisterKey, $wrapped, self::CHECK_KEY_TTL );
1110 $times[$key] = $purge[self::PURGE_TIME];
1150 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1153 $purgeBySisterKey = [ $checkSisterKey => $this->makeCheckPurgeValue( $now, $holdoff ) ];
1156 $kClass = $this->determineKeyClassForStats( $key );
1157 $this->stats->increment(
"wanobjectcache.$kClass.ck_touch." . ( $ok ?
'ok' :
'error' ) );
1190 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
1193 $kClass = $this->determineKeyClassForStats( $key );
1194 $this->stats->increment(
"wanobjectcache.$kClass.ck_reset." . ( $ok ?
'ok' :
'error' ) );
1503 $key, $ttl, $callback, array $opts = [], array $cbParams = []
1505 $version = $opts[
'version'] ??
null;
1506 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
1507 $pCache = ( $pcTTL >= 0 )
1508 ? $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY )
1514 if ( $pCache && $this->callbackDepth == 0 ) {
1515 $cached = $pCache->get( $key, $pcTTL,
false );
1516 if ( $cached !==
false ) {
1517 $this->logger->debug(
"getWithSetCallback($key): process cache hit" );
1522 [ $value, $valueVersion, $curAsOf ] = $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
1523 if ( $valueVersion !== $version ) {
1527 $this->logger->debug(
"getWithSetCallback($key): using variant key" );
1528 list( $value ) = $this->fetchOrRegenerate(
1529 $this->
makeGlobalKey(
'WANCache-key-variant', md5( $key ), (
string)$version ),
1532 [
'version' =>
null,
'minAsOf' => $curAsOf ] + $opts,
1538 if ( $pCache && $value !==
false ) {
1539 $pCache->set( $key, $value );
1561 private function fetchOrRegenerate( $key, $ttl, $callback, array $opts, array $cbParams ) {
1562 $checkKeys = $opts[
'checkKeys'] ?? [];
1563 $graceTTL = $opts[
'graceTTL'] ?? self::GRACE_TTL_NONE;
1564 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
1565 $hotTTR = $opts[
'hotTTR'] ?? self::HOT_TTR;
1566 $lowTTL = $opts[
'lowTTL'] ?? min( self::LOW_TTL, $ttl );
1567 $ageNew = $opts[
'ageNew'] ?? self::AGE_NEW;
1568 $touchedCb = $opts[
'touchedCallback'] ??
null;
1569 $startTime = $this->getCurrentTime();
1571 $kClass = $this->determineKeyClassForStats( $key );
1574 $curState = $this->fetchKeys( [ $key ], $checkKeys, $touchedCb )[$key];
1575 $curValue = $curState[self::RES_VALUE];
1577 if ( $this->isAcceptablyFreshValue( $curState, $graceTTL, $minAsOf ) ) {
1578 if ( !$this->isLotteryRefreshDue( $curState, $lowTTL, $ageNew, $hotTTR, $startTime ) ) {
1579 $this->stats->timing(
1580 "wanobjectcache.$kClass.hit.good",
1581 1e3 * ( $this->getCurrentTime() - $startTime )
1584 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1585 } elseif ( $this->scheduleAsyncRefresh( $key, $ttl, $callback, $opts, $cbParams ) ) {
1586 $this->logger->debug(
"fetchOrRegenerate($key): hit with async refresh" );
1587 $this->stats->timing(
1588 "wanobjectcache.$kClass.hit.refresh",
1592 return [ $curValue, $curState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1594 $this->logger->debug(
"fetchOrRegenerate($key): hit with sync refresh" );
1598 $isKeyTombstoned = ( $curState[self::RES_TOMB_AS_OF] !== null );
1600 if ( $isKeyTombstoned ) {
1601 $volState = $this->getInterimValue( $key, $minAsOf, $startTime, $touchedCb );
1602 $volValue = $volState[self::RES_VALUE];
1604 $volState = $curState;
1605 $volValue = $curValue;
1613 $lastPurgeTime = max(
1615 $volState[self::RES_TOUCH_AS_OF],
1616 $curState[self::RES_TOMB_AS_OF],
1617 $curState[self::RES_CHECK_AS_OF]
1619 $safeMinAsOf = max( $minAsOf, $lastPurgeTime + self::TINY_POSTIVE );
1620 if ( $this->isExtremelyNewValue( $volState, $safeMinAsOf, $startTime ) ) {
1621 $this->logger->debug(
"fetchOrRegenerate($key): volatile hit" );
1622 $this->stats->timing(
1623 "wanobjectcache.$kClass.hit.volatile",
1627 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1630 $lockTSE = $opts[
'lockTSE'] ?? self::TSE_NONE;
1631 $busyValue = $opts[
'busyValue'] ??
null;
1632 $staleTTL = $opts[
'staleTTL'] ?? self::STALE_TTL_NONE;
1633 $version = $opts[
'version'] ??
null;
1636 $useRegenerationLock =
1647 $curState[self::RES_CUR_TTL] !==
null &&
1648 $curState[self::RES_CUR_TTL] <= 0 &&
1649 abs( $curState[self::RES_CUR_TTL] ) <= $lockTSE
1653 ( $busyValue !==
null && $volValue ===
false );
1658 $hasLock = $useRegenerationLock && $this->claimStampedeLock( $key );
1659 if ( $useRegenerationLock && !$hasLock ) {
1662 if ( $this->
isValid( $volValue, $volState[self::RES_AS_OF], $minAsOf ) ) {
1663 $this->logger->debug(
"fetchOrRegenerate($key): returning stale value" );
1664 $this->stats->timing(
1665 "wanobjectcache.$kClass.hit.stale",
1669 return [ $volValue, $volState[self::RES_VERSION], $curState[self::RES_AS_OF] ];
1670 } elseif ( $busyValue !==
null ) {
1671 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1672 $this->logger->debug(
"fetchOrRegenerate($key): busy $miss" );
1673 $this->stats->timing(
1674 "wanobjectcache.$kClass.$miss.busy",
1677 $placeholderValue = $this->resolveBusyValue( $busyValue );
1679 return [ $placeholderValue, $version, $curState[self::RES_AS_OF] ];
1686 ++$this->callbackDepth;
1689 ( $curState[self::RES_VERSION] === $version ) ? $curValue : false,
1692 ( $curState[self::RES_VERSION] === $version ) ? $curState[self::RES_AS_OF] : null,
1696 --$this->callbackDepth;
1701 $elapsed = max( $postCallbackTime - $startTime, 0.0 );
1704 $walltime = max( $postCallbackTime - $preCallbackTime, 0.0 );
1705 $this->stats->timing(
"wanobjectcache.$kClass.regen_walltime", 1e3 * $walltime );
1711 ( $value !==
false && $ttl >= 0 ) &&
1713 ( !$useRegenerationLock || $hasLock || $isKeyTombstoned ) &&
1716 $this->checkAndSetCooloff( $key, $kClass, $value, $elapsed, $hasLock )
1719 if ( $isKeyTombstoned ) {
1721 $this->setInterimValue( $key, $value, $lockTSE, $version, $postCallbackTime, $walltime );
1725 'since' => $setOpts[
'since'] ?? $preCallbackTime,
1726 'version' => $version,
1727 'staleTTL' => $staleTTL,
1729 'lockTSE' => $lockTSE,
1731 'creating' => ( $curValue === false ),
1732 'walltime' => $walltime
1735 $this->
set( $key, $value, $ttl, $finalSetOpts );
1739 $this->yieldStampedeLock( $key, $hasLock );
1741 $miss = is_infinite( $minAsOf ) ?
'renew' :
'miss';
1742 $this->logger->debug(
"fetchOrRegenerate($key): $miss, new value computed" );
1743 $this->stats->timing(
1744 "wanobjectcache.$kClass.$miss.compute",
1749 return [ $value, $version, $curState[self::RES_AS_OF] ];
1756 private function claimStampedeLock( $key ) {
1757 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_MUTEX );
1759 return $this->cache->add( $checkSisterKey, 1, self::LOCK_TTL );
1766 private function yieldStampedeLock( $key, $hasLock ) {
1768 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_MUTEX );
1769 $this->cache->changeTTL( $checkSisterKey, (
int)$this->
getCurrentTime() - 60 );
1783 private function makeSisterKeys( array $baseKeys,
string $type,
string $route =
null ) {
1785 foreach ( $baseKeys as $baseKey ) {
1786 $sisterKeys[] = $this->makeSisterKey( $baseKey,
$type, $route );
1802 private function makeSisterKey(
string $baseKey,
string $typeChar,
string $route =
null ) {
1803 if ( $this->coalesceScheme === self::SCHEME_HASH_STOP ) {
1805 $sisterKey =
'WANCache:' . $baseKey .
'|#|' . $typeChar;
1808 $sisterKey =
'WANCache:{' . $baseKey .
'}:' . $typeChar;
1811 if ( $route !==
null ) {
1812 $sisterKey = $this->
prependRoute( $sisterKey, $route );
1825 if ( substr( $sisterKey, -4 ) ===
'|#|v' ) {
1827 $collection = substr( $sisterKey, 9, strcspn( $sisterKey,
':|', 9 ) );
1828 } elseif ( substr( $sisterKey, -3 ) ===
'}:v' ) {
1830 $collection = substr( $sisterKey, 10, strcspn( $sisterKey,
':}', 10 ) );
1832 $collection =
'internal';
1850 private function isExtremelyNewValue(
$res, $minAsOf, $now ) {
1851 if (
$res[self::RES_VALUE] ===
false ||
$res[self::RES_AS_OF] < $minAsOf ) {
1855 $age = $now -
$res[self::RES_AS_OF];
1857 return ( $age < mt_rand( self::RECENT_SET_LOW_MS, self::RECENT_SET_HIGH_MS ) / 1e3 );
1881 private function checkAndSetCooloff( $key, $kClass, $value, $elapsed, $hasLock ) {
1882 $valueSisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
1883 list( $estimatedSize ) = $this->cache->setNewPreparedValues( [
1884 $valueSisterKey => $value
1896 $missesPerSecForHighQPS = ( min( $elapsed, 1 ) * $this->keyHighQps );
1908 if ( ( $missesPerSecForHighQPS * $estimatedSize ) >= $this->keyHighUplinkBps ) {
1909 $cooloffSisterKey = $this->makeSisterKey( $key, self::TYPE_COOLOFF );
1910 $watchPoint = $this->cache->watchErrors();
1912 !$this->cache->add( $cooloffSisterKey, 1, self::COOLOFF_TTL ) &&
1914 $this->cache->getLastError( $watchPoint ) === self::ERR_NONE
1916 $this->stats->increment(
"wanobjectcache.$kClass.cooloff_bounce" );
1924 $this->stats->timing(
"wanobjectcache.$kClass.regen_set_delay", 1e3 * $elapsed );
1925 $this->stats->updateCount(
"wanobjectcache.$kClass.regen_set_bytes", $estimatedSize );
1939 private function getInterimValue( $key, $minAsOf, $now, $touchedCb ) {
1940 if ( $this->useInterimHoldOffCaching ) {
1941 $interimSisterKey = $this->makeSisterKey( $key, self::TYPE_INTERIM );
1942 $wrapped = $this->cache->get( $interimSisterKey );
1943 $res = $this->unwrap( $wrapped, $now );
1944 if (
$res[self::RES_VALUE] !==
false &&
$res[self::RES_AS_OF] >= $minAsOf ) {
1945 if ( $touchedCb !==
null ) {
1948 $res[self::RES_TOUCH_AS_OF] = max(
1949 $touchedCb(
$res[self::RES_VALUE] ),
1950 $res[self::RES_TOUCH_AS_OF]
1958 return $this->unwrap(
false, $now );
1969 private function setInterimValue( $key, $value, $ttl, $version, $now, $walltime ) {
1970 $ttl = max( self::INTERIM_KEY_TTL, (
int)$ttl );
1972 $wrapped = $this->wrap( $value, $ttl, $version, $now, $walltime );
1974 $this->makeSisterKey( $key, self::TYPE_INTERIM ),
1984 private function resolveBusyValue( $busyValue ) {
1985 return ( $busyValue instanceof Closure ) ? $busyValue() : $busyValue;
2054 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2057 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache(
2058 $this->getNonProcessCachedMultiKeys( $keyedIds, $opts ),
2059 $opts[
'checkKeys'] ?? []
2061 $this->warmupKeyMisses = 0;
2067 $proxyCb =
static function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2070 return $callback( $params[
'id'], $oldValue, $ttl, $setOpts, $oldAsOf );
2075 foreach ( $keyedIds as $key => $id ) {
2085 $this->warmupCache = [];
2157 ArrayIterator $keyedIds, $ttl, callable $callback, array $opts = []
2159 $checkKeys = $opts[
'checkKeys'] ?? [];
2160 $minAsOf = $opts[
'minAsOf'] ?? self::MIN_TIMESTAMP_NONE;
2163 unset( $opts[
'lockTSE'] );
2164 unset( $opts[
'busyValue'] );
2167 $keysByIdGet = $this->getNonProcessCachedMultiKeys( $keyedIds, $opts );
2168 $this->warmupCache = $this->fetchWrappedValuesForWarmupCache( $keysByIdGet, $checkKeys );
2169 $this->warmupKeyMisses = 0;
2175 $resByKey = $this->
fetchKeys( $keysByIdGet, $checkKeys );
2176 foreach ( $keysByIdGet as $id => $key ) {
2177 $res = $resByKey[$key];
2179 $res[self::RES_VALUE] ===
false ||
2180 $res[self::RES_CUR_TTL] < 0 ||
2181 $res[self::RES_AS_OF] < $minAsOf
2189 $newTTLsById = array_fill_keys( $idsRegen, $ttl );
2190 $newValsById = $idsRegen ? $callback( $idsRegen, $newTTLsById, $newSetOpts ) : [];
2192 $method = __METHOD__;
2197 $proxyCb =
function ( $oldValue, &$ttl, &$setOpts, $oldAsOf, $params )
2198 use ( $callback, $newValsById, $newTTLsById, $newSetOpts, $method )
2200 $id = $params[
'id'];
2202 if ( array_key_exists( $id, $newValsById ) ) {
2204 $newValue = $newValsById[$id];
2205 $ttl = $newTTLsById[$id];
2206 $setOpts = $newSetOpts;
2210 $ttls = [ $id => $ttl ];
2211 $result = $callback( [ $id ], $ttls, $setOpts );
2212 if ( !isset( $result[$id] ) ) {
2214 $this->logger->warning(
2215 $method .
' failed due to {id} not set in result {result}', [
2217 'result' => json_encode( $result )
2220 $newValue = $result[$id];
2229 foreach ( $keyedIds as $key => $id ) {
2239 $this->warmupCache = [];
2257 final public function reap( $key, $purgeTimestamp, &$isStale =
false ) {
2259 $valueSisterKey = $this->makeSisterKey( $key, self::TYPE_VALUE );
2261 $minAsOf = $purgeTimestamp + self::HOLDOFF_TTL;
2262 $wrapped = $this->cache->get( $valueSisterKey );
2263 if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
2265 $this->logger->warning(
"Reaping stale value key '$key'." );
2267 $ttlReap = self::HOLDOFF_TTL;
2268 $ok = $this->cache->changeTTL( $valueSisterKey, $ttlReap );
2270 $this->logger->error(
"Could not complete reap of key '$key'." );
2291 final public function reapCheckKey( $key, $purgeTimestamp, &$isStale =
false ) {
2293 $checkSisterKey = $this->makeSisterKey( $key, self::TYPE_TIMESTAMP );
2295 $wrapped = $this->cache->get( $checkSisterKey );
2296 $purge = $this->parsePurgeValue( $wrapped );
2297 if ( $purge !==
null && $purge[self::PURGE_TIME] < $purgeTimestamp ) {
2299 $this->logger->warning(
"Reaping stale check key '$key'." );
2300 $ok = $this->cache->changeTTL( $checkSisterKey, self::TTL_SECOND );
2302 $this->logger->error(
"Could not complete reap of check key '$key'." );
2325 return $this->cache->makeGlobalKey( ...func_get_args() );
2338 public function makeKey( $collection, ...$components ) {
2340 return $this->cache->makeKey( ...func_get_args() );
2351 return hash_hmac(
'sha256', $component, $this->secret );
2405 foreach ( $ids as $id ) {
2407 if ( strlen( $id ) > 64 ) {
2408 $this->logger->warning( __METHOD__ .
": long ID '$id'; use hash256()" );
2410 $key = $keyCallback( $id, $this );
2412 if ( !isset( $idByKey[$key] ) ) {
2413 $idByKey[$key] = $id;
2414 } elseif ( (
string)$id !== (
string)$idByKey[$key] ) {
2415 throw new UnexpectedValueException(
2416 "Cache key collision; IDs ('$id','{$idByKey[$key]}') map to '$key'"
2421 return new ArrayIterator( $idByKey );
2460 if ( count( $ids ) !== count(
$res ) ) {
2463 $ids = array_keys( array_fill_keys( $ids,
true ) );
2464 if ( count( $ids ) !== count(
$res ) ) {
2465 throw new UnexpectedValueException(
"Multi-key result does not match ID list" );
2469 return array_combine( $ids,
$res );
2479 return $this->cache->watchErrors();
2500 $code = $this->cache->getLastError( $watchPoint );
2502 case self::ERR_NONE:
2503 return self::ERR_NONE;
2504 case self::ERR_NO_RESPONSE:
2505 return self::ERR_NO_RESPONSE;
2506 case self::ERR_UNREACHABLE:
2507 return self::ERR_UNREACHABLE;
2509 return self::ERR_UNEXPECTED;
2518 $this->cache->clearLastError();
2527 $this->processCaches = [];
2560 return $this->cache->getQoS( $flag );
2626 public function adaptiveTTL( $mtime, $maxTTL, $minTTL = 30, $factor = 0.2 ) {
2628 $mtime = (int)$mtime;
2629 if ( $mtime <= 0 ) {
2636 return (
int)min( $maxTTL, max( $minTTL, $factor * $age ) );
2646 return $this->warmupKeyMisses;
2664 $purgeByRouteKey = [];
2665 foreach ( $purgeBySisterKey as $sisterKey => $purge ) {
2666 if ( $this->broadcastRoute !==
null ) {
2667 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2669 $routeKey = $sisterKey;
2671 $purgeByRouteKey[$routeKey] = $purge;
2674 if ( count( $purgeByRouteKey ) == 1 ) {
2675 $purge = reset( $purgeByRouteKey );
2676 $ok = $this->cache->set( key( $purgeByRouteKey ), $purge, $ttl );
2678 $ok = $this->cache->setMulti( $purgeByRouteKey, $ttl );
2693 if ( $this->broadcastRoute !==
null ) {
2694 $routeKey = $this->
prependRoute( $sisterKey, $this->broadcastRoute );
2696 $routeKey = $sisterKey;
2699 return $this->cache->delete( $routeKey );
2708 if ( $sisterKey[0] ===
'/' ) {
2709 throw new RuntimeException(
"Sister key '$sisterKey' already contains a route." );
2712 return $route . $sisterKey;
2726 private function scheduleAsyncRefresh( $key, $ttl, $callback, array $opts, array $cbParams ) {
2727 if ( !$this->asyncHandler ) {
2734 $func = $this->asyncHandler;
2735 $func(
function () use ( $key, $ttl, $callback, $opts, $cbParams ) {
2736 $opts[
'minAsOf'] = INF;
2738 $this->fetchOrRegenerate( $key, $ttl, $callback, $opts, $cbParams );
2739 }
catch ( Exception $e ) {
2741 $this->logger->error(
'Async refresh failed for {key}', [
2761 private function isAcceptablyFreshValue(
$res, $graceTTL, $minAsOf ) {
2762 if ( !$this->
isValid(
$res[self::RES_VALUE],
$res[self::RES_AS_OF], $minAsOf ) ) {
2767 $curTTL =
$res[self::RES_CUR_TTL];
2768 if ( $curTTL > 0 ) {
2774 $curGraceTTL = $graceTTL + $curTTL;
2776 return ( $curGraceTTL > 0 )
2794 $curTTL =
$res[self::RES_CUR_TTL];
2795 $logicalTTL =
$res[self::RES_TTL];
2796 $asOf =
$res[self::RES_AS_OF];
2820 if ( $ageNew < 0 || $timeTillRefresh <= 0 ) {
2824 $age = $now - $asOf;
2825 $timeOld = $age - $ageNew;
2826 if ( $timeOld <= 0 ) {
2830 $popularHitsPerSec = 1;
2834 $refreshWindowSec = max( $timeTillRefresh - $ageNew - self::RAMPUP_TTL / 2, 1 );
2838 $chance = 1 / ( $popularHitsPerSec * $refreshWindowSec );
2840 $chance *= ( $timeOld <= self::RAMPUP_TTL ) ? $timeOld / self::RAMPUP_TTL : 1;
2842 return ( mt_rand( 1, 1000000000 ) <= 1000000000 * $chance );
2864 if ( $lowTTL <= 0 ) {
2870 $effectiveLowTTL = min( $lowTTL, $logicalTTL ?: INF );
2872 if ( $curTTL >= $effectiveLowTTL || $curTTL <= 0 ) {
2876 $chance = ( 1 - $curTTL / $effectiveLowTTL );
2878 return ( mt_rand( 1, 1000000000 ) <= 1000000000 * $chance );
2889 protected function isValid( $value, $asOf, $minAsOf ) {
2890 return ( $value !==
false && $asOf >= $minAsOf );
2901 private function wrap( $value, $ttl, $version, $now, $walltime ) {
2905 self::FLD_FORMAT_VERSION => self::VERSION,
2906 self::FLD_VALUE => $value,
2907 self::FLD_TTL => $ttl,
2908 self::FLD_TIME => $now
2910 if ( $version !==
null ) {
2911 $wrapped[self::FLD_VALUE_VERSION] = $version;
2913 if ( $walltime >= self::GENERATION_SLOW_SEC ) {
2914 $wrapped[self::FLD_GENERATION_TIME] = $walltime;
2934 private function unwrap( $wrapped, $now ) {
2938 self::RES_VALUE =>
false,
2939 self::RES_VERSION =>
null,
2940 self::RES_AS_OF =>
null,
2941 self::RES_TTL =>
null,
2942 self::RES_TOMB_AS_OF =>
null,
2944 self::RES_CHECK_AS_OF =>
null,
2945 self::RES_TOUCH_AS_OF =>
null,
2946 self::RES_CUR_TTL => null
2949 if ( is_array( $wrapped ) ) {
2952 ( $wrapped[self::FLD_FORMAT_VERSION] ??
null ) === self::VERSION &&
2953 $wrapped[self::FLD_TIME] >= $this->epoch
2955 if ( $wrapped[self::FLD_TTL] > 0 ) {
2957 $age = $now - $wrapped[self::FLD_TIME];
2958 $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 );
2963 $res[self::RES_VALUE] = $wrapped[self::FLD_VALUE];
2964 $res[self::RES_VERSION] = $wrapped[self::FLD_VALUE_VERSION] ??
null;
2965 $res[self::RES_AS_OF] = $wrapped[self::FLD_TIME];
2966 $res[self::RES_CUR_TTL] = $curTTL;
2967 $res[self::RES_TTL] = $wrapped[self::FLD_TTL];
2971 $purge = $this->parsePurgeValue( $wrapped );
2972 if ( $purge !==
null ) {
2974 $curTTL = min( $purge[self::PURGE_TIME] - $now, self::TINY_NEGATIVE );
2975 $res[self::RES_CUR_TTL] = $curTTL;
2976 $res[self::RES_TOMB_AS_OF] = $purge[self::PURGE_TIME];
2987 private function determineKeyClassForStats( $key ) {
2988 $parts = explode(
':', $key, 3 );
2991 return strtr( $parts[1] ?? $parts[0],
'.',
'_' );
3002 private function parsePurgeValue( $value ) {
3003 if ( !is_string( $value ) ) {
3007 $segments = explode(
':', $value, 3 );
3008 $prefix = $segments[0];
3009 if ( $prefix !== self::PURGE_VAL_PREFIX ) {
3014 $timestamp = (float)$segments[1];
3016 $holdoff = isset( $segments[2] ) ? (int)$segments[2] : self::
HOLDOFF_TTL;
3018 if ( $timestamp < $this->epoch ) {
3023 return [ self::PURGE_TIME => $timestamp, self::PURGE_HOLDOFF => $holdoff ];
3030 private function makeTombstonePurgeValue(
float $timestamp ) {
3031 return self::PURGE_VAL_PREFIX .
':' . (int)$timestamp;
3040 private function makeCheckPurgeValue(
float $timestamp,
int $holdoff, array &$purge =
null ) {
3041 $normalizedTime = (int)$timestamp;
3043 $purge = [ self::PURGE_TIME => (float)$normalizedTime, self::PURGE_HOLDOFF => $holdoff ];
3045 return self::PURGE_VAL_PREFIX .
":$normalizedTime:$holdoff";
3052 private function getProcessCache( $group ) {
3053 if ( !isset( $this->processCaches[$group] ) ) {
3054 list( , $size ) = explode(
':', $group );
3055 $this->processCaches[$group] =
new MapCacheLRU( (
int)$size );
3056 if ( $this->wallClockOverride !==
null ) {
3057 $this->processCaches[$group]->setMockTime( $this->wallClockOverride );
3061 return $this->processCaches[$group];
3069 private function getNonProcessCachedMultiKeys( ArrayIterator
$keys, array $opts ) {
3070 $pcTTL = $opts[
'pcTTL'] ?? self::TTL_UNCACHEABLE;
3073 if ( $pcTTL > 0 && $this->callbackDepth == 0 ) {
3074 $pCache = $this->getProcessCache( $opts[
'pcGroup'] ?? self::PC_PRIMARY );
3075 foreach (
$keys as $key => $id ) {
3076 if ( !$pCache->has( $key, $pcTTL ) ) {
3077 $keysMissing[$id] = $key;
3082 return $keysMissing;
3091 private function fetchWrappedValuesForWarmupCache( array
$keys, array $checkKeys ) {
3097 $sisterKeys = $this->makeSisterKeys(
$keys, self::TYPE_VALUE );
3099 foreach ( $checkKeys as $i => $checkKeyOrKeyGroup ) {
3101 if ( is_int( $i ) ) {
3103 $sisterKeys[] = $this->makeSisterKey( $checkKeyOrKeyGroup, self::TYPE_TIMESTAMP );
3106 foreach ( (array)$checkKeyOrKeyGroup as $checkKey ) {
3107 $sisterKeys[] = $this->makeSisterKey( $checkKey, self::TYPE_TIMESTAMP );
3112 $wrappedBySisterKey = $this->cache->getMulti( $sisterKeys );
3113 $wrappedBySisterKey += array_fill_keys( $sisterKeys,
false );
3115 return $wrappedBySisterKey;
3123 private function timeSinceLoggedMiss( $key, $now ) {
3124 for ( end( $this->missLog ); $miss = current( $this->missLog ); prev( $this->missLog ) ) {
3125 if ( $miss[0] === $key ) {
3126 return ( $now - $miss[1] );
3138 return $this->wallClockOverride ?: microtime(
true );
3146 $this->wallClockOverride =& $time;
3147 $this->cache->setMockTime( $time );
3148 foreach ( $this->processCaches as $pCache ) {
3149 $pCache->setMockTime( $time );