24use Wikimedia\AtEase\AtEase;
25use Wikimedia\ObjectFactory;
34use Wikimedia\ScopedCallback;
35use Wikimedia\Timestamp\ConvertibleTimestamp;
36use Wikimedia\WaitConditionLoop;
160 parent::__construct( $params );
163 if ( isset( $params[
'servers'] ) || isset( $params[
'server'] ) ) {
167 foreach ( ( $params[
'servers'] ?? [ $params[
'server'] ] ) as $tag => $info ) {
168 $this->serverInfos[$index] = $info;
170 $this->serverTags[$index] = is_string( $tag ) ? $tag :
"#$index";
171 $dbType = $info[
'type'];
177 if ( isset( $params[
'globalKeyLB'] ) ) {
178 $this->globalKeyLb = ( $params[
'globalKeyLB'] instanceof
ILoadBalancer )
179 ? $params[
'globalKeyLB']
180 : ObjectFactory::getObjectFromSpec( $params[
'globalKeyLB'] );
182 if ( isset( $params[
'localKeyLB'] ) ) {
183 $this->localKeyLb = ( $params[
'localKeyLB'] instanceof
ILoadBalancer )
184 ? $params[
'localKeyLB']
185 : ObjectFactory::getObjectFromSpec( $params[
'localKeyLB'] );
190 if ( !$this->localKeyLb ) {
191 throw new InvalidArgumentException(
192 "Config requires 'server', 'servers', or 'localKeyLB'/'globalKeyLB'"
197 $this->purgePeriod = intval( $params[
'purgePeriod'] ?? $this->purgePeriod );
198 $this->purgeLimit = intval( $params[
'purgeLimit'] ?? $this->purgeLimit );
200 $this->numTableShards = intval( $params[
'shards'] ?? $this->numTableShards );
201 $this->writeBatchSize = intval( $params[
'writeBatchSize'] ?? $this->writeBatchSize );
202 $this->replicaOnly = $params[
'replicaOnly'] ??
false;
204 if ( $params[
'multiPrimaryMode'] ??
false ) {
205 if ( $dbType !==
'mysql' ) {
206 throw new InvalidArgumentException(
"Multi-primary mode only supports MySQL" );
209 $this->multiPrimaryModeType = $dbType;
216 protected function doGet( $key, $flags = 0, &$casToken =
null ) {
217 $getToken = ( $casToken === self::PASS_BY_REF );
220 $data = $this->
fetchBlobs( [ $key ], $getToken )[$key];
222 $result = $this->
unserialize( $data[self::BLOB_VALUE] );
223 if ( $getToken && $result !==
false ) {
226 $valueSize = strlen( $data[self::BLOB_VALUE] );
232 $this->
updateOpStats( self::METRIC_OP_GET, [ $key => [
null, $valueSize ] ] );
237 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
241 [ $this,
'modifyTableSpecificBlobsForSet' ],
243 [ $key => [ $value, $exptime ] ],
252 [ $this,
'modifyTableSpecificBlobsForDelete' ],
259 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
261 if ( $mtime ===
null ) {
267 [ $this,
'modifyTableSpecificBlobsForAdd' ],
269 [ $key => [ $value, $exptime ] ],
274 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
276 if ( $mtime ===
null ) {
282 [ $this,
'modifyTableSpecificBlobsForCas' ],
284 [ $key => [ $value, $exptime, $casToken ] ],
293 [ $this,
'modifyTableSpecificBlobsForChangeTTL' ],
295 [ $key => [ $exptime ] ],
300 public function incrWithInit( $key, $exptime, $value = 1, $init =
null, $flags = 0 ) {
301 $value = (int)$value;
302 $init = is_int( $init ) ? $init : $value;
307 [ $this,
'modifyTableSpecificBlobsForIncrInit' ],
309 [ $key => [ $value, $init, $exptime ] ],
312 ) ? $resByKey[$key] :
false;
317 public function incr( $key, $value = 1, $flags = 0 ) {
318 return $this->
doIncr( $key, $value, $flags );
321 public function decr( $key, $value = 1, $flags = 0 ) {
322 return $this->
doIncr( $key, -$value, $flags );
325 private function doIncr( $key, $value = 1, $flags = 0 ) {
327 if ( $mtime ===
null ) {
335 if ( $this->
isInteger( $serialValue ) ) {
336 $newValue = max( (
int)$serialValue + (
int)$value, 0 );
338 [ $this,
'modifyTableSpecificBlobsForSet' ],
341 [ $key => [ $newValue, $data[self::BLOB_EXPIRY] ] ],
343 ) ? $newValue :
false;
346 $this->logger->warning( __METHOD__ .
": $key is a non-integer" );
350 $this->logger->debug( __METHOD__ .
": $key does not exists" );
358 $valueSizeByKey = [];
361 foreach (
$keys as $key ) {
362 $data = $dataByKey[$key];
366 if ( $value !==
false ) {
367 $result[$key] = $value;
369 $valueSize = strlen( $serialValue );
373 $valueSizeByKey[$key] = [
null, $valueSize ];
376 $this->
updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
381 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
385 [ $this,
'modifyTableSpecificBlobsForSet' ],
388 static function ( $value ) use ( $exptime ) {
389 return [ $value, $exptime ];
401 [ $this,
'modifyTableSpecificBlobsForDelete' ],
403 array_fill_keys(
$keys, [] ),
412 [ $this,
'modifyTableSpecificBlobsForChangeTTL' ],
414 array_fill_keys(
$keys, [ $exptime ] ),
430 isset( $this->connFailureErrors[$shardIndex] ) &&
431 ( $this->
getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
433 throw $this->connFailureErrors[$shardIndex];
436 if ( $shardIndex === self::SHARD_LOCAL ) {
438 } elseif ( $shardIndex === self::SHARD_GLOBAL ) {
440 } elseif ( is_int( $shardIndex ) ) {
441 if ( isset( $this->serverInfos[$shardIndex] ) ) {
442 $server = $this->serverInfos[$shardIndex];
445 throw new UnexpectedValueException(
"Invalid server index #$shardIndex" );
448 throw new UnexpectedValueException(
"Invalid server index '$shardIndex'" );
460 if ( $this->serverTags ) {
462 if ( count( $this->serverTags ) == 1 ) {
466 ArrayUtils::consistentHashSort( $sortedServers, $key );
467 reset( $sortedServers );
468 $shardIndex = key( $sortedServers );
477 if ( $this->numTableShards > 1 ) {
478 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
493 if ( $index !==
null && $this->numTableShards > 1 ) {
494 $decimals = strlen( $this->numTableShards - 1 );
496 return $this->tableName . sprintf(
"%0{$decimals}d", $index );
512 $dataByKey = array_fill_keys(
$keys,
null );
515 $keysByTableByShard = [];
516 foreach (
$keys as $key ) {
517 list( $shardIndex, $partitionTable ) = $this->
getKeyLocation( $key );
518 $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
521 foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
524 foreach ( $serverKeys as $partitionTable => $tableKeys ) {
529 : [
'keyname',
'value',
'exptime' ],
533 foreach (
$res as $row ) {
534 $row->shardIndex = $shardIndex;
535 $row->tableName = $partitionTable;
536 $dataByKey[$row->keyname] = $row;
544 foreach (
$keys as $key ) {
545 $row = $dataByKey[$key] ??
null;
550 $this->
debug( __METHOD__ .
": retrieved $key; expiry time is {$row->exptime}" );
556 self::BLOB_CASTOKEN => $getCasToken
583 callable $tableWriteCallback,
590 $resByKey = array_fill_keys( array_keys( $argsByKey ),
false );
595 $argsByKeyByTableByShard = [];
596 foreach ( $argsByKey as $key =>
$args ) {
597 list( $shardIndex, $partitionTable ) = $this->
getKeyLocation( $key );
598 $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] =
$args;
601 $shardIndexesAffected = [];
602 foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
603 foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
606 $shardIndexesAffected[] = $shardIndex;
607 $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
615 $success = !in_array(
false, $resByKey,
true );
618 foreach ( $shardIndexesAffected as $shardIndex ) {
625 foreach ( $shardIndexesAffected as $shardIndex ) {
659 $valueSizesByKey = [];
663 if ( $this->multiPrimaryModeType !==
null ) {
665 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
675 $resByKey[$key] =
true;
677 $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
682 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
685 $rows[] = $this->
buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
687 $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
689 $db->
replace( $ptable,
'keyname', $rows, __METHOD__ );
690 foreach ( $argsByKey as $key => $unused ) {
691 $resByKey[$key] =
true;
695 $this->
updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
726 foreach ( $argsByKey as $key => $arg ) {
727 $rows[] = $this->
buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
738 $db->
delete( $ptable, [
'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
741 foreach ( $argsByKey as $key => $arg ) {
742 $resByKey[$key] =
true;
745 $this->
updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
775 $valueSizesByKey = [];
786 $existingByKey = array_fill_keys( $existingKeys,
true );
789 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
790 if ( isset( $existingByKey[$key] ) ) {
791 $this->logger->debug( __METHOD__ .
": $key already exists" );
804 $resByKey[$key] =
true;
806 $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
809 $this->
updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
839 $valueSizesByKey = [];
851 $curTokensByKey = [];
852 foreach (
$res as $row ) {
857 foreach ( $argsByKey as $key => list( $value, $exptime, $casToken ) ) {
858 $curToken = $curTokensByKey[$key] ??
null;
859 if ( $curToken ===
null ) {
860 $this->logger->debug( __METHOD__ .
": $key does not exists" );
864 if ( $curToken !== $casToken ) {
865 $this->logger->debug( __METHOD__ .
": $key does not have a matching token" );
878 $resByKey[$key] =
true;
880 $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
883 $this->
updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
917 [
'keyname',
'value' ],
922 foreach (
$res as $curRow ) {
923 $key = $curRow->keyname;
925 list( $exptime ) = $argsByKey[$key];
935 $resByKey[$key] =
true;
938 $keysBatchesByExpiry = [];
939 foreach ( $argsByKey as $key => list( $exptime ) ) {
941 $keysBatchesByExpiry[$expiry][] = $key;
945 foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
954 if ( $existingCount === count( $argsByKey ) ) {
955 foreach ( $argsByKey as $key =>
$args ) {
956 $resByKey[$key] =
true;
961 $this->
updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
990 foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) {
1007 $row = $db->
selectRow( $ptable,
'value', [
'keyname' => $key ], __METHOD__ );
1010 if ( !$affectedCount || $row ===
false ) {
1011 $this->logger->warning( __METHOD__ .
": failed to set new $key value" );
1016 if ( !$this->
isInteger( $serialValue ) ) {
1017 $this->logger->warning( __METHOD__ .
": got non-integer $key value" );
1021 $resByKey[$key] = (int)$serialValue;
1024 $this->
updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
1037 if ( $expiry !== self::TTL_INDEFINITE ) {
1038 $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1063 if ( !$this->
lock( $key, 0 ) ) {
1067 $scope =
new ScopedCallback(
function () use ( $key ) {
1071 return sprintf(
'%.6f', $this->locks[$key][self::LOCK_TIME] );
1089 $seconds = (int)$mtime;
1090 list( , $microseconds ) = explode(
'.', sprintf(
'%.6f', $mtime ) );
1094 $token = implode(
'', [
1098 str_pad( base_convert( $seconds, 10, 2 ), 35,
'0', STR_PAD_LEFT ) .
1100 str_pad( base_convert( $id, 10, 2 ), 32,
'0', STR_PAD_LEFT ),
1106 str_pad( base_convert( $microseconds, 10, 36 ), 4,
'0', STR_PAD_LEFT )
1109 if ( strlen( $token ) !== 17 ) {
1110 throw new RuntimeException(
"Modification timestamp overflow detected" );
1155 $row[
'modtoken'] = $mt;
1176 $expressionsByColumn = [
1189 $expressionsByColumn[
'modtoken'] = $db->
addQuotes( $mt );
1190 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1192 $db->
addQuotes( substr( $mt, 0, 13 ) ) .
' >= SUBSTR(modtoken,0,13)',
1196 $set[] =
"{$column}=" . trim( $rhs );
1199 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1200 $set[] =
"{$column}={$updateExpression}";
1227 $expressionsByColumn = [
1238 $expressionsByColumn[
'modtoken'] = [
'modtoken', $db->
addQuotes( $mt ) ];
1242 foreach ( $expressionsByColumn as $column => list( $updateExpression, $initExpression ) ) {
1248 $set[] =
"{$column}=" . trim( $rhs );
1260 return ( $expiry === self::TTL_INDEFINITE )
1262 ? $db->
timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1273 return ( $dbExpiry === $db->
timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1274 ? self::TTL_INDEFINITE
1275 : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1284 return is_int( $serialValue ) ? $serialValue : $db->
encodeBlob( $serialValue );
1306 if (
$type ===
'mysql' ) {
1312 } elseif (
$type ===
'postgres' ) {
1319 if ( !in_array(
'value', $fields,
true ) ) {
1320 $fields[] =
'value';
1322 if ( !in_array(
'exptime', $fields,
true ) ) {
1323 $fields[] =
'exptime';
1338 if ( isset( $row->castoken ) ) {
1339 $token = $row->castoken;
1342 $this->logger->debug( __METHOD__ .
": application computed hash for CAS token" );
1355 $this->purgePeriod &&
1357 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1359 ( $this->
getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1361 $garbageCollector =
function () use ( $db ) {
1367 $this->lastGarbageCollect = time();
1369 if ( $this->asyncHandler ) {
1373 $garbageCollector();
1384 callable $progress =
null,
1391 if ( $tag !==
null ) {
1396 shuffle( $shardIndexes );
1400 $numServers = count( $shardIndexes );
1402 $keysDeletedCount = 0;
1403 foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1411 [
'fn' => $progress,
'serversDone' => $numServersDone,
'serversTotal' => $numServers ]
1434 &$keysDeletedCount = 0,
1435 array $progress =
null
1437 $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1448 $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1450 $tableIndexes = range( 0, $this->numTableShards - 1 );
1451 shuffle( $tableIndexes );
1453 $batchSize = min( $this->writeBatchSize, $limit );
1455 foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1462 $totalSeconds =
null;
1467 [
'keyname',
'exptime' ],
1470 $maxExp ? [
'exptime >= ' . $db->
addQuotes( $maxExp ) ] : []
1473 [
'LIMIT' => $batchSize,
'ORDER BY' =>
'exptime ASC' ]
1476 if (
$res->numRows() ) {
1477 $row =
$res->current();
1478 if ( $minExpUnix ===
null ) {
1479 $minExpUnix = ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1480 $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1484 foreach (
$res as $row ) {
1485 $keys[] = $row->keyname;
1486 $maxExp = $row->exptime;
1500 if ( $progress && is_callable( $progress[
'fn'] ) ) {
1501 if ( $totalSeconds ) {
1502 $maxExpUnix = ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1503 $remainingSeconds = $cutoffUnix - $maxExpUnix;
1504 $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1511 $tablesDoneRatio = 1;
1516 $overallRatio = ( $progress[
'serversDone'] / $progress[
'serversTotal'] ) +
1517 ( $tablesDoneRatio / $progress[
'serversTotal'] );
1518 ( $progress[
'fn'] )( $overallRatio * 100 );
1520 }
while (
$res->numRows() && $keysDeletedCount < $limit );
1547 public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1556 $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1559 $this->logger->warning(
1560 __METHOD__ .
' failed due to I/O error for {key}.',
1576 $released = $db->unlock( $key, __METHOD__ );
1591 $charsLeft = 205 - strlen(
$keyspace ) - count( $components );
1592 foreach ( $components as &$component ) {
1593 $component = strtr( $component, [
1599 if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1600 $component =
'#' . md5( $component );
1602 $charsLeft -= strlen( $component );
1605 if ( $charsLeft < 0 ) {
1606 return $keyspace .
':BagOStuff-long-key:##' . md5( implode(
':', $components ) );
1608 return $keyspace .
':' . implode(
':', $components );
1612 if ( is_int( $value ) ) {
1617 if ( function_exists(
'gzdeflate' ) ) {
1619 $serial = gzdeflate( $serial );
1626 if ( $value === self::TOMB_SERIAL ) {
1634 if ( function_exists(
'gzinflate' ) ) {
1635 AtEase::suppressWarnings();
1636 $decompressed = gzinflate( $value );
1637 AtEase::restoreWarnings();
1639 if ( $decompressed !==
false ) {
1640 $value = $decompressed;
1654 if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1660 $conn = $lb->getMaintenanceConnectionRef(
1663 $lb::CONN_TRX_AUTOCOMMIT
1677 if ( !isset( $this->conns[$shardIndex] ) ) {
1679 $conn = Database::factory(
1685 'flags' => ( $server[
'flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1686 'connLogger' => $this->logger,
1687 'queryLogger' => $this->logger
1692 if ( $conn->getType() ===
'sqlite' ) {
1693 if ( !$conn->tableExists(
'objectcache', __METHOD__ ) ) {
1697 $this->conns[$shardIndex] = $conn;
1700 return $this->conns[$shardIndex];
1721 $this->logger->error(
"DBError: {$exception->getMessage()}" );
1724 $this->logger->warning( __METHOD__ .
": ignoring connection error" );
1727 $this->logger->warning( __METHOD__ .
": ignoring query error" );
1738 unset( $this->conns[$shardIndex] );
1741 if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1742 if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1743 unset( $this->connFailureTimes[$shardIndex] );
1744 unset( $this->connFailureErrors[$shardIndex] );
1746 $this->logger->debug( __METHOD__ .
": Server #$shardIndex already down" );
1750 $this->logger->info( __METHOD__ .
": Server #$shardIndex down until " . ( $now + 60 ) );
1751 $this->connFailureTimes[$shardIndex] = $now;
1752 $this->connFailureErrors[$shardIndex] = $exception;
1760 if ( $db->
tableExists(
'objectcache', __METHOD__ ) ) {
1764 $db->
query(
"PRAGMA journal_mode=WAL", __METHOD__ );
1767 $encTable = $db->
tableName(
'objectcache' );
1770 "CREATE TABLE $encTable (\n" .
1771 " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1773 " exptime BLOB NOT NULL\n" .
1777 $db->
query(
"CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1791 if ( in_array( $db->getType(), [
'mysql',
'postgres' ],
true ) ) {
1793 $encBaseTable = $db->tableName(
'objectcache' );
1795 $db->query(
"CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1805 if ( $this->serverTags ) {
1807 $shardIndexes = array_keys( $this->serverTags );
1811 if ( $this->localKeyLb ) {
1814 if ( $this->globalKeyLb ) {
1819 return $shardIndexes;
1828 if ( !$this->serverTags ) {
1829 throw new InvalidArgumentException(
"Given a tag but no tags are configured" );
1831 foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1832 if ( $tag === $serverTag ) {
1833 return $serverShardIndex;
1836 throw new InvalidArgumentException(
"Unknown server tag: $tag" );
1843 return ( $this->multiPrimaryModeType !==
null );
1853 if ( is_int( $shardIndex ) ) {
1858 if ( !$lb->hasStreamingReplicaServers() ) {
1865 if ( !$masterPos ) {
1869 $loop =
new WaitConditionLoop(
1870 static function () use ( $lb, $masterPos ) {
1871 return $lb->waitForAll( $masterPos, 1 );
1877 return ( $loop->invoke() === $loop::CONDITION_REACHED );
1891 if ( $this->serverInfos ) {
1894 return Profiler::instance()->getTransactionProfiler()->silenceForScope();
string $keyspace
Default keyspace; used by makeKey()
fieldHasFlags( $field, $flags)
callable null $asyncHandler
Storage medium specific cache for storing items (e.g.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, using any applicable prepared value.
unlock( $key)
Release an advisory lock on a key string.
updateOpStats(string $op, array $keyInfo)
callable[] $busyCallbacks
isInteger( $value)
Check if a value is an integer.
lock( $key, $timeout=6, $exptime=6, $rclass='')
setLastError( $err)
Set the "last error" registry.
RDBMS-based caching module.
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF, string $tag=null)
Delete all objects expiring before a certain date.
decodeDbExpiry(IDatabase $db, string $dbExpiry)
modifyTableSpecificBlobsForChangeTTL(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Update the TTL for keys belonging to a partition table on the the given server.
doDelete( $key, $flags=0)
Delete an item.
buildUpsertSetForOverwrite(IDatabase $db, $serialValue, int $expiry, string $mt)
SET array for handling key overwrites when a live or stale key exists.
const INF_TIMESTAMP_PLACEHOLDER
Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types.
string[] $serverTags
(server index => tag/host name)
doSetMulti(array $data, $exptime=0, $flags=0)
makeNewKeyExpiry( $exptime, int $nowTsUnix)
buildUpsertRow(IDatabase $db, $key, $serialValue, int $expiry, string $mt)
INSERT array for handling key writes/overwrites when no live nor stale key exists.
createTables()
Create the shard tables on all databases (e.g.
encodeDbExpiry(IDatabase $db, int $expiry)
addCasTokenFields(IDatabase $db, array $fields)
Either append a 'castoken' field or append the fields needed to compute the CAS token.
deleteServerObjectsExpiringBefore(IDatabase $db, $timestamp, $limit, &$keysDeletedCount=0, array $progress=null)
modifyTableSpecificBlobsForIncrInit(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Either increment a counter key, if it exists, or initialize it, otherwise.
modifyBlobs(callable $tableWriteCallback, float $mtime, array $argsByKey, int $flags, &$resByKey=[])
int $purgePeriod
Average number of writes required to trigger garbage collection.
getConnectionViaLoadBalancer( $shardIndex)
doChangeTTL( $key, $exptime, $flags)
incr( $key, $value=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
waitForReplication( $shardIndex)
Wait for replica DBs to catch up to the primary DB.
isMultiPrimaryModeEnabled()
array[] $serverInfos
(server index => server config)
fetchBlobs(array $keys, bool $getCasToken=false)
modifyTableSpecificBlobsForCas(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Insert key/value pairs belonging to a partition table on the the given server.
deleteAll()
Delete content of shard tables in every server.
doDeleteMulti(array $keys, $flags=0)
getConnectionFromServerInfo( $shardIndex, array $server)
string null $multiPrimaryModeType
Multi-primary mode DB type ("mysql",...); null if not enabled.
int $numTableShards
Number of table shards to use on each server.
int $purgeLimit
Max expired rows to purge during randomized garbage collection.
__construct( $params)
Create a new backend instance from configuration.
int $lastGarbageCollect
UNIX timestamp.
markServerDown(DBError $exception, $shardIndex)
Mark a server down due to a DBConnectionError exception.
doIncr( $key, $value=1, $flags=0)
modifyTableSpecificBlobsForAdd(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Insert key/value pairs belonging to a partition table on the the given server.
const SAFE_PURGE_DELAY_SEC
A number of seconds well above any expected clock skew and replication lag.
doGet( $key, $flags=0, &$casToken=null)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
const TOMB_SERIAL
Distinct string for tombstones stored in the "serialized" value column.
bool $replicaOnly
Whether to use replicas instead of primaries (if using LoadBalancer)
handleDBError(DBError $exception, $shardIndex)
Handle a DBError which occurred during a read operation.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
modifyTableSpecificBlobsForDelete(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Purge/tombstone key/value pairs belonging to a partition table on the the given server.
dbEncodeSerialValue(IDatabase $db, $serialValue)
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
ILoadBalancer null $localKeyLb
getCasTokenFromRow(IDatabase $db, stdClass $row)
Get a CAS token from a SELECT result row.
silenceTransactionProfiler()
Silence the transaction profiler until the return value falls out of scope.
makeKeyInternal( $keyspace, $components)
Make a cache key for the given keyspace and components.
getConnection( $shardIndex)
Get a connection to the specified database.
doLock( $key, $timeout=6, $exptime=6)
ILoadBalancer null $globalKeyLb
Exception[] $connFailureErrors
Map of (shard index => Exception)
setAndLogDBError(DBError $exception)
const TOMB_EXPTIME
Relative seconds-to-live to use for tombstones.
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
newLockingWriteSectionModificationTimestamp( $key, &$scope)
Get a scoped lock and modification timestamp for a critical section of reads/writes.
getShardServerIndexForTag(string $tag)
modifyTableSpecificBlobsForSet(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Set key/value pairs belonging to a partition table on the the given server.
getKeyLocation( $key)
Get the server index and table name for a given key.
getTableNameByShard( $index)
Get the table name for a given shard index.
const SAFE_CLOCK_BOUND_SEC
A number of seconds well above any expected clock skew.
incrWithInit( $key, $exptime, $value=1, $init=null, $flags=0)
Increase the value of the given key (no TTL change) if it exists or create it otherwise.
const GC_DELAY_SEC
How many seconds must pass before triggering a garbage collection.
initSqliteDatabase(IMaintainableDatabase $db)
dbDecodeSerialValue(IDatabase $db, $blob)
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Check and set an item.
occasionallyGarbageCollect(IDatabase $db)
buildExistenceConditions(IDatabase $db, $keys, string $time)
WHERE conditions that check for existence and liveness of keys.
buildIncrUpsertSet(IDatabase $db, int $step, int $init, int $expiry, string $mt, int $mtUnixTs)
SET array for handling key overwrites when a live or stale key exists.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
makeTimestampedModificationToken( $mtime, IDatabase $db)
Make a modtoken column value with the original time and source database server of a write.
Advanced database interface for IDatabase handles that include maintenance methods.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.