26use InvalidArgumentException;
30use Wikimedia\WaitConditionLoop;
52 private $duplicateKeyLookups = [];
54 private $reportDupes =
false;
56 private $dupeTrackScheduled =
false;
59 private const SEGMENT_COMPONENT =
'segment';
99 if ( !empty(
$params[
'reportDupes'] ) && $this->asyncHandler ) {
100 $this->reportDupes =
true;
104 $this->segmentationSize =
$params[
'segmentationSize'] ?? 8_388_608;
106 $this->segmentedValueMaxSize =
$params[
'segmentedValueMaxSize'] ?? 67_108_864;
123 public function get( $key, $flags = 0 ) {
124 $this->trackDuplicateKeys( $key );
134 private function trackDuplicateKeys( $key ) {
135 if ( !$this->reportDupes ) {
139 if ( !isset( $this->duplicateKeyLookups[$key] ) ) {
142 $this->duplicateKeyLookups[$key] = 0;
144 $this->duplicateKeyLookups[$key] += 1;
146 if ( $this->dupeTrackScheduled ===
false ) {
147 $this->dupeTrackScheduled =
true;
149 call_user_func( $this->asyncHandler,
function () {
150 $dups = array_filter( $this->duplicateKeyLookups );
151 foreach ( $dups as $key => $count ) {
152 $this->logger->warning(
153 'Duplicate get(): "{key}" fetched {count} times',
155 [
'key' => $key,
'count' => $count + 1, ]
174 abstract protected function doGet( $key, $flags = 0, &$casToken =
null );
186 public function set( $key, $value, $exptime = 0, $flags = 0 ) {
190 return $ok && $this->
doSet( $key, $entry, $exptime, $flags );
203 abstract protected function doSet( $key, $value, $exptime = 0, $flags = 0 );
217 public function delete( $key, $flags = 0 ) {
218 if ( !$this->
fieldHasFlags( $flags, self::WRITE_PRUNE_SEGMENTS ) ) {
219 return $this->
doDelete( $key, $flags );
222 $mainValue = $this->
doGet( $key, self::READ_LATEST );
223 if ( !$this->
doDelete( $key, $flags ) ) {
232 $orderedKeys = array_map(
233 function ( $segmentHash ) use ( $key ) {
234 return $this->
makeGlobalKey( self::SEGMENT_COMPONENT, $key, $segmentHash );
239 return $this->
deleteMulti( $orderedKeys, $flags & ~self::WRITE_PRUNE_SEGMENTS );
250 abstract protected function doDelete( $key, $flags = 0 );
252 public function add( $key, $value, $exptime = 0, $flags = 0 ) {
253 $entry = $this->makeValueOrSegmentList( $key, $value, $exptime, $flags, $ok );
256 return $ok && $this->doAdd( $key, $entry, $exptime, $flags );
269 abstract protected function doAdd( $key, $value, $exptime = 0, $flags = 0 );
288 public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
289 return $this->mergeViaCas( $key, $callback, $exptime, $attempts, $flags );
302 final protected function mergeViaCas( $key, callable $callback, $exptime, $attempts, $flags ) {
303 $attemptsLeft = $attempts;
305 $token = self::PASS_BY_REF;
307 $watchPoint = $this->watchErrors();
308 $currentValue = $this->resolveSegments(
310 $this->doGet( $key, $flags, $token )
312 if ( $this->getLastError( $watchPoint ) ) {
314 $this->logger->warning(
315 __METHOD__ .
' failed due to read I/O error on get() for {key}.', [
'key' => $key ]
322 $value = $callback( $this, $key, $currentValue, $exptime );
323 $keyWasNonexistent = ( $currentValue === false );
324 $valueMatchesOldValue = ( $value === $currentValue );
326 unset( $currentValue );
328 $watchPoint = $this->watchErrors();
329 if ( $value ===
false || $exptime < 0 ) {
332 } elseif ( $valueMatchesOldValue && $attemptsLeft !== $attempts ) {
335 } elseif ( $keyWasNonexistent ) {
337 $success = $this->add( $key, $value, $exptime, $flags );
340 $success = $this->cas( $token, $key, $value, $exptime, $flags );
342 if ( $this->getLastError( $watchPoint ) ) {
344 $this->logger->warning(
345 __METHOD__ .
' failed due to write I/O error for {key}.',
352 }
while ( !
$success && --$attemptsLeft );
368 protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
369 if ( $casToken ===
null ) {
370 $this->logger->warning(
371 __METHOD__ .
' got empty CAS token for {key}.',
379 $entry = $this->makeValueOrSegmentList( $key, $value, $exptime, $flags, $ok );
382 return $ok && $this->doCas( $casToken, $key, $entry, $exptime, $flags );
396 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
398 if ( !$this->lock( $key, 0 ) ) {
403 $curCasToken = self::PASS_BY_REF;
404 $watchPoint = $this->watchErrors();
405 $exists = ( $this->doGet( $key, self::READ_LATEST, $curCasToken ) !== false );
406 if ( $this->getLastError( $watchPoint ) ) {
409 $this->logger->warning(
410 __METHOD__ .
' failed due to write I/O error for {key}.',
413 } elseif ( $exists && $this->tokensMatch( $casToken, $curCasToken ) ) {
414 $success = $this->doSet( $key, $value, $exptime, $flags );
419 __METHOD__ .
' failed due to race condition for {key}.',
420 [
'key' => $key,
'key_exists' => $exists ]
424 $this->unlock( $key );
436 $type = gettype( $value );
439 if ( $type !== gettype( $otherValue ) ) {
444 if ( $type ===
'array' || $type ===
'object' ) {
445 return ( serialize( $value ) === serialize( $otherValue ) );
449 return ( $value === $otherValue );
470 public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
471 return $this->doChangeTTL( $key, $exptime, $flags );
483 if ( !$this->lock( $key, 0 ) ) {
487 $expiry = $this->getExpirationAsTimestamp( $exptime );
488 $delete = ( $expiry != self::TTL_INDEFINITE && $expiry < $this->getCurrentTime() );
491 $blob = $this->doGet( $key, self::READ_LATEST );
494 $ok = $this->doDelete( $key, $flags );
496 $ok = $this->doSet( $key, $blob, $exptime, $flags );
502 $this->unlock( $key );
507 public function incrWithInit( $key, $exptime, $step = 1, $init =
null, $flags = 0 ) {
509 $init = is_int( $init ) ? $init : $step;
511 return $this->doIncrWithInit( $key, $exptime, $step, $init, $flags );
523 abstract protected function doIncrWithInit( $key, $exptime, $step, $init, $flags );
533 public function lock( $key, $timeout = 6, $exptime = 6, $rclass =
'' ) {
534 $exptime = min( $exptime ?: INF, self::TTL_DAY );
538 if ( isset( $this->locks[$key] ) ) {
540 if ( $rclass !=
'' && $this->locks[$key][self::LOCK_RCLASS] === $rclass ) {
541 ++$this->locks[$key][self::LOCK_DEPTH];
546 $lockTsUnix = $this->doLock( $key, $timeout, $exptime );
547 if ( $lockTsUnix !==
null ) {
548 $this->locks[$key] = [
549 self::LOCK_RCLASS => $rclass,
550 self::LOCK_DEPTH => 1,
551 self::LOCK_TIME => $lockTsUnix,
552 self::LOCK_EXPIRY => $lockTsUnix + $exptime
570 protected function doLock( $key, $timeout, $exptime ) {
574 $loop =
new WaitConditionLoop(
575 function () use ( $key, $exptime, $fname, &$lockTsUnix ) {
576 $watchPoint = $this->watchErrors();
577 if ( $this->add( $this->makeLockKey( $key ), 1, $exptime ) ) {
578 $lockTsUnix = microtime(
true );
580 return WaitConditionLoop::CONDITION_REACHED;
581 } elseif ( $this->getLastError( $watchPoint ) ) {
582 $this->logger->warning(
583 "$fname failed due to I/O error for {key}.",
587 return WaitConditionLoop::CONDITION_ABORTED;
590 return WaitConditionLoop::CONDITION_CONTINUE;
594 $code = $loop->invoke();
596 if ( $code === $loop::CONDITION_TIMED_OUT ) {
597 $this->logger->warning(
598 "$fname failed due to timeout for {key}.",
599 [
'key' => $key,
'timeout' => $timeout ]
616 if ( isset( $this->locks[$key] ) ) {
617 if ( --$this->locks[$key][self::LOCK_DEPTH] > 0 ) {
620 $released = $this->doUnlock( $key );
621 unset( $this->locks[$key] );
623 $this->logger->warning(
624 __METHOD__ .
' failed to release lock for {key}.',
630 $this->logger->warning(
631 __METHOD__ .
' no lock to release for {key}.',
650 $curTTL = $this->locks[$key][self::LOCK_EXPIRY] - $this->getCurrentTime();
653 if ( $this->getQoS( self::ATTR_DURABILITY ) <= self::QOS_DURABILITY_SCRIPT ) {
659 $isSafe = ( $curTTL > $this->maxLockSendDelay );
663 $released = $this->doDelete( $this->makeLockKey( $key ) );
665 $this->logger->warning(
666 "Lock for {key} held too long ({age} sec).",
667 [
'key' => $key,
'curTTL' => $curTTL ]
685 callable $progress =
null,
700 public function getMulti( array $keys, $flags = 0 ) {
701 $foundByKey = $this->doGetMulti( $keys, $flags );
704 foreach ( $keys as $key ) {
706 if ( array_key_exists( $key, $foundByKey ) ) {
708 $value = $this->resolveSegments( $key, $foundByKey[$key] );
709 if ( $value !==
false ) {
728 foreach ( $keys as $key ) {
729 $val = $this->doGet( $key, $flags );
730 if ( $val !==
false ) {
750 public function setMulti( array $valueByKey, $exptime = 0, $flags = 0 ) {
751 if ( $this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) ) {
752 throw new InvalidArgumentException( __METHOD__ .
' got WRITE_ALLOW_SEGMENTS' );
755 return $this->doSetMulti( $valueByKey, $exptime, $flags );
765 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
767 foreach ( $data as $key => $value ) {
768 $res = $this->doSet( $key, $value, $exptime, $flags ) && $res;
786 if ( $this->fieldHasFlags( $flags, self::WRITE_PRUNE_SEGMENTS ) ) {
787 throw new InvalidArgumentException( __METHOD__ .
' got WRITE_PRUNE_SEGMENTS' );
790 return $this->doDeleteMulti( $keys, $flags );
801 foreach ( $keys as $key ) {
802 $res = $this->doDelete( $key, $flags ) && $res;
820 return $this->doChangeTTLMulti( $keys, $exptime, $flags );
832 foreach ( $keys as $key ) {
833 $res = $this->doChangeTTL( $key, $exptime, $flags ) && $res;
849 $orderedKeys = array_map(
850 function ( $segmentHash ) use ( $key ) {
851 return $this->makeGlobalKey( self::SEGMENT_COMPONENT, $key, $segmentHash );
856 $segmentsByKey = $this->doGetMulti( $orderedKeys );
859 foreach ( $orderedKeys as $segmentKey ) {
860 if ( isset( $segmentsByKey[$segmentKey] ) ) {
861 $parts[] = $segmentsByKey[$segmentKey];
868 return $this->unserialize( implode(
'', $parts ) );
887 private function useSegmentationWrapper( $value, $flags ) {
889 $this->segmentationSize === INF ||
890 !$this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS )
895 if ( is_string( $value ) ) {
896 return ( strlen( $value ) >= $this->segmentationSize );
899 if ( is_array( $value ) ) {
901 foreach ( array_slice( $value, 0, 4 ) as $v ) {
902 if ( is_string( $v ) && strlen( $v ) >= $this->segmentationSize ) {
928 if ( $this->useSegmentationWrapper( $value, $flags ) ) {
929 $segmentSize = $this->segmentationSize;
930 $maxTotalSize = $this->segmentedValueMaxSize;
931 $serialized = $this->getSerialized( $value, $key );
932 $size = strlen( $serialized );
933 if ( $size > $maxTotalSize ) {
934 $this->logger->warning(
935 "Value for {key} exceeds $maxTotalSize bytes; cannot segment.",
942 $count = intdiv( $size, $segmentSize ) + ( ( $size % $segmentSize ) ? 1 : 0 );
943 for ( $i = 0; $i < $count; ++$i ) {
944 $segment = substr( $serialized, $i * $segmentSize, $segmentSize );
945 $hash = sha1( $segment );
946 $chunkKey = $this->makeGlobalKey( self::SEGMENT_COMPONENT, $key, $hash );
947 $chunksByKey[$chunkKey] = $segment;
948 $segmentHashes[] = $hash;
950 $flags &= ~self::WRITE_ALLOW_SEGMENTS;
951 $ok = $this->setMulti( $chunksByKey, $exptime, $flags );
952 $entry = SerializedValueContainer::newSegmented( $segmentHashes );
966 return ( $exptime !== self::TTL_INDEFINITE && $exptime < ( 10 * self::TTL_YEAR ) );
984 if ( $exptime == self::TTL_INDEFINITE ) {
988 return $this->isRelativeExpiration( $exptime )
989 ? intval( $this->getCurrentTime() + $exptime )
1009 if ( $exptime == self::TTL_INDEFINITE ) {
1013 return $this->isRelativeExpiration( $exptime )
1015 : (int)max( $exptime - $this->getCurrentTime(), 1 );
1026 if ( is_int( $value ) ) {
1028 } elseif ( !is_string( $value ) ) {
1032 $integer = (int)$value;
1034 return ( $value === (
string)$integer );
1038 return $this->attrMap[$flag] ?? self::QOS_UNKNOWN;
1047 return $this->segmentationSize;
1056 return $this->segmentedValueMaxSize;
1069 $this->checkValueSerializability( $value, $key );
1071 return $this->serialize( $value );
1094 private function checkValueSerializability( $value, $key ) {
1095 if ( is_array( $value ) ) {
1096 $this->checkIterableMapSerializability( $value, $key );
1097 } elseif ( is_object( $value ) ) {
1099 if ( $value instanceof stdClass ) {
1100 $this->checkIterableMapSerializability( $value, $key );
1101 } elseif ( !( $value instanceof JsonSerializable ) ) {
1102 $this->logger->warning(
1103 "{class} value for '{cachekey}'; serialization is suspect.",
1104 [
'cachekey' => $key,
'class' => get_class( $value ) ]
1114 private function checkIterableMapSerializability( $value, $key ) {
1115 foreach ( $value as $index => $entry ) {
1116 if ( is_object( $entry ) ) {
1119 !( $entry instanceof \stdClass ) &&
1120 !( $entry instanceof \JsonSerializable )
1122 $this->logger->warning(
1123 "{class} value for '{cachekey}' at '$index'; serialization is suspect.",
1124 [
'cachekey' => $key,
'class' => get_class( $entry ) ]
1140 return is_int( $value ) ? $value : serialize( $value );
1150 return $this->isInteger( $value ) ? (int)$value : unserialize( $value );
1157 $this->logger->debug(
"{class} debug: $text", [
'class' => static::class ] );
1165 private function determinekeyGroupForStats( $key ): string {
1168 $components = explode(
':', $key, 3 );
1170 $keygroup = $components[1] ??
'UNKNOWN';
1172 return strtr( $keygroup,
'.',
'_' );
1183 $deltasByMetric = [];
1185 foreach ( $keyInfo as $indexOrKey => $keyOrSizes ) {
1186 if ( is_array( $keyOrSizes ) ) {
1188 [ $sPayloadSize, $rPayloadSize ] = $keyOrSizes;
1196 $keygroup = $this->determinekeyGroupForStats( $key );
1198 if ( $op === self::METRIC_OP_GET ) {
1200 if ( $rPayloadSize ===
false ) {
1201 $statsdName =
"objectcache.{$keygroup}.{$op}_miss_rate";
1202 $statsName =
"bagostuff_miss_total";
1204 $statsdName =
"objectcache.{$keygroup}.{$op}_hit_rate";
1205 $statsName =
"bagostuff_hit_total";
1209 $statsdName =
"objectcache.{$keygroup}.{$op}_call_rate";
1210 $statsName =
"bagostuff_call_total";
1212 $deltasByMetric[$statsdName] = [
1213 'delta' => ( $deltasByMetric[$statsdName][
'delta'] ?? 0 ) + 1,
1214 'metric' => $statsName,
1215 'keygroup' => $keygroup,
1219 if ( $sPayloadSize > 0 ) {
1220 $statsdName =
"objectcache.{$keygroup}.{$op}_bytes_sent";
1221 $statsName =
"bagostuff_bytes_sent_total";
1222 $deltasByMetric[$statsdName] = [
1223 'delta' => ( $deltasByMetric[$statsdName][
'delta'] ?? 0 ) + $sPayloadSize,
1224 'metric' => $statsName,
1225 'keygroup' => $keygroup,
1230 if ( $rPayloadSize > 0 ) {
1231 $statsdName =
"objectcache.{$keygroup}.{$op}_bytes_read";
1232 $statsName =
"bagostuff_bytes_read_total";
1233 $deltasByMetric[$statsdName] = [
1234 'delta' => ( $deltasByMetric[$statsdName][
'delta'] ?? 0 ) + $rPayloadSize,
1235 'metric' => $statsName,
1236 'keygroup' => $keygroup,
1242 foreach ( $deltasByMetric as $statsdName => $delta ) {
1243 $this->stats->getCounter( $delta[
'metric'] )
1244 ->setLabel(
'keygroup', $delta[
'keygroup'] )
1245 ->setLabel(
'operation', $delta[
'operation'] )
1246 ->copyToStatsdAt( $statsdName )
1247 ->incrementBy( $delta[
'delta'] );
1253class_alias( MediumSpecificBagOStuff::class,
'MediumSpecificBagOStuff' );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
array $params
The job parameters.
Helper class for segmenting large cache values without relying on serializing classes.
static isSegmented( $value)