24use Wikimedia\AtEase\AtEase;
25use Wikimedia\ObjectFactory;
33use Wikimedia\ScopedCallback;
34use Wikimedia\Timestamp\ConvertibleTimestamp;
35use Wikimedia\WaitConditionLoop;
87 private const SHARD_LOCAL =
'local';
89 private const SHARD_GLOBAL =
'global';
128 parent::__construct( $params );
130 $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
132 if ( isset( $params[
'servers'] ) || isset( $params[
'server'] ) ) {
134 foreach ( ( $params[
'servers'] ?? [ $params[
'server'] ] ) as $tag => $info ) {
135 $this->serverInfos[$index] = $info;
136 $this->serverTags[$index] = is_string( $tag ) ? $tag :
"#$index";
140 $this->numServerShards = count( $this->serverInfos );
141 $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
143 if ( isset( $params[
'localKeyLB'] ) ) {
144 $this->localKeyLb = ( $params[
'localKeyLB'] instanceof
ILoadBalancer )
145 ? $params[
'localKeyLB']
146 : ObjectFactory::getObjectFromSpec( $params[
'localKeyLB'] );
148 if ( isset( $params[
'globalKeyLB'] ) ) {
149 $this->globalKeyLb = ( $params[
'globalKeyLB'] instanceof
ILoadBalancer )
150 ? $params[
'globalKeyLB']
151 : ObjectFactory::getObjectFromSpec( $params[
'globalKeyLB'] );
154 if ( !$this->localKeyLb ) {
155 throw new InvalidArgumentException(
156 "Config requires 'server', 'servers', or 'localKeyLB'/'globalKeyLB'"
161 $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE;
163 if ( isset( $params[
'purgePeriod'] ) ) {
164 $this->purgePeriod = intval( $params[
'purgePeriod'] );
166 if ( isset( $params[
'purgeLimit'] ) ) {
167 $this->purgeLimit = intval( $params[
'purgeLimit'] );
169 if ( isset( $params[
'tableName'] ) ) {
170 $this->tableName = $params[
'tableName'];
172 if ( isset( $params[
'shards'] ) ) {
173 $this->numTableShards = intval( $params[
'shards'] );
175 $this->replicaOnly = $params[
'replicaOnly'] ??
false;
189 isset( $this->connFailureErrors[$shardIndex] ) &&
190 ( $this->
getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
192 throw $this->connFailureErrors[$shardIndex];
195 if ( $shardIndex === self::SHARD_LOCAL ) {
197 } elseif ( $shardIndex === self::SHARD_GLOBAL ) {
199 } elseif ( is_int( $shardIndex ) ) {
200 if ( isset( $this->serverInfos[$shardIndex] ) ) {
201 $server = $this->serverInfos[$shardIndex];
204 throw new UnexpectedValueException(
"Invalid server index #$shardIndex" );
207 throw new UnexpectedValueException(
"Invalid server index '$shardIndex'" );
219 if ( $this->serverTags ) {
221 if ( count( $this->serverTags ) == 1 ) {
225 ArrayUtils::consistentHashSort( $sortedServers, $key );
226 reset( $sortedServers );
227 $shardIndex = key( $sortedServers );
236 if ( $this->numTableShards > 1 ) {
237 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
252 if ( $index !==
null && $this->numTableShards > 1 ) {
253 $decimals = strlen( $this->numTableShards - 1 );
255 return $this->tableName . sprintf(
"%0{$decimals}d", $index );
261 protected function doGet( $key, $flags = 0, &$casToken =
null ) {
265 if ( array_key_exists( $key, $blobs ) ) {
266 $blob = $blobs[$key];
269 $casToken = ( $value !== false ) ?
$blob :
null;
281 foreach ( $blobs as $key =>
$blob ) {
291 $keysByTableByShardIndex = [];
292 foreach (
$keys as $key ) {
294 $keysByTableByShardIndex[$shardIndex][
$tableName][] = $key;
298 foreach ( $keysByTableByShardIndex as $shardIndex => $serverKeys ) {
301 foreach ( $serverKeys as
$tableName => $tableKeys ) {
304 [
'keyname',
'value',
'exptime' ],
305 [
'keyname' => $tableKeys ],
310 $db->trxLevel() ? [
'LOCK IN SHARE MODE' ] : []
312 if (
$res ===
false ) {
315 foreach (
$res as $row ) {
316 $row->shardIndex = $shardIndex;
318 $dataRows[$row->keyname] = $row;
326 foreach (
$keys as $key ) {
327 if ( isset( $dataRows[$key] ) ) {
328 $row = $dataRows[$key];
329 $this->
debug(
"get: retrieved data; expiry time is " . $row->exptime );
333 if ( $this->
isExpired( $db, $row->exptime ) ) {
334 $this->
debug(
"get: key has expired" );
336 $values[$key] = $db->decodeBlob( $row->value );
342 $this->
debug(
'get: no matching rows' );
349 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
350 return $this->
modifyMulti( $data, $exptime, $flags, self::$OP_SET );
360 private function modifyMulti( array $data, $exptime, $flags, $op ) {
361 $keysByTableByShardIndex = [];
362 foreach ( $data as $key => $value ) {
364 $keysByTableByShardIndex[$shardIndex][
$tableName][] = $key;
372 foreach ( $keysByTableByShardIndex as $shardIndex => $serverKeys ) {
377 $dbExpiry = $exptime ? $db->timestamp( $exptime ) : $this->
getMaxDateTime( $db );
384 foreach ( $serverKeys as
$tableName => $tableKeys ) {
403 foreach ( $keysByTableByShardIndex as $shardIndex => $unused ) {
422 private function updateTable( $op, $db, $table, $tableKeys, $data, $dbExpiry ) {
425 if ( $op === self::$OP_ADD ) {
427 foreach ( $tableKeys as $key ) {
430 'value' => $db->encodeBlob( $this->
serialize( $data[$key] ) ),
431 'exptime' => $dbExpiry
437 'keyname' => $tableKeys,
438 'exptime <= ' . $db->addQuotes( $db->timestamp() )
442 $db->insert( $table, $rows, __METHOD__, [
'IGNORE' ] );
444 $success = ( $db->affectedRows() == count( $rows ) );
445 } elseif ( $op === self::$OP_SET ) {
447 foreach ( $tableKeys as $key ) {
450 'value' => $db->encodeBlob( $this->
serialize( $data[$key] ) ),
451 'exptime' => $dbExpiry
454 $db->replace( $table,
'keyname', $rows, __METHOD__ );
455 } elseif ( $op === self::$OP_DELETE ) {
456 $db->delete( $table, [
'keyname' => $tableKeys ], __METHOD__ );
457 } elseif ( $op === self::$OP_TOUCH ) {
460 [
'exptime' => $dbExpiry ],
462 'keyname' => $tableKeys,
463 'exptime > ' . $db->addQuotes( $db->timestamp() )
468 $success = ( $db->affectedRows() == count( $tableKeys ) );
470 throw new InvalidArgumentException(
"Invalid operation '$op'" );
476 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
477 return $this->
modifyMulti( [ $key => $value ], $exptime, $flags, self::$OP_SET );
480 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
481 return $this->
modifyMulti( [ $key => $value ], $exptime, $flags, self::$OP_ADD );
484 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
499 'value' => $db->encodeBlob( $this->serialize( $value ) ),
500 'exptime' => $exptime
501 ? $db->timestamp( $exptime )
506 'value' => $db->encodeBlob( $casToken ),
507 'exptime > ' . $db->addQuotes( $db->timestamp() )
517 $success = (bool)$db->affectedRows();
527 array_fill_keys(
$keys,
null ),
535 return $this->
modifyMulti( [ $key =>
null ], 0, $flags, self::$OP_DELETE );
538 public function incr( $key, $step = 1, $flags = 0 ) {
547 $encTimestamp = $db->addQuotes( $db->timestamp() );
550 [
'value = value + ' . (
int)$step ],
551 [
'keyname' => $key,
"exptime > $encTimestamp" ],
554 if ( $db->affectedRows() > 0 ) {
555 $newValue = $db->selectField(
558 [
'keyname' => $key,
"exptime > $encTimestamp" ],
562 $newCount = (int)$newValue;
572 public function decr( $key, $value = 1, $flags = 0 ) {
573 return $this->
incr( $key, -$value, $flags );
578 array_fill_keys(
$keys,
null ),
586 return $this->
modifyMulti( [ $key =>
null ], $exptime, $flags, self::$OP_TOUCH );
597 ConvertibleTimestamp::convert( TS_UNIX, $exptime ) < $this->
getCurrentTime()
607 return $db->timestamp( 1 << 62 );
609 return $db->timestamp( 0x7fffffff );
620 $this->purgePeriod &&
622 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
624 ( $this->
getCurrentTime() - $this->lastGarbageCollect ) > self::$GC_DELAY_SEC
626 $garbageCollector =
function () use ( $db ) {
632 $this->lastGarbageCollect = time();
634 if ( $this->asyncHandler ) {
649 callable $progress =
null,
656 shuffle( $shardIndexes );
660 $keysDeletedCount = 0;
661 foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
696 $serversDoneCount = 0,
697 &$keysDeletedCount = 0
699 $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
700 $tableIndexes = range( 0, $this->numTableShards - 1 );
701 shuffle( $tableIndexes );
703 foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
709 [
'keyname',
'exptime' ],
712 $continue ? [
'exptime >= ' . $db->
addQuotes( $continue ) ] : []
715 [
'LIMIT' => min( $limit, 100 ),
'ORDER BY' =>
'exptime' ]
718 if (
$res->numRows() ) {
719 $row =
$res->current();
720 if ( $lag ===
null ) {
721 $rowExpUnix = ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
722 $lag = max( $cutoffUnix - $rowExpUnix, 1 );
726 foreach (
$res as $row ) {
727 $keys[] = $row->keyname;
728 $continue = $row->exptime;
742 if ( is_callable( $progressCallback ) ) {
744 $continueUnix = ConvertibleTimestamp::convert( TS_UNIX, $continue );
745 $remainingLag = $cutoffUnix - $continueUnix;
746 $processedLag = max( $lag - $remainingLag, 0 );
748 ( $numShardsDone + $processedLag / $lag ) / $this->numTableShards;
754 + ( $serversDoneCount / $this->numServerShards );
755 call_user_func( $progressCallback, $overallRatio * 100 );
757 }
while (
$res->numRows() && $keysDeletedCount < $limit );
784 public function lock( $key, $timeout = 6, $expiry = 6, $rclass =
'' ) {
786 if ( isset( $this->locks[$key] ) ) {
787 if ( $rclass !=
'' && $this->locks[$key][
'class'] === $rclass ) {
788 ++$this->locks[$key][
'depth'];
800 $ok = $db->lock( $key, __METHOD__, $timeout );
802 $this->locks[$key] = [
'class' => $rclass,
'depth' => 1 ];
805 $this->logger->warning(
806 __METHOD__ .
" failed due to timeout for {key}.",
807 [
'key' => $key,
'timeout' => $timeout ]
820 if ( !isset( $this->locks[$key] ) ) {
824 if ( --$this->locks[$key][
'depth'] <= 0 ) {
825 unset( $this->locks[$key] );
832 $ok = $db->unlock( $key, __METHOD__ );
834 $this->logger->warning(
835 __METHOD__ .
' failed to release lock for {key}.',
865 foreach (
$args as &$arg ) {
866 $arg = strtr( $arg, [
872 if ( $charsLeft > 33 && strlen( $arg ) > $charsLeft ) {
873 $arg =
'#' . md5( $arg );
875 $charsLeft -= strlen( $arg );
878 if ( $charsLeft < 0 ) {
879 return $keyspace .
':BagOStuff-long-key:##' . md5( implode(
':',
$args ) );
893 if ( is_int( $data ) ) {
898 if ( function_exists(
'gzdeflate' ) ) {
899 $serial = gzdeflate( $serial );
915 if ( function_exists(
'gzinflate' ) ) {
916 AtEase::suppressWarnings();
917 $decomp = gzinflate( $serial );
918 AtEase::restoreWarnings();
920 if ( $decomp !==
false ) {
935 if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
941 $conn = $lb->getMaintenanceConnectionRef(
944 $lb::CONN_TRX_AUTOCOMMIT
958 if ( !isset( $this->conns[$shardIndex] ) ) {
960 $conn = Database::factory( $server[
'type'], array_merge(
963 'flags' => ( $server[
'flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
964 'connLogger' => $this->logger,
965 'queryLogger' => $this->logger
969 if ( $conn->getType() ===
'sqlite' && !$conn->tableExists(
'objectcache', __METHOD__ ) ) {
972 $this->conns[$shardIndex] = $conn;
975 return $this->conns[$shardIndex];
1012 $this->logger->error(
"DBError: {$exception->getMessage()}" );
1015 $this->logger->debug( __METHOD__ .
": ignoring connection error" );
1018 $this->logger->debug( __METHOD__ .
": ignoring query error" );
1029 unset( $this->conns[$shardIndex] );
1032 if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1033 if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1034 unset( $this->connFailureTimes[$shardIndex] );
1035 unset( $this->connFailureErrors[$shardIndex] );
1037 $this->logger->debug( __METHOD__ .
": Server #$shardIndex already down" );
1041 $this->logger->info( __METHOD__ .
": Server #$shardIndex down until " . ( $now + 60 ) );
1042 $this->connFailureTimes[$shardIndex] = $now;
1043 $this->connFailureErrors[$shardIndex] = $exception;
1051 if ( $db->
tableExists(
'objectcache', __METHOD__ ) ) {
1055 $db->
query(
"PRAGMA journal_mode=WAL", __METHOD__ );
1058 $encTable = $db->
tableName(
'objectcache' );
1061 "CREATE TABLE $encTable (\n" .
1062 " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1068 $db->
query(
"CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1082 if ( in_array( $db->getType(), [
'mysql',
'postgres' ],
true ) ) {
1084 $encBaseTable = $db->tableName(
'objectcache' );
1086 $db->query(
"CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1096 if ( $this->serverTags ) {
1098 $shardIndexes = range( 0, $this->numServerShards - 1 );
1102 if ( $this->localKeyLb ) {
1103 $shardIndexes[] = self::SHARD_LOCAL;
1105 if ( $this->globalKeyLb ) {
1106 $shardIndexes[] = self::SHARD_GLOBAL;
1110 return $shardIndexes;
1120 if ( is_int( $shardIndex ) ) {
1124 $lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb :
$this->globalKeyLb;
1125 if ( !$lb->hasStreamingReplicaServers() ) {
1132 if ( !$masterPos ) {
1136 $loop =
new WaitConditionLoop(
1137 function () use ( $lb, $masterPos ) {
1138 return $lb->waitForAll( $masterPos, 1 );
1144 return ( $loop->invoke() === $loop::CONDITION_REACHED );
1158 if ( $this->serverInfos ) {
1162 $trxProfiler = Profiler::instance()->getTransactionProfiler();
1163 $oldSilenced = $trxProfiler->setSilenced(
true );
1164 return new ScopedCallback(
function () use ( $trxProfiler, $oldSilenced ) {
1165 $trxProfiler->setSilenced( $oldSilenced );
fieldHasFlags( $field, $flags)
callable null $asyncHandler
Storage medium specific cache for storing items (e.g.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
callable[] $busyCallbacks
isInteger( $value)
Check if a value is an integer.
setLastError( $err)
Set the "last error" registry.
Class to store objects in the database.
doDelete( $key, $flags=0)
Delete an item.
handleWriteError(DBError $exception, $db, $shardIndex)
Handle a DBQueryError which occurred during a write operation.
serialize( $data)
Serialize an object and, if possible, compress the representation.
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
string[] $serverTags
(server index => tag/host name)
doSetMulti(array $data, $exptime=0, $flags=0)
makeKeyInternal( $keyspace, $args)
Construct a cache key.
createTables()
Create the shard tables on all databases (e.g.
isExpired(IDatabase $db, $exptime)
getConnectionViaLoadBalancer( $shardIndex)
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF)
Delete all objects expiring before a certain date.
doChangeTTL( $key, $exptime, $flags)
modifyMulti(array $data, $exptime, $flags, $op)
handleReadError(DBError $exception, $shardIndex)
Handle a DBError which occurred during a read operation.
waitForReplication( $shardIndex)
Wait for replica DBs to catch up to the master DB.
array[] $serverInfos
(server index => server config)
deleteAll()
Delete content of shard tables in every server.
doDeleteMulti(array $keys, $flags=0)
getConnectionFromServerInfo( $shardIndex, array $server)
__construct( $params)
Constructor.
int $lastGarbageCollect
UNIX timestamp.
markServerDown(DBError $exception, $shardIndex)
Mark a server down due to a DBConnectionError exception.
doGet( $key, $flags=0, &$casToken=null)
updateTable( $op, $db, $table, $tableKeys, $data, $dbExpiry)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
deleteServerObjectsExpiringBefore(IDatabase $db, $timestamp, $progressCallback, $limit, $serversDoneCount=0, &$keysDeletedCount=0)
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
int $numServerShards
Number of database servers shards (e.g.
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
ILoadBalancer null $localKeyLb
silenceTransactionProfiler()
Silence the transaction profiler until the return value falls out of scope.
unlock( $key)
Release an advisory lock on a key string.
getConnection( $shardIndex)
Get a connection to the specified database.
fetchBlobMulti(array $keys)
ILoadBalancer null $globalKeyLb
Exception[] $connFailureErrors
Map of (shard index => Exception)
changeTTLMulti(array $keys, $exptime, $flags=0)
Change the expiration of multiple keys that exist.
setAndLogDBError(DBError $exception)
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
unserialize( $serial)
Unserialize and, if necessary, decompress an object.
getKeyLocation( $key)
Get the server index and table name for a given key.
getTableNameByShard( $index)
Get the table name for a given shard index.
initSqliteDatabase(IMaintainableDatabase $db)
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Check and set an item.
occasionallyGarbageCollect(IDatabase $db)
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
incr( $key, $step=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
Advanced database interface for IDatabase handles that include maintenance methods.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.