24 use Wikimedia\WaitConditionLoop;
59 const SEGMENT_COMPONENT =
'segment';
81 parent::__construct( $params );
83 if ( isset( $params[
'keyspace'] ) ) {
84 $this->keyspace = $params[
'keyspace'];
87 if ( !empty( $params[
'reportDupes'] ) && is_callable( $this->asyncHandler ) ) {
88 $this->reportDupes =
true;
91 $this->syncTimeout = $params[
'syncTimeout'] ?? 3;
92 $this->segmentationSize = $params[
'segmentationSize'] ?? 8388608;
93 $this->segmentedValueMaxSize = $params[
'segmentedValueMaxSize'] ?? 67108864;
109 public function get( $key, $flags = 0 ) {
120 if ( !$this->reportDupes ) {
124 if ( !isset( $this->duplicateKeyLookups[$key] ) ) {
127 $this->duplicateKeyLookups[$key] = 0;
129 $this->duplicateKeyLookups[$key] += 1;
131 if ( $this->dupeTrackScheduled ===
false ) {
132 $this->dupeTrackScheduled =
true;
134 call_user_func( $this->asyncHandler,
function () {
135 $dups = array_filter( $this->duplicateKeyLookups );
136 foreach ( $dups as $key => $count ) {
137 $this->logger->warning(
138 'Duplicate get(): "{key}" fetched {count} times',
140 [
'key' => $key,
'count' => $count + 1, ]
154 abstract protected function doGet( $key, $flags = 0, &$casToken =
null );
165 public function set( $key, $value, $exptime = 0, $flags = 0 ) {
168 return $usable ? $this->
doSet( $key, $entry, $exptime, $flags ) :
false;
180 abstract protected function doSet( $key, $value, $exptime = 0, $flags = 0 );
193 public function delete( $key, $flags = 0 ) {
194 if ( !$this->
fieldHasFlags( $flags, self::WRITE_PRUNE_SEGMENTS ) ) {
195 return $this->
doDelete( $key, $flags );
198 $mainValue = $this->
doGet( $key, self::READ_LATEST );
199 if ( !$this->
doDelete( $key, $flags ) ) {
207 $orderedKeys = array_map(
208 function ( $segmentHash ) use ( $key ) {
209 return $this->
makeGlobalKey( self::SEGMENT_COMPONENT, $key, $segmentHash );
214 return $this->
deleteMulti( $orderedKeys, $flags & ~self::WRITE_PRUNE_SEGMENTS );
224 abstract protected function doDelete( $key, $flags = 0 );
226 public function add( $key, $value, $exptime = 0, $flags = 0 ) {
227 list( $entry, $usable ) = $this->makeValueOrSegmentList( $key, $value, $exptime, $flags );
229 return $usable ? $this->doAdd( $key, $entry, $exptime, $flags ) :
false;
241 abstract protected function doAdd( $key, $value, $exptime = 0, $flags = 0 );
259 public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
260 return $this->mergeViaCas( $key, $callback, $exptime, $attempts, $flags );
272 final protected function mergeViaCas( $key, callable $callback, $exptime, $attempts, $flags ) {
273 $attemptsLeft = $attempts;
277 $this->clearLastError();
278 $currentValue = $this->resolveSegments(
280 $this->doGet( $key, $flags, $token )
282 if ( $this->getLastError() ) {
284 $this->logger->warning(
285 __METHOD__ .
' failed due to read I/O error on get() for {key}.',
293 $value = call_user_func( $callback, $this, $key, $currentValue, $exptime );
294 $keyWasNonexistant = ( $currentValue === false );
295 $valueMatchesOldValue = ( $value === $currentValue );
296 unset( $currentValue );
298 $this->clearLastError();
299 if ( $value ===
false || $exptime < 0 ) {
301 } elseif ( $valueMatchesOldValue && $attemptsLeft !== $attempts ) {
303 } elseif ( $keyWasNonexistant ) {
305 $success = $this->add( $key, $value, $exptime, $flags );
308 $success = $this->cas( $token, $key, $value, $exptime, $flags );
310 if ( $this->getLastError() ) {
312 $this->logger->warning(
313 __METHOD__ .
' failed due to write I/O error for {key}.',
320 }
while ( !
$success && --$attemptsLeft );
335 protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
336 if ( $casToken ===
null ) {
337 $this->logger->warning(
338 __METHOD__ .
' got empty CAS token for {key}.',
345 list( $entry, $usable ) = $this->makeValueOrSegmentList( $key, $value, $exptime, $flags );
347 return $usable ? $this->doCas( $casToken, $key, $entry, $exptime, $flags ) :
false;
360 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
362 if ( !$this->lock( $key, 0 ) ) {
367 $this->clearLastError();
368 $this->doGet( $key, self::READ_LATEST, $curCasToken );
369 if ( is_object( $curCasToken ) ) {
371 throw new UnexpectedValueException(
"CAS token cannot be an object" );
373 if ( $this->getLastError() ) {
376 $this->logger->warning(
377 __METHOD__ .
' failed due to write I/O error for {key}.',
380 } elseif ( $casToken === $curCasToken ) {
381 $success = $this->doSet( $key, $value, $exptime, $flags );
385 __METHOD__ .
' failed due to race condition for {key}.',
390 $this->unlock( $key );
412 public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
413 return $this->doChangeTTL( $key, $exptime, $flags );
423 if ( !$this->lock( $key, 0 ) ) {
427 $expiry = $this->getExpirationAsTimestamp( $exptime );
428 $delete = ( $expiry != self::TTL_INDEFINITE && $expiry < $this->getCurrentTime() );
431 $blob = $this->doGet( $key, self::READ_LATEST );
434 $ok = $this->doDelete( $key, $flags );
436 $ok = $this->doSet( $key,
$blob, $exptime, $flags );
442 $this->unlock( $key );
458 public function lock( $key, $timeout = 6, $expiry = 6, $rclass =
'' ) {
460 if ( isset( $this->locks[$key] ) ) {
461 if ( $rclass !=
'' && $this->locks[$key][
'class'] === $rclass ) {
462 ++$this->locks[$key][
'depth'];
470 $expiry = min( $expiry ?: INF, self::TTL_DAY );
471 $loop =
new WaitConditionLoop(
472 function () use ( $key, $expiry, $fname ) {
473 $this->clearLastError();
474 if ( $this->add(
"{$key}:lock", 1, $expiry ) ) {
475 return WaitConditionLoop::CONDITION_REACHED;
476 } elseif ( $this->getLastError() ) {
477 $this->logger->warning(
478 $fname .
' failed due to I/O error for {key}.',
482 return WaitConditionLoop::CONDITION_ABORTED;
485 return WaitConditionLoop::CONDITION_CONTINUE;
490 $code = $loop->invoke();
491 $locked = ( $code === $loop::CONDITION_REACHED );
493 $this->locks[$key] = [
'class' => $rclass,
'depth' => 1 ];
494 } elseif ( $code === $loop::CONDITION_TIMED_OUT ) {
495 $this->logger->warning(
496 "$fname failed due to timeout for {key}.",
497 [
'key' => $key,
'timeout' => $timeout ]
511 if ( !isset( $this->locks[$key] ) ) {
515 if ( --$this->locks[$key][
'depth'] <= 0 ) {
516 unset( $this->locks[$key] );
518 $ok = $this->doDelete(
"{$key}:lock" );
520 $this->logger->warning(
521 __METHOD__ .
' failed to release lock for {key}.',
544 callable $progress =
null,
557 $foundByKey = $this->doGetMulti(
$keys, $flags );
560 foreach (
$keys as $key ) {
562 if ( array_key_exists( $key, $foundByKey ) ) {
564 $value = $this->resolveSegments( $key, $foundByKey[$key] );
565 if ( $value !==
false ) {
582 foreach (
$keys as $key ) {
583 $val = $this->doGet( $key, $flags );
584 if ( $val !==
false ) {
603 public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
604 if ( $this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) ) {
605 throw new InvalidArgumentException( __METHOD__ .
' got WRITE_ALLOW_SEGMENTS' );
608 return $this->doSetMulti( $data, $exptime, $flags );
617 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
619 foreach ( $data as $key => $value ) {
620 $res = $this->doSet( $key, $value, $exptime, $flags ) &&
$res;
637 if ( $this->fieldHasFlags( $flags, self::WRITE_PRUNE_SEGMENTS ) ) {
638 throw new InvalidArgumentException( __METHOD__ .
' got WRITE_PRUNE_SEGMENTS' );
641 return $this->doDeleteMulti(
$keys, $flags );
651 foreach (
$keys as $key ) {
652 $res = $this->doDelete( $key, $flags ) &&
$res;
670 foreach (
$keys as $key ) {
671 $res = $this->doChangeTTL( $key, $exptime, $flags ) &&
$res;
677 public function incrWithInit( $key, $exptime, $value = 1, $init =
null, $flags = 0 ) {
678 $init = is_int( $init ) ? $init : $value;
679 $this->clearLastError();
680 $newValue = $this->incr( $key, $value, $flags );
681 if ( $newValue ===
false && !$this->getLastError() ) {
683 $newValue = $this->add( $key, (
int)$init, $exptime, $flags ) ? $init :
false;
684 if ( $newValue ===
false && !$this->getLastError() ) {
686 $newValue = $this->incr( $key, $value, $flags );
706 $orderedKeys = array_map(
707 function ( $segmentHash ) use ( $key ) {
708 return $this->makeGlobalKey( self::SEGMENT_COMPONENT, $key, $segmentHash );
713 $segmentsByKey = $this->doGetMulti( $orderedKeys );
716 foreach ( $orderedKeys as $segmentKey ) {
717 if ( isset( $segmentsByKey[$segmentKey] ) ) {
718 $parts[] = $segmentsByKey[$segmentKey];
724 return $this->
unserialize( implode(
'', $parts ) );
736 return $this->lastError;
744 $this->lastError = self::ERR_NONE;
753 $this->lastError = $err;
757 $this->busyCallbacks[] = $workCallback;
775 $this->fieldHasFlags( $flags, self::WRITE_ALLOW_SEGMENTS ) &&
777 is_finite( $this->segmentationSize )
779 $segmentSize = $this->segmentationSize;
780 $maxTotalSize = $this->segmentedValueMaxSize;
784 if ( $size > $maxTotalSize ) {
785 $this->logger->warning(
786 "Value for {key} exceeds $maxTotalSize bytes; cannot segment.",
789 } elseif ( $size <= $segmentSize ) {
796 $count = intdiv( $size, $segmentSize ) + ( ( $size % $segmentSize ) ? 1 : 0 );
797 for ( $i = 0; $i < $count; ++$i ) {
798 $segment = substr(
$serialized, $i * $segmentSize, $segmentSize );
799 $hash = sha1( $segment );
800 $chunkKey = $this->makeGlobalKey( self::SEGMENT_COMPONENT, $key, $hash );
801 $chunksByKey[$chunkKey] = $segment;
802 $segmentHashes[] = $hash;
804 $flags &= ~self::WRITE_ALLOW_SEGMENTS;
805 $usable = $this->setMulti( $chunksByKey, $exptime, $flags );
810 return [ $entry, $usable ];
819 return ( $exptime !== self::TTL_INDEFINITE && $exptime < ( 10 * self::TTL_YEAR ) );
836 if ( $exptime == self::TTL_INDEFINITE ) {
840 return $this->isRelativeExpiration( $exptime )
841 ? intval( $this->getCurrentTime() + $exptime )
860 if ( $exptime == self::TTL_INDEFINITE ) {
864 return $this->isRelativeExpiration( $exptime )
866 : (int)max( $exptime - $this->getCurrentTime(), 1 );
876 if ( is_int( $value ) ) {
878 } elseif ( !is_string( $value ) ) {
882 $integer = (int)$value;
884 return ( $value === (
string)$integer );
889 foreach (
$args as $arg ) {
890 $key .=
':' . str_replace(
':',
'%3A', $arg );
892 return strtr( $key,
' ',
'_' );
904 return $this->makeKeyInternal(
'global', func_get_args() );
915 public function makeKey( $class, ...$components ) {
916 return $this->makeKeyInternal( $this->keyspace, func_get_args() );
925 return $this->attrMap[$flag] ?? self::QOS_UNKNOWN;
929 return $this->segmentationSize;
933 return $this->segmentedValueMaxSize;
942 return is_int( $value ) ? $value :
serialize( $value );
951 return $this->isInteger( $value ) ? (int)$value :
unserialize( $value );
958 if ( $this->debugMode ) {
959 $this->logger->debug(
"{class} debug: $text", [
'class' => static::class ] );