MediaWiki  master
SqlBagOStuff.php
Go to the documentation of this file.
1 <?php
24 use Wikimedia\AtEase\AtEase;
25 use Wikimedia\ObjectFactory;
34 use Wikimedia\ScopedCallback;
35 use Wikimedia\Timestamp\ConvertibleTimestamp;
36 use Wikimedia\WaitConditionLoop;
37 
54  protected $localKeyLb;
56  protected $globalKeyLb;
57 
59  protected $serverInfos = [];
61  protected $serverTags = [];
63  protected $lastGarbageCollect = 0;
65  protected $purgePeriod = 10;
67  protected $purgeLimit = 100;
69  protected $numTableShards = 1;
71  protected $writeBatchSize = 100;
73  protected $tableName = 'objectcache';
75  protected $replicaOnly;
78 
80  protected $conns;
82  protected $connFailureTimes = [];
84  protected $connFailureErrors = [];
85 
86  private const SHARD_LOCAL = 'local';
87  private const SHARD_GLOBAL = 'global';
88 
90  private const SAFE_CLOCK_BOUND_SEC = 15;
92  private const SAFE_PURGE_DELAY_SEC = 3600;
94  private const TOMB_SERIAL = '';
98  private const GC_DELAY_SEC = 1;
99 
100  private const BLOB_VALUE = 0;
101  private const BLOB_EXPIRY = 1;
102  private const BLOB_CASTOKEN = 2;
103 
110  private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
111 
159  public function __construct( $params ) {
160  parent::__construct( $params );
161 
162  $dbType = null;
163  if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
164  // Configuration uses a direct list of servers.
165  // Object data is horizontally partitioned via key hash.
166  $index = 0;
167  foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
168  $this->serverInfos[$index] = $info;
169  // Allow integer-indexes arrays for b/c
170  $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
171  $dbType = $info['type'];
172  ++$index;
173  }
174  } else {
175  // Configuration uses the servers defined in LoadBalancer instances.
176  // Object data is vertically partitioned via global vs local keys.
177  if ( isset( $params['globalKeyLB'] ) ) {
178  $this->globalKeyLb = ( $params['globalKeyLB'] instanceof ILoadBalancer )
179  ? $params['globalKeyLB']
180  : ObjectFactory::getObjectFromSpec( $params['globalKeyLB'] );
181  }
182  if ( isset( $params['localKeyLB'] ) ) {
183  $this->localKeyLb = ( $params['localKeyLB'] instanceof ILoadBalancer )
184  ? $params['localKeyLB']
185  : ObjectFactory::getObjectFromSpec( $params['localKeyLB'] );
186  } else {
187  $this->localKeyLb = $this->globalKeyLb;
188  }
189  // When using LoadBalancer instances, one *must* be defined for local keys
190  if ( !$this->localKeyLb ) {
191  throw new InvalidArgumentException(
192  "Config requires 'server', 'servers', or 'localKeyLB'/'globalKeyLB'"
193  );
194  }
195  }
196 
197  $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
198  $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
199  $this->tableName = $params['tableName'] ?? $this->tableName;
200  $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
201  $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
202  $this->replicaOnly = $params['replicaOnly'] ?? false;
203 
204  if ( $params['multiPrimaryMode'] ?? false ) {
205  if ( $dbType !== 'mysql' ) {
206  throw new InvalidArgumentException( "Multi-primary mode only supports MySQL" );
207  }
208 
209  $this->multiPrimaryModeType = $dbType;
210  }
211 
214  }
215 
216  protected function doGet( $key, $flags = 0, &$casToken = null ) {
217  $getToken = ( $casToken === self::PASS_BY_REF );
218  $casToken = null;
219 
220  $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
221  if ( $data ) {
222  $result = $this->unserialize( $data[self::BLOB_VALUE] );
223  if ( $getToken && $result !== false ) {
224  $casToken = $data[self::BLOB_CASTOKEN];
225  }
226  $valueSize = strlen( $data[self::BLOB_VALUE] );
227  } else {
228  $result = false;
229  $valueSize = false;
230  }
231 
232  $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ null, $valueSize ] ] );
233 
234  return $result;
235  }
236 
237  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
238  $mtime = $this->getCurrentTime();
239 
240  return $this->modifyBlobs(
241  [ $this, 'modifyTableSpecificBlobsForSet' ],
242  $mtime,
243  [ $key => [ $value, $exptime ] ],
244  $flags
245  );
246  }
247 
248  protected function doDelete( $key, $flags = 0 ) {
249  $mtime = $this->getCurrentTime();
250 
251  return $this->modifyBlobs(
252  [ $this, 'modifyTableSpecificBlobsForDelete' ],
253  $mtime,
254  [ $key => [] ],
255  $flags
256  );
257  }
258 
259  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
260  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
261  if ( $mtime === null ) {
262  // Timeout or I/O error during lock acquisition
263  return false;
264  }
265 
266  return $this->modifyBlobs(
267  [ $this, 'modifyTableSpecificBlobsForAdd' ],
268  $mtime,
269  [ $key => [ $value, $exptime ] ],
270  $flags
271  );
272  }
273 
274  protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
275  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
276  if ( $mtime === null ) {
277  // Timeout or I/O error during lock acquisition
278  return false;
279  }
280 
281  return $this->modifyBlobs(
282  [ $this, 'modifyTableSpecificBlobsForCas' ],
283  $mtime,
284  [ $key => [ $value, $exptime, $casToken ] ],
285  $flags
286  );
287  }
288 
289  protected function doChangeTTL( $key, $exptime, $flags ) {
290  $mtime = $this->getCurrentTime();
291 
292  return $this->modifyBlobs(
293  [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
294  $mtime,
295  [ $key => [ $exptime ] ],
296  $flags
297  );
298  }
299 
300  public function incrWithInit( $key, $exptime, $value = 1, $init = null, $flags = 0 ) {
301  $value = (int)$value;
302  $init = is_int( $init ) ? $init : $value;
303 
304  $mtime = $this->getCurrentTime();
305 
306  $result = $this->modifyBlobs(
307  [ $this, 'modifyTableSpecificBlobsForIncrInit' ],
308  $mtime,
309  [ $key => [ $value, $init, $exptime ] ],
310  $flags,
311  $resByKey
312  ) ? $resByKey[$key] : false;
313 
314  return $result;
315  }
316 
317  public function incr( $key, $value = 1, $flags = 0 ) {
318  return $this->doIncr( $key, $value, $flags );
319  }
320 
321  public function decr( $key, $value = 1, $flags = 0 ) {
322  return $this->doIncr( $key, -$value, $flags );
323  }
324 
325  private function doIncr( $key, $value = 1, $flags = 0 ) {
326  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
327  if ( $mtime === null ) {
328  // Timeout or I/O error during lock acquisition
329  return false;
330  }
331 
332  $data = $this->fetchBlobs( [ $key ] )[$key];
333  if ( $data ) {
334  $serialValue = $data[self::BLOB_VALUE];
335  if ( $this->isInteger( $serialValue ) ) {
336  $newValue = max( (int)$serialValue + (int)$value, 0 );
337  $result = $this->modifyBlobs(
338  [ $this, 'modifyTableSpecificBlobsForSet' ],
339  $mtime,
340  // Preserve the old expiry timestamp
341  [ $key => [ $newValue, $data[self::BLOB_EXPIRY] ] ],
342  $flags
343  ) ? $newValue : false;
344  } else {
345  $result = false;
346  $this->logger->warning( __METHOD__ . ": $key is a non-integer" );
347  }
348  } else {
349  $result = false;
350  $this->logger->debug( __METHOD__ . ": $key does not exists" );
351  }
352 
353  return $result;
354  }
355 
356  protected function doGetMulti( array $keys, $flags = 0 ) {
357  $result = [];
358  $valueSizeByKey = [];
359 
360  $dataByKey = $this->fetchBlobs( $keys );
361  foreach ( $keys as $key ) {
362  $data = $dataByKey[$key];
363  if ( $data ) {
364  $serialValue = $data[self::BLOB_VALUE];
365  $value = $this->unserialize( $serialValue );
366  if ( $value !== false ) {
367  $result[$key] = $value;
368  }
369  $valueSize = strlen( $serialValue );
370  } else {
371  $valueSize = false;
372  }
373  $valueSizeByKey[$key] = [ null, $valueSize ];
374  }
375 
376  $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
377 
378  return $result;
379  }
380 
381  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
382  $mtime = $this->getCurrentTime();
383 
384  return $this->modifyBlobs(
385  [ $this, 'modifyTableSpecificBlobsForSet' ],
386  $mtime,
387  array_map(
388  static function ( $value ) use ( $exptime ) {
389  return [ $value, $exptime ];
390  },
391  $data
392  ),
393  $flags
394  );
395  }
396 
397  protected function doDeleteMulti( array $keys, $flags = 0 ) {
398  $mtime = $this->getCurrentTime();
399 
400  return $this->modifyBlobs(
401  [ $this, 'modifyTableSpecificBlobsForDelete' ],
402  $mtime,
403  array_fill_keys( $keys, [] ),
404  $flags
405  );
406  }
407 
408  public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
409  $mtime = $this->getCurrentTime();
410 
411  return $this->modifyBlobs(
412  [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
413  $mtime,
414  array_fill_keys( $keys, [ $exptime ] ),
415  $flags
416  );
417  }
418 
427  private function getConnection( $shardIndex ) {
428  // Don't keep timing out trying to connect if the server is down
429  if (
430  isset( $this->connFailureErrors[$shardIndex] ) &&
431  ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
432  ) {
433  throw $this->connFailureErrors[$shardIndex];
434  }
435 
436  if ( $shardIndex === self::SHARD_LOCAL ) {
437  $conn = $this->getConnectionViaLoadBalancer( $shardIndex );
438  } elseif ( $shardIndex === self::SHARD_GLOBAL ) {
439  $conn = $this->getConnectionViaLoadBalancer( $shardIndex );
440  } elseif ( is_int( $shardIndex ) ) {
441  if ( isset( $this->serverInfos[$shardIndex] ) ) {
442  $server = $this->serverInfos[$shardIndex];
443  $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
444  } else {
445  throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
446  }
447  } else {
448  throw new UnexpectedValueException( "Invalid server index '$shardIndex'" );
449  }
450 
451  return $conn;
452  }
453 
459  private function getKeyLocation( $key ) {
460  if ( $this->serverTags ) {
461  // Striped array of database servers
462  if ( count( $this->serverTags ) == 1 ) {
463  $shardIndex = 0; // short-circuit
464  } else {
465  $sortedServers = $this->serverTags;
466  ArrayUtils::consistentHashSort( $sortedServers, $key );
467  reset( $sortedServers );
468  $shardIndex = key( $sortedServers );
469  }
470  } else {
471  // LoadBalancer based configuration
472  $shardIndex = ( strpos( $key, 'global:' ) === 0 && $this->globalKeyLb )
473  ? self::SHARD_GLOBAL
474  : self::SHARD_LOCAL;
475  }
476 
477  if ( $this->numTableShards > 1 ) {
478  $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
479  $tableIndex = $hash % $this->numTableShards;
480  } else {
481  $tableIndex = null;
482  }
483 
484  return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
485  }
486 
492  private function getTableNameByShard( $index ) {
493  if ( $index !== null && $this->numTableShards > 1 ) {
494  $decimals = strlen( $this->numTableShards - 1 );
495 
496  return $this->tableName . sprintf( "%0{$decimals}d", $index );
497  }
498 
499  return $this->tableName;
500  }
501 
507  private function fetchBlobs( array $keys, bool $getCasToken = false ) {
509  $silenceScope = $this->silenceTransactionProfiler();
510 
511  // Initialize order-preserved per-key results; set values for live keys below
512  $dataByKey = array_fill_keys( $keys, null );
513 
514  $readTime = (int)$this->getCurrentTime();
515  $keysByTableByShard = [];
516  foreach ( $keys as $key ) {
517  list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
518  $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
519  }
520 
521  foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
522  try {
523  $db = $this->getConnection( $shardIndex );
524  foreach ( $serverKeys as $partitionTable => $tableKeys ) {
525  $res = $db->select(
526  $partitionTable,
527  $getCasToken
528  ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
529  : [ 'keyname', 'value', 'exptime' ],
530  $this->buildExistenceConditions( $db, $tableKeys, $readTime ),
531  __METHOD__
532  );
533  foreach ( $res as $row ) {
534  $row->shardIndex = $shardIndex;
535  $row->tableName = $partitionTable;
536  $dataByKey[$row->keyname] = $row;
537  }
538  }
539  } catch ( DBError $e ) {
540  $this->handleDBError( $e, $shardIndex );
541  }
542  }
543 
544  foreach ( $keys as $key ) {
545  $row = $dataByKey[$key] ?? null;
546  if ( !$row ) {
547  continue;
548  }
549 
550  $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
551  try {
552  $db = $this->getConnection( $row->shardIndex );
553  $dataByKey[$key] = [
554  self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
555  self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
556  self::BLOB_CASTOKEN => $getCasToken
557  ? $this->getCasTokenFromRow( $db, $row )
558  : null
559  ];
560  } catch ( DBQueryError $e ) {
561  $this->handleDBError( $e, $row->shardIndex );
562  }
563  }
564 
565  return $dataByKey;
566  }
567 
582  private function modifyBlobs(
583  callable $tableWriteCallback,
584  float $mtime,
585  array $argsByKey,
586  int $flags,
587  &$resByKey = []
588  ) {
589  // Initialize order-preserved per-key results; callbacks mark successful results
590  $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
591 
593  $silenceScope = $this->silenceTransactionProfiler();
594 
595  $argsByKeyByTableByShard = [];
596  foreach ( $argsByKey as $key => $args ) {
597  list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
598  $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
599  }
600 
601  $shardIndexesAffected = [];
602  foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
603  foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
604  try {
605  $db = $this->getConnection( $shardIndex );
606  $shardIndexesAffected[] = $shardIndex;
607  $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
608  } catch ( DBError $e ) {
609  $this->handleDBError( $e, $shardIndex );
610  continue;
611  }
612  }
613  }
614 
615  $success = !in_array( false, $resByKey, true );
616 
617  if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) {
618  foreach ( $shardIndexesAffected as $shardIndex ) {
619  if ( !$this->waitForReplication( $shardIndex ) ) {
620  $success = false;
621  }
622  }
623  }
624 
625  foreach ( $shardIndexesAffected as $shardIndex ) {
626  try {
627  $db = $this->getConnection( $shardIndex );
628  $this->occasionallyGarbageCollect( $db );
629  } catch ( DBError $e ) {
630  $this->handleDBError( $e, $shardIndex );
631  }
632  }
633 
634  return $success;
635  }
636 
653  IDatabase $db,
654  string $ptable,
655  float $mtime,
656  array $argsByKey,
657  array &$resByKey
658  ) {
659  $valueSizesByKey = [];
660 
661  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
662 
663  if ( $this->multiPrimaryModeType !== null ) {
664  // @TODO: use multi-row upsert() with VALUES() once supported in Database
665  foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
666  $serialValue = $this->getSerialized( $value, $key );
667  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
668  $db->upsert(
669  $ptable,
670  $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
671  [ [ 'keyname' ] ],
672  $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
673  __METHOD__
674  );
675  $resByKey[$key] = true;
676 
677  $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
678  }
679  } else {
680  // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
681  $rows = [];
682  foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
683  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
684  $serialValue = $this->getSerialized( $value, $key );
685  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
686 
687  $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
688  }
689  $db->replace( $ptable, 'keyname', $rows, __METHOD__ );
690  foreach ( $argsByKey as $key => $unused ) {
691  $resByKey[$key] = true;
692  }
693  }
694 
695  $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
696  }
697 
715  IDatabase $db,
716  string $ptable,
717  float $mtime,
718  array $argsByKey,
719  array &$resByKey
720  ) {
721  if ( $this->isMultiPrimaryModeEnabled() ) {
722  // Tombstone keys in order to respect eventual consistency
723  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
724  $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
725  $rows = [];
726  foreach ( $argsByKey as $key => $arg ) {
727  $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
728  }
729  $db->upsert(
730  $ptable,
731  $rows,
732  [ [ 'keyname' ] ],
733  $this->buildUpsertSetForOverwrite( $db, self::TOMB_SERIAL, $expiry, $mt ),
734  __METHOD__
735  );
736  } else {
737  // Just purge the keys since there is only one primary (e.g. "source of truth")
738  $db->delete( $ptable, [ 'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
739  }
740 
741  foreach ( $argsByKey as $key => $arg ) {
742  $resByKey[$key] = true;
743  }
744 
745  $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
746  }
747 
769  IDatabase $db,
770  string $ptable,
771  float $mtime,
772  array $argsByKey,
773  array &$resByKey
774  ) {
775  $valueSizesByKey = [];
776 
777  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
778 
779  // This check must happen outside the write query to respect eventual consistency
780  $existingKeys = $db->selectFieldValues(
781  $ptable,
782  'keyname',
783  $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
784  __METHOD__
785  );
786  $existingByKey = array_fill_keys( $existingKeys, true );
787 
788  // @TODO: use multi-row upsert() with VALUES() once supported in Database
789  foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
790  if ( isset( $existingByKey[$key] ) ) {
791  $this->logger->debug( __METHOD__ . ": $key already exists" );
792  continue;
793  }
794 
795  $serialValue = $this->getSerialized( $value, $key );
796  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
797  $db->upsert(
798  $ptable,
799  $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
800  [ [ 'keyname' ] ],
801  $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
802  __METHOD__
803  );
804  $resByKey[$key] = true;
805 
806  $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
807  }
808 
809  $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
810  }
811 
833  IDatabase $db,
834  string $ptable,
835  float $mtime,
836  array $argsByKey,
837  array &$resByKey
838  ) {
839  $valueSizesByKey = [];
840 
841  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
842 
843  // This check must happen outside the write query to respect eventual consistency
844  $res = $db->select(
845  $ptable,
846  $this->addCasTokenFields( $db, [ 'keyname' ] ),
847  $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
848  __METHOD__
849  );
850 
851  $curTokensByKey = [];
852  foreach ( $res as $row ) {
853  $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
854  }
855 
856  // @TODO: use multi-row upsert() with VALUES() once supported in Database
857  foreach ( $argsByKey as $key => list( $value, $exptime, $casToken ) ) {
858  $curToken = $curTokensByKey[$key] ?? null;
859  if ( $curToken === null ) {
860  $this->logger->debug( __METHOD__ . ": $key does not exists" );
861  continue;
862  }
863 
864  if ( $curToken !== $casToken ) {
865  $this->logger->debug( __METHOD__ . ": $key does not have a matching token" );
866  continue;
867  }
868 
869  $serialValue = $this->getSerialized( $value, $key );
870  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
871  $db->upsert(
872  $ptable,
873  $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
874  [ [ 'keyname' ] ],
875  $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
876  __METHOD__
877  );
878  $resByKey[$key] = true;
879 
880  $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
881  }
882 
883  $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
884  }
885 
906  IDatabase $db,
907  string $ptable,
908  float $mtime,
909  array $argsByKey,
910  array &$resByKey
911  ) {
912  if ( $this->isMultiPrimaryModeEnabled() ) {
913  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
914 
915  $res = $db->select(
916  $ptable,
917  [ 'keyname', 'value' ],
918  $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
919  __METHOD__
920  );
921  // @TODO: use multi-row upsert() with VALUES() once supported in Database
922  foreach ( $res as $curRow ) {
923  $key = $curRow->keyname;
924  $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
925  list( $exptime ) = $argsByKey[$key];
926  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
927 
928  $db->upsert(
929  $ptable,
930  $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
931  [ [ 'keyname' ] ],
932  $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
933  __METHOD__
934  );
935  $resByKey[$key] = true;
936  }
937  } else {
938  $keysBatchesByExpiry = [];
939  foreach ( $argsByKey as $key => list( $exptime ) ) {
940  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
941  $keysBatchesByExpiry[$expiry][] = $key;
942  }
943 
944  $existingCount = 0;
945  foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
946  $db->update(
947  $ptable,
948  [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
949  $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ),
950  __METHOD__
951  );
952  $existingCount += $db->affectedRows();
953  }
954  if ( $existingCount === count( $argsByKey ) ) {
955  foreach ( $argsByKey as $key => $args ) {
956  $resByKey[$key] = true;
957  }
958  }
959  }
960 
961  $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
962  }
963 
984  IDatabase $db,
985  string $ptable,
986  float $mtime,
987  array $argsByKey,
988  array &$resByKey
989  ) {
990  foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) {
991  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
992  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
993 
994  // Use a transaction so that changes from other threads are not visible due to
995  // "consistent reads". This way, the exact post-increment value can be returned.
996  // The "live key exists" check can go inside the write query and remain safe for
997  // replication since the TTL for such keys is either indefinite or very short.
998  $db->startAtomic( __METHOD__ );
999  $db->upsert(
1000  $ptable,
1001  $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
1002  [ [ 'keyname' ] ],
1003  $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
1004  __METHOD__
1005  );
1006  $affectedCount = $db->affectedRows();
1007  $row = $db->selectRow( $ptable, 'value', [ 'keyname' => $key ], __METHOD__ );
1008  $db->endAtomic( __METHOD__ );
1009 
1010  if ( !$affectedCount || $row === false ) {
1011  $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
1012  continue;
1013  }
1014 
1015  $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
1016  if ( !$this->isInteger( $serialValue ) ) {
1017  $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
1018  continue;
1019  }
1020 
1021  $resByKey[$key] = (int)$serialValue;
1022  }
1023 
1024  $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
1025  }
1026 
1032  private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1033  $expiry = $this->getExpirationAsTimestamp( $exptime );
1034  // Eventual consistency requires the preservation of recently modified keys.
1035  // Do not create rows with `exptime` fields so low that they might get garbage
1036  // collected before being replicated.
1037  if ( $expiry !== self::TTL_INDEFINITE ) {
1038  $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1039  }
1040 
1041  return $expiry;
1042  }
1043 
1062  private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1063  if ( !$this->lock( $key, 0 ) ) {
1064  return null;
1065  }
1066 
1067  $scope = new ScopedCallback( function () use ( $key ) {
1068  $this->unlock( $key );
1069  } );
1070 
1071  return sprintf( '%.6f', $this->locks[$key][self::LOCK_TIME] );
1072  }
1073 
1083  private function makeTimestampedModificationToken( $mtime, IDatabase $db ) {
1084  // We have reserved space for upto 6 digits in the microsecond portion of the token.
1085  // This is for future use only (maybe CAS tokens) and not currently used.
1086  // It is currently populated by the microsecond portion returned by microtime,
1087  // which generally has fewer than 6 digits of meaningful precision but can still be useful
1088  // in debugging (to see the token continuously change even during rapid testing).
1089  $seconds = (int)$mtime;
1090  list( , $microseconds ) = explode( '.', sprintf( '%.6f', $mtime ) );
1091 
1092  $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );
1093 
1094  $token = implode( '', [
1095  // 67 bit integral portion of UNIX timestamp, qualified
1096  \Wikimedia\base_convert(
1097  // 35 bit integral seconds portion of UNIX timestamp
1098  str_pad( base_convert( $seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1099  // 32 bit ID of the primary database server handling the write
1100  str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1101  2,
1102  36,
1103  13
1104  ),
1105  // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1106  str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1107  ] );
1108 
1109  if ( strlen( $token ) !== 17 ) {
1110  throw new RuntimeException( "Modification timestamp overflow detected" );
1111  }
1112 
1113  return $token;
1114  }
1115 
1124  private function buildExistenceConditions( IDatabase $db, $keys, string $time ) {
1125  // Note that tombstones always have past expiration dates
1126  return [
1127  'keyname' => $keys,
1128  'exptime >= ' . $db->addQuotes( $db->timestamp( (int)$time ) )
1129  ];
1130  }
1131 
1142  private function buildUpsertRow(
1143  IDatabase $db,
1144  $key,
1145  $serialValue,
1146  int $expiry,
1147  string $mt
1148  ) {
1149  $row = [
1150  'keyname' => $key,
1151  'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1152  'exptime' => $this->encodeDbExpiry( $db, $expiry )
1153  ];
1154  if ( $this->isMultiPrimaryModeEnabled() ) {
1155  $row['modtoken'] = $mt;
1156  }
1157 
1158  return $row;
1159  }
1160 
1170  private function buildUpsertSetForOverwrite(
1171  IDatabase $db,
1172  $serialValue,
1173  int $expiry,
1174  string $mt
1175  ) {
1176  $expressionsByColumn = [
1177  'value' => $db->addQuotes( $this->dbEncodeSerialValue( $db, $serialValue ) ),
1178  'exptime' => $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1179  ];
1180 
1181  $set = [];
1182  if ( $this->isMultiPrimaryModeEnabled() ) {
1183  // The query might take a while to replicate, during which newer values might get
1184  // written. Qualify the query so that it does not override such values. Note that
1185  // duplicate tokens generated around the same time for a key should still result
1186  // in convergence given the use of server_id in modtoken (providing a total order
1187  // among primary DB servers) and MySQL binlog ordering (providing a total order
1188  // for writes replicating from a given primary DB server).
1189  $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1190  foreach ( $expressionsByColumn as $column => $updateExpression ) {
1191  $rhs = $db->conditional(
1192  $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= SUBSTR(modtoken,0,13)',
1193  $updateExpression,
1194  $column
1195  );
1196  $set[] = "{$column}=" . trim( $rhs );
1197  }
1198  } else {
1199  foreach ( $expressionsByColumn as $column => $updateExpression ) {
1200  $set[] = "{$column}={$updateExpression}";
1201  }
1202  }
1203 
1204  return $set;
1205  }
1206 
1218  private function buildIncrUpsertSet(
1219  IDatabase $db,
1220  int $step,
1221  int $init,
1222  int $expiry,
1223  string $mt,
1224  int $mtUnixTs
1225  ) {
1226  // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1227  $expressionsByColumn = [
1228  'value' => [
1229  $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1230  $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1231  ],
1232  'exptime' => [
1233  'exptime',
1234  $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1235  ]
1236  ];
1237  if ( $this->isMultiPrimaryModeEnabled() ) {
1238  $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1239  }
1240 
1241  $set = [];
1242  foreach ( $expressionsByColumn as $column => list( $updateExpression, $initExpression ) ) {
1243  $rhs = $db->conditional(
1244  'exptime >= ' . $db->addQuotes( $db->timestamp( $mtUnixTs ) ),
1245  $updateExpression,
1246  $initExpression
1247  );
1248  $set[] = "{$column}=" . trim( $rhs );
1249  }
1250 
1251  return $set;
1252  }
1253 
1259  private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1260  return ( $expiry === self::TTL_INDEFINITE )
1261  // Use the maximum timestamp that the column can store
1262  ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1263  // Convert the absolute timestamp into the DB timestamp format
1264  : $db->timestamp( $expiry );
1265  }
1266 
1272  private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1273  return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1274  ? self::TTL_INDEFINITE
1275  : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1276  }
1277 
1283  private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1284  return is_int( $serialValue ) ? $serialValue : $db->encodeBlob( $serialValue );
1285  }
1286 
1292  private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1293  return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1294  }
1295 
1303  private function addCasTokenFields( IDatabase $db, array $fields ) {
1304  $type = $db->getType();
1305 
1306  if ( $type === 'mysql' ) {
1307  $fields['castoken'] = $db->buildConcat( [
1308  'SHA1(value)',
1309  $db->addQuotes( '@' ),
1310  'exptime'
1311  ] );
1312  } elseif ( $type === 'postgres' ) {
1313  $fields['castoken'] = $db->buildConcat( [
1314  'md5(value)',
1315  $db->addQuotes( '@' ),
1316  'exptime'
1317  ] );
1318  } else {
1319  if ( !in_array( 'value', $fields, true ) ) {
1320  $fields[] = 'value';
1321  }
1322  if ( !in_array( 'exptime', $fields, true ) ) {
1323  $fields[] = 'exptime';
1324  }
1325  }
1326 
1327  return $fields;
1328  }
1329 
1337  private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1338  if ( isset( $row->castoken ) ) {
1339  $token = $row->castoken;
1340  } else {
1341  $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1342  $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1343  }
1344 
1345  return $token;
1346  }
1347 
1352  private function occasionallyGarbageCollect( IDatabase $db ) {
1353  if (
1354  // Random purging is enabled
1355  $this->purgePeriod &&
1356  // Only purge on one in every $this->purgePeriod writes
1357  mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1358  // Avoid repeating the delete within a few seconds
1359  ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1360  ) {
1361  $garbageCollector = function () use ( $db ) {
1363  $db,
1364  (int)$this->getCurrentTime(),
1365  $this->purgeLimit
1366  );
1367  $this->lastGarbageCollect = time();
1368  };
1369  if ( $this->asyncHandler ) {
1370  $this->lastGarbageCollect = $this->getCurrentTime(); // avoid duplicate enqueues
1371  ( $this->asyncHandler )( $garbageCollector );
1372  } else {
1373  $garbageCollector();
1374  }
1375  }
1376  }
1377 
1378  public function expireAll() {
1379  $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1380  }
1381 
1383  $timestamp,
1384  callable $progress = null,
1385  $limit = INF,
1386  string $tag = null
1387  ) {
1389  $silenceScope = $this->silenceTransactionProfiler();
1390 
1391  if ( $tag !== null ) {
1392  // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1393  $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1394  } else {
1395  $shardIndexes = $this->getShardServerIndexes();
1396  shuffle( $shardIndexes );
1397  }
1398 
1399  $ok = true;
1400  $numServers = count( $shardIndexes );
1401 
1402  $keysDeletedCount = 0;
1403  foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1404  try {
1405  $db = $this->getConnection( $shardIndex );
1407  $db,
1408  $timestamp,
1409  $limit,
1410  $keysDeletedCount,
1411  [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1412  );
1413  } catch ( DBError $e ) {
1414  $this->handleDBError( $e, $shardIndex );
1415  $ok = false;
1416  }
1417  }
1418 
1419  return $ok;
1420  }
1421 
1431  IDatabase $db,
1432  $timestamp,
1433  $limit,
1434  &$keysDeletedCount = 0,
1435  array $progress = null
1436  ) {
1437  $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1438  if ( $this->isMultiPrimaryModeEnabled() ) {
1439  // Eventual consistency requires the preservation of any key that was recently
1440  // modified. The key must exist on this database server long enough for the server
1441  // to receive, via replication, all writes to the key with lower timestamps. Such
1442  // writes should be no-ops since the existing key value should "win". If the network
1443  // partitions between datacenters A and B for 30 minutes, the database servers in
1444  // each datacenter will see an initial burst of writes with "old" timestamps via
1445  // replication. This might include writes with lower timestamps that the existing
1446  // key value. Therefore, clock skew and replication delay are both factors.
1447  $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1448  $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1449  }
1450  $tableIndexes = range( 0, $this->numTableShards - 1 );
1451  shuffle( $tableIndexes );
1452 
1453  $batchSize = min( $this->writeBatchSize, $limit );
1454 
1455  foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1456  // The oldest expiry of a row we have deleted on this shard
1457  // (the first row that we deleted)
1458  $minExpUnix = null;
1459  // The most recent expiry time so far, from a row we have deleted on this shard
1460  $maxExp = null;
1461  // Size of the time range we'll delete, in seconds (for progress estimate)
1462  $totalSeconds = null;
1463 
1464  do {
1465  $res = $db->select(
1466  $this->getTableNameByShard( $tableIndex ),
1467  [ 'keyname', 'exptime' ],
1468  array_merge(
1469  [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
1470  $maxExp ? [ 'exptime >= ' . $db->addQuotes( $maxExp ) ] : []
1471  ),
1472  __METHOD__,
1473  [ 'LIMIT' => $batchSize, 'ORDER BY' => 'exptime ASC' ]
1474  );
1475 
1476  if ( $res->numRows() ) {
1477  $row = $res->current();
1478  if ( $minExpUnix === null ) {
1479  $minExpUnix = ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1480  $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1481  }
1482 
1483  $keys = [];
1484  foreach ( $res as $row ) {
1485  $keys[] = $row->keyname;
1486  $maxExp = $row->exptime;
1487  }
1488 
1489  $db->delete(
1490  $this->getTableNameByShard( $tableIndex ),
1491  [
1492  'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
1493  'keyname' => $keys
1494  ],
1495  __METHOD__
1496  );
1497  $keysDeletedCount += $db->affectedRows();
1498  }
1499 
1500  if ( $progress && is_callable( $progress['fn'] ) ) {
1501  if ( $totalSeconds ) {
1502  $maxExpUnix = ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1503  $remainingSeconds = $cutoffUnix - $maxExpUnix;
1504  $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1505  // For example, if we've done 1.5 table shard, and are thus half-way on the
1506  // 2nd of perhaps 5 tables on this server, then this might be:
1507  // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1508  $tablesDoneRatio =
1509  ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1510  } else {
1511  $tablesDoneRatio = 1;
1512  }
1513 
1514  // For example, if we're 30% done on the last of 10 servers, then this might be:
1515  // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1516  $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1517  ( $tablesDoneRatio / $progress['serversTotal'] );
1518  ( $progress['fn'] )( $overallRatio * 100 );
1519  }
1520  } while ( $res->numRows() && $keysDeletedCount < $limit );
1521  }
1522  }
1523 
1529  public function deleteAll() {
1531  $silenceScope = $this->silenceTransactionProfiler();
1532  foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1533  $db = null; // in case of connection failure
1534  try {
1535  $db = $this->getConnection( $shardIndex );
1536  for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1537  $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
1538  }
1539  } catch ( DBError $e ) {
1540  $this->handleDBError( $e, $shardIndex );
1541  return false;
1542  }
1543  }
1544  return true;
1545  }
1546 
1547  public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1549  $silenceScope = $this->silenceTransactionProfiler();
1550 
1551  $lockTsUnix = null;
1552 
1553  list( $shardIndex ) = $this->getKeyLocation( $key );
1554  try {
1555  $db = $this->getConnection( $shardIndex );
1556  $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1557  } catch ( DBError $e ) {
1558  $this->handleDBError( $e, $shardIndex );
1559  $this->logger->warning(
1560  __METHOD__ . ' failed due to I/O error for {key}.',
1561  [ 'key' => $key ]
1562  );
1563  }
1564 
1565  return $lockTsUnix;
1566  }
1567 
1568  public function doUnlock( $key ) {
1570  $silenceScope = $this->silenceTransactionProfiler();
1571 
1572  list( $shardIndex ) = $this->getKeyLocation( $key );
1573 
1574  try {
1575  $db = $this->getConnection( $shardIndex );
1576  $released = $db->unlock( $key, __METHOD__ );
1577  } catch ( DBError $e ) {
1578  $this->handleDBError( $e, $shardIndex );
1579  $released = false;
1580  }
1581 
1582  return $released;
1583  }
1584 
1585  public function makeKeyInternal( $keyspace, $components ) {
1586  // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1587  // subtract the number of characters we need for the keyspace and for
1588  // the separator character needed for each argument. To handle some
1589  // custom prefixes used by thing like WANObjectCache, limit to 205.
1590  $keyspace = strtr( $keyspace, ' ', '_' );
1591  $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1592  foreach ( $components as &$component ) {
1593  $component = strtr( $component, [
1594  ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1595  ':' => '%3A',
1596  ] );
1597 
1598  // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1599  if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1600  $component = '#' . md5( $component );
1601  }
1602  $charsLeft -= strlen( $component );
1603  }
1604 
1605  if ( $charsLeft < 0 ) {
1606  return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1607  }
1608  return $keyspace . ':' . implode( ':', $components );
1609  }
1610 
1611  protected function serialize( $value ) {
1612  if ( is_int( $value ) ) {
1613  return $value;
1614  }
1615 
1616  $serial = serialize( $value );
1617  if ( function_exists( 'gzdeflate' ) ) {
1618  // On typical message and page data, this can provide a 3X storage savings
1619  $serial = gzdeflate( $serial );
1620  }
1621 
1622  return $serial;
1623  }
1624 
1625  protected function unserialize( $value ) {
1626  if ( $value === self::TOMB_SERIAL ) {
1627  return false; // tombstone
1628  }
1629 
1630  if ( $this->isInteger( $value ) ) {
1631  return (int)$value;
1632  }
1633 
1634  if ( function_exists( 'gzinflate' ) ) {
1635  AtEase::suppressWarnings();
1636  $decompressed = gzinflate( $value );
1637  AtEase::restoreWarnings();
1638 
1639  if ( $decompressed !== false ) {
1640  $value = $decompressed;
1641  }
1642  }
1643 
1644  return unserialize( $value );
1645  }
1646 
1652  private function getConnectionViaLoadBalancer( $shardIndex ) {
1653  $lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb;
1654  if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1655  // Use the main connection to avoid transaction deadlocks
1656  $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY );
1657  } else {
1658  // If the RDBMs has row/table/page level locking, then use separate auto-commit
1659  // connection to avoid needless contention and deadlocks.
1660  $conn = $lb->getMaintenanceConnectionRef(
1661  $this->replicaOnly ? DB_REPLICA : DB_PRIMARY, [],
1662  false,
1663  $lb::CONN_TRX_AUTOCOMMIT
1664  );
1665  }
1666 
1667  return $conn;
1668  }
1669 
1676  private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1677  if ( !isset( $this->conns[$shardIndex] ) ) {
1679  $conn = Database::factory(
1680  $server['type'],
1681  array_merge(
1682  $server,
1683  [
1684  // Make sure the handle uses autocommit mode
1685  'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1686  'connLogger' => $this->logger,
1687  'queryLogger' => $this->logger
1688  ]
1689  )
1690  );
1691  // Automatically create the objectcache table for sqlite as needed
1692  if ( $conn->getType() === 'sqlite' ) {
1693  if ( !$conn->tableExists( 'objectcache', __METHOD__ ) ) {
1694  $this->initSqliteDatabase( $conn );
1695  }
1696  }
1697  $this->conns[$shardIndex] = $conn;
1698  }
1699 
1700  return $this->conns[$shardIndex];
1701  }
1702 
1709  private function handleDBError( DBError $exception, $shardIndex ) {
1710  if ( $exception instanceof DBConnectionError ) {
1711  $this->markServerDown( $exception, $shardIndex );
1712  }
1713 
1714  $this->setAndLogDBError( $exception );
1715  }
1716 
1720  private function setAndLogDBError( DBError $exception ) {
1721  $this->logger->error( "DBError: {$exception->getMessage()}" );
1722  if ( $exception instanceof DBConnectionError ) {
1723  $this->setLastError( self::ERR_UNREACHABLE );
1724  $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1725  } else {
1726  $this->setLastError( self::ERR_UNEXPECTED );
1727  $this->logger->warning( __METHOD__ . ": ignoring query error" );
1728  }
1729  }
1730 
1737  private function markServerDown( DBError $exception, $shardIndex ) {
1738  unset( $this->conns[$shardIndex] ); // bug T103435
1739 
1740  $now = $this->getCurrentTime();
1741  if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1742  if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1743  unset( $this->connFailureTimes[$shardIndex] );
1744  unset( $this->connFailureErrors[$shardIndex] );
1745  } else {
1746  $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1747  return;
1748  }
1749  }
1750  $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1751  $this->connFailureTimes[$shardIndex] = $now;
1752  $this->connFailureErrors[$shardIndex] = $exception;
1753  }
1754 
1759  private function initSqliteDatabase( IMaintainableDatabase $db ) {
1760  if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1761  return;
1762  }
1763  // Use one table for SQLite; sharding does not seem to have much benefit
1764  $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1765  $db->startAtomic( __METHOD__ ); // atomic DDL
1766  try {
1767  $encTable = $db->tableName( 'objectcache' );
1768  $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1769  $db->query(
1770  "CREATE TABLE $encTable (\n" .
1771  " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1772  " value BLOB,\n" .
1773  " exptime BLOB NOT NULL\n" .
1774  ")",
1775  __METHOD__
1776  );
1777  $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1778  $db->endAtomic( __METHOD__ );
1779  } catch ( DBError $e ) {
1780  $db->rollback( __METHOD__ );
1781  throw $e;
1782  }
1783  }
1784 
1788  public function createTables() {
1789  foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1790  $db = $this->getConnection( $shardIndex );
1791  if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1792  for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1793  $encBaseTable = $db->tableName( 'objectcache' );
1794  $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1795  $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1796  }
1797  }
1798  }
1799  }
1800 
1804  private function getShardServerIndexes() {
1805  if ( $this->serverTags ) {
1806  // Striped array of database servers
1807  $shardIndexes = array_keys( $this->serverTags );
1808  } else {
1809  // LoadBalancer based configuration
1810  $shardIndexes = [];
1811  if ( $this->localKeyLb ) {
1812  $shardIndexes[] = self::SHARD_LOCAL;
1813  }
1814  if ( $this->globalKeyLb ) {
1815  $shardIndexes[] = self::SHARD_GLOBAL;
1816  }
1817  }
1818 
1819  return $shardIndexes;
1820  }
1821 
1827  private function getShardServerIndexForTag( string $tag ) {
1828  if ( !$this->serverTags ) {
1829  throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1830  }
1831  foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1832  if ( $tag === $serverTag ) {
1833  return $serverShardIndex;
1834  }
1835  }
1836  throw new InvalidArgumentException( "Unknown server tag: $tag" );
1837  }
1838 
1842  private function isMultiPrimaryModeEnabled() {
1843  return ( $this->multiPrimaryModeType !== null );
1844  }
1845 
1852  private function waitForReplication( $shardIndex ) {
1853  if ( is_int( $shardIndex ) ) {
1854  return true; // striped only, no LoadBalancer
1855  }
1856 
1857  $lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb;
1858  if ( !$lb->hasStreamingReplicaServers() ) {
1859  return true;
1860  }
1861 
1862  try {
1863  // Wait for any replica DBs to catch up
1864  $masterPos = $lb->getPrimaryPos();
1865  if ( !$masterPos ) {
1866  return true; // not applicable
1867  }
1868 
1869  $loop = new WaitConditionLoop(
1870  static function () use ( $lb, $masterPos ) {
1871  return $lb->waitForAll( $masterPos, 1 );
1872  },
1875  );
1876 
1877  return ( $loop->invoke() === $loop::CONDITION_REACHED );
1878  } catch ( DBError $e ) {
1879  $this->setAndLogDBError( $e );
1880 
1881  return false;
1882  }
1883  }
1884 
1890  private function silenceTransactionProfiler() {
1891  if ( $this->serverInfos ) {
1892  return null; // no TransactionProfiler injected anyway
1893  }
1894  return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1895  }
1896 }
SqlBagOStuff\modifyTableSpecificBlobsForChangeTTL
modifyTableSpecificBlobsForChangeTTL(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Update the TTL for keys belonging to a partition table on the the given server.
Definition: SqlBagOStuff.php:905
SqlBagOStuff\newLockingWriteSectionModificationTimestamp
newLockingWriteSectionModificationTimestamp( $key, &$scope)
Get a scoped lock and modification timestamp for a critical section of reads/writes.
Definition: SqlBagOStuff.php:1062
SqlBagOStuff\$purgeLimit
int $purgeLimit
Max expired rows to purge during randomized garbage collection.
Definition: SqlBagOStuff.php:67
SqlBagOStuff\__construct
__construct( $params)
Create a new backend instance from configuration.
Definition: SqlBagOStuff.php:159
MediumSpecificBagOStuff\setLastError
setLastError( $err)
Set the "last error" registry.
Definition: MediumSpecificBagOStuff.php:830
Wikimedia\Rdbms\IDatabase\getServerName
getServerName()
Get the readable name for the server.
SqlBagOStuff\doGetMulti
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
Definition: SqlBagOStuff.php:356
SqlBagOStuff\doCas
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Check and set an item.
Definition: SqlBagOStuff.php:274
SqlBagOStuff\createTables
createTables()
Create the shard tables on all databases (e.g.
Definition: SqlBagOStuff.php:1788
Wikimedia\Rdbms\Database
Relational database abstraction object.
Definition: Database.php:52
MediumSpecificBagOStuff\isInteger
isInteger( $value)
Check if a value is an integer.
Definition: MediumSpecificBagOStuff.php:953
SqlBagOStuff\incrWithInit
incrWithInit( $key, $exptime, $value=1, $init=null, $flags=0)
Increase the value of the given key (no TTL change) if it exists or create it otherwise.
Definition: SqlBagOStuff.php:300
ArrayUtils\consistentHashSort
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Definition: ArrayUtils.php:49
Wikimedia\Rdbms\IDatabase\affectedRows
affectedRows()
Get the number of rows affected by the last write query.
SqlBagOStuff\doGet
doGet( $key, $flags=0, &$casToken=null)
Definition: SqlBagOStuff.php:216
SqlBagOStuff\waitForReplication
waitForReplication( $shardIndex)
Wait for replica DBs to catch up to the primary DB.
Definition: SqlBagOStuff.php:1852
SqlBagOStuff\handleDBError
handleDBError(DBError $exception, $shardIndex)
Handle a DBError which occurred during a read operation.
Definition: SqlBagOStuff.php:1709
SqlBagOStuff\SHARD_LOCAL
const SHARD_LOCAL
Definition: SqlBagOStuff.php:86
SqlBagOStuff\makeKeyInternal
makeKeyInternal( $keyspace, $components)
Make a cache key for the given keyspace and components.
Definition: SqlBagOStuff.php:1585
SqlBagOStuff\dbDecodeSerialValue
dbDecodeSerialValue(IDatabase $db, $blob)
Definition: SqlBagOStuff.php:1292
Wikimedia\Rdbms\IDatabase\tablePrefix
tablePrefix( $prefix=null)
Get/set the table prefix.
SqlBagOStuff\setAndLogDBError
setAndLogDBError(DBError $exception)
Definition: SqlBagOStuff.php:1720
MediumSpecificBagOStuff\debug
debug( $text)
Definition: MediumSpecificBagOStuff.php:1180
Profiler\instance
static instance()
Singleton.
Definition: Profiler.php:69
Wikimedia\Rdbms\IDatabase\rollback
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
SqlBagOStuff\TOMB_SERIAL
const TOMB_SERIAL
Distinct string for tombstones stored in the "serialized" value column.
Definition: SqlBagOStuff.php:94
Wikimedia\Rdbms\IDatabase\replace
replace( $table, $uniqueKeys, $rows, $fname=__METHOD__)
Insert row(s) into a table, deleting all conflicting rows beforehand.
SqlBagOStuff\fetchBlobs
fetchBlobs(array $keys, bool $getCasToken=false)
Definition: SqlBagOStuff.php:507
SqlBagOStuff\$serverInfos
array[] $serverInfos
(server index => server config)
Definition: SqlBagOStuff.php:59
SqlBagOStuff\buildUpsertRow
buildUpsertRow(IDatabase $db, $key, $serialValue, int $expiry, string $mt)
INSERT array for handling key writes/overwrites when no live nor stale key exists.
Definition: SqlBagOStuff.php:1142
SqlBagOStuff\getConnectionViaLoadBalancer
getConnectionViaLoadBalancer( $shardIndex)
Definition: SqlBagOStuff.php:1652
SqlBagOStuff\INF_TIMESTAMP_PLACEHOLDER
const INF_TIMESTAMP_PLACEHOLDER
Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types.
Definition: SqlBagOStuff.php:110
SqlBagOStuff\$numTableShards
int $numTableShards
Number of table shards to use on each server.
Definition: SqlBagOStuff.php:69
SqlBagOStuff\doUnlock
doUnlock( $key)
Definition: SqlBagOStuff.php:1568
Wikimedia\Rdbms\IDatabase\upsert
upsert( $table, array $rows, $uniqueKeys, array $set, $fname=__METHOD__)
Upsert the given row(s) into a table.
Wikimedia\Rdbms\IDatabase\endAtomic
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
SqlBagOStuff\initSqliteDatabase
initSqliteDatabase(IMaintainableDatabase $db)
Definition: SqlBagOStuff.php:1759
SqlBagOStuff\silenceTransactionProfiler
silenceTransactionProfiler()
Silence the transaction profiler until the return value falls out of scope.
Definition: SqlBagOStuff.php:1890
Wikimedia\Rdbms\IMaintainableDatabase\tableName
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.
MediumSpecificBagOStuff\unlock
unlock( $key)
Release an advisory lock on a key string.
Definition: MediumSpecificBagOStuff.php:545
SqlBagOStuff\unserialize
unserialize( $value)
Definition: SqlBagOStuff.php:1625
SqlBagOStuff\buildUpsertSetForOverwrite
buildUpsertSetForOverwrite(IDatabase $db, $serialValue, int $expiry, string $mt)
SET array for handling key overwrites when a live or stale key exists.
Definition: SqlBagOStuff.php:1170
SqlBagOStuff\BLOB_CASTOKEN
const BLOB_CASTOKEN
Definition: SqlBagOStuff.php:102
SqlBagOStuff\serialize
serialize( $value)
Definition: SqlBagOStuff.php:1611
Wikimedia\Rdbms\IDatabase\decodeBlob
decodeBlob( $b)
Some DBMSs return a special placeholder object representing blob fields in result objects.
SqlBagOStuff\$connFailureTimes
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
Definition: SqlBagOStuff.php:82
$success
$success
Definition: NoLocalSettings.php:42
SqlBagOStuff\$localKeyLb
ILoadBalancer null $localKeyLb
Definition: SqlBagOStuff.php:54
$res
$res
Definition: testCompression.php:57
Wikimedia\Rdbms\DBError
Database error base class @newable.
Definition: DBError.php:32
SqlBagOStuff\deleteObjectsExpiringBefore
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF, string $tag=null)
Delete all objects expiring before a certain date.
Definition: SqlBagOStuff.php:1382
Wikimedia\LightweightObjectStore\StorageAwareness\QOS_DURABILITY_RDBMS
const QOS_DURABILITY_RDBMS
Data is saved to disk and writes usually block on fsync(), like a standard RDBMS.
Definition: StorageAwareness.php:64
DBO_TRX
const DBO_TRX
Definition: defines.php:12
Wikimedia\Rdbms\IDatabase\tableExists
tableExists( $table, $fname=__METHOD__)
Query whether a given table exists.
Wikimedia\Rdbms\IDatabase\conditional
conditional( $cond, $caseTrueExpression, $caseFalseExpression)
Returns an SQL expression for a simple conditional.
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
SqlBagOStuff\doSet
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
Definition: SqlBagOStuff.php:237
SqlBagOStuff\dbEncodeSerialValue
dbEncodeSerialValue(IDatabase $db, $serialValue)
Definition: SqlBagOStuff.php:1283
Wikimedia\Rdbms\IDatabase\update
update( $table, $set, $conds, $fname=__METHOD__, $options=[])
Update all rows in a table that match a given condition.
MediumSpecificBagOStuff\lock
lock( $key, $timeout=6, $exptime=6, $rclass='')
Definition: MediumSpecificBagOStuff.php:467
SqlBagOStuff\SAFE_PURGE_DELAY_SEC
const SAFE_PURGE_DELAY_SEC
A number of seconds well above any expected clock skew and replication lag.
Definition: SqlBagOStuff.php:92
MediumSpecificBagOStuff\$syncTimeout
int $syncTimeout
Seconds.
Definition: MediumSpecificBagOStuff.php:40
SqlBagOStuff\$lastGarbageCollect
int $lastGarbageCollect
UNIX timestamp.
Definition: SqlBagOStuff.php:63
SqlBagOStuff\getKeyLocation
getKeyLocation( $key)
Get the server index and table name for a given key.
Definition: SqlBagOStuff.php:459
Wikimedia\Rdbms\IDatabase\timestamp
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
SqlBagOStuff\expireAll
expireAll()
Definition: SqlBagOStuff.php:1378
SqlBagOStuff\occasionallyGarbageCollect
occasionallyGarbageCollect(IDatabase $db)
Definition: SqlBagOStuff.php:1352
SqlBagOStuff\getCasTokenFromRow
getCasTokenFromRow(IDatabase $db, stdClass $row)
Get a CAS token from a SELECT result row.
Definition: SqlBagOStuff.php:1337
Wikimedia\Rdbms\IDatabase\encodeBlob
encodeBlob( $b)
Some DBMSs have a special format for inserting into blob fields, they don't allow simple quoted strin...
SqlBagOStuff\deleteAll
deleteAll()
Delete content of shard tables in every server.
Definition: SqlBagOStuff.php:1529
SqlBagOStuff\modifyBlobs
modifyBlobs(callable $tableWriteCallback, float $mtime, array $argsByKey, int $flags, &$resByKey=[])
Definition: SqlBagOStuff.php:582
MediumSpecificBagOStuff\getExpirationAsTimestamp
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
Definition: MediumSpecificBagOStuff.php:913
SqlBagOStuff\SAFE_CLOCK_BOUND_SEC
const SAFE_CLOCK_BOUND_SEC
A number of seconds well above any expected clock skew.
Definition: SqlBagOStuff.php:90
SqlBagOStuff\$connFailureErrors
Exception[] $connFailureErrors
Map of (shard index => Exception)
Definition: SqlBagOStuff.php:84
SqlBagOStuff\doDeleteMulti
doDeleteMulti(array $keys, $flags=0)
Definition: SqlBagOStuff.php:397
$blob
$blob
Definition: testCompression.php:70
BagOStuff\$asyncHandler
callable null $asyncHandler
Definition: BagOStuff.php:92
SqlBagOStuff\getConnectionFromServerInfo
getConnectionFromServerInfo( $shardIndex, array $server)
Definition: SqlBagOStuff.php:1676
SqlBagOStuff\modifyTableSpecificBlobsForIncrInit
modifyTableSpecificBlobsForIncrInit(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Either increment a counter key, if it exists, or initialize it, otherwise.
Definition: SqlBagOStuff.php:983
$args
if( $line===false) $args
Definition: mcc.php:124
SqlBagOStuff\modifyTableSpecificBlobsForDelete
modifyTableSpecificBlobsForDelete(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Purge/tombstone key/value pairs belonging to a partition table on the the given server.
Definition: SqlBagOStuff.php:714
SqlBagOStuff\modifyTableSpecificBlobsForCas
modifyTableSpecificBlobsForCas(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Insert key/value pairs belonging to a partition table on the the given server.
Definition: SqlBagOStuff.php:832
SqlBagOStuff\$tableName
string $tableName
Definition: SqlBagOStuff.php:73
SqlBagOStuff\isMultiPrimaryModeEnabled
isMultiPrimaryModeEnabled()
Definition: SqlBagOStuff.php:1842
MediumSpecificBagOStuff
Storage medium specific cache for storing items (e.g.
Definition: MediumSpecificBagOStuff.php:34
Wikimedia\Rdbms\IDatabase\query
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query and return the result.
DB_REPLICA
const DB_REPLICA
Definition: defines.php:25
SqlBagOStuff\doIncr
doIncr( $key, $value=1, $flags=0)
Definition: SqlBagOStuff.php:325
MediumSpecificBagOStuff\updateOpStats
updateOpStats(string $op, array $keyInfo)
Definition: MediumSpecificBagOStuff.php:1191
SqlBagOStuff\buildExistenceConditions
buildExistenceConditions(IDatabase $db, $keys, string $time)
WHERE conditions that check for existence and liveness of keys.
Definition: SqlBagOStuff.php:1124
SqlBagOStuff\GC_DELAY_SEC
const GC_DELAY_SEC
How many seconds must pass before triggering a garbage collection.
Definition: SqlBagOStuff.php:98
Wikimedia\Rdbms\DBQueryError
@newable
Definition: DBQueryError.php:29
SqlBagOStuff\BLOB_EXPIRY
const BLOB_EXPIRY
Definition: SqlBagOStuff.php:101
SqlBagOStuff\incr
incr( $key, $value=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
Definition: SqlBagOStuff.php:317
SqlBagOStuff\$globalKeyLb
ILoadBalancer null $globalKeyLb
Definition: SqlBagOStuff.php:56
SqlBagOStuff\TOMB_EXPTIME
const TOMB_EXPTIME
Relative seconds-to-live to use for tombstones.
Definition: SqlBagOStuff.php:96
Wikimedia\Rdbms\IDatabase\selectRow
selectRow( $table, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
SqlBagOStuff\addCasTokenFields
addCasTokenFields(IDatabase $db, array $fields)
Either append a 'castoken' field or append the fields needed to compute the CAS token.
Definition: SqlBagOStuff.php:1303
DB_PRIMARY
const DB_PRIMARY
Definition: defines.php:27
SqlBagOStuff\doChangeTTL
doChangeTTL( $key, $exptime, $flags)
Definition: SqlBagOStuff.php:289
Wikimedia\Rdbms\IDatabase\buildConcat
buildConcat( $stringList)
Build a concatenation list to feed into a SQL query.
SqlBagOStuff\getShardServerIndexes
getShardServerIndexes()
Definition: SqlBagOStuff.php:1804
SqlBagOStuff\getConnection
getConnection( $shardIndex)
Get a connection to the specified database.
Definition: SqlBagOStuff.php:427
SqlBagOStuff\makeTimestampedModificationToken
makeTimestampedModificationToken( $mtime, IDatabase $db)
Make a modtoken column value with the original time and source database server of a write.
Definition: SqlBagOStuff.php:1083
SqlBagOStuff\doLock
doLock( $key, $timeout=6, $exptime=6)
Definition: SqlBagOStuff.php:1547
MediumSpecificBagOStuff\$busyCallbacks
callable[] $busyCallbacks
Definition: MediumSpecificBagOStuff.php:54
MediumSpecificBagOStuff\getSerialized
getSerialized( $value, $key)
Get the serialized form a value, using any applicable prepared value.
Definition: MediumSpecificBagOStuff.php:1026
SqlBagOStuff\doChangeTTLMulti
doChangeTTLMulti(array $keys, $exptime, $flags=0)
Definition: SqlBagOStuff.php:408
Wikimedia\LightweightObjectStore\StorageAwareness\ATTR_DURABILITY
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
Definition: StorageAwareness.php:45
SqlBagOStuff\markServerDown
markServerDown(DBError $exception, $shardIndex)
Mark a server down due to a DBConnectionError exception.
Definition: SqlBagOStuff.php:1737
SqlBagOStuff
RDBMS-based caching module.
Definition: SqlBagOStuff.php:52
SqlBagOStuff\getShardServerIndexForTag
getShardServerIndexForTag(string $tag)
Definition: SqlBagOStuff.php:1827
SqlBagOStuff\$conns
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
Definition: SqlBagOStuff.php:80
SqlBagOStuff\buildIncrUpsertSet
buildIncrUpsertSet(IDatabase $db, int $step, int $init, int $expiry, string $mt, int $mtUnixTs)
SET array for handling key overwrites when a live or stale key exists.
Definition: SqlBagOStuff.php:1218
SqlBagOStuff\deleteServerObjectsExpiringBefore
deleteServerObjectsExpiringBefore(IDatabase $db, $timestamp, $limit, &$keysDeletedCount=0, array $progress=null)
Definition: SqlBagOStuff.php:1430
Wikimedia\Rdbms\IDatabase\addQuotes
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
SqlBagOStuff\decr
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
Definition: SqlBagOStuff.php:321
SqlBagOStuff\doAdd
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
Definition: SqlBagOStuff.php:259
Wikimedia\LightweightObjectStore\StorageAwareness\ATTR_EMULATION
const ATTR_EMULATION
Emulation/fallback mode; see QOS_EMULATION_*; higher is better.
Definition: StorageAwareness.php:43
BagOStuff\fieldHasFlags
fieldHasFlags( $field, $flags)
Definition: BagOStuff.php:584
SqlBagOStuff\BLOB_VALUE
const BLOB_VALUE
Definition: SqlBagOStuff.php:100
Wikimedia\Rdbms\IDatabase\selectFieldValues
selectFieldValues( $table, $var, $cond='', $fname=__METHOD__, $options=[], $join_conds=[])
A SELECT wrapper which returns a list of single field values from result rows.
Wikimedia\Rdbms\DBConnectionError
@newable
Definition: DBConnectionError.php:27
Wikimedia
This program is free software; you can redistribute it and/or modify it under the terms of the GNU Ge...
SqlBagOStuff\decodeDbExpiry
decodeDbExpiry(IDatabase $db, string $dbExpiry)
Definition: SqlBagOStuff.php:1272
Wikimedia\Rdbms\IDatabase\getType
getType()
Get the RDBMS type of the server (e.g.
$keys
$keys
Definition: testCompression.php:72
Wikimedia\Rdbms\IDatabase\select
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
BagOStuff\getCurrentTime
getCurrentTime()
Definition: BagOStuff.php:795
BagOStuff\$keyspace
string $keyspace
Default keyspace; used by makeKey()
Definition: BagOStuff.php:103
SqlBagOStuff\modifyTableSpecificBlobsForSet
modifyTableSpecificBlobsForSet(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Set key/value pairs belonging to a partition table on the the given server.
Definition: SqlBagOStuff.php:652
SqlBagOStuff\SHARD_GLOBAL
const SHARD_GLOBAL
Definition: SqlBagOStuff.php:87
SqlBagOStuff\$serverTags
string[] $serverTags
(server index => tag/host name)
Definition: SqlBagOStuff.php:61
Wikimedia\Rdbms\IDatabase\getTopologyBasedServerId
getTopologyBasedServerId()
Get a non-recycled ID that uniquely identifies this server within the replication topology.
SqlBagOStuff\$writeBatchSize
int $writeBatchSize
Definition: SqlBagOStuff.php:71
Wikimedia\Rdbms\IDatabase\buildIntegerCast
buildIntegerCast( $field)
SqlBagOStuff\$replicaOnly
bool $replicaOnly
Whether to use replicas instead of primaries (if using LoadBalancer)
Definition: SqlBagOStuff.php:75
Wikimedia\Rdbms\IDatabase\addIdentifierQuotes
addIdentifierQuotes( $s)
Escape a SQL identifier (e.g.
SqlBagOStuff\$multiPrimaryModeType
string null $multiPrimaryModeType
Multi-primary mode DB type ("mysql",...); null if not enabled.
Definition: SqlBagOStuff.php:77
SqlBagOStuff\modifyTableSpecificBlobsForAdd
modifyTableSpecificBlobsForAdd(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Insert key/value pairs belonging to a partition table on the the given server.
Definition: SqlBagOStuff.php:768
SqlBagOStuff\makeNewKeyExpiry
makeNewKeyExpiry( $exptime, int $nowTsUnix)
Definition: SqlBagOStuff.php:1032
SqlBagOStuff\doSetMulti
doSetMulti(array $data, $exptime=0, $flags=0)
Definition: SqlBagOStuff.php:381
SqlBagOStuff\encodeDbExpiry
encodeDbExpiry(IDatabase $db, int $expiry)
Definition: SqlBagOStuff.php:1259
Wikimedia\Rdbms\IMaintainableDatabase
Advanced database interface for IDatabase handles that include maintenance methods.
Definition: IMaintainableDatabase.php:38
SqlBagOStuff\$purgePeriod
int $purgePeriod
Average number of writes required to trigger garbage collection.
Definition: SqlBagOStuff.php:65
Wikimedia\Rdbms\IDatabase\delete
delete( $table, $conds, $fname=__METHOD__)
Delete all rows in a table that match a condition.
Wikimedia\Rdbms\ILoadBalancer
Database cluster connection, tracking, load balancing, and transaction manager interface.
Definition: ILoadBalancer.php:81
SqlBagOStuff\getTableNameByShard
getTableNameByShard( $index)
Get the table name for a given shard index.
Definition: SqlBagOStuff.php:492
SqlBagOStuff\doDelete
doDelete( $key, $flags=0)
Delete an item.
Definition: SqlBagOStuff.php:248
Wikimedia\Rdbms\Blob
@newable
Definition: Blob.php:9
$type
$type
Definition: testCompression.php:52
Wikimedia\LightweightObjectStore\StorageAwareness\QOS_EMULATION_SQL
const QOS_EMULATION_SQL
Fallback disk-based SQL store.
Definition: StorageAwareness.php:48
Wikimedia\Rdbms\IDatabase\startAtomic
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.