MediaWiki  master
SqlBagOStuff.php
Go to the documentation of this file.
1 <?php
24 use Wikimedia\AtEase\AtEase;
25 use Wikimedia\ObjectFactory\ObjectFactory;
34 use Wikimedia\ScopedCallback;
35 use Wikimedia\Timestamp\ConvertibleTimestamp;
36 use Wikimedia\WaitConditionLoop;
37 
54  protected $localKeyLb;
56  protected $globalKeyLb;
58  protected $globalKeyLbDomain;
59 
61  protected $serverInfos = [];
63  protected $serverTags = [];
65  protected $lastGarbageCollect = 0;
67  protected $purgePeriod = 10;
69  protected $purgeLimit = 100;
71  protected $numTableShards = 1;
73  protected $writeBatchSize = 100;
75  protected $tableName = 'objectcache';
77  protected $replicaOnly;
80 
82  protected $conns;
84  protected $connFailureTimes = [];
86  protected $connFailureErrors = [];
87 
89  private $hasZlib;
90 
91  private const SHARD_LOCAL = 'local';
92  private const SHARD_GLOBAL = 'global';
93 
95  private const SAFE_CLOCK_BOUND_SEC = 15;
97  private const SAFE_PURGE_DELAY_SEC = 3600;
99  private const TOMB_SERIAL = '';
103  private const GC_DELAY_SEC = 1;
104 
105  private const BLOB_VALUE = 0;
106  private const BLOB_EXPIRY = 1;
107  private const BLOB_CASTOKEN = 2;
108 
115  private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
116 
165  public function __construct( $params ) {
166  parent::__construct( $params );
167 
168  $dbType = null;
169  if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
170  // Configuration uses a direct list of servers.
171  // Object data is horizontally partitioned via key hash.
172  $index = 0;
173  foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
174  $this->serverInfos[$index] = $info;
175  // Allow integer-indexes arrays for b/c
176  $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
177  $dbType = $info['type'];
178  ++$index;
179  }
180  } else {
181  // Configuration uses the servers defined in LoadBalancer instances.
182  // Object data is vertically partitioned via global vs local keys.
183  if ( isset( $params['globalKeyLB'] ) ) {
184  $this->globalKeyLb = ( $params['globalKeyLB'] instanceof ILoadBalancer )
185  ? $params['globalKeyLB']
186  : ObjectFactory::getObjectFromSpec( $params['globalKeyLB'] );
187  $this->globalKeyLbDomain = $params['globalKeyLbDomain'] ?? null;
188  if ( $this->globalKeyLbDomain === null ) {
189  throw new InvalidArgumentException(
190  "Config requires 'globalKeyLbDomain' if 'globalKeyLB' is set"
191  );
192  }
193  $writerIndex = $this->globalKeyLb->getWriterIndex();
194  $dbType = $this->globalKeyLb->getServerType( $writerIndex );
195  }
196  if ( isset( $params['localKeyLB'] ) ) {
197  $this->localKeyLb = ( $params['localKeyLB'] instanceof ILoadBalancer )
198  ? $params['localKeyLB']
199  : ObjectFactory::getObjectFromSpec( $params['localKeyLB'] );
200  $writerIndex = $this->localKeyLb->getWriterIndex();
201  $dbType = $this->localKeyLb->getServerType( $writerIndex );
202  } else {
203  $this->localKeyLb = $this->globalKeyLb;
204  }
205  // When using LoadBalancer instances, one *must* be defined for local keys
206  if ( !$this->localKeyLb ) {
207  throw new InvalidArgumentException(
208  "Config requires 'server', 'servers', or 'localKeyLB'/'globalKeyLB'"
209  );
210  }
211  }
212 
213  $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
214  $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
215  $this->tableName = $params['tableName'] ?? $this->tableName;
216  $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
217  $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
218  $this->replicaOnly = $params['replicaOnly'] ?? false;
219 
220  if ( $params['multiPrimaryMode'] ?? false ) {
221  $this->multiPrimaryModeType = $dbType;
222  }
223 
226 
227  $this->hasZlib = extension_loaded( 'zlib' );
228  }
229 
230  protected function doGet( $key, $flags = 0, &$casToken = null ) {
231  $getToken = ( $casToken === self::PASS_BY_REF );
232  $casToken = null;
233 
234  $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
235  if ( $data ) {
236  $result = $this->unserialize( $data[self::BLOB_VALUE] );
237  if ( $getToken && $result !== false ) {
238  $casToken = $data[self::BLOB_CASTOKEN];
239  }
240  $valueSize = strlen( $data[self::BLOB_VALUE] );
241  } else {
242  $result = false;
243  $valueSize = false;
244  }
245 
246  $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
247 
248  return $result;
249  }
250 
251  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
252  $mtime = $this->getCurrentTime();
253 
254  return $this->modifyBlobs(
255  [ $this, 'modifyTableSpecificBlobsForSet' ],
256  $mtime,
257  [ $key => [ $value, $exptime ] ],
258  $flags
259  );
260  }
261 
262  protected function doDelete( $key, $flags = 0 ) {
263  $mtime = $this->getCurrentTime();
264 
265  return $this->modifyBlobs(
266  [ $this, 'modifyTableSpecificBlobsForDelete' ],
267  $mtime,
268  [ $key => [] ],
269  $flags
270  );
271  }
272 
273  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
274  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
275  if ( $mtime === null ) {
276  // Timeout or I/O error during lock acquisition
277  return false;
278  }
279 
280  return $this->modifyBlobs(
281  [ $this, 'modifyTableSpecificBlobsForAdd' ],
282  $mtime,
283  [ $key => [ $value, $exptime ] ],
284  $flags
285  );
286  }
287 
288  protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
289  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
290  if ( $mtime === null ) {
291  // Timeout or I/O error during lock acquisition
292  return false;
293  }
294 
295  return $this->modifyBlobs(
296  [ $this, 'modifyTableSpecificBlobsForCas' ],
297  $mtime,
298  [ $key => [ $value, $exptime, $casToken ] ],
299  $flags
300  );
301  }
302 
303  protected function doChangeTTL( $key, $exptime, $flags ) {
304  $mtime = $this->getCurrentTime();
305 
306  return $this->modifyBlobs(
307  [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
308  $mtime,
309  [ $key => [ $exptime ] ],
310  $flags
311  );
312  }
313 
314  protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
315  $mtime = $this->getCurrentTime();
316 
317  $result = $this->modifyBlobs(
318  [ $this, 'modifyTableSpecificBlobsForIncrInit' ],
319  $mtime,
320  [ $key => [ $step, $init, $exptime ] ],
321  $flags,
322  $resByKey
323  ) ? $resByKey[$key] : false;
324 
325  return $result;
326  }
327 
328  public function incr( $key, $value = 1, $flags = 0 ) {
329  return $this->doIncr( $key, $value, $flags );
330  }
331 
332  public function decr( $key, $value = 1, $flags = 0 ) {
333  return $this->doIncr( $key, -$value, $flags );
334  }
335 
336  private function doIncr( $key, $value = 1, $flags = 0 ) {
337  $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
338  if ( $mtime === null ) {
339  // Timeout or I/O error during lock acquisition
340  return false;
341  }
342 
343  $data = $this->fetchBlobs( [ $key ] )[$key];
344  if ( $data ) {
345  $serialValue = $data[self::BLOB_VALUE];
346  if ( $this->isInteger( $serialValue ) ) {
347  $newValue = max( (int)$serialValue + (int)$value, 0 );
348  $result = $this->modifyBlobs(
349  [ $this, 'modifyTableSpecificBlobsForSet' ],
350  $mtime,
351  // Preserve the old expiry timestamp
352  [ $key => [ $newValue, $data[self::BLOB_EXPIRY] ] ],
353  $flags
354  ) ? $newValue : false;
355  } else {
356  $result = false;
357  $this->logger->warning( __METHOD__ . ": $key is a non-integer" );
358  }
359  } else {
360  $result = false;
361  $this->logger->debug( __METHOD__ . ": $key does not exists" );
362  }
363 
364  $this->updateOpStats( $value >= 0 ? self::METRIC_OP_INCR : self::METRIC_OP_DECR, [ $key ] );
365 
366  return $result;
367  }
368 
369  protected function doGetMulti( array $keys, $flags = 0 ) {
370  $result = [];
371  $valueSizeByKey = [];
372 
373  $dataByKey = $this->fetchBlobs( $keys );
374  foreach ( $keys as $key ) {
375  $data = $dataByKey[$key];
376  if ( $data ) {
377  $serialValue = $data[self::BLOB_VALUE];
378  $value = $this->unserialize( $serialValue );
379  if ( $value !== false ) {
380  $result[$key] = $value;
381  }
382  $valueSize = strlen( $serialValue );
383  } else {
384  $valueSize = false;
385  }
386  $valueSizeByKey[$key] = [ 0, $valueSize ];
387  }
388 
389  $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
390 
391  return $result;
392  }
393 
394  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
395  $mtime = $this->getCurrentTime();
396 
397  return $this->modifyBlobs(
398  [ $this, 'modifyTableSpecificBlobsForSet' ],
399  $mtime,
400  array_map(
401  static function ( $value ) use ( $exptime ) {
402  return [ $value, $exptime ];
403  },
404  $data
405  ),
406  $flags
407  );
408  }
409 
410  protected function doDeleteMulti( array $keys, $flags = 0 ) {
411  $mtime = $this->getCurrentTime();
412 
413  return $this->modifyBlobs(
414  [ $this, 'modifyTableSpecificBlobsForDelete' ],
415  $mtime,
416  array_fill_keys( $keys, [] ),
417  $flags
418  );
419  }
420 
421  public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
422  $mtime = $this->getCurrentTime();
423 
424  return $this->modifyBlobs(
425  [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
426  $mtime,
427  array_fill_keys( $keys, [ $exptime ] ),
428  $flags
429  );
430  }
431 
440  private function getConnection( $shardIndex ) {
441  // Don't keep timing out trying to connect if the server is down
442  if (
443  isset( $this->connFailureErrors[$shardIndex] ) &&
444  ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
445  ) {
446  throw $this->connFailureErrors[$shardIndex];
447  }
448 
449  if ( $shardIndex === self::SHARD_LOCAL ) {
450  $conn = $this->getConnectionViaLoadBalancer( $shardIndex );
451  } elseif ( $shardIndex === self::SHARD_GLOBAL ) {
452  $conn = $this->getConnectionViaLoadBalancer( $shardIndex );
453  } elseif ( is_int( $shardIndex ) ) {
454  if ( isset( $this->serverInfos[$shardIndex] ) ) {
455  $server = $this->serverInfos[$shardIndex];
456  $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
457  } else {
458  throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
459  }
460  } else {
461  throw new UnexpectedValueException( "Invalid server index '$shardIndex'" );
462  }
463 
464  return $conn;
465  }
466 
472  private function getKeyLocation( $key ) {
473  if ( $this->serverTags ) {
474  // Striped array of database servers
475  if ( count( $this->serverTags ) == 1 ) {
476  $shardIndex = 0; // short-circuit
477  } else {
478  $sortedServers = $this->serverTags;
479  ArrayUtils::consistentHashSort( $sortedServers, $key );
480  reset( $sortedServers );
481  $shardIndex = key( $sortedServers );
482  }
483  } else {
484  // LoadBalancer based configuration
485  $shardIndex = ( strpos( $key, 'global:' ) === 0 && $this->globalKeyLb )
486  ? self::SHARD_GLOBAL
487  : self::SHARD_LOCAL;
488  }
489 
490  if ( $this->numTableShards > 1 ) {
491  $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
492  $tableIndex = $hash % $this->numTableShards;
493  } else {
494  $tableIndex = null;
495  }
496 
497  return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
498  }
499 
505  private function getTableNameByShard( $index ) {
506  if ( $index !== null && $this->numTableShards > 1 ) {
507  $decimals = strlen( (string)( $this->numTableShards - 1 ) );
508 
509  return $this->tableName . sprintf( "%0{$decimals}d", $index );
510  }
511 
512  return $this->tableName;
513  }
514 
520  private function fetchBlobs( array $keys, bool $getCasToken = false ) {
522  $silenceScope = $this->silenceTransactionProfiler();
523 
524  // Initialize order-preserved per-key results; set values for live keys below
525  $dataByKey = array_fill_keys( $keys, null );
526 
527  $readTime = (int)$this->getCurrentTime();
528  $keysByTableByShard = [];
529  foreach ( $keys as $key ) {
530  list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
531  $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
532  }
533 
534  foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
535  try {
536  $db = $this->getConnection( $shardIndex );
537  foreach ( $serverKeys as $partitionTable => $tableKeys ) {
538  $res = $db->select(
539  $partitionTable,
540  $getCasToken
541  ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
542  : [ 'keyname', 'value', 'exptime' ],
543  $this->buildExistenceConditions( $db, $tableKeys, $readTime ),
544  __METHOD__
545  );
546  foreach ( $res as $row ) {
547  $row->shardIndex = $shardIndex;
548  $row->tableName = $partitionTable;
549  $dataByKey[$row->keyname] = $row;
550  }
551  }
552  } catch ( DBError $e ) {
553  $this->handleDBError( $e, $shardIndex );
554  }
555  }
556 
557  foreach ( $keys as $key ) {
558  $row = $dataByKey[$key] ?? null;
559  if ( !$row ) {
560  continue;
561  }
562 
563  $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
564  try {
565  $db = $this->getConnection( $row->shardIndex );
566  $dataByKey[$key] = [
567  self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
568  self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
569  self::BLOB_CASTOKEN => $getCasToken
570  ? $this->getCasTokenFromRow( $db, $row )
571  : null
572  ];
573  } catch ( DBQueryError $e ) {
574  $this->handleDBError( $e, $row->shardIndex );
575  }
576  }
577 
578  return $dataByKey;
579  }
580 
595  private function modifyBlobs(
596  callable $tableWriteCallback,
597  float $mtime,
598  array $argsByKey,
599  int $flags,
600  &$resByKey = []
601  ) {
602  // Initialize order-preserved per-key results; callbacks mark successful results
603  $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
604 
606  $silenceScope = $this->silenceTransactionProfiler();
607 
608  $argsByKeyByTableByShard = [];
609  foreach ( $argsByKey as $key => $args ) {
610  list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
611  $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
612  }
613 
614  $shardIndexesAffected = [];
615  foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
616  foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
617  try {
618  $db = $this->getConnection( $shardIndex );
619  $shardIndexesAffected[] = $shardIndex;
620  $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
621  } catch ( DBError $e ) {
622  $this->handleDBError( $e, $shardIndex );
623  continue;
624  }
625  }
626  }
627 
628  $success = !in_array( false, $resByKey, true );
629 
630  if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) {
631  foreach ( $shardIndexesAffected as $shardIndex ) {
632  if ( !$this->waitForReplication( $shardIndex ) ) {
633  $success = false;
634  }
635  }
636  }
637 
638  foreach ( $shardIndexesAffected as $shardIndex ) {
639  try {
640  $db = $this->getConnection( $shardIndex );
641  $this->occasionallyGarbageCollect( $db );
642  } catch ( DBError $e ) {
643  $this->handleDBError( $e, $shardIndex );
644  }
645  }
646 
647  return $success;
648  }
649 
666  IDatabase $db,
667  string $ptable,
668  float $mtime,
669  array $argsByKey,
670  array &$resByKey
671  ) {
672  $valueSizesByKey = [];
673 
674  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
675 
676  if ( $this->multiPrimaryModeType !== null ) {
677  // @TODO: use multi-row upsert() with VALUES() once supported in Database
678  foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
679  $serialValue = $this->getSerialized( $value, $key );
680  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
681  $db->upsert(
682  $ptable,
683  $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
684  [ [ 'keyname' ] ],
685  $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
686  __METHOD__
687  );
688  $resByKey[$key] = true;
689 
690  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
691  }
692  } else {
693  // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
694  $rows = [];
695  foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
696  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
697  $serialValue = $this->getSerialized( $value, $key );
698  $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
699 
700  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
701  }
702  $db->replace( $ptable, 'keyname', $rows, __METHOD__ );
703  foreach ( $argsByKey as $key => $unused ) {
704  $resByKey[$key] = true;
705  }
706  }
707 
708  $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
709  }
710 
728  IDatabase $db,
729  string $ptable,
730  float $mtime,
731  array $argsByKey,
732  array &$resByKey
733  ) {
734  if ( $this->isMultiPrimaryModeEnabled() ) {
735  // Tombstone keys in order to respect eventual consistency
736  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
737  $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
738  $rows = [];
739  foreach ( $argsByKey as $key => $arg ) {
740  $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
741  }
742  $db->upsert(
743  $ptable,
744  $rows,
745  [ [ 'keyname' ] ],
746  $this->buildUpsertSetForOverwrite( $db, self::TOMB_SERIAL, $expiry, $mt ),
747  __METHOD__
748  );
749  } else {
750  // Just purge the keys since there is only one primary (e.g. "source of truth")
751  $db->delete( $ptable, [ 'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
752  }
753 
754  foreach ( $argsByKey as $key => $arg ) {
755  $resByKey[$key] = true;
756  }
757 
758  $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
759  }
760 
782  IDatabase $db,
783  string $ptable,
784  float $mtime,
785  array $argsByKey,
786  array &$resByKey
787  ) {
788  $valueSizesByKey = [];
789 
790  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
791 
792  // This check must happen outside the write query to respect eventual consistency
793  $existingKeys = $db->selectFieldValues(
794  $ptable,
795  'keyname',
796  $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
797  __METHOD__
798  );
799  $existingByKey = array_fill_keys( $existingKeys, true );
800 
801  // @TODO: use multi-row upsert() with VALUES() once supported in Database
802  foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
803  if ( isset( $existingByKey[$key] ) ) {
804  $this->logger->debug( __METHOD__ . ": $key already exists" );
805  continue;
806  }
807 
808  $serialValue = $this->getSerialized( $value, $key );
809  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
810  $db->upsert(
811  $ptable,
812  $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
813  [ [ 'keyname' ] ],
814  $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
815  __METHOD__
816  );
817  $resByKey[$key] = true;
818 
819  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
820  }
821 
822  $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
823  }
824 
846  IDatabase $db,
847  string $ptable,
848  float $mtime,
849  array $argsByKey,
850  array &$resByKey
851  ) {
852  $valueSizesByKey = [];
853 
854  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
855 
856  // This check must happen outside the write query to respect eventual consistency
857  $res = $db->select(
858  $ptable,
859  $this->addCasTokenFields( $db, [ 'keyname' ] ),
860  $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
861  __METHOD__
862  );
863 
864  $curTokensByKey = [];
865  foreach ( $res as $row ) {
866  $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
867  }
868 
869  // @TODO: use multi-row upsert() with VALUES() once supported in Database
870  foreach ( $argsByKey as $key => list( $value, $exptime, $casToken ) ) {
871  $curToken = $curTokensByKey[$key] ?? null;
872  if ( $curToken === null ) {
873  $this->logger->debug( __METHOD__ . ": $key does not exists" );
874  continue;
875  }
876 
877  if ( $curToken !== $casToken ) {
878  $this->logger->debug( __METHOD__ . ": $key does not have a matching token" );
879  continue;
880  }
881 
882  $serialValue = $this->getSerialized( $value, $key );
883  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
884  $db->upsert(
885  $ptable,
886  $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
887  [ [ 'keyname' ] ],
888  $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
889  __METHOD__
890  );
891  $resByKey[$key] = true;
892 
893  $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
894  }
895 
896  $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
897  }
898 
919  IDatabase $db,
920  string $ptable,
921  float $mtime,
922  array $argsByKey,
923  array &$resByKey
924  ) {
925  if ( $this->isMultiPrimaryModeEnabled() ) {
926  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
927 
928  $res = $db->select(
929  $ptable,
930  [ 'keyname', 'value' ],
931  $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
932  __METHOD__
933  );
934  // @TODO: use multi-row upsert() with VALUES() once supported in Database
935  foreach ( $res as $curRow ) {
936  $key = $curRow->keyname;
937  $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
938  list( $exptime ) = $argsByKey[$key];
939  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
940 
941  $db->upsert(
942  $ptable,
943  $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
944  [ [ 'keyname' ] ],
945  $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
946  __METHOD__
947  );
948  $resByKey[$key] = true;
949  }
950  } else {
951  $keysBatchesByExpiry = [];
952  foreach ( $argsByKey as $key => list( $exptime ) ) {
953  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
954  $keysBatchesByExpiry[$expiry][] = $key;
955  }
956 
957  $existingCount = 0;
958  foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
959  $db->update(
960  $ptable,
961  [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
962  $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ),
963  __METHOD__
964  );
965  $existingCount += $db->affectedRows();
966  }
967  if ( $existingCount === count( $argsByKey ) ) {
968  foreach ( $argsByKey as $key => $args ) {
969  $resByKey[$key] = true;
970  }
971  }
972  }
973 
974  $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
975  }
976 
997  IDatabase $db,
998  string $ptable,
999  float $mtime,
1000  array $argsByKey,
1001  array &$resByKey
1002  ) {
1003  foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) {
1004  $mt = $this->makeTimestampedModificationToken( $mtime, $db );
1005  $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
1006 
1007  // Use a transaction so that changes from other threads are not visible due to
1008  // "consistent reads". This way, the exact post-increment value can be returned.
1009  // The "live key exists" check can go inside the write query and remain safe for
1010  // replication since the TTL for such keys is either indefinite or very short.
1011  $db->startAtomic( __METHOD__ );
1012  $db->upsert(
1013  $ptable,
1014  $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
1015  [ [ 'keyname' ] ],
1016  $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
1017  __METHOD__
1018  );
1019  $affectedCount = $db->affectedRows();
1020  $row = $db->selectRow( $ptable, 'value', [ 'keyname' => $key ], __METHOD__ );
1021  $db->endAtomic( __METHOD__ );
1022 
1023  if ( !$affectedCount || $row === false ) {
1024  $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
1025  continue;
1026  }
1027 
1028  $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
1029  if ( !$this->isInteger( $serialValue ) ) {
1030  $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
1031  continue;
1032  }
1033 
1034  $resByKey[$key] = (int)$serialValue;
1035  }
1036 
1037  $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
1038  }
1039 
1045  private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1046  $expiry = $this->getExpirationAsTimestamp( $exptime );
1047  // Eventual consistency requires the preservation of recently modified keys.
1048  // Do not create rows with `exptime` fields so low that they might get garbage
1049  // collected before being replicated.
1050  if ( $expiry !== self::TTL_INDEFINITE ) {
1051  $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1052  }
1053 
1054  return $expiry;
1055  }
1056 
1075  private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1076  if ( !$this->lock( $key, 0 ) ) {
1077  return null;
1078  }
1079 
1080  $scope = new ScopedCallback( function () use ( $key ) {
1081  $this->unlock( $key );
1082  } );
1083 
1084  // sprintf is used to adjust precision
1085  return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] );
1086  }
1087 
1097  private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) {
1098  // We have reserved space for upto 6 digits in the microsecond portion of the token.
1099  // This is for future use only (maybe CAS tokens) and not currently used.
1100  // It is currently populated by the microsecond portion returned by microtime,
1101  // which generally has fewer than 6 digits of meaningful precision but can still be useful
1102  // in debugging (to see the token continuously change even during rapid testing).
1103  $seconds = (int)$mtime;
1104  list( , $microseconds ) = explode( '.', sprintf( '%.6F', $mtime ) );
1105 
1106  $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );
1107 
1108  $token = implode( '', [
1109  // 67 bit integral portion of UNIX timestamp, qualified
1110  \Wikimedia\base_convert(
1111  // 35 bit integral seconds portion of UNIX timestamp
1112  str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1113  // 32 bit ID of the primary database server handling the write
1114  str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1115  2,
1116  36,
1117  13
1118  ),
1119  // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1120  str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1121  ] );
1122 
1123  if ( strlen( $token ) !== 17 ) {
1124  throw new RuntimeException( "Modification timestamp overflow detected" );
1125  }
1126 
1127  return $token;
1128  }
1129 
1138  private function buildExistenceConditions( IDatabase $db, $keys, int $time ) {
1139  // Note that tombstones always have past expiration dates
1140  return [
1141  'keyname' => $keys,
1142  'exptime >= ' . $db->addQuotes( $db->timestamp( $time ) )
1143  ];
1144  }
1145 
1156  private function buildUpsertRow(
1157  IDatabase $db,
1158  $key,
1159  $serialValue,
1160  int $expiry,
1161  string $mt
1162  ) {
1163  $row = [
1164  'keyname' => $key,
1165  'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1166  'exptime' => $this->encodeDbExpiry( $db, $expiry )
1167  ];
1168  if ( $this->isMultiPrimaryModeEnabled() ) {
1169  $row['modtoken'] = $mt;
1170  }
1171 
1172  return $row;
1173  }
1174 
1184  private function buildUpsertSetForOverwrite(
1185  IDatabase $db,
1186  $serialValue,
1187  int $expiry,
1188  string $mt
1189  ) {
1190  $expressionsByColumn = [
1191  'value' => $db->addQuotes( $this->dbEncodeSerialValue( $db, $serialValue ) ),
1192  'exptime' => $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1193  ];
1194 
1195  $set = [];
1196  if ( $this->isMultiPrimaryModeEnabled() ) {
1197  // The query might take a while to replicate, during which newer values might get
1198  // written. Qualify the query so that it does not override such values. Note that
1199  // duplicate tokens generated around the same time for a key should still result
1200  // in convergence given the use of server_id in modtoken (providing a total order
1201  // among primary DB servers) and MySQL binlog ordering (providing a total order
1202  // for writes replicating from a given primary DB server).
1203  $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1204  foreach ( $expressionsByColumn as $column => $updateExpression ) {
1205  $rhs = $db->conditional(
1206  $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= SUBSTR(modtoken,0,13)',
1207  $updateExpression,
1208  $column
1209  );
1210  $set[] = "{$column}=" . trim( $rhs );
1211  }
1212  } else {
1213  foreach ( $expressionsByColumn as $column => $updateExpression ) {
1214  $set[] = "{$column}={$updateExpression}";
1215  }
1216  }
1217 
1218  return $set;
1219  }
1220 
1232  private function buildIncrUpsertSet(
1233  IDatabase $db,
1234  int $step,
1235  int $init,
1236  int $expiry,
1237  string $mt,
1238  int $mtUnixTs
1239  ) {
1240  // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1241  $expressionsByColumn = [
1242  'value' => [
1243  $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1244  $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1245  ],
1246  'exptime' => [
1247  'exptime',
1248  $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1249  ]
1250  ];
1251  if ( $this->isMultiPrimaryModeEnabled() ) {
1252  $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1253  }
1254 
1255  $set = [];
1256  foreach ( $expressionsByColumn as $column => list( $updateExpression, $initExpression ) ) {
1257  $rhs = $db->conditional(
1258  'exptime >= ' . $db->addQuotes( $db->timestamp( $mtUnixTs ) ),
1259  $updateExpression,
1260  $initExpression
1261  );
1262  $set[] = "{$column}=" . trim( $rhs );
1263  }
1264 
1265  return $set;
1266  }
1267 
1273  private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1274  return ( $expiry === self::TTL_INDEFINITE )
1275  // Use the maximum timestamp that the column can store
1276  ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1277  // Convert the absolute timestamp into the DB timestamp format
1278  : $db->timestamp( $expiry );
1279  }
1280 
1286  private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1287  return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1288  ? self::TTL_INDEFINITE
1289  : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1290  }
1291 
1297  private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1298  return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1299  }
1300 
1306  private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1307  return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1308  }
1309 
1317  private function addCasTokenFields( IDatabase $db, array $fields ) {
1318  $type = $db->getType();
1319 
1320  if ( $type === 'mysql' ) {
1321  $fields['castoken'] = $db->buildConcat( [
1322  'SHA1(value)',
1323  $db->addQuotes( '@' ),
1324  'exptime'
1325  ] );
1326  } elseif ( $type === 'postgres' ) {
1327  $fields['castoken'] = $db->buildConcat( [
1328  'md5(value)',
1329  $db->addQuotes( '@' ),
1330  'exptime'
1331  ] );
1332  } else {
1333  if ( !in_array( 'value', $fields, true ) ) {
1334  $fields[] = 'value';
1335  }
1336  if ( !in_array( 'exptime', $fields, true ) ) {
1337  $fields[] = 'exptime';
1338  }
1339  }
1340 
1341  return $fields;
1342  }
1343 
1351  private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1352  if ( isset( $row->castoken ) ) {
1353  $token = $row->castoken;
1354  } else {
1355  $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1356  $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1357  }
1358 
1359  return $token;
1360  }
1361 
1366  private function occasionallyGarbageCollect( IDatabase $db ) {
1367  if (
1368  // Random purging is enabled
1369  $this->purgePeriod &&
1370  // Only purge on one in every $this->purgePeriod writes
1371  mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1372  // Avoid repeating the delete within a few seconds
1373  ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1374  ) {
1375  $garbageCollector = function () use ( $db ) {
1377  $db,
1378  (int)$this->getCurrentTime(),
1379  $this->purgeLimit
1380  );
1381  $this->lastGarbageCollect = time();
1382  };
1383  if ( $this->asyncHandler ) {
1384  $this->lastGarbageCollect = $this->getCurrentTime(); // avoid duplicate enqueues
1385  ( $this->asyncHandler )( $garbageCollector );
1386  } else {
1387  $garbageCollector();
1388  }
1389  }
1390  }
1391 
1392  public function expireAll() {
1393  $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1394  }
1395 
1397  $timestamp,
1398  callable $progress = null,
1399  $limit = INF,
1400  string $tag = null
1401  ) {
1403  $silenceScope = $this->silenceTransactionProfiler();
1404 
1405  if ( $tag !== null ) {
1406  // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1407  $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1408  } else {
1409  $shardIndexes = $this->getShardServerIndexes();
1410  shuffle( $shardIndexes );
1411  }
1412 
1413  $ok = true;
1414  $numServers = count( $shardIndexes );
1415 
1416  $keysDeletedCount = 0;
1417  foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1418  try {
1419  $db = $this->getConnection( $shardIndex );
1421  $db,
1422  $timestamp,
1423  $limit,
1424  $keysDeletedCount,
1425  [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1426  );
1427  } catch ( DBError $e ) {
1428  $this->handleDBError( $e, $shardIndex );
1429  $ok = false;
1430  }
1431  }
1432 
1433  return $ok;
1434  }
1435 
1446  IDatabase $db,
1447  $timestamp,
1448  $limit,
1449  &$keysDeletedCount = 0,
1450  array $progress = null
1451  ) {
1452  $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1453  if ( $this->isMultiPrimaryModeEnabled() ) {
1454  // Eventual consistency requires the preservation of any key that was recently
1455  // modified. The key must exist on this database server long enough for the server
1456  // to receive, via replication, all writes to the key with lower timestamps. Such
1457  // writes should be no-ops since the existing key value should "win". If the network
1458  // partitions between datacenters A and B for 30 minutes, the database servers in
1459  // each datacenter will see an initial burst of writes with "old" timestamps via
1460  // replication. This might include writes with lower timestamps that the existing
1461  // key value. Therefore, clock skew and replication delay are both factors.
1462  $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1463  $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1464  }
1465  $tableIndexes = range( 0, $this->numTableShards - 1 );
1466  shuffle( $tableIndexes );
1467 
1468  $batchSize = min( $this->writeBatchSize, $limit );
1469 
1470  foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1471  // The oldest expiry of a row we have deleted on this shard
1472  // (the first row that we deleted)
1473  $minExpUnix = null;
1474  // The most recent expiry time so far, from a row we have deleted on this shard
1475  $maxExp = null;
1476  // Size of the time range we'll delete, in seconds (for progress estimate)
1477  $totalSeconds = null;
1478 
1479  do {
1480  $res = $db->select(
1481  $this->getTableNameByShard( $tableIndex ),
1482  [ 'keyname', 'exptime' ],
1483  array_merge(
1484  [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
1485  $maxExp ? [ 'exptime >= ' . $db->addQuotes( $maxExp ) ] : []
1486  ),
1487  __METHOD__,
1488  [ 'LIMIT' => $batchSize, 'ORDER BY' => 'exptime ASC' ]
1489  );
1490 
1491  if ( $res->numRows() ) {
1492  $row = $res->current();
1493  if ( $minExpUnix === null ) {
1494  $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1495  $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1496  }
1497 
1498  $keys = [];
1499  foreach ( $res as $row ) {
1500  $keys[] = $row->keyname;
1501  $maxExp = $row->exptime;
1502  }
1503 
1504  $db->delete(
1505  $this->getTableNameByShard( $tableIndex ),
1506  [
1507  'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
1508  'keyname' => $keys
1509  ],
1510  __METHOD__
1511  );
1512  $keysDeletedCount += $db->affectedRows();
1513  }
1514 
1515  if ( $progress && is_callable( $progress['fn'] ) ) {
1516  if ( $totalSeconds ) {
1517  $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1518  $remainingSeconds = $cutoffUnix - $maxExpUnix;
1519  $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1520  // For example, if we've done 1.5 table shard, and are thus half-way on the
1521  // 2nd of perhaps 5 tables on this server, then this might be:
1522  // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1523  $tablesDoneRatio =
1524  ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1525  } else {
1526  $tablesDoneRatio = 1;
1527  }
1528 
1529  // For example, if we're 30% done on the last of 10 servers, then this might be:
1530  // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1531  $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1532  ( $tablesDoneRatio / $progress['serversTotal'] );
1533  ( $progress['fn'] )( $overallRatio * 100 );
1534  }
1535  } while ( $res->numRows() && $keysDeletedCount < $limit );
1536  }
1537  }
1538 
1544  public function deleteAll() {
1546  $silenceScope = $this->silenceTransactionProfiler();
1547  foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1548  $db = null; // in case of connection failure
1549  try {
1550  $db = $this->getConnection( $shardIndex );
1551  for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1552  $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
1553  }
1554  } catch ( DBError $e ) {
1555  $this->handleDBError( $e, $shardIndex );
1556  return false;
1557  }
1558  }
1559  return true;
1560  }
1561 
1562  public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1564  $silenceScope = $this->silenceTransactionProfiler();
1565 
1566  $lockTsUnix = null;
1567 
1568  list( $shardIndex ) = $this->getKeyLocation( $key );
1569  try {
1570  $db = $this->getConnection( $shardIndex );
1571  $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1572  } catch ( DBError $e ) {
1573  $this->handleDBError( $e, $shardIndex );
1574  $this->logger->warning(
1575  __METHOD__ . ' failed due to I/O error for {key}.',
1576  [ 'key' => $key ]
1577  );
1578  }
1579 
1580  return $lockTsUnix;
1581  }
1582 
1583  public function doUnlock( $key ) {
1585  $silenceScope = $this->silenceTransactionProfiler();
1586 
1587  list( $shardIndex ) = $this->getKeyLocation( $key );
1588 
1589  try {
1590  $db = $this->getConnection( $shardIndex );
1591  $released = $db->unlock( $key, __METHOD__ );
1592  } catch ( DBError $e ) {
1593  $this->handleDBError( $e, $shardIndex );
1594  $released = false;
1595  }
1596 
1597  return $released;
1598  }
1599 
1600  public function makeKeyInternal( $keyspace, $components ) {
1601  // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1602  // subtract the number of characters we need for the keyspace and for
1603  // the separator character needed for each argument. To handle some
1604  // custom prefixes used by thing like WANObjectCache, limit to 205.
1605  $keyspace = strtr( $keyspace, ' ', '_' );
1606  $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1607  foreach ( $components as &$component ) {
1608  $component = strtr( $component, [
1609  ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1610  ':' => '%3A',
1611  ] );
1612 
1613  // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1614  if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1615  $component = '#' . md5( $component );
1616  }
1617  $charsLeft -= strlen( $component );
1618  }
1619 
1620  if ( $charsLeft < 0 ) {
1621  return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1622  }
1623  return $keyspace . ':' . implode( ':', $components );
1624  }
1625 
1626  protected function serialize( $value ) {
1627  if ( is_int( $value ) ) {
1628  return $value;
1629  }
1630 
1631  $serial = serialize( $value );
1632  if ( $this->hasZlib ) {
1633  // On typical message and page data, this can provide a 3X storage savings
1634  $serial = gzdeflate( $serial );
1635  }
1636 
1637  return $serial;
1638  }
1639 
1640  protected function unserialize( $value ) {
1641  if ( $value === self::TOMB_SERIAL ) {
1642  return false; // tombstone
1643  }
1644 
1645  if ( $this->isInteger( $value ) ) {
1646  return (int)$value;
1647  }
1648 
1649  if ( $this->hasZlib ) {
1650  AtEase::suppressWarnings();
1651  $decompressed = gzinflate( $value );
1652  AtEase::restoreWarnings();
1653 
1654  if ( $decompressed !== false ) {
1655  $value = $decompressed;
1656  }
1657  }
1658 
1659  return unserialize( $value );
1660  }
1661 
1667  private function getConnectionViaLoadBalancer( $shardIndex ) {
1668  if ( $shardIndex === self::SHARD_GLOBAL ) {
1669  $lb = $this->globalKeyLb;
1670  $dbDomain = $this->globalKeyLbDomain;
1671  } else {
1672  $lb = $this->localKeyLb;
1673  $dbDomain = false;
1674  }
1675 
1676  if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1677  // Use the main connection to avoid transaction deadlocks
1678  // @phan-suppress-next-line PhanTypeMismatchArgumentNullable dbDomain should be not null here
1679  $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $dbDomain );
1680  } else {
1681  // If the RDBMs has row/table/page level locking, then use separate auto-commit
1682  // connection to avoid needless contention and deadlocks.
1683  $conn = $lb->getMaintenanceConnectionRef(
1684  $this->replicaOnly ? DB_REPLICA : DB_PRIMARY,
1685  [],
1686  // @phan-suppress-next-line PhanTypeMismatchArgumentNullable dbDomain should be not null here
1687  $dbDomain,
1688  $lb::CONN_TRX_AUTOCOMMIT
1689  );
1690  }
1691 
1692  return $conn;
1693  }
1694 
1701  private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1702  if ( !isset( $this->conns[$shardIndex] ) ) {
1704  $conn = Database::factory(
1705  $server['type'],
1706  array_merge(
1707  $server,
1708  [
1709  // Make sure the handle uses autocommit mode
1710  'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1711  'connLogger' => $this->logger,
1712  'queryLogger' => $this->logger
1713  ]
1714  )
1715  );
1716  // Automatically create the objectcache table for sqlite as needed
1717  if ( $conn->getType() === 'sqlite' ) {
1718  $this->initSqliteDatabase( $conn );
1719  }
1720  $this->conns[$shardIndex] = $conn;
1721  }
1722 
1723  // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive
1724  return $this->conns[$shardIndex];
1725  }
1726 
1733  private function handleDBError( DBError $exception, $shardIndex ) {
1734  if ( $exception instanceof DBConnectionError ) {
1735  $this->markServerDown( $exception, $shardIndex );
1736  }
1737 
1738  $this->setAndLogDBError( $exception );
1739  }
1740 
1744  private function setAndLogDBError( DBError $exception ) {
1745  $this->logger->error( "DBError: {$exception->getMessage()}" );
1746  if ( $exception instanceof DBConnectionError ) {
1747  $this->setLastError( self::ERR_UNREACHABLE );
1748  $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1749  } else {
1750  $this->setLastError( self::ERR_UNEXPECTED );
1751  $this->logger->warning( __METHOD__ . ": ignoring query error" );
1752  }
1753  }
1754 
1761  private function markServerDown( DBError $exception, $shardIndex ) {
1762  unset( $this->conns[$shardIndex] ); // bug T103435
1763 
1764  $now = $this->getCurrentTime();
1765  if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1766  if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1767  unset( $this->connFailureTimes[$shardIndex] );
1768  unset( $this->connFailureErrors[$shardIndex] );
1769  } else {
1770  $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1771  return;
1772  }
1773  }
1774  $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1775  $this->connFailureTimes[$shardIndex] = $now;
1776  $this->connFailureErrors[$shardIndex] = $exception;
1777  }
1778 
1783  private function initSqliteDatabase( IMaintainableDatabase $db ) {
1784  if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1785  return;
1786  }
1787  // Use one table for SQLite; sharding does not seem to have much benefit
1788  $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1789  $db->startAtomic( __METHOD__ ); // atomic DDL
1790  try {
1791  $encTable = $db->tableName( 'objectcache' );
1792  $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1793  $db->query(
1794  "CREATE TABLE $encTable (\n" .
1795  " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1796  " value BLOB,\n" .
1797  " exptime BLOB NOT NULL\n" .
1798  ")",
1799  __METHOD__
1800  );
1801  $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1802  $db->endAtomic( __METHOD__ );
1803  } catch ( DBError $e ) {
1804  $db->rollback( __METHOD__ );
1805  throw $e;
1806  }
1807  }
1808 
1824  public function createTables() {
1825  foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1826  $db = $this->getConnection( $shardIndex );
1827  if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1828  for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1829  $encBaseTable = $db->tableName( 'objectcache' );
1830  $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1831  $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1832  }
1833  }
1834  }
1835  }
1836 
1840  private function getShardServerIndexes() {
1841  if ( $this->serverTags ) {
1842  // Striped array of database servers
1843  $shardIndexes = array_keys( $this->serverTags );
1844  } else {
1845  // LoadBalancer based configuration
1846  $shardIndexes = [];
1847  if ( $this->localKeyLb ) {
1848  $shardIndexes[] = self::SHARD_LOCAL;
1849  }
1850  if ( $this->globalKeyLb ) {
1851  $shardIndexes[] = self::SHARD_GLOBAL;
1852  }
1853  }
1854 
1855  return $shardIndexes;
1856  }
1857 
1863  private function getShardServerIndexForTag( string $tag ) {
1864  if ( !$this->serverTags ) {
1865  throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1866  }
1867  foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1868  if ( $tag === $serverTag ) {
1869  return $serverShardIndex;
1870  }
1871  }
1872  throw new InvalidArgumentException( "Unknown server tag: $tag" );
1873  }
1874 
1878  private function isMultiPrimaryModeEnabled() {
1879  return ( $this->multiPrimaryModeType !== null );
1880  }
1881 
1888  private function waitForReplication( $shardIndex ) {
1889  if ( is_int( $shardIndex ) ) {
1890  return true; // striped only, no LoadBalancer
1891  }
1892 
1893  $lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb;
1894  if ( !$lb->hasStreamingReplicaServers() ) {
1895  return true;
1896  }
1897 
1898  try {
1899  // Wait for any replica DBs to catch up
1900  $primaryPos = $lb->getPrimaryPos();
1901  if ( !$primaryPos ) {
1902  return true; // not applicable
1903  }
1904 
1905  $loop = new WaitConditionLoop(
1906  static function () use ( $lb, $primaryPos ) {
1907  return $lb->waitForAll( $primaryPos, 1 );
1908  },
1911  );
1912 
1913  return ( $loop->invoke() === $loop::CONDITION_REACHED );
1914  } catch ( DBError $e ) {
1915  $this->setAndLogDBError( $e );
1916 
1917  return false;
1918  }
1919  }
1920 
1926  private function silenceTransactionProfiler() {
1927  if ( $this->serverInfos ) {
1928  return null; // no TransactionProfiler injected anyway
1929  }
1930  return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1931  }
1932 }
$success
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Definition: ArrayUtils.php:49
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
Definition: BagOStuff.php:515
string $keyspace
Default keyspace; used by makeKey()
Definition: BagOStuff.php:104
fieldHasFlags( $field, $flags)
Definition: BagOStuff.php:623
callable null $asyncHandler
Definition: BagOStuff.php:93
getCurrentTime()
Definition: BagOStuff.php:846
Storage medium specific cache for storing items (e.g.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, using any applicable prepared value.
unlock( $key)
Release an advisory lock on a key string.
updateOpStats(string $op, array $keyInfo)
isInteger( $value)
Check if a value is an integer.
lock( $key, $timeout=6, $exptime=6, $rclass='')
static instance()
Singleton.
Definition: Profiler.php:69
RDBMS-based caching module.
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF, string $tag=null)
Delete all objects expiring before a certain date.
const SHARD_GLOBAL
decodeDbExpiry(IDatabase $db, string $dbExpiry)
modifyTableSpecificBlobsForChangeTTL(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Update the TTL for keys belonging to a partition table on the given server.
doDelete( $key, $flags=0)
Delete an item.
buildUpsertSetForOverwrite(IDatabase $db, $serialValue, int $expiry, string $mt)
SET array for handling key overwrites when a live or stale key exists.
const INF_TIMESTAMP_PLACEHOLDER
Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types.
float $lastGarbageCollect
UNIX timestamp.
makeTimestampedModificationToken(float $mtime, IDatabase $db)
Make a modtoken column value with the original time and source database server of a write.
string[] $serverTags
(server index => tag/host name)
doSetMulti(array $data, $exptime=0, $flags=0)
makeNewKeyExpiry( $exptime, int $nowTsUnix)
buildUpsertRow(IDatabase $db, $key, $serialValue, int $expiry, string $mt)
INSERT array for handling key writes/overwrites when no live nor stale key exists.
createTables()
Create the shard tables on all databases.
encodeDbExpiry(IDatabase $db, int $expiry)
addCasTokenFields(IDatabase $db, array $fields)
Either append a 'castoken' field or append the fields needed to compute the CAS token.
serialize( $value)
deleteServerObjectsExpiringBefore(IDatabase $db, $timestamp, $limit, &$keysDeletedCount=0, array $progress=null)
modifyTableSpecificBlobsForIncrInit(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Either increment a counter key, if it exists, or initialize it, otherwise.
modifyBlobs(callable $tableWriteCallback, float $mtime, array $argsByKey, int $flags, &$resByKey=[])
int $purgePeriod
Average number of writes required to trigger garbage collection.
getConnectionViaLoadBalancer( $shardIndex)
bool $hasZlib
Whether zlib methods are available to PHP.
doChangeTTL( $key, $exptime, $flags)
incr( $key, $value=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
unserialize( $value)
waitForReplication( $shardIndex)
Wait for replica DBs to catch up to the primary DB.
array[] $serverInfos
(server index => server config)
fetchBlobs(array $keys, bool $getCasToken=false)
modifyTableSpecificBlobsForCas(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Insert key/value pairs belonging to a partition table on the given server.
deleteAll()
Delete content of shard tables in every server.
doDeleteMulti(array $keys, $flags=0)
getConnectionFromServerInfo( $shardIndex, array $server)
string null $multiPrimaryModeType
Multi-primary mode DB type ("mysql",...); null if not enabled.
int $numTableShards
Number of table shards to use on each server.
int $purgeLimit
Max expired rows to purge during randomized garbage collection.
__construct( $params)
Create a new backend instance from configuration.
markServerDown(DBError $exception, $shardIndex)
Mark a server down due to a DBConnectionError exception.
doIncr( $key, $value=1, $flags=0)
modifyTableSpecificBlobsForAdd(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Insert key/value pairs belonging to a partition table on the given server.
const SAFE_PURGE_DELAY_SEC
A number of seconds well above any expected clock skew and replication lag.
doGet( $key, $flags=0, &$casToken=null)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
const TOMB_SERIAL
Distinct string for tombstones stored in the "serialized" value column.
bool $replicaOnly
Whether to use replicas instead of primaries (if using LoadBalancer)
buildExistenceConditions(IDatabase $db, $keys, int $time)
WHERE conditions that check for existence and liveness of keys.
handleDBError(DBError $exception, $shardIndex)
Handle a DBError which occurred during a read operation.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
modifyTableSpecificBlobsForDelete(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Purge/tombstone key/value pairs belonging to a partition table on the given server.
const BLOB_CASTOKEN
dbEncodeSerialValue(IDatabase $db, $serialValue)
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
ILoadBalancer null $localKeyLb
getCasTokenFromRow(IDatabase $db, stdClass $row)
Get a CAS token from a SELECT result row.
silenceTransactionProfiler()
Silence the transaction profiler until the return value falls out of scope.
makeKeyInternal( $keyspace, $components)
Make a cache key for the given keyspace and components.
getConnection( $shardIndex)
Get a connection to the specified database.
const SHARD_LOCAL
doLock( $key, $timeout=6, $exptime=6)
ILoadBalancer null $globalKeyLb
Exception[] $connFailureErrors
Map of (shard index => Exception)
doIncrWithInit( $key, $exptime, $step, $init, $flags)
setAndLogDBError(DBError $exception)
const TOMB_EXPTIME
Relative seconds-to-live to use for tombstones.
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
newLockingWriteSectionModificationTimestamp( $key, &$scope)
Get a scoped lock and modification timestamp for a critical section of reads/writes.
getShardServerIndexForTag(string $tag)
string $tableName
modifyTableSpecificBlobsForSet(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Set key/value pairs belonging to a partition table on the given server.
getKeyLocation( $key)
Get the server index and table name for a given key.
getTableNameByShard( $index)
Get the table name for a given shard index.
const SAFE_CLOCK_BOUND_SEC
A number of seconds well above any expected clock skew.
const GC_DELAY_SEC
How many seconds must pass before triggering a garbage collection.
initSqliteDatabase(IMaintainableDatabase $db)
dbDecodeSerialValue(IDatabase $db, $blob)
string false null $globalKeyLbDomain
DB name used for keys using the "global key" LoadBalancer.
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Check and set an item.
occasionallyGarbageCollect(IDatabase $db)
buildIncrUpsertSet(IDatabase $db, int $step, int $init, int $expiry, string $mt, int $mtUnixTs)
SET array for handling key overwrites when a live or stale key exists.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
Database error base class.
Definition: DBError.php:32
const ATTR_EMULATION
Emulation/fallback mode; see QOS_EMULATION_*; higher is better.
const QOS_DURABILITY_RDBMS
Data is saved to disk and writes usually block on fsync(), like a standard RDBMS.
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
const QOS_EMULATION_SQL
Fallback disk-based SQL store.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:40
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
selectRow( $table, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
tableExists( $table, $fname=__METHOD__)
Query whether a given table exists.
getTopologyBasedServerId()
Get a non-recycled ID that uniquely identifies this server within the replication topology.
affectedRows()
Get the number of rows affected by the last write query.
delete( $table, $conds, $fname=__METHOD__)
Delete all rows in a table that match a condition.
update( $table, $set, $conds, $fname=__METHOD__, $options=[])
Update all rows in a table that match a given condition.
upsert( $table, array $rows, $uniqueKeys, array $set, $fname=__METHOD__)
Upsert row(s) into a table, in the provided order, while updating conflicting rows.
getType()
Get the RDBMS type of the server (e.g.
tablePrefix( $prefix=null)
Get/set the table prefix.
decodeBlob( $b)
Some DBMSs return a special placeholder object representing blob fields in result objects.
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
encodeBlob( $b)
Some DBMSs have a special format for inserting into blob fields, they don't allow simple quoted strin...
replace( $table, $uniqueKeys, $rows, $fname=__METHOD__)
Insert row(s) into a table, in the provided order, while deleting conflicting rows.
selectFieldValues( $table, $var, $cond='', $fname=__METHOD__, $options=[], $join_conds=[])
A SELECT wrapper which returns a list of single field values from result rows.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
getServerName()
Get the readable name for the server.
Database cluster connection, tracking, load balancing, and transaction manager interface.
getMaintenanceConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle, suitable for migrations and schema changes, for a server index.
Advanced database interface for IDatabase handles that include maintenance methods.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.
conditional( $cond, $caseTrueExpression, $caseFalseExpression)
Returns an SQL expression for a simple conditional.
addIdentifierQuotes( $s)
Escape a SQL identifier (e.g.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
buildConcat( $stringList)
Build a concatenation list to feed into a SQL query.
if( $line===false) $args
Definition: mcc.php:124
const DB_REPLICA
Definition: defines.php:25
const DB_PRIMARY
Definition: defines.php:27
const DBO_TRX
Definition: defines.php:12