58 protected $useLB =
false;
61 protected $serverInfos = [];
63 protected $serverTags = [];
65 protected $lastGarbageCollect = 0;
67 protected $purgePeriod = 10;
69 protected $purgeLimit = 100;
71 protected $numTableShards = 1;
73 protected $writeBatchSize = 100;
75 protected $tableName =
'objectcache';
84 protected $connFailureTimes = [];
86 protected $connFailureErrors = [];
92 private const SAFE_CLOCK_BOUND_SEC = 15;
94 private const SAFE_PURGE_DELAY_SEC = 3600;
96 private const TOMB_SERIAL =
'';
98 private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
100 private const GC_DELAY_SEC = 1;
102 private const BLOB_VALUE = 0;
103 private const BLOB_EXPIRY = 1;
104 private const BLOB_CASTOKEN = 2;
112 private const INF_TIMESTAMP_PLACEHOLDER =
'99991231235959';
144 parent::__construct( $params );
146 if ( isset( $params[
'servers'] ) || isset( $params[
'server'] ) ) {
150 foreach ( ( $params[
'servers'] ?? [ $params[
'server'] ] ) as $tag => $info ) {
151 $this->serverInfos[$index] = $info;
153 $this->serverTags[$index] = is_string( $tag ) ? $tag :
"#$index";
156 } elseif ( isset( $params[
'loadBalancerCallback'] ) ) {
157 $this->loadBalancerCallback = $params[
'loadBalancerCallback'];
158 if ( !isset( $params[
'dbDomain'] ) ) {
159 throw new InvalidArgumentException(
160 __METHOD__ .
": 'dbDomain' is required if 'loadBalancerCallback' is given"
163 $this->dbDomain = $params[
'dbDomain'];
166 throw new InvalidArgumentException(
167 __METHOD__ .
" requires 'server', 'servers', or 'loadBalancerCallback'"
171 $this->purgePeriod = intval( $params[
'purgePeriod'] ?? $this->purgePeriod );
172 $this->purgeLimit = intval( $params[
'purgeLimit'] ?? $this->purgeLimit );
173 $this->tableName = $params[
'tableName'] ?? $this->tableName;
174 $this->numTableShards = intval( $params[
'shards'] ?? $this->numTableShards );
175 $this->writeBatchSize = intval( $params[
'writeBatchSize'] ?? $this->writeBatchSize );
176 $this->replicaOnly = $params[
'replicaOnly'] ??
false;
177 $this->multiPrimaryMode = $params[
'multiPrimaryMode'] ??
false;
179 $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS;
180 $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
182 $this->hasZlib = extension_loaded(
'zlib' );
185 protected function doGet( $key, $flags = 0, &$casToken =
null ) {
186 $getToken = ( $casToken === self::PASS_BY_REF );
189 $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
191 $result = $this->
unserialize( $data[self::BLOB_VALUE] );
192 if ( $getToken && $result !==
false ) {
193 $casToken = $data[self::BLOB_CASTOKEN];
195 $valueSize = strlen( $data[self::BLOB_VALUE] );
201 $this->
updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
206 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
209 return $this->modifyBlobs(
210 [ $this,
'modifyTableSpecificBlobsForSet' ],
212 [ $key => [ $value, $exptime ] ],
220 return $this->modifyBlobs(
221 [ $this,
'modifyTableSpecificBlobsForDelete' ],
228 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
229 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
230 if ( $mtime ===
null ) {
235 return $this->modifyBlobs(
236 [ $this,
'modifyTableSpecificBlobsForAdd' ],
238 [ $key => [ $value, $exptime ] ],
243 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
244 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
245 if ( $mtime ===
null ) {
250 return $this->modifyBlobs(
251 [ $this,
'modifyTableSpecificBlobsForCas' ],
253 [ $key => [ $value, $exptime, $casToken ] ],
261 return $this->modifyBlobs(
262 [ $this,
'modifyTableSpecificBlobsForChangeTTL' ],
264 [ $key => [ $exptime ] ],
272 if ( $flags & self::WRITE_BACKGROUND ) {
273 $callback = [ $this,
'modifyTableSpecificBlobsForIncrInitAsync' ];
275 $callback = [ $this,
'modifyTableSpecificBlobsForIncrInit' ];
278 $result = $this->modifyBlobs(
281 [ $key => [ $step, $init, $exptime ] ],
284 ) ? $resByKey[$key] :
false;
289 public function incr( $key, $value = 1, $flags = 0 ) {
290 return $this->doIncr( $key, $value, $flags );
293 public function decr( $key, $value = 1, $flags = 0 ) {
294 return $this->doIncr( $key, -$value, $flags );
297 private function doIncr( $key, $value = 1, $flags = 0 ) {
298 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
299 if ( $mtime ===
null ) {
304 $data = $this->fetchBlobs( [ $key ] )[$key];
306 $serialValue = $data[self::BLOB_VALUE];
307 if ( $this->
isInteger( $serialValue ) ) {
308 $newValue = max( (
int)$serialValue + (
int)$value, 0 );
309 $result = $this->modifyBlobs(
310 [ $this,
'modifyTableSpecificBlobsForSet' ],
313 [ $key => [ $newValue, $data[self::BLOB_EXPIRY] ] ],
315 ) ? $newValue :
false;
318 $this->logger->warning( __METHOD__ .
": $key is a non-integer" );
322 $this->logger->debug( __METHOD__ .
": $key does not exists" );
332 $valueSizeByKey = [];
334 $dataByKey = $this->fetchBlobs(
$keys );
335 foreach (
$keys as $key ) {
336 $data = $dataByKey[$key];
338 $serialValue = $data[self::BLOB_VALUE];
340 if ( $value !==
false ) {
341 $result[$key] = $value;
343 $valueSize = strlen( $serialValue );
347 $valueSizeByKey[$key] = [ 0, $valueSize ];
350 $this->
updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
355 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
358 return $this->modifyBlobs(
359 [ $this,
'modifyTableSpecificBlobsForSet' ],
362 static function ( $value ) use ( $exptime ) {
363 return [ $value, $exptime ];
374 return $this->modifyBlobs(
375 [ $this,
'modifyTableSpecificBlobsForDelete' ],
377 array_fill_keys(
$keys, [] ),
385 return $this->modifyBlobs(
386 [ $this,
'modifyTableSpecificBlobsForChangeTTL' ],
388 array_fill_keys(
$keys, [ $exptime ] ),
401 private function getConnection( $shardIndex ) {
402 if ( $this->useLB ) {
403 return $this->getConnectionViaLoadBalancer();
408 isset( $this->connFailureErrors[$shardIndex] ) &&
409 ( $this->
getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
411 throw $this->connFailureErrors[$shardIndex];
414 if ( isset( $this->serverInfos[$shardIndex] ) ) {
415 $server = $this->serverInfos[$shardIndex];
416 $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
418 throw new UnexpectedValueException(
"Invalid server index #$shardIndex" );
429 private function getKeyLocation( $key ) {
430 if ( $this->useLB ) {
435 if ( count( $this->serverTags ) == 1 ) {
438 $sortedServers = $this->serverTags;
440 reset( $sortedServers );
441 $shardIndex = key( $sortedServers );
445 if ( $this->numTableShards > 1 ) {
446 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
447 $tableIndex = $hash % $this->numTableShards;
452 return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
460 private function getTableNameByShard( $index ) {
461 if ( $index !==
null && $this->numTableShards > 1 ) {
462 $decimals = strlen( (
string)( $this->numTableShards - 1 ) );
464 return $this->tableName . sprintf(
"%0{$decimals}d", $index );
467 return $this->tableName;
475 private function fetchBlobs( array
$keys,
bool $getCasToken =
false ) {
477 $silenceScope = $this->silenceTransactionProfiler();
480 $dataByKey = array_fill_keys(
$keys,
null );
483 $keysByTableByShard = [];
484 foreach (
$keys as $key ) {
485 list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
486 $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
489 foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
491 $db = $this->getConnection( $shardIndex );
492 foreach ( $serverKeys as $partitionTable => $tableKeys ) {
496 ? $this->addCasTokenFields( $db, [
'keyname',
'value',
'exptime' ] )
497 : [
'keyname',
'value',
'exptime' ] )
498 ->from( $partitionTable )
499 ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
500 ->caller( __METHOD__ )
502 foreach (
$res as $row ) {
503 $row->shardIndex = $shardIndex;
504 $row->tableName = $partitionTable;
505 $dataByKey[$row->keyname] = $row;
509 $this->handleDBError( $e, $shardIndex );
513 foreach (
$keys as $key ) {
514 $row = $dataByKey[$key] ??
null;
519 $this->
debug( __METHOD__ .
": retrieved $key; expiry time is {$row->exptime}" );
521 $db = $this->getConnection( $row->shardIndex );
523 self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
524 self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
525 self::BLOB_CASTOKEN => $getCasToken
526 ? $this->getCasTokenFromRow( $db, $row )
530 $this->handleDBError( $e, $row->shardIndex );
551 private function modifyBlobs(
552 callable $tableWriteCallback,
559 $resByKey = array_fill_keys( array_keys( $argsByKey ),
false );
562 $silenceScope = $this->silenceTransactionProfiler();
564 $argsByKeyByTableByShard = [];
565 foreach ( $argsByKey as $key =>
$args ) {
566 list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
567 $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] =
$args;
570 $shardIndexesAffected = [];
571 foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
572 foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
574 $db = $this->getConnection( $shardIndex );
575 $shardIndexesAffected[] = $shardIndex;
576 $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
578 $this->handleDBError( $e, $shardIndex );
584 $success = !in_array(
false, $resByKey,
true );
586 foreach ( $shardIndexesAffected as $shardIndex ) {
588 $db = $this->getConnection( $shardIndex );
589 $this->occasionallyGarbageCollect( $db );
591 $this->handleDBError( $e, $shardIndex );
613 private function modifyTableSpecificBlobsForSet(
620 $valueSizesByKey = [];
622 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
624 if ( $this->multiPrimaryMode ) {
626 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
628 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
631 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
633 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
636 $resByKey[$key] =
true;
638 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
643 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
644 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
646 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
648 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
650 $db->
replace( $ptable,
'keyname', $rows, __METHOD__ );
651 foreach ( $argsByKey as $key => $unused ) {
652 $resByKey[$key] =
true;
656 $this->
updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
675 private function modifyTableSpecificBlobsForDelete(
682 if ( $this->multiPrimaryMode ) {
684 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
685 $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (
int)$mtime );
687 foreach ( $argsByKey as $key => $arg ) {
688 $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
694 $this->buildUpsertSetForOverwrite( $db, self::TOMB_SERIAL, $expiry, $mt ),
699 $db->
delete( $ptable, [
'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
702 foreach ( $argsByKey as $key => $arg ) {
703 $resByKey[$key] =
true;
706 $this->
updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
729 private function modifyTableSpecificBlobsForAdd(
736 $valueSizesByKey = [];
738 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
742 ->select(
'keyname' )
744 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (
int)$mtime ) )
745 ->caller( __METHOD__ )
746 ->fetchFieldValues();
747 $existingByKey = array_fill_keys( $existingKeys,
true );
750 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
751 if ( isset( $existingByKey[$key] ) ) {
752 $this->logger->debug( __METHOD__ .
": $key already exists" );
757 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
760 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
762 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
765 $resByKey[$key] =
true;
767 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
770 $this->
updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
793 private function modifyTableSpecificBlobsForCas(
800 $valueSizesByKey = [];
802 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
806 ->select( $this->addCasTokenFields( $db, [
'keyname' ] ) )
808 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (
int)$mtime ) )
809 ->caller( __METHOD__ )
812 $curTokensByKey = [];
813 foreach (
$res as $row ) {
814 $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
818 foreach ( $argsByKey as $key => list( $value, $exptime, $casToken ) ) {
819 $curToken = $curTokensByKey[$key] ??
null;
820 if ( $curToken ===
null ) {
821 $this->logger->debug( __METHOD__ .
": $key does not exists" );
825 if ( $curToken !== $casToken ) {
826 $this->logger->debug( __METHOD__ .
": $key does not have a matching token" );
831 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
834 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
836 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
839 $resByKey[$key] =
true;
841 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
844 $this->
updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
866 private function modifyTableSpecificBlobsForChangeTTL(
873 if ( $this->multiPrimaryMode ) {
874 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
877 ->select( [
'keyname',
'value' ] )
879 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (
int)$mtime ) )
880 ->caller( __METHOD__ )
883 foreach (
$res as $curRow ) {
884 $key = $curRow->keyname;
885 $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
886 list( $exptime ) = $argsByKey[$key];
887 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
891 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
893 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
896 $resByKey[$key] =
true;
899 $keysBatchesByExpiry = [];
900 foreach ( $argsByKey as $key => list( $exptime ) ) {
901 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
902 $keysBatchesByExpiry[$expiry][] = $key;
906 foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
909 [
'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
910 $this->buildExistenceConditions( $db, $keyBatch, (
int)$mtime ),
915 if ( $existingCount === count( $argsByKey ) ) {
916 foreach ( $argsByKey as $key =>
$args ) {
917 $resByKey[$key] =
true;
922 $this->
updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
944 private function modifyTableSpecificBlobsForIncrInit(
951 foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) {
952 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
953 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
959 $atomic = $db->
startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
963 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
965 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (
int)$mtime ),
972 ->where( [
'keyname' => $key ] )
973 ->caller( __METHOD__ )
975 }
catch ( Exception $e ) {
981 if ( !$affectedCount || $row ===
false ) {
982 $this->logger->warning( __METHOD__ .
": failed to set new $key value" );
986 $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
987 if ( !$this->
isInteger( $serialValue ) ) {
988 $this->logger->warning( __METHOD__ .
": got non-integer $key value" );
992 $resByKey[$key] = (int)$serialValue;
995 $this->
updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
1009 private function modifyTableSpecificBlobsForIncrInitAsync(
1016 foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) {
1017 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
1018 $expiry = $this->makeNewKeyExpiry( $exptime, (
int)$mtime );
1021 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
1023 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (
int)$mtime ),
1027 $this->logger->warning( __METHOD__ .
": failed to set new $key value" );
1029 $resByKey[$key] =
true;
1039 private function makeNewKeyExpiry( $exptime,
int $nowTsUnix ) {
1044 if ( $expiry !== self::TTL_INDEFINITE ) {
1045 $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1069 private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1070 if ( !$this->
lock( $key, 0 ) ) {
1074 $scope =
new ScopedCallback(
function () use ( $key ) {
1079 return (
float)sprintf(
'%.6F', $this->locks[$key][self::LOCK_TIME] );
1091 private function makeTimestampedModificationToken(
float $mtime,
IDatabase $db ) {
1097 $seconds = (int)$mtime;
1098 list( , $microseconds ) = explode(
'.', sprintf(
'%.6F', $mtime ) );
1102 $token = implode(
'', [
1106 str_pad( base_convert( (
string)$seconds, 10, 2 ), 35,
'0', STR_PAD_LEFT ) .
1108 str_pad( base_convert( $id, 10, 2 ), 32,
'0', STR_PAD_LEFT ),
1114 str_pad( base_convert( $microseconds, 10, 36 ), 4,
'0', STR_PAD_LEFT )
1117 if ( strlen( $token ) !== 17 ) {
1118 throw new RuntimeException(
"Modification timestamp overflow detected" );
1132 private function buildExistenceConditions(
IDatabase $db,
$keys,
int $time ) {
1150 private function buildUpsertRow(
1159 'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1160 'exptime' => $this->encodeDbExpiry( $db, $expiry )
1162 if ( $this->multiPrimaryMode ) {
1163 $row[
'modtoken'] = $mt;
1178 private function buildUpsertSetForOverwrite(
1184 $expressionsByColumn = [
1185 'value' => $db->
addQuotes( $this->dbEncodeSerialValue( $db, $serialValue ) ),
1186 'exptime' => $db->
addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1190 if ( $this->multiPrimaryMode ) {
1197 $expressionsByColumn[
'modtoken'] = $db->
addQuotes( $mt );
1198 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1200 $db->
addQuotes( substr( $mt, 0, 13 ) ) .
' >= ' .
1205 $set[] =
"{$column}=" . trim( $rhs );
1208 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1209 $set[] =
"{$column}={$updateExpression}";
1227 private function buildIncrUpsertSet(
1236 $expressionsByColumn = [
1239 $db->
addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1243 $db->
addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1246 if ( $this->multiPrimaryMode ) {
1247 $expressionsByColumn[
'modtoken'] = [
'modtoken', $db->
addQuotes( $mt ) ];
1251 foreach ( $expressionsByColumn as $column => list( $updateExpression, $initExpression ) ) {
1257 $set[] =
"{$column}=" . trim( $rhs );
1268 private function encodeDbExpiry(
IDatabase $db,
int $expiry ) {
1269 return ( $expiry === self::TTL_INDEFINITE )
1271 ? $db->
timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1281 private function decodeDbExpiry(
IDatabase $db,
string $dbExpiry ) {
1282 return ( $dbExpiry === $db->
timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1283 ? self::TTL_INDEFINITE
1284 : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1292 private function dbEncodeSerialValue(
IDatabase $db, $serialValue ) {
1293 return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1312 private function addCasTokenFields(
IDatabase $db, array $fields ) {
1315 if (
$type ===
'mysql' ) {
1321 } elseif (
$type ===
'postgres' ) {
1328 if ( !in_array(
'value', $fields,
true ) ) {
1329 $fields[] =
'value';
1331 if ( !in_array(
'exptime', $fields,
true ) ) {
1332 $fields[] =
'exptime';
1346 private function getCasTokenFromRow(
IDatabase $db, stdClass $row ) {
1347 if ( isset( $row->castoken ) ) {
1348 $token = $row->castoken;
1350 $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) .
'@' . $row->exptime;
1351 $this->logger->debug( __METHOD__ .
": application computed hash for CAS token" );
1361 private function occasionallyGarbageCollect(
IDatabase $db ) {
1364 $this->purgePeriod &&
1366 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1368 ( $this->
getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1370 $garbageCollector =
function () use ( $db ) {
1371 $this->deleteServerObjectsExpiringBefore(
1376 $this->lastGarbageCollect = time();
1378 if ( $this->asyncHandler ) {
1380 ( $this->asyncHandler )( $garbageCollector );
1382 $garbageCollector();
1393 callable $progress =
null,
1398 $silenceScope = $this->silenceTransactionProfiler();
1400 if ( $tag !==
null ) {
1402 $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1404 $shardIndexes = $this->getShardServerIndexes();
1405 shuffle( $shardIndexes );
1409 $numServers = count( $shardIndexes );
1411 $keysDeletedCount = 0;
1412 foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1414 $db = $this->getConnection( $shardIndex );
1415 $this->deleteServerObjectsExpiringBefore(
1420 [
'fn' => $progress,
'serversDone' => $numServersDone,
'serversTotal' => $numServers ]
1423 $this->handleDBError( $e, $shardIndex );
1440 private function deleteServerObjectsExpiringBefore(
1444 &$keysDeletedCount = 0,
1445 array $progress =
null
1447 $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1448 if ( $this->multiPrimaryMode ) {
1457 $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1458 $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1460 $tableIndexes = range( 0, $this->numTableShards - 1 );
1461 shuffle( $tableIndexes );
1463 $batchSize = min( $this->writeBatchSize, $limit );
1465 foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1472 $totalSeconds =
null;
1476 ->select( [
'keyname',
'exptime' ] )
1477 ->from( $this->getTableNameByShard( $tableIndex ) )
1481 $maxExp ? [
'exptime >= ' . $db->
addQuotes( $maxExp ) ] : []
1485 ->limit( $batchSize )
1486 ->caller( __METHOD__ )
1489 if (
$res->numRows() ) {
1490 $row =
$res->current();
1491 if ( $minExpUnix ===
null ) {
1492 $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1493 $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1497 foreach (
$res as $row ) {
1498 $keys[] = $row->keyname;
1499 $maxExp = $row->exptime;
1503 $this->getTableNameByShard( $tableIndex ),
1513 if ( $progress && is_callable( $progress[
'fn'] ) ) {
1514 if ( $totalSeconds ) {
1515 $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1516 $remainingSeconds = $cutoffUnix - $maxExpUnix;
1517 $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1522 ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1524 $tablesDoneRatio = 1;
1529 $overallRatio = ( $progress[
'serversDone'] / $progress[
'serversTotal'] ) +
1530 ( $tablesDoneRatio / $progress[
'serversTotal'] );
1531 ( $progress[
'fn'] )( $overallRatio * 100 );
1533 }
while (
$res->numRows() && $keysDeletedCount < $limit );
1544 $silenceScope = $this->silenceTransactionProfiler();
1545 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1548 $db = $this->getConnection( $shardIndex );
1549 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1550 $db->
delete( $this->getTableNameByShard( $i ),
'*', __METHOD__ );
1553 $this->handleDBError( $e, $shardIndex );
1560 public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1562 $silenceScope = $this->silenceTransactionProfiler();
1566 list( $shardIndex ) = $this->getKeyLocation( $key );
1568 $db = $this->getConnection( $shardIndex );
1569 $lockTsUnix = $db->
lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1571 $this->handleDBError( $e, $shardIndex );
1572 $this->logger->warning(
1573 __METHOD__ .
' failed due to I/O error for {key}.',
1583 $silenceScope = $this->silenceTransactionProfiler();
1585 list( $shardIndex ) = $this->getKeyLocation( $key );
1588 $db = $this->getConnection( $shardIndex );
1589 $released = $db->
unlock( $key, __METHOD__ );
1591 $this->handleDBError( $e, $shardIndex );
1604 $charsLeft = 205 - strlen(
$keyspace ) - count( $components );
1605 foreach ( $components as &$component ) {
1606 $component = strtr( $component, [
1612 if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1613 $component =
'#' . md5( $component );
1615 $charsLeft -= strlen( $component );
1618 if ( $charsLeft < 0 ) {
1619 return $keyspace .
':BagOStuff-long-key:##' . md5( implode(
':', $components ) );
1621 return $keyspace .
':' . implode(
':', $components );
1625 if ( is_int( $value ) ) {
1630 if ( $this->hasZlib ) {
1632 $serial = gzdeflate( $serial );
1639 if ( $value === self::TOMB_SERIAL ) {
1647 if ( $this->hasZlib ) {
1648 AtEase::suppressWarnings();
1649 $decompressed = gzinflate( $value );
1650 AtEase::restoreWarnings();
1652 if ( $decompressed !==
false ) {
1653 $value = $decompressed;
1661 if ( !$this->loadBalancer ) {
1662 $this->loadBalancer = ( $this->loadBalancerCallback )();
1664 return $this->loadBalancer;
1671 private function getConnectionViaLoadBalancer() {
1672 $lb = $this->getLoadBalancer();
1674 if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1676 $conn = $lb->getMaintenanceConnectionRef(
DB_PRIMARY, [], $this->dbDomain );
1680 $conn = $lb->getMaintenanceConnectionRef(
1684 $lb::CONN_TRX_AUTOCOMMIT
1689 $conn->ensureConnection();
1699 private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1700 if ( !isset( $this->conns[$shardIndex] ) ) {
1708 'flags' => ( $server[
'flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1709 'connLogger' => $this->logger,
1710 'queryLogger' => $this->logger
1715 if ( $conn->
getType() ===
'sqlite' ) {
1716 $this->initSqliteDatabase( $conn );
1718 $this->conns[$shardIndex] = $conn;
1722 return $this->conns[$shardIndex];
1731 private function handleDBError(
DBError $exception, $shardIndex ) {
1733 $this->markServerDown( $exception, $shardIndex );
1735 $this->setAndLogDBError( $exception );
1741 private function setAndLogDBError(
DBError $e ) {
1742 $this->logger->error(
"DBError: {$e->getMessage()}", [
'exception' => $e ] );
1744 $this->setLastError( self::ERR_UNREACHABLE );
1745 $this->logger->warning( __METHOD__ .
": ignoring connection error" );
1747 $this->setLastError( self::ERR_UNEXPECTED );
1748 $this->logger->warning( __METHOD__ .
": ignoring query error" );
1758 private function markServerDown(
DBError $exception, $shardIndex ) {
1759 unset( $this->conns[$shardIndex] );
1761 $now = $this->getCurrentTime();
1762 if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1763 if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1764 unset( $this->connFailureTimes[$shardIndex] );
1765 unset( $this->connFailureErrors[$shardIndex] );
1767 $this->logger->debug( __METHOD__ .
": Server #$shardIndex already down" );
1771 $this->logger->info( __METHOD__ .
": Server #$shardIndex down until " . ( $now + 60 ) );
1772 $this->connFailureTimes[$shardIndex] = $now;
1773 $this->connFailureErrors[$shardIndex] = $exception;
1781 if ( $db->
tableExists(
'objectcache', __METHOD__ ) ) {
1785 $db->
query(
"PRAGMA journal_mode=WAL", __METHOD__ );
1788 $encTable = $db->
tableName(
'objectcache' );
1791 "CREATE TABLE $encTable (\n" .
1792 " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1794 " exptime BLOB NOT NULL\n" .
1798 $db->
query(
"CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1822 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1823 $db = $this->getConnection( $shardIndex );
1824 if ( in_array( $db->
getType(), [
'mysql',
'postgres' ],
true ) ) {
1825 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1826 $encBaseTable = $db->
tableName(
'objectcache' );
1827 $encShardTable = $db->
tableName( $this->getTableNameByShard( $i ) );
1828 $db->
query(
"CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1837 private function getShardServerIndexes() {
1838 if ( $this->useLB ) {
1840 $shardIndexes = [ 0 ];
1843 $shardIndexes = array_keys( $this->serverTags );
1846 return $shardIndexes;
1854 private function getShardServerIndexForTag(
string $tag ) {
1855 if ( !$this->serverTags ) {
1856 throw new InvalidArgumentException(
"Given a tag but no tags are configured" );
1858 foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1859 if ( $tag === $serverTag ) {
1860 return $serverShardIndex;
1863 throw new InvalidArgumentException(
"Unknown server tag: $tag" );
1871 private function silenceTransactionProfiler() {
1872 if ( $this->serverInfos ) {