59 protected $useLB =
false;
62 protected $serverInfos = [];
64 protected $serverTags = [];
66 protected $lastGarbageCollect = 0;
68 protected $purgePeriod = 10;
70 protected $purgeLimit = 100;
72 protected $numTableShards = 1;
74 protected $writeBatchSize = 100;
76 protected $tableName =
'objectcache';
85 protected $connFailureTimes = [];
87 protected $connFailureErrors = [];
93 private const SAFE_CLOCK_BOUND_SEC = 15;
95 private const SAFE_PURGE_DELAY_SEC = 3600;
97 private const TOMB_SERIAL =
'';
99 private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
101 private const GC_DELAY_SEC = 1;
103 private const BLOB_VALUE = 0;
104 private const BLOB_EXPIRY = 1;
105 private const BLOB_CASTOKEN = 2;
113 private const INF_TIMESTAMP_PLACEHOLDER =
'99991231235959';
145 parent::__construct( $params );
147 if ( isset( $params[
'servers'] ) || isset( $params[
'server'] ) ) {
151 foreach ( ( $params[
'servers'] ?? [ $params[
'server'] ] ) as $tag => $info ) {
152 $this->serverInfos[$index] = $info;
154 $this->serverTags[$index] = is_string( $tag ) ? $tag :
"#$index";
157 } elseif ( isset( $params[
'loadBalancerCallback'] ) ) {
158 $this->loadBalancerCallback = $params[
'loadBalancerCallback'];
159 if ( !isset( $params[
'dbDomain'] ) ) {
160 throw new InvalidArgumentException(
161 __METHOD__ .
": 'dbDomain' is required if 'loadBalancerCallback' is given"
164 $this->dbDomain = $params[
'dbDomain'];
167 throw new InvalidArgumentException(
168 __METHOD__ .
" requires 'server', 'servers', or 'loadBalancerCallback'"
172 $this->purgePeriod = intval( $params[
'purgePeriod'] ?? $this->purgePeriod );
173 $this->purgeLimit = intval( $params[
'purgeLimit'] ?? $this->purgeLimit );
174 $this->tableName = $params[
'tableName'] ?? $this->tableName;
175 $this->numTableShards = intval( $params[
'shards'] ?? $this->numTableShards );
176 $this->writeBatchSize = intval( $params[
'writeBatchSize'] ?? $this->writeBatchSize );
177 $this->replicaOnly = $params[
'replicaOnly'] ??
false;
178 $this->multiPrimaryMode = $params[
'multiPrimaryMode'] ??
false;
180 $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS;
181 $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
183 $this->hasZlib = extension_loaded(
'zlib' );
186 protected function doGet( $key, $flags = 0, &$casToken =
null ) {
187 $getToken = ( $casToken === self::PASS_BY_REF );
190 $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
192 $result = $this->
unserialize( $data[self::BLOB_VALUE] );
193 if ( $getToken && $result !==
false ) {
194 $casToken = $data[self::BLOB_CASTOKEN];
196 $valueSize = strlen( $data[self::BLOB_VALUE] );
202 $this->
updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
207 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
210 return $this->modifyBlobs(
211 [ $this,
'modifyTableSpecificBlobsForSet' ],
213 [ $key => [ $value, $exptime ] ]
220 return $this->modifyBlobs(
221 [ $this,
'modifyTableSpecificBlobsForDelete' ],
227 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
228 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
229 if ( $mtime ===
null ) {
234 return $this->modifyBlobs(
235 [ $this,
'modifyTableSpecificBlobsForAdd' ],
237 [ $key => [ $value, $exptime ] ]
241 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
242 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
243 if ( $mtime ===
null ) {
248 return $this->modifyBlobs(
249 [ $this,
'modifyTableSpecificBlobsForCas' ],
251 [ $key => [ $value, $exptime, $casToken ] ]
258 return $this->modifyBlobs(
259 [ $this,
'modifyTableSpecificBlobsForChangeTTL' ],
261 [ $key => [ $exptime ] ]
268 if ( $flags & self::WRITE_BACKGROUND ) {
269 $callback = [ $this,
'modifyTableSpecificBlobsForIncrInitAsync' ];
271 $callback = [ $this,
'modifyTableSpecificBlobsForIncrInit' ];
274 $result = $this->modifyBlobs(
277 [ $key => [ $step, $init, $exptime ] ],
279 ) ? $resByKey[$key] :
false;
286 $valueSizeByKey = [];
288 $dataByKey = $this->fetchBlobs(
$keys );
289 foreach (
$keys as $key ) {
290 $data = $dataByKey[$key];
292 $serialValue = $data[self::BLOB_VALUE];
294 if ( $value !==
false ) {
295 $result[$key] = $value;
297 $valueSize = strlen( $serialValue );
301 $valueSizeByKey[$key] = [ 0, $valueSize ];
304 $this->
updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
309 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
312 return $this->modifyBlobs(
313 [ $this,
'modifyTableSpecificBlobsForSet' ],
316 static function ( $value ) use ( $exptime ) {
317 return [ $value, $exptime ];
327 return $this->modifyBlobs(
328 [ $this,
'modifyTableSpecificBlobsForDelete' ],
330 array_fill_keys(
$keys, [] )
337 return $this->modifyBlobs(
338 [ $this,
'modifyTableSpecificBlobsForChangeTTL' ],
340 array_fill_keys(
$keys, [ $exptime ] )
352 private function getConnection( $shardIndex ) {
353 if ( $this->useLB ) {
354 return $this->getConnectionViaLoadBalancer();
359 isset( $this->connFailureErrors[$shardIndex] ) &&
360 ( $this->
getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
362 throw $this->connFailureErrors[$shardIndex];
365 if ( isset( $this->serverInfos[$shardIndex] ) ) {
366 $server = $this->serverInfos[$shardIndex];
367 $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
369 throw new UnexpectedValueException(
"Invalid server index #$shardIndex" );
380 private function getKeyLocation( $key ) {
381 if ( $this->useLB ) {
386 if ( count( $this->serverTags ) == 1 ) {
389 $sortedServers = $this->serverTags;
391 $shardIndex = array_key_first( $sortedServers );
395 if ( $this->numTableShards > 1 ) {
396 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
397 $tableIndex = $hash % $this->numTableShards;
402 return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
410 private function getTableNameByShard( $index ) {
411 if ( $index !==
null && $this->numTableShards > 1 ) {
412 $decimals = strlen( (
string)( $this->numTableShards - 1 ) );
414 return $this->tableName . sprintf(
"%0{$decimals}d", $index );
417 return $this->tableName;
425 private function fetchBlobs( array
$keys,
bool $getCasToken =
false ) {
427 $silenceScope = $this->silenceTransactionProfiler();
430 $dataByKey = array_fill_keys(
$keys,
null );
433 $keysByTableByShard = [];
434 foreach (
$keys as $key ) {
435 [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
436 $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
439 foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
441 $db = $this->getConnection( $shardIndex );
442 foreach ( $serverKeys as $partitionTable => $tableKeys ) {
446 ? $this->addCasTokenFields( $db, [
'keyname',
'value',
'exptime' ] )
447 : [
'keyname',
'value',
'exptime' ] )
448 ->from( $partitionTable )
449 ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
450 ->caller( __METHOD__ )
452 foreach (
$res as $row ) {
453 $row->shardIndex = $shardIndex;
454 $row->tableName = $partitionTable;
455 $dataByKey[$row->keyname] = $row;
459 $this->handleDBError( $e, $shardIndex );
463 foreach (
$keys as $key ) {
464 $row = $dataByKey[$key] ??
null;
469 $this->
debug( __METHOD__ .
": retrieved $key; expiry time is {$row->exptime}" );
471 $db = $this->getConnection( $row->shardIndex );
473 self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
474 self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
475 self::BLOB_CASTOKEN => $getCasToken
476 ? $this->getCasTokenFromRow( $db, $row )
480 $this->handleDBError( $e, $row->shardIndex );
500 private function modifyBlobs(
501 callable $tableWriteCallback,
507 $resByKey = array_fill_keys( array_keys( $argsByKey ),
false );
510 $silenceScope = $this->silenceTransactionProfiler();
512 $argsByKeyByTableByShard = [];
513 foreach ( $argsByKey as $key => $args ) {
514 [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
515 $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
518 $shardIndexesAffected = [];
519 foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
520 foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
522 $db = $this->getConnection( $shardIndex );
523 $shardIndexesAffected[] = $shardIndex;
524 $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
526 $this->handleDBError( $e, $shardIndex );
532 $success = !in_array(
false, $resByKey,
true );
534 foreach ( $shardIndexesAffected as $shardIndex ) {
536 $db = $this->getConnection( $shardIndex );
537 $this->occasionallyGarbageCollect( $db );
539 $this->handleDBError( $e, $shardIndex );
561 private function modifyTableSpecificBlobsForSet(
568 $valueSizesByKey = [];
570 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
573 foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
574 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
576 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
578 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
581 if ( $this->multiPrimaryMode ) {
586 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
591 $db->
replace( $ptable,
'keyname', $rows, __METHOD__ );
594 foreach ( $argsByKey as $key => $unused ) {
595 $resByKey[$key] =
true;
598 $this->
updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
617 private function modifyTableSpecificBlobsForDelete(
624 if ( $this->multiPrimaryMode ) {
626 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
627 $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (
int)$mtime );
629 foreach ( $argsByKey as $key => $arg ) {
630 $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
636 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
641 $db->
delete( $ptable, [
'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
644 foreach ( $argsByKey as $key => $arg ) {
645 $resByKey[$key] =
true;
648 $this->
updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
671 private function modifyTableSpecificBlobsForAdd(
678 $valueSizesByKey = [];
680 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
684 ->select(
'keyname' )
686 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (
int)$mtime ) )
687 ->caller( __METHOD__ )
688 ->fetchFieldValues();
689 $existingByKey = array_fill_keys( $existingKeys,
true );
692 foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
693 if ( isset( $existingByKey[$key] ) ) {
694 $this->logger->debug( __METHOD__ .
": $key already exists" );
699 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
700 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
702 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
709 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
713 foreach ( $argsByKey as $key => $unused ) {
714 $resByKey[$key] = !isset( $existingByKey[$key] );
717 $this->
updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
740 private function modifyTableSpecificBlobsForCas(
747 $valueSizesByKey = [];
749 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
753 ->select( $this->addCasTokenFields( $db, [
'keyname' ] ) )
755 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (
int)$mtime ) )
756 ->caller( __METHOD__ )
759 $curTokensByKey = [];
760 foreach (
$res as $row ) {
761 $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
765 $nonMatchingByKey = [];
766 foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) {
767 $curToken = $curTokensByKey[$key] ??
null;
768 if ( $curToken ===
null ) {
769 $nonMatchingByKey[$key] =
true;
770 $this->logger->debug( __METHOD__ .
": $key does not exists" );
774 if ( $curToken !== $casToken ) {
775 $nonMatchingByKey[$key] =
true;
776 $this->logger->debug( __METHOD__ .
": $key does not have a matching token" );
781 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
782 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
784 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
791 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
795 foreach ( $argsByKey as $key => $unused ) {
796 $resByKey[$key] = !isset( $nonMatchingByKey[$key] );
799 $this->
updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
821 private function modifyTableSpecificBlobsForChangeTTL(
828 if ( $this->multiPrimaryMode ) {
829 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
832 ->select( [
'keyname',
'value' ] )
834 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (
int)$mtime ) )
835 ->caller( __METHOD__ )
840 foreach (
$res as $curRow ) {
841 $key = $curRow->keyname;
842 $existingKeys[$key] =
true;
843 $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
844 [ $exptime ] = $argsByKey[$key];
845 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
846 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
853 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
857 foreach ( $argsByKey as $key => $unused ) {
858 $resByKey[$key] = isset( $existingKeys[$key] );
861 $keysBatchesByExpiry = [];
862 foreach ( $argsByKey as $key => [ $exptime ] ) {
863 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
864 $keysBatchesByExpiry[$expiry][] = $key;
868 foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
871 [
'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
872 $this->buildExistenceConditions( $db, $keyBatch, (
int)$mtime ),
877 if ( $existingCount === count( $argsByKey ) ) {
878 foreach ( $argsByKey as $key => $args ) {
879 $resByKey[$key] =
true;
884 $this->
updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
906 private function modifyTableSpecificBlobsForIncrInit(
913 foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
914 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
915 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
921 $atomic = $db->
startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
925 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
927 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (
int)$mtime ),
934 ->where( [
'keyname' => $key ] )
935 ->caller( __METHOD__ )
937 }
catch ( Exception $e ) {
943 if ( !$affectedCount || $row ===
false ) {
944 $this->logger->warning( __METHOD__ .
": failed to set new $key value" );
948 $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
949 if ( !$this->
isInteger( $serialValue ) ) {
950 $this->logger->warning( __METHOD__ .
": got non-integer $key value" );
954 $resByKey[$key] = (int)$serialValue;
957 $this->
updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
971 private function modifyTableSpecificBlobsForIncrInitAsync(
978 foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
979 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
980 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
983 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
985 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (
int)$mtime ),
989 $this->logger->warning( __METHOD__ .
": failed to set new $key value" );
991 $resByKey[$key] =
true;
1001 private function makeNewKeyExpiry( $exptime,
int $nowTsUnix ) {
1006 if ( $expiry !== self::TTL_INDEFINITE ) {
1007 $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1031 private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1032 if ( !$this->
lock( $key, 0 ) ) {
1036 $scope =
new ScopedCallback(
function () use ( $key ) {
1041 return (
float)sprintf(
'%.6F', $this->locks[$key][self::LOCK_TIME] );
1053 private function makeTimestampedModificationToken(
float $mtime,
IDatabase $db ) {
1059 $seconds = (int)$mtime;
1060 [ , $microseconds ] = explode(
'.', sprintf(
'%.6F', $mtime ) );
1064 $token = implode(
'', [
1068 str_pad( base_convert( (
string)$seconds, 10, 2 ), 35,
'0', STR_PAD_LEFT ) .
1070 str_pad( base_convert( $id, 10, 2 ), 32,
'0', STR_PAD_LEFT ),
1076 str_pad( base_convert( $microseconds, 10, 36 ), 4,
'0', STR_PAD_LEFT )
1079 if ( strlen( $token ) !== 17 ) {
1080 throw new RuntimeException(
"Modification timestamp overflow detected" );
1094 private function buildExistenceConditions(
IDatabase $db,
$keys,
int $time ) {
1112 private function buildUpsertRow(
1121 'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1122 'exptime' => $this->encodeDbExpiry( $db, $expiry )
1124 if ( $this->multiPrimaryMode ) {
1125 $row[
'modtoken'] = $mt;
1138 private function buildMultiUpsertSetForOverwrite(
IDatabase $db,
string $mt ) {
1139 $expressionsByColumn = [
1145 if ( $this->multiPrimaryMode ) {
1152 $expressionsByColumn[
'modtoken'] = $db->
addQuotes( $mt );
1153 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1155 $db->
addQuotes( substr( $mt, 0, 13 ) ) .
' >= ' .
1160 $set[] =
"{$column}=" . trim( $rhs );
1163 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1164 $set[] =
"{$column}={$updateExpression}";
1182 private function buildIncrUpsertSet(
1191 $expressionsByColumn = [
1194 $db->
addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1198 $db->
addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1201 if ( $this->multiPrimaryMode ) {
1202 $expressionsByColumn[
'modtoken'] = [
'modtoken', $db->
addQuotes( $mt ) ];
1206 foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) {
1212 $set[] =
"{$column}=" . trim( $rhs );
1223 private function encodeDbExpiry(
IDatabase $db,
int $expiry ) {
1224 return ( $expiry === self::TTL_INDEFINITE )
1226 ? $db->
timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1236 private function decodeDbExpiry(
IDatabase $db,
string $dbExpiry ) {
1237 return ( $dbExpiry === $db->
timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1238 ? self::TTL_INDEFINITE
1239 : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1247 private function dbEncodeSerialValue(
IDatabase $db, $serialValue ) {
1248 return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1267 private function addCasTokenFields(
IDatabase $db, array $fields ) {
1270 if (
$type ===
'mysql' ) {
1276 } elseif (
$type ===
'postgres' ) {
1283 if ( !in_array(
'value', $fields,
true ) ) {
1284 $fields[] =
'value';
1286 if ( !in_array(
'exptime', $fields,
true ) ) {
1287 $fields[] =
'exptime';
1301 private function getCasTokenFromRow(
IDatabase $db, stdClass $row ) {
1302 if ( isset( $row->castoken ) ) {
1303 $token = $row->castoken;
1305 $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) .
'@' . $row->exptime;
1306 $this->logger->debug( __METHOD__ .
": application computed hash for CAS token" );
1316 private function occasionallyGarbageCollect(
IDatabase $db ) {
1319 $this->purgePeriod &&
1321 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1323 ( $this->
getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1325 $garbageCollector =
function () use ( $db ) {
1327 $silenceScope = $this->silenceTransactionProfiler();
1328 $this->deleteServerObjectsExpiringBefore(
1333 $this->lastGarbageCollect = time();
1335 if ( $this->asyncHandler ) {
1337 ( $this->asyncHandler )( $garbageCollector );
1339 $garbageCollector();
1350 callable $progress =
null,
1355 $silenceScope = $this->silenceTransactionProfiler();
1357 if ( $tag !==
null ) {
1359 $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1361 $shardIndexes = $this->getShardServerIndexes();
1362 shuffle( $shardIndexes );
1366 $numServers = count( $shardIndexes );
1368 $keysDeletedCount = 0;
1369 foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1371 $db = $this->getConnection( $shardIndex );
1372 $this->deleteServerObjectsExpiringBefore(
1377 [
'fn' => $progress,
'serversDone' => $numServersDone,
'serversTotal' => $numServers ]
1380 $this->handleDBError( $e, $shardIndex );
1397 private function deleteServerObjectsExpiringBefore(
1401 &$keysDeletedCount = 0,
1402 array $progress =
null
1404 $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1405 if ( $this->multiPrimaryMode ) {
1414 $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1415 $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1417 $tableIndexes = range( 0, $this->numTableShards - 1 );
1418 shuffle( $tableIndexes );
1420 $batchSize = min( $this->writeBatchSize, $limit );
1422 foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1429 $totalSeconds =
null;
1433 ->select( [
'keyname',
'exptime' ] )
1434 ->from( $this->getTableNameByShard( $tableIndex ) )
1438 $maxExp ? [
'exptime >= ' . $db->
addQuotes( $maxExp ) ] : []
1442 ->limit( $batchSize )
1443 ->caller( __METHOD__ )
1446 if (
$res->numRows() ) {
1447 $row =
$res->current();
1448 if ( $minExpUnix ===
null ) {
1449 $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1450 $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1454 foreach (
$res as $row ) {
1455 $keys[] = $row->keyname;
1456 $maxExp = $row->exptime;
1460 $this->getTableNameByShard( $tableIndex ),
1470 if ( $progress && is_callable( $progress[
'fn'] ) ) {
1471 if ( $totalSeconds ) {
1472 $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1473 $remainingSeconds = $cutoffUnix - $maxExpUnix;
1474 $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1479 ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1481 $tablesDoneRatio = 1;
1486 $overallRatio = ( $progress[
'serversDone'] / $progress[
'serversTotal'] ) +
1487 ( $tablesDoneRatio / $progress[
'serversTotal'] );
1488 ( $progress[
'fn'] )( $overallRatio * 100 );
1490 }
while (
$res->numRows() && $keysDeletedCount < $limit );
1501 $silenceScope = $this->silenceTransactionProfiler();
1502 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1504 $db = $this->getConnection( $shardIndex );
1505 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1506 $db->
delete( $this->getTableNameByShard( $i ),
'*', __METHOD__ );
1509 $this->handleDBError( $e, $shardIndex );
1516 public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1518 $silenceScope = $this->silenceTransactionProfiler();
1522 [ $shardIndex ] = $this->getKeyLocation( $key );
1524 $db = $this->getConnection( $shardIndex );
1525 $lockTsUnix = $db->
lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1527 $this->handleDBError( $e, $shardIndex );
1528 $this->logger->warning(
1529 __METHOD__ .
' failed due to I/O error for {key}.',
1539 $silenceScope = $this->silenceTransactionProfiler();
1541 [ $shardIndex ] = $this->getKeyLocation( $key );
1544 $db = $this->getConnection( $shardIndex );
1545 $released = $db->
unlock( $key, __METHOD__ );
1547 $this->handleDBError( $e, $shardIndex );
1560 $charsLeft = 205 - strlen(
$keyspace ) - count( $components );
1561 foreach ( $components as &$component ) {
1562 $component = strtr( $component, [
1568 if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1569 $component =
'#' . md5( $component );
1571 $charsLeft -= strlen( $component );
1574 if ( $charsLeft < 0 ) {
1575 return $keyspace .
':BagOStuff-long-key:##' . md5( implode(
':', $components ) );
1577 return $keyspace .
':' . implode(
':', $components );
1581 if ( is_int( $value ) ) {
1586 if ( $this->hasZlib ) {
1588 $serial = gzdeflate( $serial );
1595 if ( $value === self::TOMB_SERIAL ) {
1603 if ( $this->hasZlib ) {
1604 AtEase::suppressWarnings();
1605 $decompressed = gzinflate( $value );
1606 AtEase::restoreWarnings();
1608 if ( $decompressed !==
false ) {
1609 $value = $decompressed;
1617 if ( !$this->loadBalancer ) {
1618 $this->loadBalancer = ( $this->loadBalancerCallback )();
1620 return $this->loadBalancer;
1627 private function getConnectionViaLoadBalancer() {
1628 $lb = $this->getLoadBalancer();
1630 if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1632 $conn = $lb->getMaintenanceConnectionRef(
DB_PRIMARY, [], $this->dbDomain );
1636 $conn = $lb->getMaintenanceConnectionRef(
1640 $lb::CONN_TRX_AUTOCOMMIT
1645 $conn->ensureConnection();
1655 private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1656 if ( !isset( $this->conns[$shardIndex] ) ) {
1658 $dbFactory = MediaWikiServices::getInstance()->getDatabaseFactory();
1659 $conn = $dbFactory->create(
1665 'flags' => ( $server[
'flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1666 'logger' => $this->logger,
1671 if ( $conn->getType() ===
'sqlite' ) {
1672 $this->initSqliteDatabase( $conn );
1674 $this->conns[$shardIndex] = $conn;
1678 return $this->conns[$shardIndex];
1687 private function handleDBError(
DBError $exception, $shardIndex ) {
1689 $this->markServerDown( $exception, $shardIndex );
1691 $this->setAndLogDBError( $exception );
1697 private function setAndLogDBError(
DBError $e ) {
1698 $this->logger->error(
"DBError: {$e->getMessage()}", [
'exception' => $e ] );
1700 $this->setLastError( self::ERR_UNREACHABLE );
1701 $this->logger->warning( __METHOD__ .
": ignoring connection error" );
1703 $this->setLastError( self::ERR_UNEXPECTED );
1704 $this->logger->warning( __METHOD__ .
": ignoring query error" );
1714 private function markServerDown(
DBError $exception, $shardIndex ) {
1715 unset( $this->conns[$shardIndex] );
1717 $now = $this->getCurrentTime();
1718 if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1719 if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1720 unset( $this->connFailureTimes[$shardIndex] );
1721 unset( $this->connFailureErrors[$shardIndex] );
1723 $this->logger->debug( __METHOD__ .
": Server #$shardIndex already down" );
1727 $this->logger->info( __METHOD__ .
": Server #$shardIndex down until " . ( $now + 60 ) );
1728 $this->connFailureTimes[$shardIndex] = $now;
1729 $this->connFailureErrors[$shardIndex] = $exception;
1737 if ( $db->
tableExists(
'objectcache', __METHOD__ ) ) {
1741 $db->
query(
"PRAGMA journal_mode=WAL", __METHOD__ );
1744 $encTable = $db->
tableName(
'objectcache' );
1747 "CREATE TABLE $encTable (\n" .
1748 " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1750 " exptime BLOB NOT NULL\n" .
1754 $db->
query(
"CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1778 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1779 $db = $this->getConnection( $shardIndex );
1780 if ( in_array( $db->
getType(), [
'mysql',
'postgres' ],
true ) ) {
1781 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1782 $encBaseTable = $db->
tableName(
'objectcache' );
1783 $encShardTable = $db->
tableName( $this->getTableNameByShard( $i ) );
1784 $db->
query(
"CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1793 private function getShardServerIndexes() {
1794 if ( $this->useLB ) {
1796 $shardIndexes = [ 0 ];
1799 $shardIndexes = array_keys( $this->serverTags );
1802 return $shardIndexes;
1810 private function getShardServerIndexForTag(
string $tag ) {
1811 if ( !$this->serverTags ) {
1812 throw new InvalidArgumentException(
"Given a tag but no tags are configured" );
1814 foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1815 if ( $tag === $serverTag ) {
1816 return $serverShardIndex;
1819 throw new InvalidArgumentException(
"Unknown server tag: $tag" );
1827 private function silenceTransactionProfiler() {
1828 if ( $this->serverInfos ) {