MediaWiki  master
SqlBagOStuff.php
Go to the documentation of this file.
1 <?php
25 use Wikimedia\AtEase\AtEase;
35 use Wikimedia\ScopedCallback;
36 use Wikimedia\Timestamp\ConvertibleTimestamp;
37 
55  protected $loadBalancer;
57  protected $dbDomain;
59  protected $useLB = false;
60 
62  protected $serverInfos = [];
64  protected $serverTags = [];
66  protected $lastGarbageCollect = 0;
68  protected $purgePeriod = 10;
70  protected $purgeLimit = 100;
72  protected $numTableShards = 1;
74  protected $writeBatchSize = 100;
76  protected $tableName = 'objectcache';
78  protected $replicaOnly;
80  protected $multiPrimaryMode;
81 
83  protected $conns;
85  protected $connFailureTimes = [];
87  protected $connFailureErrors = [];
88 
90  private $hasZlib;
91 
93  private const SAFE_CLOCK_BOUND_SEC = 15;
95  private const SAFE_PURGE_DELAY_SEC = 3600;
97  private const TOMB_SERIAL = '';
99  private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
101  private const GC_DELAY_SEC = 1;
102 
103  private const BLOB_VALUE = 0;
104  private const BLOB_EXPIRY = 1;
105  private const BLOB_CASTOKEN = 2;
106 
113  private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
114 
144  public function __construct( $params ) {
145  parent::__construct( $params );
146 
147  if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
148  // Configuration uses a direct list of servers.
149  // Object data is horizontally partitioned via key hash.
150  $index = 0;
151  foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
152  $this->serverInfos[$index] = $info;
153  // Allow integer-indexes arrays for b/c
154  $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
155  ++$index;
156  }
157  } elseif ( isset( $params['loadBalancerCallback'] ) ) {
158  $this->loadBalancerCallback = $params['loadBalancerCallback'];
159  if ( !isset( $params['dbDomain'] ) ) {
160  throw new InvalidArgumentException(
161  __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given"
162  );
163  }
164  $this->dbDomain = $params['dbDomain'];
165  $this->useLB = true;
166  } else {
167  throw new InvalidArgumentException(
168  __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'"
169  );
170  }
171 
172  $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
173  $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
174  $this->tableName = $params['tableName'] ?? $this->tableName;
175  $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
176  $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
177  $this->replicaOnly = $params['replicaOnly'] ?? false;
178  $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false;
179 
182 
183  $this->hasZlib = extension_loaded( 'zlib' );
184  }
185 
186  protected function doGet( $key, $flags = 0, &$casToken = null ) {
187  $getToken = ( $casToken === self::PASS_BY_REF );
188  $casToken = null;
189 
190  $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
191  if ( $data ) {
192  $result = $this->unserialize( $data[self::BLOB_VALUE] );
193  if ( $getToken && $result !== false ) {
194  $casToken = $data[self::BLOB_CASTOKEN];
195  }
196  $valueSize = strlen( $data[self::BLOB_VALUE] );
197  } else {
198  $result = false;
199  $valueSize = false;
200  }
201 
202  $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
203 
204  return $result;
205  }
206 
207  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
208  $mtime = $this->getCurrentTime();
209 
210  return $this->modifyBlobs(
211  [ $this, 'modifyTableSpecificBlobsForSet' ],
212  $mtime,
213  [ $key => [ $value, $exptime ] ]
214  );
215  }
216 
217  protected function doDelete( $key, $flags = 0 ) {
218  $mtime = $this->getCurrentTime();
219 
220  return $this->modifyBlobs(
221  [ $this, 'modifyTableSpecificBlobsForDelete' ],
222  $mtime,
223  [ $key => [] ]
224  );
225  }
226 
227  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
228  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
229  if ( $mtime === null ) {
230  // Timeout or I/O error during lock acquisition
231  return false;
232  }
233 
234  return $this->modifyBlobs(
235  [ $this, 'modifyTableSpecificBlobsForAdd' ],
236  $mtime,
237  [ $key => [ $value, $exptime ] ]
238  );
239  }
240 
241  protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
242  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
243  if ( $mtime === null ) {
244  // Timeout or I/O error during lock acquisition
245  return false;
246  }
247 
248  return $this->modifyBlobs(
249  [ $this, 'modifyTableSpecificBlobsForCas' ],
250  $mtime,
251  [ $key => [ $value, $exptime, $casToken ] ]
252  );
253  }
254 
255  protected function doChangeTTL( $key, $exptime, $flags ) {
256  $mtime = $this->getCurrentTime();
257 
258  return $this->modifyBlobs(
259  [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
260  $mtime,
261  [ $key => [ $exptime ] ]
262  );
263  }
264 
265  protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
266  $mtime = $this->getCurrentTime();
267 
268  if ( $flags & self::WRITE_BACKGROUND ) {
269  $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ];
270  } else {
271  $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ];
272  }
273 
274  $result = $this->modifyBlobs(
275  $callback,
276  $mtime,
277  [ $key => [ $step, $init, $exptime ] ],
278  $resByKey
279  ) ? $resByKey[$key] : false;
280 
281  return $result;
282  }
283 
284  protected function doGetMulti( array $keys, $flags = 0 ) {
285  $result = [];
286  $valueSizeByKey = [];
287 
288  $dataByKey = $this->fetchBlobs( $keys );
289  foreach ( $keys as $key ) {
290  $data = $dataByKey[$key];
291  if ( $data ) {
292  $serialValue = $data[self::BLOB_VALUE];
293  $value = $this->unserialize( $serialValue );
294  if ( $value !== false ) {
295  $result[$key] = $value;
296  }
297  $valueSize = strlen( $serialValue );
298  } else {
299  $valueSize = false;
300  }
301  $valueSizeByKey[$key] = [ 0, $valueSize ];
302  }
303 
304  $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
305 
306  return $result;
307  }
308 
309  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
310  $mtime = $this->getCurrentTime();
311 
312  return $this->modifyBlobs(
313  [ $this, 'modifyTableSpecificBlobsForSet' ],
314  $mtime,
315  array_map(
316  static function ( $value ) use ( $exptime ) {
317  return [ $value, $exptime ];
318  },
319  $data
320  )
321  );
322  }
323 
324  protected function doDeleteMulti( array $keys, $flags = 0 ) {
325  $mtime = $this->getCurrentTime();
326 
327  return $this->modifyBlobs(
328  [ $this, 'modifyTableSpecificBlobsForDelete' ],
329  $mtime,
330  array_fill_keys( $keys, [] )
331  );
332  }
333 
334  public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
335  $mtime = $this->getCurrentTime();
336 
337  return $this->modifyBlobs(
338  [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
339  $mtime,
340  array_fill_keys( $keys, [ $exptime ] )
341  );
342  }
343 
352  private function getConnection( $shardIndex ) {
353  if ( $this->useLB ) {
354  return $this->getConnectionViaLoadBalancer();
355  }
356 
357  // Don't keep timing out trying to connect if the server is down
358  if (
359  isset( $this->connFailureErrors[$shardIndex] ) &&
360  ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
361  ) {
362  throw $this->connFailureErrors[$shardIndex];
363  }
364 
365  if ( isset( $this->serverInfos[$shardIndex] ) ) {
366  $server = $this->serverInfos[$shardIndex];
367  $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
368  } else {
369  throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
370  }
371 
372  return $conn;
373  }
374 
380  private function getKeyLocation( $key ) {
381  if ( $this->useLB ) {
382  // LoadBalancer based configuration
383  $shardIndex = 0;
384  } else {
385  // Striped array of database servers
386  if ( count( $this->serverTags ) == 1 ) {
387  $shardIndex = 0; // short-circuit
388  } else {
389  $sortedServers = $this->serverTags;
390  ArrayUtils::consistentHashSort( $sortedServers, $key );
391  $shardIndex = array_key_first( $sortedServers );
392  }
393  }
394 
395  if ( $this->numTableShards > 1 ) {
396  $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
397  $tableIndex = $hash % $this->numTableShards;
398  } else {
399  $tableIndex = null;
400  }
401 
402  return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
403  }
404 
410  private function getTableNameByShard( $index ) {
411  if ( $index !== null && $this->numTableShards > 1 ) {
412  $decimals = strlen( (string)( $this->numTableShards - 1 ) );
413 
414  return $this->tableName . sprintf( "%0{$decimals}d", $index );
415  }
416 
417  return $this->tableName;
418  }
419 
425  private function fetchBlobs( array $keys, bool $getCasToken = false ) {
427  $silenceScope = $this->silenceTransactionProfiler();
428 
429  // Initialize order-preserved per-key results; set values for live keys below
430  $dataByKey = array_fill_keys( $keys, null );
431 
432  $readTime = (int)$this->getCurrentTime();
433  $keysByTableByShard = [];
434  foreach ( $keys as $key ) {
435  [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
436  $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
437  }
438 
439  foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
440  try {
441  $db = $this->getConnection( $shardIndex );
442  foreach ( $serverKeys as $partitionTable => $tableKeys ) {
443  $res = $db->newSelectQueryBuilder()
444  ->select(
445  $getCasToken
446  ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
447  : [ 'keyname', 'value', 'exptime' ] )
448  ->from( $partitionTable )
449  ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
450  ->caller( __METHOD__ )
451  ->fetchResultSet();
452  foreach ( $res as $row ) {
453  $row->shardIndex = $shardIndex;
454  $row->tableName = $partitionTable;
455  $dataByKey[$row->keyname] = $row;
456  }
457  }
458  } catch ( DBError $e ) {
459  $this->handleDBError( $e, $shardIndex );
460  }
461  }
462 
463  foreach ( $keys as $key ) {
464  $row = $dataByKey[$key] ?? null;
465  if ( !$row ) {
466  continue;
467  }
468 
469  $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
470  try {
471  $db = $this->getConnection( $row->shardIndex );
472  $dataByKey[$key] = [
473  self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
474  self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
475  self::BLOB_CASTOKEN => $getCasToken
476  ? $this->getCasTokenFromRow( $db, $row )
477  : null
478  ];
479  } catch ( DBQueryError $e ) {
480  $this->handleDBError( $e, $row->shardIndex );
481  }
482  }
483 
484  return $dataByKey;
485  }
486 
500  private function modifyBlobs(
501  callable $tableWriteCallback,
502  float $mtime,
503  array $argsByKey,
504  &$resByKey = []
505  ) {
506  // Initialize order-preserved per-key results; callbacks mark successful results
507  $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
508 
510  $silenceScope = $this->silenceTransactionProfiler();
511 
512  $argsByKeyByTableByShard = [];
513  foreach ( $argsByKey as $key => $args ) {
514  [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
515  $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
516  }
517 
518  $shardIndexesAffected = [];
519  foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
520  foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
521  try {
522  $db = $this->getConnection( $shardIndex );
523  $shardIndexesAffected[] = $shardIndex;
524  $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
525  } catch ( DBError $e ) {
526  $this->handleDBError( $e, $shardIndex );
527  continue;
528  }
529  }
530  }
531 
532  $success = !in_array( false, $resByKey, true );
533 
534  foreach ( $shardIndexesAffected as $shardIndex ) {
535  try {
536  $db = $this->getConnection( $shardIndex );
537  $this->occasionallyGarbageCollect( $db );
538  } catch ( DBError $e ) {
539  $this->handleDBError( $e, $shardIndex );
540  }
541  }
542 
543  return $success;
544  }
545 
561  private function modifyTableSpecificBlobsForSet(
562  IDatabase $db,
563  string $ptable,
564  float $mtime,
565  array $argsByKey,
566  array &$resByKey
567  ) {
568  $valueSizesByKey = [];
569 
570  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
571 
572  $rows = [];
573  foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
574  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
575  $serialValue = $this->getSerialized( $value, $key );
576  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
577 
578  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
579  }
580 
581  if ( $this->multiPrimaryMode ) {
582  $db->upsert(
583  $ptable,
584  $rows,
585  [ [ 'keyname' ] ],
586  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
587  __METHOD__
588  );
589  } else {
590  // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
591  $db->replace( $ptable, 'keyname', $rows, __METHOD__ );
592  }
593 
594  foreach ( $argsByKey as $key => $unused ) {
595  $resByKey[$key] = true;
596  }
597 
598  $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
599  }
600 
617  private function modifyTableSpecificBlobsForDelete(
618  IDatabase $db,
619  string $ptable,
620  float $mtime,
621  array $argsByKey,
622  array &$resByKey
623  ) {
624  if ( $this->multiPrimaryMode ) {
625  // Tombstone keys in order to respect eventual consistency
626  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
627  $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
628  $rows = [];
629  foreach ( $argsByKey as $key => $arg ) {
630  $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
631  }
632  $db->upsert(
633  $ptable,
634  $rows,
635  [ [ 'keyname' ] ],
636  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
637  __METHOD__
638  );
639  } else {
640  // Just purge the keys since there is only one primary (e.g. "source of truth")
641  $db->delete( $ptable, [ 'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
642  }
643 
644  foreach ( $argsByKey as $key => $arg ) {
645  $resByKey[$key] = true;
646  }
647 
648  $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
649  }
650 
671  private function modifyTableSpecificBlobsForAdd(
672  IDatabase $db,
673  string $ptable,
674  float $mtime,
675  array $argsByKey,
676  array &$resByKey
677  ) {
678  $valueSizesByKey = [];
679 
680  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
681 
682  // This check must happen outside the write query to respect eventual consistency
683  $existingKeys = $db->newSelectQueryBuilder()
684  ->select( 'keyname' )
685  ->from( $ptable )
686  ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
687  ->caller( __METHOD__ )
688  ->fetchFieldValues();
689  $existingByKey = array_fill_keys( $existingKeys, true );
690 
691  $rows = [];
692  foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
693  if ( isset( $existingByKey[$key] ) ) {
694  $this->logger->debug( __METHOD__ . ": $key already exists" );
695  continue;
696  }
697 
698  $serialValue = $this->getSerialized( $value, $key );
699  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
700  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
701 
702  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
703  }
704 
705  $db->upsert(
706  $ptable,
707  $rows,
708  [ [ 'keyname' ] ],
709  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
710  __METHOD__
711  );
712 
713  foreach ( $argsByKey as $key => $unused ) {
714  $resByKey[$key] = !isset( $existingByKey[$key] );
715  }
716 
717  $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
718  }
719 
740  private function modifyTableSpecificBlobsForCas(
741  IDatabase $db,
742  string $ptable,
743  float $mtime,
744  array $argsByKey,
745  array &$resByKey
746  ) {
747  $valueSizesByKey = [];
748 
749  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
750 
751  // This check must happen outside the write query to respect eventual consistency
752  $res = $db->newSelectQueryBuilder()
753  ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) )
754  ->from( $ptable )
755  ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
756  ->caller( __METHOD__ )
757  ->fetchResultSet();
758 
759  $curTokensByKey = [];
760  foreach ( $res as $row ) {
761  $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
762  }
763 
764  $rows = [];
765  $nonMatchingByKey = [];
766  foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) {
767  $curToken = $curTokensByKey[$key] ?? null;
768  if ( $curToken === null ) {
769  $nonMatchingByKey[$key] = true;
770  $this->logger->debug( __METHOD__ . ": $key does not exists" );
771  continue;
772  }
773 
774  if ( $curToken !== $casToken ) {
775  $nonMatchingByKey[$key] = true;
776  $this->logger->debug( __METHOD__ . ": $key does not have a matching token" );
777  continue;
778  }
779 
780  $serialValue = $this->getSerialized( $value, $key );
781  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
782  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
783 
784  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
785  }
786 
787  $db->upsert(
788  $ptable,
789  $rows,
790  [ [ 'keyname' ] ],
791  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
792  __METHOD__
793  );
794 
795  foreach ( $argsByKey as $key => $unused ) {
796  $resByKey[$key] = !isset( $nonMatchingByKey[$key] );
797  }
798 
799  $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
800  }
801 
821  private function modifyTableSpecificBlobsForChangeTTL(
822  IDatabase $db,
823  string $ptable,
824  float $mtime,
825  array $argsByKey,
826  array &$resByKey
827  ) {
828  if ( $this->multiPrimaryMode ) {
829  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
830 
831  $res = $db->newSelectQueryBuilder()
832  ->select( [ 'keyname', 'value' ] )
833  ->from( $ptable )
834  ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
835  ->caller( __METHOD__ )
836  ->fetchResultSet();
837 
838  $rows = [];
839  $existingKeys = [];
840  foreach ( $res as $curRow ) {
841  $key = $curRow->keyname;
842  $existingKeys[$key] = true;
843  $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
844  [ $exptime ] = $argsByKey[$key];
845  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
846  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
847  }
848 
849  $db->upsert(
850  $ptable,
851  $rows,
852  [ [ 'keyname' ] ],
853  $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
854  __METHOD__
855  );
856 
857  foreach ( $argsByKey as $key => $unused ) {
858  $resByKey[$key] = isset( $existingKeys[$key] );
859  }
860  } else {
861  $keysBatchesByExpiry = [];
862  foreach ( $argsByKey as $key => [ $exptime ] ) {
863  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
864  $keysBatchesByExpiry[$expiry][] = $key;
865  }
866 
867  $existingCount = 0;
868  foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
869  $db->update(
870  $ptable,
871  [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
872  $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ),
873  __METHOD__
874  );
875  $existingCount += $db->affectedRows();
876  }
877  if ( $existingCount === count( $argsByKey ) ) {
878  foreach ( $argsByKey as $key => $args ) {
879  $resByKey[$key] = true;
880  }
881  }
882  }
883 
884  $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
885  }
886 
906  private function modifyTableSpecificBlobsForIncrInit(
907  IDatabase $db,
908  string $ptable,
909  float $mtime,
910  array $argsByKey,
911  array &$resByKey
912  ) {
913  foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
914  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
915  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
916 
917  // Use a transaction so that changes from other threads are not visible due to
918  // "consistent reads". This way, the exact post-increment value can be returned.
919  // The "live key exists" check can go inside the write query and remain safe for
920  // replication since the TTL for such keys is either indefinite or very short.
921  $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
922  try {
923  $db->upsert(
924  $ptable,
925  $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
926  [ [ 'keyname' ] ],
927  $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
928  __METHOD__
929  );
930  $affectedCount = $db->affectedRows();
931  $row = $db->newSelectQueryBuilder()
932  ->select( 'value' )
933  ->from( $ptable )
934  ->where( [ 'keyname' => $key ] )
935  ->caller( __METHOD__ )
936  ->fetchRow();
937  } catch ( Exception $e ) {
938  $db->cancelAtomic( __METHOD__, $atomic );
939  throw $e;
940  }
941  $db->endAtomic( __METHOD__ );
942 
943  if ( !$affectedCount || $row === false ) {
944  $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
945  continue;
946  }
947 
948  $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
949  if ( !$this->isInteger( $serialValue ) ) {
950  $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
951  continue;
952  }
953 
954  $resByKey[$key] = (int)$serialValue;
955  }
956 
957  $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
958  }
959 
971  private function modifyTableSpecificBlobsForIncrInitAsync(
972  IDatabase $db,
973  string $ptable,
974  float $mtime,
975  array $argsByKey,
976  array &$resByKey
977  ) {
978  foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
979  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
980  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
981  $db->upsert(
982  $ptable,
983  $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
984  [ [ 'keyname' ] ],
985  $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
986  __METHOD__
987  );
988  if ( !$db->affectedRows() ) {
989  $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
990  } else {
991  $resByKey[$key] = true;
992  }
993  }
994  }
995 
1001  private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1002  $expiry = $this->getExpirationAsTimestamp( $exptime );
1003  // Eventual consistency requires the preservation of recently modified keys.
1004  // Do not create rows with `exptime` fields so low that they might get garbage
1005  // collected before being replicated.
1006  if ( $expiry !== self::TTL_INDEFINITE ) {
1007  $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1008  }
1009 
1010  return $expiry;
1011  }
1012 
1031  private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1032  if ( !$this->lock( $key, 0 ) ) {
1033  return null;
1034  }
1035 
1036  $scope = new ScopedCallback( function () use ( $key ) {
1037  $this->unlock( $key );
1038  } );
1039 
1040  // sprintf is used to adjust precision
1041  return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] );
1042  }
1043 
1053  private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) {
1054  // We have reserved space for upto 6 digits in the microsecond portion of the token.
1055  // This is for future use only (maybe CAS tokens) and not currently used.
1056  // It is currently populated by the microsecond portion returned by microtime,
1057  // which generally has fewer than 6 digits of meaningful precision but can still be useful
1058  // in debugging (to see the token continuously change even during rapid testing).
1059  $seconds = (int)$mtime;
1060  [ , $microseconds ] = explode( '.', sprintf( '%.6F', $mtime ) );
1061 
1062  $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );
1063 
1064  $token = implode( '', [
1065  // 67 bit integral portion of UNIX timestamp, qualified
1066  \Wikimedia\base_convert(
1067  // 35 bit integral seconds portion of UNIX timestamp
1068  str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1069  // 32 bit ID of the primary database server handling the write
1070  str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1071  2,
1072  36,
1073  13
1074  ),
1075  // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1076  str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1077  ] );
1078 
1079  if ( strlen( $token ) !== 17 ) {
1080  throw new RuntimeException( "Modification timestamp overflow detected" );
1081  }
1082 
1083  return $token;
1084  }
1085 
1094  private function buildExistenceConditions( IDatabase $db, $keys, int $time ) {
1095  // Note that tombstones always have past expiration dates
1096  return [
1097  'keyname' => $keys,
1098  'exptime >= ' . $db->addQuotes( $db->timestamp( $time ) )
1099  ];
1100  }
1101 
1112  private function buildUpsertRow(
1113  IDatabase $db,
1114  $key,
1115  $serialValue,
1116  int $expiry,
1117  string $mt
1118  ) {
1119  $row = [
1120  'keyname' => $key,
1121  'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1122  'exptime' => $this->encodeDbExpiry( $db, $expiry )
1123  ];
1124  if ( $this->multiPrimaryMode ) {
1125  $row['modtoken'] = $mt;
1126  }
1127 
1128  return $row;
1129  }
1130 
1138  private function buildMultiUpsertSetForOverwrite( IDatabase $db, string $mt ) {
1139  $expressionsByColumn = [
1140  'value' => $db->buildExcludedValue( 'value' ),
1141  'exptime' => $db->buildExcludedValue( 'exptime' )
1142  ];
1143 
1144  $set = [];
1145  if ( $this->multiPrimaryMode ) {
1146  // The query might take a while to replicate, during which newer values might get
1147  // written. Qualify the query so that it does not override such values. Note that
1148  // duplicate tokens generated around the same time for a key should still result
1149  // in convergence given the use of server_id in modtoken (providing a total order
1150  // among primary DB servers) and MySQL binlog ordering (providing a total order
1151  // for writes replicating from a given primary DB server).
1152  $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1153  foreach ( $expressionsByColumn as $column => $updateExpression ) {
1154  $rhs = $db->conditional(
1155  $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' .
1156  $db->buildSubString( 'modtoken', 1, 13 ),
1157  $updateExpression,
1158  $column
1159  );
1160  $set[] = "{$column}=" . trim( $rhs );
1161  }
1162  } else {
1163  foreach ( $expressionsByColumn as $column => $updateExpression ) {
1164  $set[] = "{$column}={$updateExpression}";
1165  }
1166  }
1167 
1168  return $set;
1169  }
1170 
1182  private function buildIncrUpsertSet(
1183  IDatabase $db,
1184  int $step,
1185  int $init,
1186  int $expiry,
1187  string $mt,
1188  int $mtUnixTs
1189  ) {
1190  // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1191  $expressionsByColumn = [
1192  'value' => [
1193  $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1194  $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1195  ],
1196  'exptime' => [
1197  'exptime',
1198  $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1199  ]
1200  ];
1201  if ( $this->multiPrimaryMode ) {
1202  $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1203  }
1204 
1205  $set = [];
1206  foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) {
1207  $rhs = $db->conditional(
1208  'exptime >= ' . $db->addQuotes( $db->timestamp( $mtUnixTs ) ),
1209  $updateExpression,
1210  $initExpression
1211  );
1212  $set[] = "{$column}=" . trim( $rhs );
1213  }
1214 
1215  return $set;
1216  }
1217 
1223  private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1224  return ( $expiry === self::TTL_INDEFINITE )
1225  // Use the maximum timestamp that the column can store
1226  ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1227  // Convert the absolute timestamp into the DB timestamp format
1228  : $db->timestamp( $expiry );
1229  }
1230 
1236  private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1237  return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1238  ? self::TTL_INDEFINITE
1239  : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1240  }
1241 
1247  private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1248  return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1249  }
1250 
1256  private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1257  return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1258  }
1259 
1267  private function addCasTokenFields( IDatabase $db, array $fields ) {
1268  $type = $db->getType();
1269 
1270  if ( $type === 'mysql' ) {
1271  $fields['castoken'] = $db->buildConcat( [
1272  'SHA1(value)',
1273  $db->addQuotes( '@' ),
1274  'exptime'
1275  ] );
1276  } elseif ( $type === 'postgres' ) {
1277  $fields['castoken'] = $db->buildConcat( [
1278  'md5(value)',
1279  $db->addQuotes( '@' ),
1280  'exptime'
1281  ] );
1282  } else {
1283  if ( !in_array( 'value', $fields, true ) ) {
1284  $fields[] = 'value';
1285  }
1286  if ( !in_array( 'exptime', $fields, true ) ) {
1287  $fields[] = 'exptime';
1288  }
1289  }
1290 
1291  return $fields;
1292  }
1293 
1301  private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1302  if ( isset( $row->castoken ) ) {
1303  $token = $row->castoken;
1304  } else {
1305  $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1306  $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1307  }
1308 
1309  return $token;
1310  }
1311 
1316  private function occasionallyGarbageCollect( IDatabase $db ) {
1317  if (
1318  // Random purging is enabled
1319  $this->purgePeriod &&
1320  // Only purge on one in every $this->purgePeriod writes
1321  mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1322  // Avoid repeating the delete within a few seconds
1323  ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1324  ) {
1325  $garbageCollector = function () use ( $db ) {
1327  $silenceScope = $this->silenceTransactionProfiler();
1328  $this->deleteServerObjectsExpiringBefore(
1329  $db,
1330  (int)$this->getCurrentTime(),
1331  $this->purgeLimit
1332  );
1333  $this->lastGarbageCollect = time();
1334  };
1335  if ( $this->asyncHandler ) {
1336  $this->lastGarbageCollect = $this->getCurrentTime(); // avoid duplicate enqueues
1337  ( $this->asyncHandler )( $garbageCollector );
1338  } else {
1339  $garbageCollector();
1340  }
1341  }
1342  }
1343 
1344  public function expireAll() {
1345  $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1346  }
1347 
1349  $timestamp,
1350  callable $progress = null,
1351  $limit = INF,
1352  string $tag = null
1353  ) {
1355  $silenceScope = $this->silenceTransactionProfiler();
1356 
1357  if ( $tag !== null ) {
1358  // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1359  $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1360  } else {
1361  $shardIndexes = $this->getShardServerIndexes();
1362  shuffle( $shardIndexes );
1363  }
1364 
1365  $ok = true;
1366  $numServers = count( $shardIndexes );
1367 
1368  $keysDeletedCount = 0;
1369  foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1370  try {
1371  $db = $this->getConnection( $shardIndex );
1372  $this->deleteServerObjectsExpiringBefore(
1373  $db,
1374  $timestamp,
1375  $limit,
1376  $keysDeletedCount,
1377  [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1378  );
1379  } catch ( DBError $e ) {
1380  $this->handleDBError( $e, $shardIndex );
1381  $ok = false;
1382  }
1383  }
1384 
1385  return $ok;
1386  }
1387 
1397  private function deleteServerObjectsExpiringBefore(
1398  IDatabase $db,
1399  $timestamp,
1400  $limit,
1401  &$keysDeletedCount = 0,
1402  array $progress = null
1403  ) {
1404  $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1405  if ( $this->multiPrimaryMode ) {
1406  // Eventual consistency requires the preservation of any key that was recently
1407  // modified. The key must exist on this database server long enough for the server
1408  // to receive, via replication, all writes to the key with lower timestamps. Such
1409  // writes should be no-ops since the existing key value should "win". If the network
1410  // partitions between datacenters A and B for 30 minutes, the database servers in
1411  // each datacenter will see an initial burst of writes with "old" timestamps via
1412  // replication. This might include writes with lower timestamps that the existing
1413  // key value. Therefore, clock skew and replication delay are both factors.
1414  $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1415  $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1416  }
1417  $tableIndexes = range( 0, $this->numTableShards - 1 );
1418  shuffle( $tableIndexes );
1419 
1420  $batchSize = min( $this->writeBatchSize, $limit );
1421 
1422  foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1423  // The oldest expiry of a row we have deleted on this shard
1424  // (the first row that we deleted)
1425  $minExpUnix = null;
1426  // The most recent expiry time so far, from a row we have deleted on this shard
1427  $maxExp = null;
1428  // Size of the time range we'll delete, in seconds (for progress estimate)
1429  $totalSeconds = null;
1430 
1431  do {
1432  $res = $db->newSelectQueryBuilder()
1433  ->select( [ 'keyname', 'exptime' ] )
1434  ->from( $this->getTableNameByShard( $tableIndex ) )
1435  ->where(
1436  array_merge(
1437  [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
1438  $maxExp ? [ 'exptime >= ' . $db->addQuotes( $maxExp ) ] : []
1439  )
1440  )
1441  ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC )
1442  ->limit( $batchSize )
1443  ->caller( __METHOD__ )
1444  ->fetchResultSet();
1445 
1446  if ( $res->numRows() ) {
1447  $row = $res->current();
1448  if ( $minExpUnix === null ) {
1449  $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1450  $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1451  }
1452 
1453  $keys = [];
1454  foreach ( $res as $row ) {
1455  $keys[] = $row->keyname;
1456  $maxExp = $row->exptime;
1457  }
1458 
1459  $db->delete(
1460  $this->getTableNameByShard( $tableIndex ),
1461  [
1462  'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
1463  'keyname' => $keys
1464  ],
1465  __METHOD__
1466  );
1467  $keysDeletedCount += $db->affectedRows();
1468  }
1469 
1470  if ( $progress && is_callable( $progress['fn'] ) ) {
1471  if ( $totalSeconds ) {
1472  $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1473  $remainingSeconds = $cutoffUnix - $maxExpUnix;
1474  $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1475  // For example, if we've done 1.5 table shard, and are thus half-way on the
1476  // 2nd of perhaps 5 tables on this server, then this might be:
1477  // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1478  $tablesDoneRatio =
1479  ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1480  } else {
1481  $tablesDoneRatio = 1;
1482  }
1483 
1484  // For example, if we're 30% done on the last of 10 servers, then this might be:
1485  // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1486  $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1487  ( $tablesDoneRatio / $progress['serversTotal'] );
1488  ( $progress['fn'] )( $overallRatio * 100 );
1489  }
1490  } while ( $res->numRows() && $keysDeletedCount < $limit );
1491  }
1492  }
1493 
1499  public function deleteAll() {
1501  $silenceScope = $this->silenceTransactionProfiler();
1502  foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1503  try {
1504  $db = $this->getConnection( $shardIndex );
1505  for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1506  $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
1507  }
1508  } catch ( DBError $e ) {
1509  $this->handleDBError( $e, $shardIndex );
1510  return false;
1511  }
1512  }
1513  return true;
1514  }
1515 
1516  public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1518  $silenceScope = $this->silenceTransactionProfiler();
1519 
1520  $lockTsUnix = null;
1521 
1522  [ $shardIndex ] = $this->getKeyLocation( $key );
1523  try {
1524  $db = $this->getConnection( $shardIndex );
1525  $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1526  } catch ( DBError $e ) {
1527  $this->handleDBError( $e, $shardIndex );
1528  $this->logger->warning(
1529  __METHOD__ . ' failed due to I/O error for {key}.',
1530  [ 'key' => $key ]
1531  );
1532  }
1533 
1534  return $lockTsUnix;
1535  }
1536 
1537  public function doUnlock( $key ) {
1539  $silenceScope = $this->silenceTransactionProfiler();
1540 
1541  [ $shardIndex ] = $this->getKeyLocation( $key );
1542 
1543  try {
1544  $db = $this->getConnection( $shardIndex );
1545  $released = $db->unlock( $key, __METHOD__ );
1546  } catch ( DBError $e ) {
1547  $this->handleDBError( $e, $shardIndex );
1548  $released = false;
1549  }
1550 
1551  return $released;
1552  }
1553 
1554  protected function makeKeyInternal( $keyspace, $components ) {
1555  // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1556  // subtract the number of characters we need for the keyspace and for
1557  // the separator character needed for each argument. To handle some
1558  // custom prefixes used by thing like WANObjectCache, limit to 205.
1559  $keyspace = strtr( $keyspace, ' ', '_' );
1560  $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1561  foreach ( $components as &$component ) {
1562  $component = strtr( $component, [
1563  ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1564  ':' => '%3A',
1565  ] );
1566 
1567  // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1568  if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1569  $component = '#' . md5( $component );
1570  }
1571  $charsLeft -= strlen( $component );
1572  }
1573 
1574  if ( $charsLeft < 0 ) {
1575  return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1576  }
1577  return $keyspace . ':' . implode( ':', $components );
1578  }
1579 
1580  protected function serialize( $value ) {
1581  if ( is_int( $value ) ) {
1582  return $value;
1583  }
1584 
1585  $serial = serialize( $value );
1586  if ( $this->hasZlib ) {
1587  // On typical message and page data, this can provide a 3X storage savings
1588  $serial = gzdeflate( $serial );
1589  }
1590 
1591  return $serial;
1592  }
1593 
1594  protected function unserialize( $value ) {
1595  if ( $value === self::TOMB_SERIAL ) {
1596  return false; // tombstone
1597  }
1598 
1599  if ( $this->isInteger( $value ) ) {
1600  return (int)$value;
1601  }
1602 
1603  if ( $this->hasZlib ) {
1604  AtEase::suppressWarnings();
1605  $decompressed = gzinflate( $value );
1606  AtEase::restoreWarnings();
1607 
1608  if ( $decompressed !== false ) {
1609  $value = $decompressed;
1610  }
1611  }
1612 
1613  return unserialize( $value );
1614  }
1615 
1616  private function getLoadBalancer(): ILoadBalancer {
1617  if ( !$this->loadBalancer ) {
1618  $this->loadBalancer = ( $this->loadBalancerCallback )();
1619  }
1620  return $this->loadBalancer;
1621  }
1622 
1627  private function getConnectionViaLoadBalancer() {
1628  $lb = $this->getLoadBalancer();
1629 
1630  if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1631  // Use the main connection to avoid transaction deadlocks
1632  $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain );
1633  } else {
1634  // If the RDBMS has row/table/page level locking, then use separate auto-commit
1635  // connection to avoid needless contention and deadlocks.
1636  $conn = $lb->getMaintenanceConnectionRef(
1637  $this->replicaOnly ? DB_REPLICA : DB_PRIMARY,
1638  [],
1639  $this->dbDomain,
1640  $lb::CONN_TRX_AUTOCOMMIT
1641  );
1642  }
1643 
1644  // Make sure any errors are thrown now while we can more easily handle them
1645  $conn->ensureConnection();
1646  return $conn;
1647  }
1648 
1655  private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1656  if ( !isset( $this->conns[$shardIndex] ) ) {
1658  $dbFactory = MediaWikiServices::getInstance()->getDatabaseFactory();
1659  $conn = $dbFactory->create(
1660  $server['type'],
1661  array_merge(
1662  $server,
1663  [
1664  // Make sure the handle uses autocommit mode
1665  'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1666  'logger' => $this->logger,
1667  ]
1668  )
1669  );
1670  // Automatically create the objectcache table for sqlite as needed
1671  if ( $conn->getType() === 'sqlite' ) {
1672  $this->initSqliteDatabase( $conn );
1673  }
1674  $this->conns[$shardIndex] = $conn;
1675  }
1676 
1677  // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive
1678  return $this->conns[$shardIndex];
1679  }
1680 
1687  private function handleDBError( DBError $exception, $shardIndex ) {
1688  if ( !$this->useLB && $exception instanceof DBConnectionError ) {
1689  $this->markServerDown( $exception, $shardIndex );
1690  }
1691  $this->setAndLogDBError( $exception );
1692  }
1693 
1697  private function setAndLogDBError( DBError $e ) {
1698  $this->logger->error( "DBError: {$e->getMessage()}", [ 'exception' => $e ] );
1699  if ( $e instanceof DBConnectionError ) {
1700  $this->setLastError( self::ERR_UNREACHABLE );
1701  $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1702  } else {
1703  $this->setLastError( self::ERR_UNEXPECTED );
1704  $this->logger->warning( __METHOD__ . ": ignoring query error" );
1705  }
1706  }
1707 
1714  private function markServerDown( DBError $exception, $shardIndex ) {
1715  unset( $this->conns[$shardIndex] ); // bug T103435
1716 
1717  $now = $this->getCurrentTime();
1718  if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1719  if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1720  unset( $this->connFailureTimes[$shardIndex] );
1721  unset( $this->connFailureErrors[$shardIndex] );
1722  } else {
1723  $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1724  return;
1725  }
1726  }
1727  $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1728  $this->connFailureTimes[$shardIndex] = $now;
1729  $this->connFailureErrors[$shardIndex] = $exception;
1730  }
1731 
1736  private function initSqliteDatabase( IMaintainableDatabase $db ) {
1737  if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1738  return;
1739  }
1740  // Use one table for SQLite; sharding does not seem to have much benefit
1741  $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1742  $db->startAtomic( __METHOD__ ); // atomic DDL
1743  try {
1744  $encTable = $db->tableName( 'objectcache' );
1745  $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1746  $db->query(
1747  "CREATE TABLE $encTable (\n" .
1748  " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1749  " value BLOB,\n" .
1750  " exptime BLOB NOT NULL\n" .
1751  ")",
1752  __METHOD__
1753  );
1754  $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1755  $db->endAtomic( __METHOD__ );
1756  } catch ( DBError $e ) {
1757  $db->rollback( __METHOD__ );
1758  throw $e;
1759  }
1760  }
1761 
1777  public function createTables() {
1778  foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1779  $db = $this->getConnection( $shardIndex );
1780  if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1781  for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1782  $encBaseTable = $db->tableName( 'objectcache' );
1783  $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1784  $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1785  }
1786  }
1787  }
1788  }
1789 
1793  private function getShardServerIndexes() {
1794  if ( $this->useLB ) {
1795  // LoadBalancer based configuration
1796  $shardIndexes = [ 0 ];
1797  } else {
1798  // Striped array of database servers
1799  $shardIndexes = array_keys( $this->serverTags );
1800  }
1801 
1802  return $shardIndexes;
1803  }
1804 
1810  private function getShardServerIndexForTag( string $tag ) {
1811  if ( !$this->serverTags ) {
1812  throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1813  }
1814  foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1815  if ( $tag === $serverTag ) {
1816  return $serverShardIndex;
1817  }
1818  }
1819  throw new InvalidArgumentException( "Unknown server tag: $tag" );
1820  }
1821 
1827  private function silenceTransactionProfiler() {
1828  if ( $this->serverInfos ) {
1829  return null; // no TransactionProfiler injected anyway
1830  }
1831  return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1832  }
1833 }
$success
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Definition: WebStart.php:88
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Definition: ArrayUtils.php:49
string $keyspace
Default keyspace; used by makeKey()
Definition: BagOStuff.php:101
callable null $asyncHandler
Definition: BagOStuff.php:91
getCurrentTime()
Definition: BagOStuff.php:812
Service locator for MediaWiki core services.
Storage medium specific cache for storing items (e.g.
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
unlock( $key)
Release an advisory lock on a key string.
updateOpStats(string $op, array $keyInfo)
isInteger( $value)
Check if a value is an integer.
lock( $key, $timeout=6, $exptime=6, $rclass='')
static instance()
Singleton.
Definition: Profiler.php:108
RDBMS-based caching module.
bool $multiPrimaryMode
Whether multi-primary mode is enabled.
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF, string $tag=null)
Delete all objects expiring before a certain date.
ILoadBalancer null $loadBalancer
doDelete( $key, $flags=0)
Delete an item.
float $lastGarbageCollect
UNIX timestamp.
string[] $serverTags
(server index => tag/host name)
doSetMulti(array $data, $exptime=0, $flags=0)
createTables()
Create the shard tables on all databases.
serialize( $value)
int $purgePeriod
Average number of writes required to trigger garbage collection.
doChangeTTL( $key, $exptime, $flags)
unserialize( $value)
array[] $serverInfos
(server index => server config)
deleteAll()
Delete content of shard tables in every server.
callable null $loadBalancerCallback
Injected function which returns a LoadBalancer.
doDeleteMulti(array $keys, $flags=0)
int $numTableShards
Number of table shards to use on each server.
int $purgeLimit
Max expired rows to purge during randomized garbage collection.
__construct( $params)
Create a new backend instance from parameters injected by ObjectCache::newFromParams()
doGet( $key, $flags=0, &$casToken=null)
Get an item.
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
bool $replicaOnly
Whether to use replicas instead of primaries (if using LoadBalancer)
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
makeKeyInternal( $keyspace, $components)
Make a cache key for the given keyspace and components.
doLock( $key, $timeout=6, $exptime=6)
Exception[] $connFailureErrors
Map of (shard index => Exception)
doIncrWithInit( $key, $exptime, $step, $init, $flags)
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
string $tableName
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
bool $useLB
Whether to use the LoadBalancer.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Set an item if the current CAS token matches the provided CAS token.
string false null $dbDomain
DB name used for keys using the LoadBalancer.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
Database error base class.
Definition: DBError.php:31
A query builder for SELECT queries with a fluent interface.
const ATTR_EMULATION
Emulation/fallback mode; see QOS_EMULATION_*; higher is better.
const QOS_DURABILITY_RDBMS
Data is saved to disk and writes usually block on fsync(), like a standard RDBMS.
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
const QOS_EMULATION_SQL
Fallback disk-based SQL store.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:36
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
unlock( $lockName, $method)
Release a lock.
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
lock( $lockName, $method, $timeout=5, $flags=0)
Acquire a named lock.
getTopologyBasedServerId()
Get a non-recycled ID that uniquely identifies this server within the replication topology.
affectedRows()
Get the number of rows affected by the last attempted query statement.
delete( $table, $conds, $fname=__METHOD__)
Delete all rows in a table that match a condition.
update( $table, $set, $conds, $fname=__METHOD__, $options=[])
Update all rows in a table that match a given condition.
upsert( $table, array $rows, $uniqueKeys, array $set, $fname=__METHOD__)
Upsert row(s) into a table, in the provided order, while updating conflicting rows.
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
replace( $table, $uniqueKeys, $rows, $fname=__METHOD__)
Insert row(s) into a table, in the provided order, while deleting conflicting rows.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
cancelAtomic( $fname=__METHOD__, AtomicSectionIdentifier $sectionId=null)
Cancel an atomic section of SQL statements.
This class is a delegate to ILBFactory for a given database cluster.
Advanced database interface for IDatabase handles that include maintenance methods.
tableExists( $table, $fname=__METHOD__)
Query whether a given table exists.
getType()
Get the RDBMS type of the server (e.g.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
encodeBlob( $b)
Some DBMSs have a special format for inserting into blob fields, they don't allow simple quoted strin...
decodeBlob( $b)
Some DBMSs return a special placeholder object representing blob fields in result objects.
tablePrefix( $prefix=null)
Get/set the table prefix.
getServerName()
Get the readable name for the server.
buildExcludedValue( $column)
Build a reference to a column value from the conflicting proposed upsert() row.
conditional( $cond, $caseTrueExpression, $caseFalseExpression)
Returns an SQL expression for a simple conditional.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.
buildSubString( $input, $startPosition, $length=null)
Build a SUBSTRING function.
addIdentifierQuotes( $s)
Escape a SQL identifier (e.g.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
buildConcat( $stringList)
Build a concatenation list to feed into a SQL query.
const DB_REPLICA
Definition: defines.php:26
const DB_PRIMARY
Definition: defines.php:28
const DBO_TRX
Definition: defines.php:12