25use Wikimedia\AtEase\AtEase;
33use Wikimedia\ScopedCallback;
34use Wikimedia\Timestamp\ConvertibleTimestamp;
35use Wikimedia\WaitConditionLoop;
123 parent::__construct( $params );
128 if ( isset( $params[
'servers'] ) ) {
129 $this->serverInfos = [];
130 $this->serverTags = [];
131 $this->numServerShards = count( $params[
'servers'] );
133 foreach ( $params[
'servers'] as $tag => $info ) {
134 $this->serverInfos[$index] = $info;
135 if ( is_string( $tag ) ) {
136 $this->serverTags[$index] = $tag;
138 $this->serverTags[$index] = $info[
'host'] ??
"#$index";
142 } elseif ( isset( $params[
'server'] ) ) {
143 $this->serverInfos = [ $params[
'server'] ];
144 $this->numServerShards = count( $this->serverInfos );
147 $this->serverInfos = [];
148 $this->numServerShards = 1;
151 if ( isset( $params[
'purgePeriod'] ) ) {
152 $this->purgePeriod = intval( $params[
'purgePeriod'] );
154 if ( isset( $params[
'purgeLimit'] ) ) {
155 $this->purgeLimit = intval( $params[
'purgeLimit'] );
157 if ( isset( $params[
'tableName'] ) ) {
158 $this->tableName = $params[
'tableName'];
160 if ( isset( $params[
'shards'] ) ) {
161 $this->numTableShards = intval( $params[
'shards'] );
164 $this->replicaOnly = $params[
'replicaOnly'] ?? ( $params[
'slaveOnly'] ?? false );
175 if ( $shardIndex >= $this->numServerShards ) {
176 throw new MWException( __METHOD__ .
": Invalid server index \"$shardIndex\"" );
179 # Don't keep timing out trying to connect for each call if the DB is down
181 isset( $this->connFailureErrors[$shardIndex] ) &&
182 ( $this->
getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
184 throw $this->connFailureErrors[$shardIndex];
187 if ( $this->serverInfos ) {
188 if ( !isset( $this->conns[$shardIndex] ) ) {
190 $info = $this->serverInfos[$shardIndex];
191 $type = $info[
'type'] ??
'mysql';
192 $host = $info[
'host'] ??
'[unknown]';
193 $this->logger->debug( __CLASS__ .
": connecting to $host" );
194 $conn = Database::factory(
$type, $info );
196 $this->conns[$shardIndex] = $conn;
198 if ( $conn->getType() ===
'sqlite' ) {
202 $conn = $this->conns[$shardIndex];
205 $lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
210 $attribs = $lb->getServerAttributes( $lb->getWriterIndex() );
211 $flags = $attribs[Database::ATTR_DB_LEVEL_LOCKING] ? 0 : $lb::CONN_TRX_AUTOCOMMIT;
212 $conn = $lb->getMaintenanceConnectionRef( $index, [],
false, $flags );
215 $this->logger->debug( sprintf(
"Connection %s will be used for SqlBagOStuff", $conn ) );
226 if ( $this->numTableShards > 1 ) {
227 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
232 if ( $this->numServerShards > 1 ) {
234 ArrayUtils::consistentHashSort( $sortedServers, $key );
235 reset( $sortedServers );
236 $shardIndex = key( $sortedServers );
249 if ( $this->numTableShards > 1 ) {
250 $decimals = strlen( $this->numTableShards - 1 );
251 return $this->tableName .
252 sprintf(
"%0{$decimals}d", $index );
258 protected function doGet( $key, $flags = 0, &$casToken =
null ) {
262 if ( array_key_exists( $key, $blobs ) ) {
263 $blob = $blobs[$key];
266 $casToken = ( $value !== false ) ?
$blob :
null;
278 foreach ( $blobs as $key =>
$blob ) {
289 foreach (
$keys as $key ) {
291 $keysByTable[$shardIndex][
$tableName][] = $key;
295 foreach ( $keysByTable as $shardIndex => $serverKeys ) {
298 foreach ( $serverKeys as
$tableName => $tableKeys ) {
300 [
'keyname',
'value',
'exptime' ],
301 [
'keyname' => $tableKeys ],
306 $db->trxLevel() ? [
'LOCK IN SHARE MODE' ] : []
308 if (
$res ===
false ) {
311 foreach (
$res as $row ) {
312 $row->shardIndex = $shardIndex;
314 $dataRows[$row->keyname] = $row;
322 foreach (
$keys as $key ) {
323 if ( isset( $dataRows[$key] ) ) {
324 $row = $dataRows[$key];
325 $this->
debug(
"get: retrieved data; expiry time is " . $row->exptime );
329 if ( $this->
isExpired( $db, $row->exptime ) ) {
330 $this->
debug(
"get: key has expired" );
332 $values[$key] = $db->decodeBlob( $row->value );
338 $this->
debug(
'get: no matching rows' );
345 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
346 return $this->
modifyMulti( $data, $exptime, $flags, self::$OP_SET );
356 private function modifyMulti( array $data, $exptime, $flags, $op ) {
358 foreach ( $data as $key => $value ) {
360 $keysByTable[$shardIndex][
$tableName][] = $key;
368 foreach ( $keysByTable as $shardIndex => $serverKeys ) {
373 $dbExpiry = $exptime ? $db->timestamp( $exptime ) : $this->
getMaxDateTime( $db );
380 foreach ( $serverKeys as
$tableName => $tableKeys ) {
419 if ( $op === self::$OP_ADD ) {
421 foreach ( $tableKeys as $key ) {
424 'value' => $db->encodeBlob( $this->
serialize( $data[$key] ) ),
425 'exptime' => $dbExpiry
431 'keyname' => $tableKeys,
432 'exptime <= ' . $db->addQuotes( $db->timestamp() )
436 $db->insert( $table, $rows, __METHOD__, [
'IGNORE' ] );
438 $success = ( $db->affectedRows() == count( $rows ) );
439 } elseif ( $op === self::$OP_SET ) {
441 foreach ( $tableKeys as $key ) {
444 'value' => $db->encodeBlob( $this->
serialize( $data[$key] ) ),
445 'exptime' => $dbExpiry
448 $db->replace( $table, [
'keyname' ], $rows, __METHOD__ );
449 } elseif ( $op === self::$OP_DELETE ) {
450 $db->delete( $table, [
'keyname' => $tableKeys ], __METHOD__ );
451 } elseif ( $op === self::$OP_TOUCH ) {
454 [
'exptime' => $dbExpiry ],
456 'keyname' => $tableKeys,
457 'exptime > ' . $db->addQuotes( $db->timestamp() )
462 $success = ( $db->affectedRows() == count( $tableKeys ) );
464 throw new InvalidArgumentException(
"Invalid operation '$op'" );
470 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
471 return $this->
modifyMulti( [ $key => $value ], $exptime, $flags, self::$OP_SET );
474 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
475 return $this->
modifyMulti( [ $key => $value ], $exptime, $flags, self::$OP_ADD );
478 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
493 'value' => $db->encodeBlob( $this->serialize( $value ) ),
494 'exptime' => $exptime
495 ? $db->timestamp( $exptime )
500 'value' => $db->encodeBlob( $casToken ),
501 'exptime > ' . $db->addQuotes( $db->timestamp() )
511 $success = (bool)$db->affectedRows();
521 array_fill_keys(
$keys,
null ),
529 return $this->
modifyMulti( [ $key =>
null ], 0, $flags, self::$OP_DELETE );
532 public function incr( $key, $step = 1, $flags = 0 ) {
541 $encTimestamp = $db->addQuotes( $db->timestamp() );
544 [
'value = value + ' . (
int)$step ],
545 [
'keyname' => $key,
"exptime > $encTimestamp" ],
548 if ( $db->affectedRows() > 0 ) {
549 $newValue = $db->selectField(
552 [
'keyname' => $key,
"exptime > $encTimestamp" ],
556 $newCount = (int)$newValue;
566 public function decr( $key, $value = 1, $flags = 0 ) {
567 return $this->
incr( $key, -$value, $flags );
572 array_fill_keys(
$keys,
null ),
580 return $this->
modifyMulti( [ $key =>
null ], $exptime, $flags, self::$OP_TOUCH );
591 ConvertibleTimestamp::convert( TS_UNIX, $exptime ) < $this->
getCurrentTime()
601 return $db->timestamp( 1 << 62 );
603 return $db->timestamp( 0x7fffffff );
614 $this->purgePeriod &&
616 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
618 ( $this->
getCurrentTime() - $this->lastGarbageCollect ) > self::$GC_DELAY_SEC
620 $garbageCollector =
function () use ( $db ) {
626 $this->lastGarbageCollect = time();
628 if ( $this->asyncHandler ) {
643 callable $progress =
null,
649 $shardIndexes = range( 0, $this->numServerShards - 1 );
650 shuffle( $shardIndexes );
654 $keysDeletedCount = 0;
655 foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
690 $serversDoneCount = 0,
691 &$keysDeletedCount = 0
693 $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
694 $shardIndexes = range( 0, $this->numTableShards - 1 );
695 shuffle( $shardIndexes );
697 foreach ( $shardIndexes as $numShardsDone => $shardIndex ) {
703 [
'keyname',
'exptime' ],
706 $continue ? [
'exptime >= ' . $db->
addQuotes( $continue ) ] : []
709 [
'LIMIT' => min( $limit, 100 ),
'ORDER BY' =>
'exptime' ]
712 if (
$res->numRows() ) {
713 $row =
$res->current();
714 if ( $lag ===
null ) {
715 $rowExpUnix = ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
716 $lag = max( $cutoffUnix - $rowExpUnix, 1 );
720 foreach (
$res as $row ) {
721 $keys[] = $row->keyname;
722 $continue = $row->exptime;
736 if ( is_callable( $progressCallback ) ) {
738 $continueUnix = ConvertibleTimestamp::convert( TS_UNIX, $continue );
739 $remainingLag = $cutoffUnix - $continueUnix;
740 $processedLag = max( $lag - $remainingLag, 0 );
741 $doneRatio = ( $numShardsDone + $processedLag / $lag ) / $this->numTableShards;
747 + ( $serversDoneCount / $this->numServerShards );
748 call_user_func( $progressCallback, $overallRatio * 100 );
750 }
while (
$res->numRows() && $keysDeletedCount < $limit );
777 public function lock( $key, $timeout = 6, $expiry = 6, $rclass =
'' ) {
779 if ( isset( $this->locks[$key] ) ) {
780 if ( $rclass !=
'' && $this->locks[$key][
'class'] === $rclass ) {
781 ++$this->locks[$key][
'depth'];
793 $ok = $db->lock( $key, __METHOD__, $timeout );
795 $this->locks[$key] = [
'class' => $rclass,
'depth' => 1 ];
798 $this->logger->warning(
799 __METHOD__ .
" failed due to timeout for {key}.",
800 [
'key' => $key,
'timeout' => $timeout ]
813 if ( !isset( $this->locks[$key] ) ) {
817 if ( --$this->locks[$key][
'depth'] <= 0 ) {
818 unset( $this->locks[$key] );
825 $ok = $db->unlock( $key, __METHOD__ );
827 $this->logger->warning(
828 __METHOD__ .
' failed to release lock for {key}.',
852 if ( is_int( $data ) ) {
857 if ( function_exists(
'gzdeflate' ) ) {
858 $serial = gzdeflate( $serial );
874 if ( function_exists(
'gzinflate' ) ) {
875 AtEase::suppressWarnings();
876 $decomp = gzinflate( $serial );
877 AtEase::restoreWarnings();
879 if ( $decomp !==
false ) {
921 $this->logger->error(
"DBError: {$exception->getMessage()}" );
924 $this->logger->debug( __METHOD__ .
": ignoring connection error" );
927 $this->logger->debug( __METHOD__ .
": ignoring query error" );
938 unset( $this->conns[$shardIndex] );
941 if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
942 if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
943 unset( $this->connFailureTimes[$shardIndex] );
944 unset( $this->connFailureErrors[$shardIndex] );
946 $this->logger->debug( __METHOD__ .
": Server #$shardIndex already down" );
950 $this->logger->info( __METHOD__ .
": Server #$shardIndex down until " . ( $now + 60 ) );
951 $this->connFailureTimes[$shardIndex] = $now;
952 $this->connFailureErrors[$shardIndex] = $exception;
964 $db->
query(
"PRAGMA journal_mode=WAL" );
967 $encTable = $db->
tableName(
'objectcache' );
970 "CREATE TABLE $encTable (\n" .
971 " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
977 $db->
query(
"CREATE INDEX $encExptimeIndex ON $encTable (exptime)" );
991 if ( in_array( $db->getType(), [
'mysql',
'postgres' ],
true ) ) {
993 $encBaseTable = $db->tableName(
'objectcache' );
995 $db->query(
"CREATE TABLE $encShardTable LIKE $encBaseTable" );
1014 $lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
1015 if ( $lb->getServerCount() <= 1 ) {
1021 $masterPos = $lb->getMasterPos();
1022 if ( !$masterPos ) {
1026 $loop =
new WaitConditionLoop(
1027 function () use ( $lb, $masterPos ) {
1028 return $lb->waitForAll( $masterPos, 1 );
1034 return ( $loop->invoke() === $loop::CONDITION_REACHED );
1054 $trxProfiler = Profiler::instance()->getTransactionProfiler();
1055 $oldSilenced = $trxProfiler->setSilenced(
true );
1056 return new ScopedCallback(
function () use ( $trxProfiler, $oldSilenced ) {
1057 $trxProfiler->setSilenced( $oldSilenced );
fieldHasFlags( $field, $flags)
callable null $asyncHandler
Storage medium specific cache for storing items (e.g.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
callable[] $busyCallbacks
isInteger( $value)
Check if a value is an integer.
setLastError( $err)
Set the "last error" registry.
Class to store objects in the database.
doDelete( $key, $flags=0)
Delete an item.
handleWriteError(DBError $exception, $db, $shardIndex)
Handle a DBQueryError which occurred during a write operation.
serialize( $data)
Serialize an object and, if possible, compress the representation.
lock( $key, $timeout=6, $expiry=6, $rclass='')
Acquire an advisory lock on a key string.
string[] $serverTags
(server index => tag/host name)
LoadBalancer null $separateMainLB
doSetMulti(array $data, $exptime=0, $flags=0)
createTables()
Create the shard tables on all databases (e.g.
getTableByKey( $key)
Get the server index and table name for a given key.
fetchBlobMulti(array $keys, $flags=0)
updateTableKeys( $op, $db, $table, $tableKeys, $data, $dbExpiry)
isExpired(IDatabase $db, $exptime)
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF)
Delete all objects expiring before a certain date.
doChangeTTL( $key, $exptime, $flags)
modifyMulti(array $data, $exptime, $flags, $op)
handleReadError(DBError $exception, $shardIndex)
Handle a DBError which occurred during a read operation.
array[] $serverInfos
(server index => server config)
deleteAll()
Delete content of shard tables in every server.
doDeleteMulti(array $keys, $flags=0)
__construct( $params)
Constructor.
int $lastGarbageCollect
UNIX timestamp.
markServerDown(DBError $exception, $shardIndex)
Mark a server down due to a DBConnectionError exception.
doGet( $key, $flags=0, &$casToken=null)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
deleteServerObjectsExpiringBefore(IDatabase $db, $timestamp, $progressCallback, $limit, $serversDoneCount=0, &$keysDeletedCount=0)
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
silenceTransactionProfiler()
Silence the transaction profiler until the return value falls out of scope.
array $connFailureTimes
UNIX timestamps.
unlock( $key)
Release an advisory lock on a key string.
getConnection( $shardIndex)
Get a connection to the specified database.
array $connFailureErrors
Exceptions.
changeTTLMulti(array $keys, $exptime, $flags=0)
Change the expiration of multiple keys that exist.
setAndLogDBError(DBError $exception)
unserialize( $serial)
Unserialize and, if necessary, decompress an object.
getTableNameByShard( $index)
Get the table name for a given shard index.
initSqliteDatabase(IMaintainableDatabase $db)
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Check and set an item.
occasionallyGarbageCollect(IDatabase $db)
incr( $key, $step=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
const QOS_SYNCWRITES_NONE
Advanced database interface for IDatabase handles that include maintenance methods.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.