8use InvalidArgumentException;
11use Wikimedia\WaitConditionLoop;
34 private $duplicateKeyLookups = [];
36 private $reportDupes =
false;
38 private $dupeTrackScheduled =
false;
41 private const SEGMENT_COMPONENT =
'segment';
79 parent::__construct( $params );
81 if ( !empty( $params[
'reportDupes'] ) && $this->asyncHandler ) {
82 $this->reportDupes =
true;
86 $this->segmentationSize = $params[
'segmentationSize'] ?? 8_388_608;
88 $this->segmentedValueMaxSize = $params[
'segmentedValueMaxSize'] ?? 67_108_864;
105 public function get( $key, $flags = 0 ) {
106 $this->trackDuplicateKeys( $key );
116 private function trackDuplicateKeys( $key ) {
117 if ( !$this->reportDupes ) {
121 if ( !isset( $this->duplicateKeyLookups[$key] ) ) {
124 $this->duplicateKeyLookups[$key] = 0;
126 $this->duplicateKeyLookups[$key]++;
128 if ( $this->dupeTrackScheduled ===
false ) {
129 $this->dupeTrackScheduled =
true;
132 $dups = array_filter( $this->duplicateKeyLookups );
133 foreach ( $dups as $key => $count ) {
134 $this->logger->warning(
135 'Duplicate get(): "{key}" fetched {count} times',
137 [
'key' => $key,
'count' => $count + 1, ]
156 abstract protected function doGet( $key, $flags = 0, &$casToken =
null );
159 public function set( $key, $value, $exptime = 0, $flags = 0 ) {
163 return $ok && $this->
doSet( $key, $entry, $exptime, $flags );
176 abstract protected function doSet( $key, $value, $exptime = 0, $flags = 0 );
179 public function delete( $key, $flags = 0 ) {
180 if ( !$this->
fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) ) {
181 return $this->
doDelete( $key, $flags );
184 $mainValue = $this->
doGet( $key, self::READ_LATEST );
185 if ( !$this->
doDelete( $key, $flags ) ) {
194 $orderedKeys = array_map(
195 function ( $segmentHash ) use ( $key ) {
196 return $this->
makeGlobalKey( self::SEGMENT_COMPONENT, $key, $segmentHash );
201 return $this->
deleteMulti( $orderedKeys, $flags & ~self::WRITE_ALLOW_SEGMENTS );
212 abstract protected function doDelete( $key, $flags = 0 );
215 public function add( $key, $value, $exptime = 0, $flags = 0 ) {
216 $entry = $this->makeValueOrSegmentList( $key, $value, $exptime, $flags, $ok );
219 return $ok && $this->doAdd( $key, $entry, $exptime, $flags );
232 abstract protected function doAdd( $key, $value, $exptime = 0, $flags = 0 );
251 public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
252 return $this->mergeViaCas( $key, $callback, $exptime, $attempts, $flags );
265 final protected function mergeViaCas( $key, callable $callback, $exptime, $attempts, $flags ) {
266 $attemptsLeft = $attempts;
268 $token = self::PASS_BY_REF;
270 $watchPoint = $this->watchErrors();
271 $currentValue = $this->resolveSegments(
273 $this->doGet( $key, $flags, $token )
275 if ( $this->getLastError( $watchPoint ) ) {
277 $this->logger->warning(
278 __METHOD__ .
' failed due to read I/O error on get() for {key}.', [
'key' => $key ]
285 $value = $callback( $this, $key, $currentValue, $exptime );
286 $keyWasNonexistent = ( $currentValue === false );
287 $valueMatchesOldValue = ( $value === $currentValue );
289 unset( $currentValue );
291 $watchPoint = $this->watchErrors();
292 if ( $value ===
false || $exptime < 0 ) {
295 } elseif ( $valueMatchesOldValue && $attemptsLeft !== $attempts ) {
298 } elseif ( $keyWasNonexistent ) {
300 $success = $this->add( $key, $value, $exptime, $flags );
303 $success = $this->cas( $token, $key, $value, $exptime, $flags );
305 if ( $this->getLastError( $watchPoint ) ) {
307 $this->logger->warning(
308 __METHOD__ .
' failed due to write I/O error for {key}.',
315 }
while ( !
$success && --$attemptsLeft );
331 protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
332 if ( $casToken ===
null ) {
333 $this->logger->warning(
334 __METHOD__ .
' got empty CAS token for {key}.',
342 $entry = $this->makeValueOrSegmentList( $key, $value, $exptime, $flags, $ok );
345 return $ok && $this->doCas( $casToken, $key, $entry, $exptime, $flags );
359 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
361 if ( !$this->lock( $key, 0 ) ) {
366 $curCasToken = self::PASS_BY_REF;
367 $watchPoint = $this->watchErrors();
368 $exists = ( $this->doGet( $key, self::READ_LATEST, $curCasToken ) !== false );
369 if ( $this->getLastError( $watchPoint ) ) {
372 $this->logger->warning(
373 __METHOD__ .
' failed due to write I/O error for {key}.',
376 } elseif ( $exists && $this->tokensMatch( $casToken, $curCasToken ) ) {
377 $success = $this->doSet( $key, $value, $exptime, $flags );
382 __METHOD__ .
' failed due to race condition for {key}.',
383 [
'key' => $key,
'key_exists' => $exists ]
387 $this->unlock( $key );
399 $type = gettype( $value );
402 if ( $type !== gettype( $otherValue ) ) {
407 if ( $type ===
'array' || $type ===
'object' ) {
408 return ( serialize( $value ) === serialize( $otherValue ) );
412 return ( $value === $otherValue );
433 public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
434 return $this->doChangeTTL( $key, $exptime, $flags );
446 if ( !$this->lock( $key, 0 ) ) {
450 $expiry = $this->getExpirationAsTimestamp( $exptime );
451 $delete = ( $expiry != self::TTL_INDEFINITE && $expiry < $this->getCurrentTime() );
454 $blob = $this->doGet( $key, self::READ_LATEST );
457 $ok = $this->doDelete( $key, $flags );
459 $ok = $this->doSet( $key, $blob, $exptime, $flags );
465 $this->unlock( $key );
471 public function incrWithInit( $key, $exptime, $step = 1, $init =
null, $flags = 0 ) {
473 $init = is_int( $init ) ? $init : $step;
475 return $this->doIncrWithInit( $key, $exptime, $step, $init, $flags );
487 abstract protected function doIncrWithInit( $key, $exptime, $step, $init, $flags );
497 public function lock( $key, $timeout = 6, $exptime = 6, $rclass =
'' ) {
498 $exptime = min( $exptime ?: INF, self::TTL_DAY );
502 if ( isset( $this->locks[$key] ) ) {
504 if ( $rclass !=
'' && $this->locks[$key][self::LOCK_RCLASS] === $rclass ) {
505 ++$this->locks[$key][self::LOCK_DEPTH];
510 $lockTsUnix = $this->doLock( $key, $timeout, $exptime );
511 if ( $lockTsUnix !==
null ) {
512 $this->locks[$key] = [
513 self::LOCK_RCLASS => $rclass,
514 self::LOCK_DEPTH => 1,
515 self::LOCK_TIME => $lockTsUnix,
516 self::LOCK_EXPIRY => $lockTsUnix + $exptime
534 protected function doLock( $key, $timeout, $exptime ) {
538 $loop =
new WaitConditionLoop(
539 function () use ( $key, $exptime, $fname, &$lockTsUnix ) {
540 $watchPoint = $this->watchErrors();
541 if ( $this->add( $this->makeLockKey( $key ), 1, $exptime ) ) {
542 $lockTsUnix = microtime(
true );
544 return WaitConditionLoop::CONDITION_REACHED;
545 } elseif ( $this->getLastError( $watchPoint ) ) {
546 $this->logger->warning(
547 "$fname failed due to I/O error for {key}.",
551 return WaitConditionLoop::CONDITION_ABORTED;
554 return WaitConditionLoop::CONDITION_CONTINUE;
558 $code = $loop->invoke();
560 if ( $code === $loop::CONDITION_TIMED_OUT ) {
561 $this->logger->warning(
562 "$fname failed due to timeout for {key}.",
563 [
'key' => $key,
'timeout' => $timeout ]
580 if ( isset( $this->locks[$key] ) ) {
581 if ( --$this->locks[$key][self::LOCK_DEPTH] > 0 ) {
584 $released = $this->doUnlock( $key );
585 unset( $this->locks[$key] );
587 $this->logger->warning(
588 __METHOD__ .
' failed to release lock for {key}.',
594 $this->logger->warning(
595 __METHOD__ .
' no lock to release for {key}.',
614 $curTTL = $this->locks[$key][self::LOCK_EXPIRY] - $this->getCurrentTime();
617 if ( $this->getQoS( self::ATTR_DURABILITY ) <= self::QOS_DURABILITY_SCRIPT ) {
623 $isSafe = ( $curTTL > $this->maxLockSendDelay );
627 $released = $this->doDelete( $this->makeLockKey( $key ) );
629 $this->logger->warning(
630 "Lock for {key} held too long ({age} sec).",
631 [
'key' => $key,
'curTTL' => $curTTL ]
650 ?callable $progress =
null,
665 public function getMulti( array $keys, $flags = 0 ) {
666 $foundByKey = $this->doGetMulti( $keys, $flags );
669 foreach ( $keys as $key ) {
671 if ( array_key_exists( $key, $foundByKey ) ) {
673 $value = $this->resolveSegments( $key, $foundByKey[$key] );
674 if ( $value !==
false ) {
693 foreach ( $keys as $key ) {
694 $val = $this->doGet( $key, $flags );
695 if ( $val !==
false ) {
715 public function setMulti( array $valueByKey, $exptime = 0, $flags = 0 ) {
716 if ( $this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) ) {
717 throw new InvalidArgumentException( __METHOD__ .
' got WRITE_ALLOW_SEGMENTS' );
720 return $this->doSetMulti( $valueByKey, $exptime, $flags );
730 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
732 foreach ( $data as $key => $value ) {
733 $res = $this->doSet( $key, $value, $exptime, $flags ) && $res;
741 if ( $this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) ) {
742 throw new InvalidArgumentException( __METHOD__ .
' got WRITE_ALLOW_SEGMENTS' );
745 return $this->doDeleteMulti( $keys, $flags );
756 foreach ( $keys as $key ) {
757 $res = $this->doDelete( $key, $flags ) && $res;
775 return $this->doChangeTTLMulti( $keys, $exptime, $flags );
787 foreach ( $keys as $key ) {
788 $res = $this->doChangeTTL( $key, $exptime, $flags ) && $res;
804 $orderedKeys = array_map(
805 function ( $segmentHash ) use ( $key ) {
806 return $this->makeGlobalKey( self::SEGMENT_COMPONENT, $key, $segmentHash );
811 $segmentsByKey = $this->doGetMulti( $orderedKeys );
814 foreach ( $orderedKeys as $segmentKey ) {
815 if ( isset( $segmentsByKey[$segmentKey] ) ) {
816 $parts[] = $segmentsByKey[$segmentKey];
823 return $this->unserialize( implode(
'', $parts ) );
842 private function useSegmentationWrapper( $value, $flags ) {
844 $this->segmentationSize === INF ||
845 !$this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS )
850 if ( is_string( $value ) ) {
851 return ( strlen( $value ) >= $this->segmentationSize );
854 if ( is_array( $value ) ) {
856 foreach ( array_slice( $value, 0, 4 ) as $v ) {
857 if ( is_string( $v ) && strlen( $v ) >= $this->segmentationSize ) {
883 if ( $this->useSegmentationWrapper( $value, $flags ) ) {
884 $segmentSize = $this->segmentationSize;
885 $maxTotalSize = $this->segmentedValueMaxSize;
886 $serialized = $this->getSerialized( $value, $key );
887 $size = strlen( $serialized );
888 if ( $size > $maxTotalSize ) {
889 $this->logger->warning(
890 "Value for {key} exceeds $maxTotalSize bytes; cannot segment.",
897 $count = intdiv( $size, $segmentSize ) + ( ( $size % $segmentSize ) ? 1 : 0 );
898 for ( $i = 0; $i < $count; ++$i ) {
899 $segment = substr( $serialized, $i * $segmentSize, $segmentSize );
900 $hash = sha1( $segment );
901 $chunkKey = $this->makeGlobalKey( self::SEGMENT_COMPONENT, $key, $hash );
902 $chunksByKey[$chunkKey] = $segment;
903 $segmentHashes[] = $hash;
905 $flags &= ~self::WRITE_ALLOW_SEGMENTS;
906 $ok = $this->setMulti( $chunksByKey, $exptime, $flags );
907 $entry = SerializedValueContainer::newSegmented( $segmentHashes );
921 return ( $exptime !== self::TTL_INDEFINITE && $exptime < ( 10 * self::TTL_YEAR ) );
939 if ( $exptime == self::TTL_INDEFINITE ) {
943 return $this->isRelativeExpiration( $exptime )
944 ? intval( $this->getCurrentTime() + $exptime )
964 if ( $exptime == self::TTL_INDEFINITE ) {
968 return $this->isRelativeExpiration( $exptime )
970 : (int)max( $exptime - $this->getCurrentTime(), 1 );
981 if ( is_int( $value ) ) {
983 } elseif ( !is_string( $value ) ) {
987 $integer = (int)$value;
989 return ( $value === (
string)$integer );
994 return $this->attrMap[$flag] ?? self::QOS_UNKNOWN;
1003 return $this->segmentationSize;
1012 return $this->segmentedValueMaxSize;
1025 $this->checkValueSerializability( $value, $key );
1027 return $this->serialize( $value );
1050 private function checkValueSerializability( $value, $key ) {
1051 if ( is_array( $value ) ) {
1052 $this->checkIterableMapSerializability( $value, $key );
1053 } elseif ( is_object( $value ) ) {
1055 if ( $value instanceof stdClass ) {
1056 $this->checkIterableMapSerializability( $value, $key );
1057 } elseif ( !( $value instanceof JsonSerializable ) ) {
1058 $this->logger->warning(
1059 "{class} value for '{cachekey}'; serialization is suspect.",
1060 [
'cachekey' => $key,
'class' => get_class( $value ) ]
1070 private function checkIterableMapSerializability( $value, $key ) {
1071 foreach ( $value as $index => $entry ) {
1072 if ( is_object( $entry ) ) {
1075 !( $entry instanceof \stdClass ) &&
1076 !( $entry instanceof \JsonSerializable )
1078 $this->logger->warning(
1079 "{class} value for '{cachekey}' at '$index'; serialization is suspect.",
1080 [
'cachekey' => $key,
'class' => get_class( $entry ) ]
1096 return is_int( $value ) ? $value : serialize( $value );
1106 return $this->isInteger( $value ) ? (int)$value : unserialize( $value );
1113 $this->logger->debug(
"{class} debug: $text", [
'class' => static::class ] );
1121 private function determinekeyGroupForStats( $key ): string {
1124 $components = explode(
':', $key, 3 );
1126 $keygroup = $components[1] ??
'UNKNOWN';
1128 return strtr( $keygroup,
'.',
'_' );
1139 $deltasByMetric = [];
1141 foreach ( $keyInfo as $indexOrKey => $keyOrSizes ) {
1142 if ( is_array( $keyOrSizes ) ) {
1144 [ $sPayloadSize, $rPayloadSize ] = $keyOrSizes;
1152 $keygroup = $this->determinekeyGroupForStats( $key );
1154 if ( $op === self::METRIC_OP_GET ) {
1156 if ( $rPayloadSize ===
false ) {
1157 $statsName =
"bagostuff_miss_total";
1159 $statsName =
"bagostuff_hit_total";
1163 $statsName =
"bagostuff_call_total";
1165 $deltasByMetric[$statsName][$keygroup] = ( $deltasByMetric[$statsName][$keygroup] ?? 0 ) + 1;
1167 if ( $sPayloadSize > 0 ) {
1168 $statsName =
"bagostuff_bytes_sent_total";
1169 $deltasByMetric[$statsName][$keygroup] =
1170 ( $deltasByMetric[$statsName][$keygroup] ?? 0 ) + $sPayloadSize;
1173 if ( $rPayloadSize > 0 ) {
1174 $statsName =
"bagostuff_bytes_read_total";
1175 $deltasByMetric[$statsName][$keygroup] =
1176 ( $deltasByMetric[$statsName][$keygroup] ?? 0 ) + $rPayloadSize;
1180 foreach ( $deltasByMetric as $statsName => $deltaByKeygroup ) {
1181 $stats = $this->stats->getCounter( $statsName );
1182 foreach ( $deltaByKeygroup as $keygroup => $delta ) {
1183 $stats->setLabel(
'keygroup', $keygroup )
1184 ->setLabel(
'operation', $op )
1185 ->incrementBy( $delta );
1192class_alias( MediumSpecificBagOStuff::class,
'MediumSpecificBagOStuff' );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.