22use InvalidArgumentException;
25use Wikimedia\WaitConditionLoop;
48 private $duplicateKeyLookups = [];
50 private $reportDupes =
false;
52 private $dupeTrackScheduled =
false;
55 private const SEGMENT_COMPONENT =
'segment';
95 if ( !empty(
$params[
'reportDupes'] ) && $this->asyncHandler ) {
96 $this->reportDupes =
true;
100 $this->segmentationSize =
$params[
'segmentationSize'] ?? 8_388_608;
102 $this->segmentedValueMaxSize =
$params[
'segmentedValueMaxSize'] ?? 67_108_864;
119 public function get( $key, $flags = 0 ) {
120 $this->trackDuplicateKeys( $key );
130 private function trackDuplicateKeys( $key ) {
131 if ( !$this->reportDupes ) {
135 if ( !isset( $this->duplicateKeyLookups[$key] ) ) {
138 $this->duplicateKeyLookups[$key] = 0;
140 $this->duplicateKeyLookups[$key]++;
142 if ( $this->dupeTrackScheduled ===
false ) {
143 $this->dupeTrackScheduled =
true;
146 $dups = array_filter( $this->duplicateKeyLookups );
147 foreach ( $dups as $key => $count ) {
148 $this->logger->warning(
149 'Duplicate get(): "{key}" fetched {count} times',
151 [
'key' => $key,
'count' => $count + 1, ]
170 abstract protected function doGet( $key, $flags = 0, &$casToken =
null );
172 public function set( $key, $value, $exptime = 0, $flags = 0 ) {
176 return $ok && $this->
doSet( $key, $entry, $exptime, $flags );
189 abstract protected function doSet( $key, $value, $exptime = 0, $flags = 0 );
191 public function delete( $key, $flags = 0 ) {
192 if ( !$this->
fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) ) {
193 return $this->
doDelete( $key, $flags );
196 $mainValue = $this->
doGet( $key, self::READ_LATEST );
197 if ( !$this->
doDelete( $key, $flags ) ) {
206 $orderedKeys = array_map(
207 function ( $segmentHash ) use ( $key ) {
208 return $this->
makeGlobalKey( self::SEGMENT_COMPONENT, $key, $segmentHash );
213 return $this->
deleteMulti( $orderedKeys, $flags & ~self::WRITE_ALLOW_SEGMENTS );
224 abstract protected function doDelete( $key, $flags = 0 );
226 public function add( $key, $value, $exptime = 0, $flags = 0 ) {
227 $entry = $this->makeValueOrSegmentList( $key, $value, $exptime, $flags, $ok );
230 return $ok && $this->doAdd( $key, $entry, $exptime, $flags );
243 abstract protected function doAdd( $key, $value, $exptime = 0, $flags = 0 );
262 public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
263 return $this->mergeViaCas( $key, $callback, $exptime, $attempts, $flags );
276 final protected function mergeViaCas( $key, callable $callback, $exptime, $attempts, $flags ) {
277 $attemptsLeft = $attempts;
279 $token = self::PASS_BY_REF;
281 $watchPoint = $this->watchErrors();
282 $currentValue = $this->resolveSegments(
284 $this->doGet( $key, $flags, $token )
286 if ( $this->getLastError( $watchPoint ) ) {
288 $this->logger->warning(
289 __METHOD__ .
' failed due to read I/O error on get() for {key}.', [
'key' => $key ]
296 $value = $callback( $this, $key, $currentValue, $exptime );
297 $keyWasNonexistent = ( $currentValue === false );
298 $valueMatchesOldValue = ( $value === $currentValue );
300 unset( $currentValue );
302 $watchPoint = $this->watchErrors();
303 if ( $value ===
false || $exptime < 0 ) {
306 } elseif ( $valueMatchesOldValue && $attemptsLeft !== $attempts ) {
309 } elseif ( $keyWasNonexistent ) {
311 $success = $this->add( $key, $value, $exptime, $flags );
314 $success = $this->cas( $token, $key, $value, $exptime, $flags );
316 if ( $this->getLastError( $watchPoint ) ) {
318 $this->logger->warning(
319 __METHOD__ .
' failed due to write I/O error for {key}.',
326 }
while ( !
$success && --$attemptsLeft );
342 protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
343 if ( $casToken ===
null ) {
344 $this->logger->warning(
345 __METHOD__ .
' got empty CAS token for {key}.',
353 $entry = $this->makeValueOrSegmentList( $key, $value, $exptime, $flags, $ok );
356 return $ok && $this->doCas( $casToken, $key, $entry, $exptime, $flags );
370 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
372 if ( !$this->lock( $key, 0 ) ) {
377 $curCasToken = self::PASS_BY_REF;
378 $watchPoint = $this->watchErrors();
379 $exists = ( $this->doGet( $key, self::READ_LATEST, $curCasToken ) !== false );
380 if ( $this->getLastError( $watchPoint ) ) {
383 $this->logger->warning(
384 __METHOD__ .
' failed due to write I/O error for {key}.',
387 } elseif ( $exists && $this->tokensMatch( $casToken, $curCasToken ) ) {
388 $success = $this->doSet( $key, $value, $exptime, $flags );
393 __METHOD__ .
' failed due to race condition for {key}.',
394 [
'key' => $key,
'key_exists' => $exists ]
398 $this->unlock( $key );
410 $type = gettype( $value );
413 if ( $type !== gettype( $otherValue ) ) {
418 if ( $type ===
'array' || $type ===
'object' ) {
419 return ( serialize( $value ) === serialize( $otherValue ) );
423 return ( $value === $otherValue );
444 public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
445 return $this->doChangeTTL( $key, $exptime, $flags );
457 if ( !$this->lock( $key, 0 ) ) {
461 $expiry = $this->getExpirationAsTimestamp( $exptime );
462 $delete = ( $expiry != self::TTL_INDEFINITE && $expiry < $this->getCurrentTime() );
465 $blob = $this->doGet( $key, self::READ_LATEST );
468 $ok = $this->doDelete( $key, $flags );
470 $ok = $this->doSet( $key, $blob, $exptime, $flags );
476 $this->unlock( $key );
481 public function incrWithInit( $key, $exptime, $step = 1, $init =
null, $flags = 0 ) {
483 $init = is_int( $init ) ? $init : $step;
485 return $this->doIncrWithInit( $key, $exptime, $step, $init, $flags );
497 abstract protected function doIncrWithInit( $key, $exptime, $step, $init, $flags );
507 public function lock( $key, $timeout = 6, $exptime = 6, $rclass =
'' ) {
508 $exptime = min( $exptime ?: INF, self::TTL_DAY );
512 if ( isset( $this->locks[$key] ) ) {
514 if ( $rclass !=
'' && $this->locks[$key][self::LOCK_RCLASS] === $rclass ) {
515 ++$this->locks[$key][self::LOCK_DEPTH];
520 $lockTsUnix = $this->doLock( $key, $timeout, $exptime );
521 if ( $lockTsUnix !==
null ) {
522 $this->locks[$key] = [
523 self::LOCK_RCLASS => $rclass,
524 self::LOCK_DEPTH => 1,
525 self::LOCK_TIME => $lockTsUnix,
526 self::LOCK_EXPIRY => $lockTsUnix + $exptime
544 protected function doLock( $key, $timeout, $exptime ) {
548 $loop =
new WaitConditionLoop(
549 function () use ( $key, $exptime, $fname, &$lockTsUnix ) {
550 $watchPoint = $this->watchErrors();
551 if ( $this->add( $this->makeLockKey( $key ), 1, $exptime ) ) {
552 $lockTsUnix = microtime(
true );
554 return WaitConditionLoop::CONDITION_REACHED;
555 } elseif ( $this->getLastError( $watchPoint ) ) {
556 $this->logger->warning(
557 "$fname failed due to I/O error for {key}.",
561 return WaitConditionLoop::CONDITION_ABORTED;
564 return WaitConditionLoop::CONDITION_CONTINUE;
568 $code = $loop->invoke();
570 if ( $code === $loop::CONDITION_TIMED_OUT ) {
571 $this->logger->warning(
572 "$fname failed due to timeout for {key}.",
573 [
'key' => $key,
'timeout' => $timeout ]
590 if ( isset( $this->locks[$key] ) ) {
591 if ( --$this->locks[$key][self::LOCK_DEPTH] > 0 ) {
594 $released = $this->doUnlock( $key );
595 unset( $this->locks[$key] );
597 $this->logger->warning(
598 __METHOD__ .
' failed to release lock for {key}.',
604 $this->logger->warning(
605 __METHOD__ .
' no lock to release for {key}.',
624 $curTTL = $this->locks[$key][self::LOCK_EXPIRY] - $this->getCurrentTime();
627 if ( $this->getQoS( self::ATTR_DURABILITY ) <= self::QOS_DURABILITY_SCRIPT ) {
633 $isSafe = ( $curTTL > $this->maxLockSendDelay );
637 $released = $this->doDelete( $this->makeLockKey( $key ) );
639 $this->logger->warning(
640 "Lock for {key} held too long ({age} sec).",
641 [
'key' => $key,
'curTTL' => $curTTL ]
659 ?callable $progress =
null,
674 public function getMulti( array $keys, $flags = 0 ) {
675 $foundByKey = $this->doGetMulti( $keys, $flags );
678 foreach ( $keys as $key ) {
680 if ( array_key_exists( $key, $foundByKey ) ) {
682 $value = $this->resolveSegments( $key, $foundByKey[$key] );
683 if ( $value !==
false ) {
702 foreach ( $keys as $key ) {
703 $val = $this->doGet( $key, $flags );
704 if ( $val !==
false ) {
724 public function setMulti( array $valueByKey, $exptime = 0, $flags = 0 ) {
725 if ( $this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) ) {
726 throw new InvalidArgumentException( __METHOD__ .
' got WRITE_ALLOW_SEGMENTS' );
729 return $this->doSetMulti( $valueByKey, $exptime, $flags );
739 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
741 foreach ( $data as $key => $value ) {
742 $res = $this->doSet( $key, $value, $exptime, $flags ) && $res;
749 if ( $this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) ) {
750 throw new InvalidArgumentException( __METHOD__ .
' got WRITE_ALLOW_SEGMENTS' );
753 return $this->doDeleteMulti( $keys, $flags );
764 foreach ( $keys as $key ) {
765 $res = $this->doDelete( $key, $flags ) && $res;
783 return $this->doChangeTTLMulti( $keys, $exptime, $flags );
795 foreach ( $keys as $key ) {
796 $res = $this->doChangeTTL( $key, $exptime, $flags ) && $res;
812 $orderedKeys = array_map(
813 function ( $segmentHash ) use ( $key ) {
814 return $this->makeGlobalKey( self::SEGMENT_COMPONENT, $key, $segmentHash );
819 $segmentsByKey = $this->doGetMulti( $orderedKeys );
822 foreach ( $orderedKeys as $segmentKey ) {
823 if ( isset( $segmentsByKey[$segmentKey] ) ) {
824 $parts[] = $segmentsByKey[$segmentKey];
831 return $this->unserialize( implode(
'', $parts ) );
850 private function useSegmentationWrapper( $value, $flags ) {
852 $this->segmentationSize === INF ||
853 !$this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS )
858 if ( is_string( $value ) ) {
859 return ( strlen( $value ) >= $this->segmentationSize );
862 if ( is_array( $value ) ) {
864 foreach ( array_slice( $value, 0, 4 ) as $v ) {
865 if ( is_string( $v ) && strlen( $v ) >= $this->segmentationSize ) {
891 if ( $this->useSegmentationWrapper( $value, $flags ) ) {
892 $segmentSize = $this->segmentationSize;
893 $maxTotalSize = $this->segmentedValueMaxSize;
894 $serialized = $this->getSerialized( $value, $key );
895 $size = strlen( $serialized );
896 if ( $size > $maxTotalSize ) {
897 $this->logger->warning(
898 "Value for {key} exceeds $maxTotalSize bytes; cannot segment.",
905 $count = intdiv( $size, $segmentSize ) + ( ( $size % $segmentSize ) ? 1 : 0 );
906 for ( $i = 0; $i < $count; ++$i ) {
907 $segment = substr( $serialized, $i * $segmentSize, $segmentSize );
908 $hash = sha1( $segment );
909 $chunkKey = $this->makeGlobalKey( self::SEGMENT_COMPONENT, $key, $hash );
910 $chunksByKey[$chunkKey] = $segment;
911 $segmentHashes[] = $hash;
913 $flags &= ~self::WRITE_ALLOW_SEGMENTS;
914 $ok = $this->setMulti( $chunksByKey, $exptime, $flags );
915 $entry = SerializedValueContainer::newSegmented( $segmentHashes );
929 return ( $exptime !== self::TTL_INDEFINITE && $exptime < ( 10 * self::TTL_YEAR ) );
947 if ( $exptime == self::TTL_INDEFINITE ) {
951 return $this->isRelativeExpiration( $exptime )
952 ? intval( $this->getCurrentTime() + $exptime )
972 if ( $exptime == self::TTL_INDEFINITE ) {
976 return $this->isRelativeExpiration( $exptime )
978 : (int)max( $exptime - $this->getCurrentTime(), 1 );
989 if ( is_int( $value ) ) {
991 } elseif ( !is_string( $value ) ) {
995 $integer = (int)$value;
997 return ( $value === (
string)$integer );
1001 return $this->attrMap[$flag] ?? self::QOS_UNKNOWN;
1010 return $this->segmentationSize;
1019 return $this->segmentedValueMaxSize;
1032 $this->checkValueSerializability( $value, $key );
1034 return $this->serialize( $value );
1057 private function checkValueSerializability( $value, $key ) {
1058 if ( is_array( $value ) ) {
1059 $this->checkIterableMapSerializability( $value, $key );
1060 } elseif ( is_object( $value ) ) {
1062 if ( $value instanceof stdClass ) {
1063 $this->checkIterableMapSerializability( $value, $key );
1064 } elseif ( !( $value instanceof JsonSerializable ) ) {
1065 $this->logger->warning(
1066 "{class} value for '{cachekey}'; serialization is suspect.",
1067 [
'cachekey' => $key,
'class' => get_class( $value ) ]
1077 private function checkIterableMapSerializability( $value, $key ) {
1078 foreach ( $value as $index => $entry ) {
1079 if ( is_object( $entry ) ) {
1082 !( $entry instanceof \stdClass ) &&
1083 !( $entry instanceof \JsonSerializable )
1085 $this->logger->warning(
1086 "{class} value for '{cachekey}' at '$index'; serialization is suspect.",
1087 [
'cachekey' => $key,
'class' => get_class( $entry ) ]
1103 return is_int( $value ) ? $value : serialize( $value );
1113 return $this->isInteger( $value ) ? (int)$value : unserialize( $value );
1120 $this->logger->debug(
"{class} debug: $text", [
'class' => static::class ] );
1128 private function determinekeyGroupForStats( $key ): string {
1131 $components = explode(
':', $key, 3 );
1133 $keygroup = $components[1] ??
'UNKNOWN';
1135 return strtr( $keygroup,
'.',
'_' );
1146 $deltasByMetric = [];
1148 foreach ( $keyInfo as $indexOrKey => $keyOrSizes ) {
1149 if ( is_array( $keyOrSizes ) ) {
1151 [ $sPayloadSize, $rPayloadSize ] = $keyOrSizes;
1159 $keygroup = $this->determinekeyGroupForStats( $key );
1161 if ( $op === self::METRIC_OP_GET ) {
1163 if ( $rPayloadSize ===
false ) {
1164 $statsdName =
"objectcache.{$keygroup}.{$op}_miss_rate";
1165 $statsName =
"bagostuff_miss_total";
1167 $statsdName =
"objectcache.{$keygroup}.{$op}_hit_rate";
1168 $statsName =
"bagostuff_hit_total";
1172 $statsdName =
"objectcache.{$keygroup}.{$op}_call_rate";
1173 $statsName =
"bagostuff_call_total";
1175 $deltasByMetric[$statsdName] = [
1176 'delta' => ( $deltasByMetric[$statsdName][
'delta'] ?? 0 ) + 1,
1177 'metric' => $statsName,
1178 'keygroup' => $keygroup,
1182 if ( $sPayloadSize > 0 ) {
1183 $statsdName =
"objectcache.{$keygroup}.{$op}_bytes_sent";
1184 $statsName =
"bagostuff_bytes_sent_total";
1185 $deltasByMetric[$statsdName] = [
1186 'delta' => ( $deltasByMetric[$statsdName][
'delta'] ?? 0 ) + $sPayloadSize,
1187 'metric' => $statsName,
1188 'keygroup' => $keygroup,
1193 if ( $rPayloadSize > 0 ) {
1194 $statsdName =
"objectcache.{$keygroup}.{$op}_bytes_read";
1195 $statsName =
"bagostuff_bytes_read_total";
1196 $deltasByMetric[$statsdName] = [
1197 'delta' => ( $deltasByMetric[$statsdName][
'delta'] ?? 0 ) + $rPayloadSize,
1198 'metric' => $statsName,
1199 'keygroup' => $keygroup,
1205 foreach ( $deltasByMetric as $statsdName => $delta ) {
1206 $this->stats->getCounter( $delta[
'metric'] )
1207 ->setLabel(
'keygroup', $delta[
'keygroup'] )
1208 ->setLabel(
'operation', $delta[
'operation'] )
1209 ->copyToStatsdAt( $statsdName )
1210 ->incrementBy( $delta[
'delta'] );
1216class_alias( MediumSpecificBagOStuff::class,
'MediumSpecificBagOStuff' );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
array $params
The job parameters.