MediaWiki  master
SqlBagOStuff.php
Go to the documentation of this file.
1 <?php
24 use Wikimedia\AtEase\AtEase;
34 use Wikimedia\ScopedCallback;
35 use Wikimedia\Timestamp\ConvertibleTimestamp;
36 
54  protected $loadBalancer;
56  protected $dbDomain;
58  protected $useLB = false;
59 
61  protected $serverInfos = [];
63  protected $serverTags = [];
65  protected $lastGarbageCollect = 0;
67  protected $purgePeriod = 10;
69  protected $purgeLimit = 100;
71  protected $numTableShards = 1;
73  protected $writeBatchSize = 100;
75  protected $tableName = 'objectcache';
77  protected $replicaOnly;
79  protected $multiPrimaryMode;
80 
82  protected $conns;
84  protected $connFailureTimes = [];
86  protected $connFailureErrors = [];
87 
89  private $hasZlib;
90 
92  private const SAFE_CLOCK_BOUND_SEC = 15;
94  private const SAFE_PURGE_DELAY_SEC = 3600;
96  private const TOMB_SERIAL = '';
98  private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
100  private const GC_DELAY_SEC = 1;
101 
102  private const BLOB_VALUE = 0;
103  private const BLOB_EXPIRY = 1;
104  private const BLOB_CASTOKEN = 2;
105 
112  private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
113 
143  public function __construct( $params ) {
144  parent::__construct( $params );
145 
146  if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
147  // Configuration uses a direct list of servers.
148  // Object data is horizontally partitioned via key hash.
149  $index = 0;
150  foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
151  $this->serverInfos[$index] = $info;
152  // Allow integer-indexes arrays for b/c
153  $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
154  ++$index;
155  }
156  } elseif ( isset( $params['loadBalancerCallback'] ) ) {
157  $this->loadBalancerCallback = $params['loadBalancerCallback'];
158  if ( !isset( $params['dbDomain'] ) ) {
159  throw new InvalidArgumentException(
160  __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given"
161  );
162  }
163  $this->dbDomain = $params['dbDomain'];
164  $this->useLB = true;
165  } else {
166  throw new InvalidArgumentException(
167  __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'"
168  );
169  }
170 
171  $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
172  $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
173  $this->tableName = $params['tableName'] ?? $this->tableName;
174  $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
175  $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
176  $this->replicaOnly = $params['replicaOnly'] ?? false;
177  $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false;
178 
181 
182  $this->hasZlib = extension_loaded( 'zlib' );
183  }
184 
185  protected function doGet( $key, $flags = 0, &$casToken = null ) {
186  $getToken = ( $casToken === self::PASS_BY_REF );
187  $casToken = null;
188 
189  $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
190  if ( $data ) {
191  $result = $this->unserialize( $data[self::BLOB_VALUE] );
192  if ( $getToken && $result !== false ) {
193  $casToken = $data[self::BLOB_CASTOKEN];
194  }
195  $valueSize = strlen( $data[self::BLOB_VALUE] );
196  } else {
197  $result = false;
198  $valueSize = false;
199  }
200 
201  $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
202 
203  return $result;
204  }
205 
206  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
207  $mtime = $this->getCurrentTime();
208 
209  return $this->modifyBlobs(
210  [ $this, 'modifyTableSpecificBlobsForSet' ],
211  $mtime,
212  [ $key => [ $value, $exptime ] ],
213  $flags
214  );
215  }
216 
217  protected function doDelete( $key, $flags = 0 ) {
218  $mtime = $this->getCurrentTime();
219 
220  return $this->modifyBlobs(
221  [ $this, 'modifyTableSpecificBlobsForDelete' ],
222  $mtime,
223  [ $key => [] ],
224  $flags
225  );
226  }
227 
228  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
229  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
230  if ( $mtime === null ) {
231  // Timeout or I/O error during lock acquisition
232  return false;
233  }
234 
235  return $this->modifyBlobs(
236  [ $this, 'modifyTableSpecificBlobsForAdd' ],
237  $mtime,
238  [ $key => [ $value, $exptime ] ],
239  $flags
240  );
241  }
242 
243  protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
244  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
245  if ( $mtime === null ) {
246  // Timeout or I/O error during lock acquisition
247  return false;
248  }
249 
250  return $this->modifyBlobs(
251  [ $this, 'modifyTableSpecificBlobsForCas' ],
252  $mtime,
253  [ $key => [ $value, $exptime, $casToken ] ],
254  $flags
255  );
256  }
257 
258  protected function doChangeTTL( $key, $exptime, $flags ) {
259  $mtime = $this->getCurrentTime();
260 
261  return $this->modifyBlobs(
262  [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
263  $mtime,
264  [ $key => [ $exptime ] ],
265  $flags
266  );
267  }
268 
269  protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
270  $mtime = $this->getCurrentTime();
271 
272  if ( $flags & self::WRITE_BACKGROUND ) {
273  $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ];
274  } else {
275  $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ];
276  }
277 
278  $result = $this->modifyBlobs(
279  $callback,
280  $mtime,
281  [ $key => [ $step, $init, $exptime ] ],
282  $flags,
283  $resByKey
284  ) ? $resByKey[$key] : false;
285 
286  return $result;
287  }
288 
289  public function incr( $key, $value = 1, $flags = 0 ) {
290  return $this->doIncr( $key, $value, $flags );
291  }
292 
293  public function decr( $key, $value = 1, $flags = 0 ) {
294  return $this->doIncr( $key, -$value, $flags );
295  }
296 
297  private function doIncr( $key, $value = 1, $flags = 0 ) {
298  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
299  if ( $mtime === null ) {
300  // Timeout or I/O error during lock acquisition
301  return false;
302  }
303 
304  $data = $this->fetchBlobs( [ $key ] )[$key];
305  if ( $data ) {
306  $serialValue = $data[self::BLOB_VALUE];
307  if ( $this->isInteger( $serialValue ) ) {
308  $newValue = max( (int)$serialValue + (int)$value, 0 );
309  $result = $this->modifyBlobs(
310  [ $this, 'modifyTableSpecificBlobsForSet' ],
311  $mtime,
312  // Preserve the old expiry timestamp
313  [ $key => [ $newValue, $data[self::BLOB_EXPIRY] ] ],
314  $flags
315  ) ? $newValue : false;
316  } else {
317  $result = false;
318  $this->logger->warning( __METHOD__ . ": $key is a non-integer" );
319  }
320  } else {
321  $result = false;
322  $this->logger->debug( __METHOD__ . ": $key does not exists" );
323  }
324 
325  $this->updateOpStats( $value >= 0 ? self::METRIC_OP_INCR : self::METRIC_OP_DECR, [ $key ] );
326 
327  return $result;
328  }
329 
330  protected function doGetMulti( array $keys, $flags = 0 ) {
331  $result = [];
332  $valueSizeByKey = [];
333 
334  $dataByKey = $this->fetchBlobs( $keys );
335  foreach ( $keys as $key ) {
336  $data = $dataByKey[$key];
337  if ( $data ) {
338  $serialValue = $data[self::BLOB_VALUE];
339  $value = $this->unserialize( $serialValue );
340  if ( $value !== false ) {
341  $result[$key] = $value;
342  }
343  $valueSize = strlen( $serialValue );
344  } else {
345  $valueSize = false;
346  }
347  $valueSizeByKey[$key] = [ 0, $valueSize ];
348  }
349 
350  $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
351 
352  return $result;
353  }
354 
355  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
356  $mtime = $this->getCurrentTime();
357 
358  return $this->modifyBlobs(
359  [ $this, 'modifyTableSpecificBlobsForSet' ],
360  $mtime,
361  array_map(
362  static function ( $value ) use ( $exptime ) {
363  return [ $value, $exptime ];
364  },
365  $data
366  ),
367  $flags
368  );
369  }
370 
371  protected function doDeleteMulti( array $keys, $flags = 0 ) {
372  $mtime = $this->getCurrentTime();
373 
374  return $this->modifyBlobs(
375  [ $this, 'modifyTableSpecificBlobsForDelete' ],
376  $mtime,
377  array_fill_keys( $keys, [] ),
378  $flags
379  );
380  }
381 
382  public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
383  $mtime = $this->getCurrentTime();
384 
385  return $this->modifyBlobs(
386  [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
387  $mtime,
388  array_fill_keys( $keys, [ $exptime ] ),
389  $flags
390  );
391  }
392 
401  private function getConnection( $shardIndex ) {
402  if ( $this->useLB ) {
403  return $this->getConnectionViaLoadBalancer();
404  }
405 
406  // Don't keep timing out trying to connect if the server is down
407  if (
408  isset( $this->connFailureErrors[$shardIndex] ) &&
409  ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
410  ) {
411  throw $this->connFailureErrors[$shardIndex];
412  }
413 
414  if ( isset( $this->serverInfos[$shardIndex] ) ) {
415  $server = $this->serverInfos[$shardIndex];
416  $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
417  } else {
418  throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
419  }
420 
421  return $conn;
422  }
423 
429  private function getKeyLocation( $key ) {
430  if ( $this->useLB ) {
431  // LoadBalancer based configuration
432  $shardIndex = 0;
433  } else {
434  // Striped array of database servers
435  if ( count( $this->serverTags ) == 1 ) {
436  $shardIndex = 0; // short-circuit
437  } else {
438  $sortedServers = $this->serverTags;
439  ArrayUtils::consistentHashSort( $sortedServers, $key );
440  $shardIndex = array_key_first( $sortedServers );
441  }
442  }
443 
444  if ( $this->numTableShards > 1 ) {
445  $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
446  $tableIndex = $hash % $this->numTableShards;
447  } else {
448  $tableIndex = null;
449  }
450 
451  return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
452  }
453 
459  private function getTableNameByShard( $index ) {
460  if ( $index !== null && $this->numTableShards > 1 ) {
461  $decimals = strlen( (string)( $this->numTableShards - 1 ) );
462 
463  return $this->tableName . sprintf( "%0{$decimals}d", $index );
464  }
465 
466  return $this->tableName;
467  }
468 
474  private function fetchBlobs( array $keys, bool $getCasToken = false ) {
476  $silenceScope = $this->silenceTransactionProfiler();
477 
478  // Initialize order-preserved per-key results; set values for live keys below
479  $dataByKey = array_fill_keys( $keys, null );
480 
481  $readTime = (int)$this->getCurrentTime();
482  $keysByTableByShard = [];
483  foreach ( $keys as $key ) {
484  [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
485  $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
486  }
487 
488  foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
489  try {
490  $db = $this->getConnection( $shardIndex );
491  foreach ( $serverKeys as $partitionTable => $tableKeys ) {
492  $res = $db->newSelectQueryBuilder()
493  ->select(
494  $getCasToken
495  ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
496  : [ 'keyname', 'value', 'exptime' ] )
497  ->from( $partitionTable )
498  ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
499  ->caller( __METHOD__ )
500  ->fetchResultSet();
501  foreach ( $res as $row ) {
502  $row->shardIndex = $shardIndex;
503  $row->tableName = $partitionTable;
504  $dataByKey[$row->keyname] = $row;
505  }
506  }
507  } catch ( DBError $e ) {
508  $this->handleDBError( $e, $shardIndex );
509  }
510  }
511 
512  foreach ( $keys as $key ) {
513  $row = $dataByKey[$key] ?? null;
514  if ( !$row ) {
515  continue;
516  }
517 
518  $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
519  try {
520  $db = $this->getConnection( $row->shardIndex );
521  $dataByKey[$key] = [
522  self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
523  self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
524  self::BLOB_CASTOKEN => $getCasToken
525  ? $this->getCasTokenFromRow( $db, $row )
526  : null
527  ];
528  } catch ( DBQueryError $e ) {
529  $this->handleDBError( $e, $row->shardIndex );
530  }
531  }
532 
533  return $dataByKey;
534  }
535 
550  private function modifyBlobs(
551  callable $tableWriteCallback,
552  float $mtime,
553  array $argsByKey,
554  int $flags,
555  &$resByKey = []
556  ) {
557  // Initialize order-preserved per-key results; callbacks mark successful results
558  $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
559 
561  $silenceScope = $this->silenceTransactionProfiler();
562 
563  $argsByKeyByTableByShard = [];
564  foreach ( $argsByKey as $key => $args ) {
565  [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
566  $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
567  }
568 
569  $shardIndexesAffected = [];
570  foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
571  foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
572  try {
573  $db = $this->getConnection( $shardIndex );
574  $shardIndexesAffected[] = $shardIndex;
575  $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
576  } catch ( DBError $e ) {
577  $this->handleDBError( $e, $shardIndex );
578  continue;
579  }
580  }
581  }
582 
583  $success = !in_array( false, $resByKey, true );
584 
585  foreach ( $shardIndexesAffected as $shardIndex ) {
586  try {
587  $db = $this->getConnection( $shardIndex );
588  $this->occasionallyGarbageCollect( $db );
589  } catch ( DBError $e ) {
590  $this->handleDBError( $e, $shardIndex );
591  }
592  }
593 
594  return $success;
595  }
596 
612  private function modifyTableSpecificBlobsForSet(
613  IDatabase $db,
614  string $ptable,
615  float $mtime,
616  array $argsByKey,
617  array &$resByKey
618  ) {
619  $valueSizesByKey = [];
620 
621  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
622 
623  $rows = [];
624  foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
625  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
626  $serialValue = $this->getSerialized( $value, $key );
627  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
628 
629  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
630  }
631 
632  if ( $this->multiPrimaryMode ) {
633  $db->upsert(
634  $ptable,
635  $rows,
636  [ [ 'keyname' ] ],
637  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
638  __METHOD__
639  );
640  } else {
641  // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
642  $db->replace( $ptable, 'keyname', $rows, __METHOD__ );
643  }
644 
645  foreach ( $argsByKey as $key => $unused ) {
646  $resByKey[$key] = true;
647  }
648 
649  $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
650  }
651 
668  private function modifyTableSpecificBlobsForDelete(
669  IDatabase $db,
670  string $ptable,
671  float $mtime,
672  array $argsByKey,
673  array &$resByKey
674  ) {
675  if ( $this->multiPrimaryMode ) {
676  // Tombstone keys in order to respect eventual consistency
677  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
678  $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
679  $rows = [];
680  foreach ( $argsByKey as $key => $arg ) {
681  $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
682  }
683  $db->upsert(
684  $ptable,
685  $rows,
686  [ [ 'keyname' ] ],
687  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
688  __METHOD__
689  );
690  } else {
691  // Just purge the keys since there is only one primary (e.g. "source of truth")
692  $db->delete( $ptable, [ 'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
693  }
694 
695  foreach ( $argsByKey as $key => $arg ) {
696  $resByKey[$key] = true;
697  }
698 
699  $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
700  }
701 
722  private function modifyTableSpecificBlobsForAdd(
723  IDatabase $db,
724  string $ptable,
725  float $mtime,
726  array $argsByKey,
727  array &$resByKey
728  ) {
729  $valueSizesByKey = [];
730 
731  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
732 
733  // This check must happen outside the write query to respect eventual consistency
734  $existingKeys = $db->newSelectQueryBuilder()
735  ->select( 'keyname' )
736  ->from( $ptable )
737  ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
738  ->caller( __METHOD__ )
739  ->fetchFieldValues();
740  $existingByKey = array_fill_keys( $existingKeys, true );
741 
742  $rows = [];
743  foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
744  if ( isset( $existingByKey[$key] ) ) {
745  $this->logger->debug( __METHOD__ . ": $key already exists" );
746  continue;
747  }
748 
749  $serialValue = $this->getSerialized( $value, $key );
750  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
751  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
752 
753  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
754  }
755 
756  $db->upsert(
757  $ptable,
758  $rows,
759  [ [ 'keyname' ] ],
760  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
761  __METHOD__
762  );
763 
764  foreach ( $argsByKey as $key => $unused ) {
765  $resByKey[$key] = !isset( $existingByKey[$key] );
766  }
767 
768  $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
769  }
770 
791  private function modifyTableSpecificBlobsForCas(
792  IDatabase $db,
793  string $ptable,
794  float $mtime,
795  array $argsByKey,
796  array &$resByKey
797  ) {
798  $valueSizesByKey = [];
799 
800  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
801 
802  // This check must happen outside the write query to respect eventual consistency
803  $res = $db->newSelectQueryBuilder()
804  ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) )
805  ->from( $ptable )
806  ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
807  ->caller( __METHOD__ )
808  ->fetchResultSet();
809 
810  $curTokensByKey = [];
811  foreach ( $res as $row ) {
812  $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
813  }
814 
815  $rows = [];
816  $nonMatchingByKey = [];
817  foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) {
818  $curToken = $curTokensByKey[$key] ?? null;
819  if ( $curToken === null ) {
820  $nonMatchingByKey[$key] = true;
821  $this->logger->debug( __METHOD__ . ": $key does not exists" );
822  continue;
823  }
824 
825  if ( $curToken !== $casToken ) {
826  $nonMatchingByKey[$key] = true;
827  $this->logger->debug( __METHOD__ . ": $key does not have a matching token" );
828  continue;
829  }
830 
831  $serialValue = $this->getSerialized( $value, $key );
832  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
833  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
834 
835  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
836  }
837 
838  $db->upsert(
839  $ptable,
840  $rows,
841  [ [ 'keyname' ] ],
842  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
843  __METHOD__
844  );
845 
846  foreach ( $argsByKey as $key => $unused ) {
847  $resByKey[$key] = !isset( $nonMatchingByKey[$key] );
848  }
849 
850  $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
851  }
852 
872  private function modifyTableSpecificBlobsForChangeTTL(
873  IDatabase $db,
874  string $ptable,
875  float $mtime,
876  array $argsByKey,
877  array &$resByKey
878  ) {
879  if ( $this->multiPrimaryMode ) {
880  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
881 
882  $res = $db->newSelectQueryBuilder()
883  ->select( [ 'keyname', 'value' ] )
884  ->from( $ptable )
885  ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
886  ->caller( __METHOD__ )
887  ->fetchResultSet();
888 
889  $rows = [];
890  $existingKeys = [];
891  foreach ( $res as $curRow ) {
892  $key = $curRow->keyname;
893  $existingKeys[$key] = true;
894  $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
895  [ $exptime ] = $argsByKey[$key];
896  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
897  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
898  }
899 
900  $db->upsert(
901  $ptable,
902  $rows,
903  [ [ 'keyname' ] ],
904  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
905  __METHOD__
906  );
907 
908  foreach ( $argsByKey as $key => $unused ) {
909  $resByKey[$key] = isset( $existingKeys[$key] );
910  }
911  } else {
912  $keysBatchesByExpiry = [];
913  foreach ( $argsByKey as $key => [ $exptime ] ) {
914  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
915  $keysBatchesByExpiry[$expiry][] = $key;
916  }
917 
918  $existingCount = 0;
919  foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
920  $db->update(
921  $ptable,
922  [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
923  $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ),
924  __METHOD__
925  );
926  $existingCount += $db->affectedRows();
927  }
928  if ( $existingCount === count( $argsByKey ) ) {
929  foreach ( $argsByKey as $key => $args ) {
930  $resByKey[$key] = true;
931  }
932  }
933  }
934 
935  $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
936  }
937 
957  private function modifyTableSpecificBlobsForIncrInit(
958  IDatabase $db,
959  string $ptable,
960  float $mtime,
961  array $argsByKey,
962  array &$resByKey
963  ) {
964  foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
965  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
966  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
967 
968  // Use a transaction so that changes from other threads are not visible due to
969  // "consistent reads". This way, the exact post-increment value can be returned.
970  // The "live key exists" check can go inside the write query and remain safe for
971  // replication since the TTL for such keys is either indefinite or very short.
972  $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
973  try {
974  $db->upsert(
975  $ptable,
976  $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
977  [ [ 'keyname' ] ],
978  $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
979  __METHOD__
980  );
981  $affectedCount = $db->affectedRows();
982  $row = $db->newSelectQueryBuilder()
983  ->select( 'value' )
984  ->from( $ptable )
985  ->where( [ 'keyname' => $key ] )
986  ->caller( __METHOD__ )
987  ->fetchRow();
988  } catch ( Exception $e ) {
989  $db->cancelAtomic( __METHOD__, $atomic );
990  throw $e;
991  }
992  $db->endAtomic( __METHOD__ );
993 
994  if ( !$affectedCount || $row === false ) {
995  $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
996  continue;
997  }
998 
999  $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
1000  if ( !$this->isInteger( $serialValue ) ) {
1001  $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
1002  continue;
1003  }
1004 
1005  $resByKey[$key] = (int)$serialValue;
1006  }
1007 
1008  $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
1009  }
1010 
1022  private function modifyTableSpecificBlobsForIncrInitAsync(
1023  IDatabase $db,
1024  string $ptable,
1025  float $mtime,
1026  array $argsByKey,
1027  array &$resByKey
1028  ) {
1029  foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
1030  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
1031  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
1032  $db->upsert(
1033  $ptable,
1034  $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
1035  [ [ 'keyname' ] ],
1036  $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
1037  __METHOD__
1038  );
1039  if ( !$db->affectedRows() ) {
1040  $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
1041  } else {
1042  $resByKey[$key] = true;
1043  }
1044  }
1045  }
1046 
1052  private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1053  $expiry = $this->getExpirationAsTimestamp( $exptime );
1054  // Eventual consistency requires the preservation of recently modified keys.
1055  // Do not create rows with `exptime` fields so low that they might get garbage
1056  // collected before being replicated.
1057  if ( $expiry !== self::TTL_INDEFINITE ) {
1058  $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1059  }
1060 
1061  return $expiry;
1062  }
1063 
1082  private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1083  if ( !$this->lock( $key, 0 ) ) {
1084  return null;
1085  }
1086 
1087  $scope = new ScopedCallback( function () use ( $key ) {
1088  $this->unlock( $key );
1089  } );
1090 
1091  // sprintf is used to adjust precision
1092  return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] );
1093  }
1094 
1104  private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) {
1105  // We have reserved space for upto 6 digits in the microsecond portion of the token.
1106  // This is for future use only (maybe CAS tokens) and not currently used.
1107  // It is currently populated by the microsecond portion returned by microtime,
1108  // which generally has fewer than 6 digits of meaningful precision but can still be useful
1109  // in debugging (to see the token continuously change even during rapid testing).
1110  $seconds = (int)$mtime;
1111  [ , $microseconds ] = explode( '.', sprintf( '%.6F', $mtime ) );
1112 
1113  $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );
1114 
1115  $token = implode( '', [
1116  // 67 bit integral portion of UNIX timestamp, qualified
1117  \Wikimedia\base_convert(
1118  // 35 bit integral seconds portion of UNIX timestamp
1119  str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1120  // 32 bit ID of the primary database server handling the write
1121  str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1122  2,
1123  36,
1124  13
1125  ),
1126  // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1127  str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1128  ] );
1129 
1130  if ( strlen( $token ) !== 17 ) {
1131  throw new RuntimeException( "Modification timestamp overflow detected" );
1132  }
1133 
1134  return $token;
1135  }
1136 
1145  private function buildExistenceConditions( IDatabase $db, $keys, int $time ) {
1146  // Note that tombstones always have past expiration dates
1147  return [
1148  'keyname' => $keys,
1149  'exptime >= ' . $db->addQuotes( $db->timestamp( $time ) )
1150  ];
1151  }
1152 
1163  private function buildUpsertRow(
1164  IDatabase $db,
1165  $key,
1166  $serialValue,
1167  int $expiry,
1168  string $mt
1169  ) {
1170  $row = [
1171  'keyname' => $key,
1172  'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1173  'exptime' => $this->encodeDbExpiry( $db, $expiry )
1174  ];
1175  if ( $this->multiPrimaryMode ) {
1176  $row['modtoken'] = $mt;
1177  }
1178 
1179  return $row;
1180  }
1181 
1189  private function buildMultiUpsertSetForOverwrite( IDatabase $db, string $mt ) {
1190  $expressionsByColumn = [
1191  'value' => $db->buildExcludedValue( 'value' ),
1192  'exptime' => $db->buildExcludedValue( 'exptime' )
1193  ];
1194 
1195  $set = [];
1196  if ( $this->multiPrimaryMode ) {
1197  // The query might take a while to replicate, during which newer values might get
1198  // written. Qualify the query so that it does not override such values. Note that
1199  // duplicate tokens generated around the same time for a key should still result
1200  // in convergence given the use of server_id in modtoken (providing a total order
1201  // among primary DB servers) and MySQL binlog ordering (providing a total order
1202  // for writes replicating from a given primary DB server).
1203  $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1204  foreach ( $expressionsByColumn as $column => $updateExpression ) {
1205  $rhs = $db->conditional(
1206  $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' .
1207  $db->buildSubString( 'modtoken', 1, 13 ),
1208  $updateExpression,
1209  $column
1210  );
1211  $set[] = "{$column}=" . trim( $rhs );
1212  }
1213  } else {
1214  foreach ( $expressionsByColumn as $column => $updateExpression ) {
1215  $set[] = "{$column}={$updateExpression}";
1216  }
1217  }
1218 
1219  return $set;
1220  }
1221 
1233  private function buildIncrUpsertSet(
1234  IDatabase $db,
1235  int $step,
1236  int $init,
1237  int $expiry,
1238  string $mt,
1239  int $mtUnixTs
1240  ) {
1241  // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1242  $expressionsByColumn = [
1243  'value' => [
1244  $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1245  $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1246  ],
1247  'exptime' => [
1248  'exptime',
1249  $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1250  ]
1251  ];
1252  if ( $this->multiPrimaryMode ) {
1253  $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1254  }
1255 
1256  $set = [];
1257  foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) {
1258  $rhs = $db->conditional(
1259  'exptime >= ' . $db->addQuotes( $db->timestamp( $mtUnixTs ) ),
1260  $updateExpression,
1261  $initExpression
1262  );
1263  $set[] = "{$column}=" . trim( $rhs );
1264  }
1265 
1266  return $set;
1267  }
1268 
1274  private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1275  return ( $expiry === self::TTL_INDEFINITE )
1276  // Use the maximum timestamp that the column can store
1277  ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1278  // Convert the absolute timestamp into the DB timestamp format
1279  : $db->timestamp( $expiry );
1280  }
1281 
1287  private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1288  return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1289  ? self::TTL_INDEFINITE
1290  : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1291  }
1292 
1298  private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1299  return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1300  }
1301 
1307  private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1308  return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1309  }
1310 
1318  private function addCasTokenFields( IDatabase $db, array $fields ) {
1319  $type = $db->getType();
1320 
1321  if ( $type === 'mysql' ) {
1322  $fields['castoken'] = $db->buildConcat( [
1323  'SHA1(value)',
1324  $db->addQuotes( '@' ),
1325  'exptime'
1326  ] );
1327  } elseif ( $type === 'postgres' ) {
1328  $fields['castoken'] = $db->buildConcat( [
1329  'md5(value)',
1330  $db->addQuotes( '@' ),
1331  'exptime'
1332  ] );
1333  } else {
1334  if ( !in_array( 'value', $fields, true ) ) {
1335  $fields[] = 'value';
1336  }
1337  if ( !in_array( 'exptime', $fields, true ) ) {
1338  $fields[] = 'exptime';
1339  }
1340  }
1341 
1342  return $fields;
1343  }
1344 
1352  private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1353  if ( isset( $row->castoken ) ) {
1354  $token = $row->castoken;
1355  } else {
1356  $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1357  $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1358  }
1359 
1360  return $token;
1361  }
1362 
1367  private function occasionallyGarbageCollect( IDatabase $db ) {
1368  if (
1369  // Random purging is enabled
1370  $this->purgePeriod &&
1371  // Only purge on one in every $this->purgePeriod writes
1372  mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1373  // Avoid repeating the delete within a few seconds
1374  ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1375  ) {
1376  $garbageCollector = function () use ( $db ) {
1378  $silenceScope = $this->silenceTransactionProfiler();
1379  $this->deleteServerObjectsExpiringBefore(
1380  $db,
1381  (int)$this->getCurrentTime(),
1382  $this->purgeLimit
1383  );
1384  $this->lastGarbageCollect = time();
1385  };
1386  if ( $this->asyncHandler ) {
1387  $this->lastGarbageCollect = $this->getCurrentTime(); // avoid duplicate enqueues
1388  ( $this->asyncHandler )( $garbageCollector );
1389  } else {
1390  $garbageCollector();
1391  }
1392  }
1393  }
1394 
1395  public function expireAll() {
1396  $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1397  }
1398 
1400  $timestamp,
1401  callable $progress = null,
1402  $limit = INF,
1403  string $tag = null
1404  ) {
1406  $silenceScope = $this->silenceTransactionProfiler();
1407 
1408  if ( $tag !== null ) {
1409  // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1410  $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1411  } else {
1412  $shardIndexes = $this->getShardServerIndexes();
1413  shuffle( $shardIndexes );
1414  }
1415 
1416  $ok = true;
1417  $numServers = count( $shardIndexes );
1418 
1419  $keysDeletedCount = 0;
1420  foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1421  try {
1422  $db = $this->getConnection( $shardIndex );
1423  $this->deleteServerObjectsExpiringBefore(
1424  $db,
1425  $timestamp,
1426  $limit,
1427  $keysDeletedCount,
1428  [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1429  );
1430  } catch ( DBError $e ) {
1431  $this->handleDBError( $e, $shardIndex );
1432  $ok = false;
1433  }
1434  }
1435 
1436  return $ok;
1437  }
1438 
1448  private function deleteServerObjectsExpiringBefore(
1449  IDatabase $db,
1450  $timestamp,
1451  $limit,
1452  &$keysDeletedCount = 0,
1453  array $progress = null
1454  ) {
1455  $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1456  if ( $this->multiPrimaryMode ) {
1457  // Eventual consistency requires the preservation of any key that was recently
1458  // modified. The key must exist on this database server long enough for the server
1459  // to receive, via replication, all writes to the key with lower timestamps. Such
1460  // writes should be no-ops since the existing key value should "win". If the network
1461  // partitions between datacenters A and B for 30 minutes, the database servers in
1462  // each datacenter will see an initial burst of writes with "old" timestamps via
1463  // replication. This might include writes with lower timestamps that the existing
1464  // key value. Therefore, clock skew and replication delay are both factors.
1465  $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1466  $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1467  }
1468  $tableIndexes = range( 0, $this->numTableShards - 1 );
1469  shuffle( $tableIndexes );
1470 
1471  $batchSize = min( $this->writeBatchSize, $limit );
1472 
1473  foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1474  // The oldest expiry of a row we have deleted on this shard
1475  // (the first row that we deleted)
1476  $minExpUnix = null;
1477  // The most recent expiry time so far, from a row we have deleted on this shard
1478  $maxExp = null;
1479  // Size of the time range we'll delete, in seconds (for progress estimate)
1480  $totalSeconds = null;
1481 
1482  do {
1483  $res = $db->newSelectQueryBuilder()
1484  ->select( [ 'keyname', 'exptime' ] )
1485  ->from( $this->getTableNameByShard( $tableIndex ) )
1486  ->where(
1487  array_merge(
1488  [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
1489  $maxExp ? [ 'exptime >= ' . $db->addQuotes( $maxExp ) ] : []
1490  )
1491  )
1492  ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC )
1493  ->limit( $batchSize )
1494  ->caller( __METHOD__ )
1495  ->fetchResultSet();
1496 
1497  if ( $res->numRows() ) {
1498  $row = $res->current();
1499  if ( $minExpUnix === null ) {
1500  $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1501  $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1502  }
1503 
1504  $keys = [];
1505  foreach ( $res as $row ) {
1506  $keys[] = $row->keyname;
1507  $maxExp = $row->exptime;
1508  }
1509 
1510  $db->delete(
1511  $this->getTableNameByShard( $tableIndex ),
1512  [
1513  'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
1514  'keyname' => $keys
1515  ],
1516  __METHOD__
1517  );
1518  $keysDeletedCount += $db->affectedRows();
1519  }
1520 
1521  if ( $progress && is_callable( $progress['fn'] ) ) {
1522  if ( $totalSeconds ) {
1523  $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1524  $remainingSeconds = $cutoffUnix - $maxExpUnix;
1525  $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1526  // For example, if we've done 1.5 table shard, and are thus half-way on the
1527  // 2nd of perhaps 5 tables on this server, then this might be:
1528  // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1529  $tablesDoneRatio =
1530  ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1531  } else {
1532  $tablesDoneRatio = 1;
1533  }
1534 
1535  // For example, if we're 30% done on the last of 10 servers, then this might be:
1536  // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1537  $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1538  ( $tablesDoneRatio / $progress['serversTotal'] );
1539  ( $progress['fn'] )( $overallRatio * 100 );
1540  }
1541  } while ( $res->numRows() && $keysDeletedCount < $limit );
1542  }
1543  }
1544 
1550  public function deleteAll() {
1552  $silenceScope = $this->silenceTransactionProfiler();
1553  foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1554  $db = null; // in case of connection failure
1555  try {
1556  $db = $this->getConnection( $shardIndex );
1557  for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1558  $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
1559  }
1560  } catch ( DBError $e ) {
1561  $this->handleDBError( $e, $shardIndex );
1562  return false;
1563  }
1564  }
1565  return true;
1566  }
1567 
1568  public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1570  $silenceScope = $this->silenceTransactionProfiler();
1571 
1572  $lockTsUnix = null;
1573 
1574  [ $shardIndex ] = $this->getKeyLocation( $key );
1575  try {
1576  $db = $this->getConnection( $shardIndex );
1577  $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1578  } catch ( DBError $e ) {
1579  $this->handleDBError( $e, $shardIndex );
1580  $this->logger->warning(
1581  __METHOD__ . ' failed due to I/O error for {key}.',
1582  [ 'key' => $key ]
1583  );
1584  }
1585 
1586  return $lockTsUnix;
1587  }
1588 
1589  public function doUnlock( $key ) {
1591  $silenceScope = $this->silenceTransactionProfiler();
1592 
1593  [ $shardIndex ] = $this->getKeyLocation( $key );
1594 
1595  try {
1596  $db = $this->getConnection( $shardIndex );
1597  $released = $db->unlock( $key, __METHOD__ );
1598  } catch ( DBError $e ) {
1599  $this->handleDBError( $e, $shardIndex );
1600  $released = false;
1601  }
1602 
1603  return $released;
1604  }
1605 
1606  public function makeKeyInternal( $keyspace, $components ) {
1607  // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1608  // subtract the number of characters we need for the keyspace and for
1609  // the separator character needed for each argument. To handle some
1610  // custom prefixes used by thing like WANObjectCache, limit to 205.
1611  $keyspace = strtr( $keyspace, ' ', '_' );
1612  $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1613  foreach ( $components as &$component ) {
1614  $component = strtr( $component, [
1615  ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1616  ':' => '%3A',
1617  ] );
1618 
1619  // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1620  if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1621  $component = '#' . md5( $component );
1622  }
1623  $charsLeft -= strlen( $component );
1624  }
1625 
1626  if ( $charsLeft < 0 ) {
1627  return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1628  }
1629  return $keyspace . ':' . implode( ':', $components );
1630  }
1631 
1632  protected function serialize( $value ) {
1633  if ( is_int( $value ) ) {
1634  return $value;
1635  }
1636 
1637  $serial = serialize( $value );
1638  if ( $this->hasZlib ) {
1639  // On typical message and page data, this can provide a 3X storage savings
1640  $serial = gzdeflate( $serial );
1641  }
1642 
1643  return $serial;
1644  }
1645 
1646  protected function unserialize( $value ) {
1647  if ( $value === self::TOMB_SERIAL ) {
1648  return false; // tombstone
1649  }
1650 
1651  if ( $this->isInteger( $value ) ) {
1652  return (int)$value;
1653  }
1654 
1655  if ( $this->hasZlib ) {
1656  AtEase::suppressWarnings();
1657  $decompressed = gzinflate( $value );
1658  AtEase::restoreWarnings();
1659 
1660  if ( $decompressed !== false ) {
1661  $value = $decompressed;
1662  }
1663  }
1664 
1665  return unserialize( $value );
1666  }
1667 
1668  private function getLoadBalancer(): ILoadBalancer {
1669  if ( !$this->loadBalancer ) {
1670  $this->loadBalancer = ( $this->loadBalancerCallback )();
1671  }
1672  return $this->loadBalancer;
1673  }
1674 
1679  private function getConnectionViaLoadBalancer() {
1680  $lb = $this->getLoadBalancer();
1681 
1682  if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1683  // Use the main connection to avoid transaction deadlocks
1684  $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain );
1685  } else {
1686  // If the RDBMS has row/table/page level locking, then use separate auto-commit
1687  // connection to avoid needless contention and deadlocks.
1688  $conn = $lb->getMaintenanceConnectionRef(
1689  $this->replicaOnly ? DB_REPLICA : DB_PRIMARY,
1690  [],
1691  $this->dbDomain,
1692  $lb::CONN_TRX_AUTOCOMMIT
1693  );
1694  }
1695 
1696  // Make sure any errors are thrown now while we can more easily handle them
1697  $conn->ensureConnection();
1698  return $conn;
1699  }
1700 
1707  private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1708  if ( !isset( $this->conns[$shardIndex] ) ) {
1710  $conn = Database::factory(
1711  $server['type'],
1712  array_merge(
1713  $server,
1714  [
1715  // Make sure the handle uses autocommit mode
1716  'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1717  'connLogger' => $this->logger,
1718  'queryLogger' => $this->logger
1719  ]
1720  )
1721  );
1722  // Automatically create the objectcache table for sqlite as needed
1723  if ( $conn->getType() === 'sqlite' ) {
1724  $this->initSqliteDatabase( $conn );
1725  }
1726  $this->conns[$shardIndex] = $conn;
1727  }
1728 
1729  // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive
1730  return $this->conns[$shardIndex];
1731  }
1732 
1739  private function handleDBError( DBError $exception, $shardIndex ) {
1740  if ( !$this->useLB && $exception instanceof DBConnectionError ) {
1741  $this->markServerDown( $exception, $shardIndex );
1742  }
1743  $this->setAndLogDBError( $exception );
1744  }
1745 
1749  private function setAndLogDBError( DBError $e ) {
1750  $this->logger->error( "DBError: {$e->getMessage()}", [ 'exception' => $e ] );
1751  if ( $e instanceof DBConnectionError ) {
1752  $this->setLastError( self::ERR_UNREACHABLE );
1753  $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1754  } else {
1755  $this->setLastError( self::ERR_UNEXPECTED );
1756  $this->logger->warning( __METHOD__ . ": ignoring query error" );
1757  }
1758  }
1759 
1766  private function markServerDown( DBError $exception, $shardIndex ) {
1767  unset( $this->conns[$shardIndex] ); // bug T103435
1768 
1769  $now = $this->getCurrentTime();
1770  if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1771  if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1772  unset( $this->connFailureTimes[$shardIndex] );
1773  unset( $this->connFailureErrors[$shardIndex] );
1774  } else {
1775  $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1776  return;
1777  }
1778  }
1779  $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1780  $this->connFailureTimes[$shardIndex] = $now;
1781  $this->connFailureErrors[$shardIndex] = $exception;
1782  }
1783 
1788  private function initSqliteDatabase( IMaintainableDatabase $db ) {
1789  if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1790  return;
1791  }
1792  // Use one table for SQLite; sharding does not seem to have much benefit
1793  $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1794  $db->startAtomic( __METHOD__ ); // atomic DDL
1795  try {
1796  $encTable = $db->tableName( 'objectcache' );
1797  $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1798  $db->query(
1799  "CREATE TABLE $encTable (\n" .
1800  " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1801  " value BLOB,\n" .
1802  " exptime BLOB NOT NULL\n" .
1803  ")",
1804  __METHOD__
1805  );
1806  $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1807  $db->endAtomic( __METHOD__ );
1808  } catch ( DBError $e ) {
1809  $db->rollback( __METHOD__ );
1810  throw $e;
1811  }
1812  }
1813 
1829  public function createTables() {
1830  foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1831  $db = $this->getConnection( $shardIndex );
1832  if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1833  for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1834  $encBaseTable = $db->tableName( 'objectcache' );
1835  $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1836  $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1837  }
1838  }
1839  }
1840  }
1841 
1845  private function getShardServerIndexes() {
1846  if ( $this->useLB ) {
1847  // LoadBalancer based configuration
1848  $shardIndexes = [ 0 ];
1849  } else {
1850  // Striped array of database servers
1851  $shardIndexes = array_keys( $this->serverTags );
1852  }
1853 
1854  return $shardIndexes;
1855  }
1856 
1862  private function getShardServerIndexForTag( string $tag ) {
1863  if ( !$this->serverTags ) {
1864  throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1865  }
1866  foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1867  if ( $tag === $serverTag ) {
1868  return $serverShardIndex;
1869  }
1870  }
1871  throw new InvalidArgumentException( "Unknown server tag: $tag" );
1872  }
1873 
1879  private function silenceTransactionProfiler() {
1880  if ( $this->serverInfos ) {
1881  return null; // no TransactionProfiler injected anyway
1882  }
1883  return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1884  }
1885 }
$success
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Definition: WebStart.php:82
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Definition: ArrayUtils.php:49
string $keyspace
Default keyspace; used by makeKey()
Definition: BagOStuff.php:101
callable null $asyncHandler
Definition: BagOStuff.php:91
getCurrentTime()
Definition: BagOStuff.php:834
Storage medium specific cache for storing items (e.g.
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
unlock( $key)
Release an advisory lock on a key string.
updateOpStats(string $op, array $keyInfo)
isInteger( $value)
Check if a value is an integer.
lock( $key, $timeout=6, $exptime=6, $rclass='')
static instance()
Singleton.
Definition: Profiler.php:94
RDBMS-based caching module.
bool $multiPrimaryMode
Whether multi-primary mode is enabled.
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF, string $tag=null)
Delete all objects expiring before a certain date.
ILoadBalancer null $loadBalancer
doDelete( $key, $flags=0)
Delete an item.
float $lastGarbageCollect
UNIX timestamp.
string[] $serverTags
(server index => tag/host name)
doSetMulti(array $data, $exptime=0, $flags=0)
createTables()
Create the shard tables on all databases.
serialize( $value)
int $purgePeriod
Average number of writes required to trigger garbage collection.
doChangeTTL( $key, $exptime, $flags)
incr( $key, $value=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
unserialize( $value)
array[] $serverInfos
(server index => server config)
deleteAll()
Delete content of shard tables in every server.
callable null $loadBalancerCallback
Injected function which returns a LoadBalancer.
doDeleteMulti(array $keys, $flags=0)
int $numTableShards
Number of table shards to use on each server.
int $purgeLimit
Max expired rows to purge during randomized garbage collection.
__construct( $params)
Create a new backend instance from parameters injected by ObjectCache::newFromParams()
doGet( $key, $flags=0, &$casToken=null)
Get an item.
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
bool $replicaOnly
Whether to use replicas instead of primaries (if using LoadBalancer)
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
makeKeyInternal( $keyspace, $components)
Make a cache key for the given keyspace and components.
doLock( $key, $timeout=6, $exptime=6)
Exception[] $connFailureErrors
Map of (shard index => Exception)
doIncrWithInit( $key, $exptime, $step, $init, $flags)
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
string $tableName
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
bool $useLB
Whether to use the LoadBalancer.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Set an item if the current CAS token matches the provided CAS token.
string false null $dbDomain
DB name used for keys using the LoadBalancer.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
Database error base class.
Definition: DBError.php:31
A query builder for SELECT queries with a fluent interface.
const ATTR_EMULATION
Emulation/fallback mode; see QOS_EMULATION_*; higher is better.
const QOS_DURABILITY_RDBMS
Data is saved to disk and writes usually block on fsync(), like a standard RDBMS.
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
const QOS_EMULATION_SQL
Fallback disk-based SQL store.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:40
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
unlock( $lockName, $method)
Release a lock.
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
lock( $lockName, $method, $timeout=5, $flags=0)
Acquire a named lock.
getTopologyBasedServerId()
Get a non-recycled ID that uniquely identifies this server within the replication topology.
affectedRows()
Get the number of rows affected by the last write query.
delete( $table, $conds, $fname=__METHOD__)
Delete all rows in a table that match a condition.
update( $table, $set, $conds, $fname=__METHOD__, $options=[])
Update all rows in a table that match a given condition.
upsert( $table, array $rows, $uniqueKeys, array $set, $fname=__METHOD__)
Upsert row(s) into a table, in the provided order, while updating conflicting rows.
getType()
Get the RDBMS type of the server (e.g.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
tablePrefix( $prefix=null)
Get/set the table prefix.
decodeBlob( $b)
Some DBMSs return a special placeholder object representing blob fields in result objects.
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
encodeBlob( $b)
Some DBMSs have a special format for inserting into blob fields, they don't allow simple quoted strin...
replace( $table, $uniqueKeys, $rows, $fname=__METHOD__)
Insert row(s) into a table, in the provided order, while deleting conflicting rows.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
getServerName()
Get the readable name for the server.
cancelAtomic( $fname=__METHOD__, AtomicSectionIdentifier $sectionId=null)
Cancel an atomic section of SQL statements.
Create and track the database connections and transactions for a given database cluster.
Advanced database interface for IDatabase handles that include maintenance methods.
tableExists( $table, $fname=__METHOD__)
Query whether a given table exists.
buildExcludedValue( $column)
Build a reference to a column value from the conflicting proposed upsert() row.
conditional( $cond, $caseTrueExpression, $caseFalseExpression)
Returns an SQL expression for a simple conditional.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.
buildSubString( $input, $startPosition, $length=null)
Build a SUBSTRING function.
addIdentifierQuotes( $s)
Escape a SQL identifier (e.g.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
buildConcat( $stringList)
Build a concatenation list to feed into a SQL query.
if( $line===false) $args
Definition: mcc.php:124
const DB_REPLICA
Definition: defines.php:26
const DB_PRIMARY
Definition: defines.php:28
const DBO_TRX
Definition: defines.php:12