MediaWiki  master
SqlBlobStore.php
Go to the documentation of this file.
1 <?php
26 namespace MediaWiki\Storage;
27 
28 use AppendIterator;
33 use IDBAccessObject;
34 use InvalidArgumentException;
35 use StatusValue;
36 use WANObjectCache;
37 use Wikimedia\Assert\Assert;
38 use Wikimedia\AtEase\AtEase;
42 
52 
53  // Note: the name has been taken unchanged from the old Revision class.
54  public const TEXT_CACHE_GROUP = 'revisiontext:10';
55 
59  private $dbLoadBalancer;
60 
64  private $extStoreAccess;
65 
69  private $cache;
70 
74  private $dbDomain;
75 
79  private $cacheExpiry = 604800; // 7 days
80 
84  private $compressBlobs = false;
85 
89  private $legacyEncoding = false;
90 
94  private $useExternalStore = false;
95 
107  public function __construct(
108  ILoadBalancer $dbLoadBalancer,
109  ExternalStoreAccess $extStoreAccess,
110  WANObjectCache $cache,
111  $dbDomain = false
112  ) {
113  $this->dbLoadBalancer = $dbLoadBalancer;
114  $this->extStoreAccess = $extStoreAccess;
115  $this->cache = $cache;
116  $this->dbDomain = $dbDomain;
117  }
118 
122  public function getCacheExpiry() {
123  return $this->cacheExpiry;
124  }
125 
129  public function setCacheExpiry( int $cacheExpiry ) {
130  $this->cacheExpiry = $cacheExpiry;
131  }
132 
136  public function getCompressBlobs() {
137  return $this->compressBlobs;
138  }
139 
143  public function setCompressBlobs( $compressBlobs ) {
144  $this->compressBlobs = $compressBlobs;
145  }
146 
151  public function getLegacyEncoding() {
152  return $this->legacyEncoding;
153  }
154 
163  public function setLegacyEncoding( string $legacyEncoding ) {
164  $this->legacyEncoding = $legacyEncoding;
165  }
166 
170  public function getUseExternalStore() {
171  return $this->useExternalStore;
172  }
173 
177  public function setUseExternalStore( bool $useExternalStore ) {
178  $this->useExternalStore = $useExternalStore;
179  }
180 
184  private function getDBLoadBalancer() {
185  return $this->dbLoadBalancer;
186  }
187 
193  private function getDBConnection( $index ) {
194  $lb = $this->getDBLoadBalancer();
195  return $lb->getConnectionRef( $index, [], $this->dbDomain );
196  }
197 
208  public function storeBlob( $data, $hints = [] ) {
209  $flags = $this->compressData( $data );
210 
211  # Write to external storage if required
212  if ( $this->useExternalStore ) {
213  // Store and get the URL
214  try {
215  $data = $this->extStoreAccess->insert( $data, [ 'domain' => $this->dbDomain ] );
216  } catch ( ExternalStoreException $e ) {
217  throw new BlobAccessException( $e->getMessage(), 0, $e );
218  }
219  if ( !$data ) {
220  throw new BlobAccessException( "Failed to store text to external storage" );
221  }
222  if ( $flags ) {
223  $flags .= ',';
224  }
225  $flags .= 'external';
226 
227  // TODO: we could also return an address for the external store directly here.
228  // That would mean bypassing the text table entirely when the external store is
229  // used. We'll need to assess expected fallout before doing that.
230  }
231 
232  $dbw = $this->getDBConnection( DB_PRIMARY );
233 
234  $dbw->newInsertQueryBuilder()
235  ->insertInto( 'text' )
236  ->row( [ 'old_text' => $data, 'old_flags' => $flags ] )
237  ->caller( __METHOD__ )->execute();
238 
239  $textId = $dbw->insertId();
240 
241  return self::makeAddressFromTextId( $textId );
242  }
243 
256  public function getBlob( $blobAddress, $queryFlags = 0 ) {
257  Assert::parameterType( 'string', $blobAddress, '$blobAddress' );
258 
259  $error = null;
260  $blob = $this->cache->getWithSetCallback(
261  $this->getCacheKey( $blobAddress ),
262  $this->getCacheTTL(),
263  function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
264  // Ignore $setOpts; blobs are immutable and negatives are not cached
265  [ $result, $errors ] = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
266  // No negative caching; negative hits on text rows may be due to corrupted replica DBs
267  $error = $errors[$blobAddress] ?? null;
268  if ( $error ) {
269  $ttl = WANObjectCache::TTL_UNCACHEABLE;
270  }
271  return $result[$blobAddress];
272  },
273  $this->getCacheOptions()
274  );
275 
276  if ( $error ) {
277  if ( $error[0] === 'badrevision' ) {
278  throw new BadBlobException( $error[1] );
279  } else {
280  throw new BlobAccessException( $error[1] );
281  }
282  }
283 
284  Assert::postcondition( is_string( $blob ), 'Blob must not be null' );
285  return $blob;
286  }
287 
299  public function getBlobBatch( $blobAddresses, $queryFlags = 0 ) {
300  // FIXME: All caching has temporarily been removed in I94c6f9ba7b9caeeb due to T235188.
301  // Caching behavior should be restored by reverting I94c6f9ba7b9caeeb as soon as
302  // the root cause of T235188 has been resolved.
303 
304  [ $blobsByAddress, $errors ] = $this->fetchBlobs( $blobAddresses, $queryFlags );
305 
306  $blobsByAddress = array_map( static function ( $blob ) {
307  return $blob === false ? null : $blob;
308  }, $blobsByAddress );
309 
310  $result = StatusValue::newGood( $blobsByAddress );
311  foreach ( $errors as $error ) {
312  // @phan-suppress-next-line PhanParamTooFewUnpack
313  $result->warning( ...$error );
314  }
315  return $result;
316  }
317 
332  private function fetchBlobs( $blobAddresses, $queryFlags ) {
333  $textIdToBlobAddress = [];
334  $result = [];
335  $errors = [];
336  foreach ( $blobAddresses as $blobAddress ) {
337  try {
338  [ $schema, $id ] = self::splitBlobAddress( $blobAddress );
339  } catch ( InvalidArgumentException $ex ) {
340  throw new BlobAccessException(
341  $ex->getMessage() . '. Use findBadBlobs.php to remedy.',
342  0,
343  $ex
344  );
345  }
346 
347  // TODO: MCR: also support 'ex' schema with ExternalStore URLs, plus flags encoded in the URL!
348  if ( $schema === 'bad' ) {
349  // Database row was marked as "known bad"
350  wfDebug(
351  __METHOD__
352  . ": loading known-bad content ($blobAddress), returning empty string"
353  );
354  $result[$blobAddress] = '';
355  $errors[$blobAddress] = [
356  'badrevision',
357  'The content of this revision is missing or corrupted (bad schema)'
358  ];
359  } elseif ( $schema === 'tt' ) {
360  $textId = intval( $id );
361 
362  if ( $textId < 1 || $id !== (string)$textId ) {
363  $errors[$blobAddress] = [
364  'internalerror',
365  "Bad blob address: $blobAddress. Use findBadBlobs.php to remedy."
366  ];
367  $result[$blobAddress] = false;
368  }
369 
370  $textIdToBlobAddress[$textId] = $blobAddress;
371  } else {
372  $errors[$blobAddress] = [
373  'internalerror',
374  "Unknown blob address schema: $schema. Use findBadBlobs.php to remedy."
375  ];
376  $result[$blobAddress] = false;
377  }
378  }
379 
380  $textIds = array_keys( $textIdToBlobAddress );
381  if ( !$textIds ) {
382  return [ $result, $errors ];
383  }
384  // Callers doing updates will pass in READ_LATEST as usual. Since the text/blob tables
385  // do not normally get rows changed around, set READ_LATEST_IMMUTABLE in those cases.
386  $queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, self::READ_LATEST )
387  ? self::READ_LATEST_IMMUTABLE
388  : 0;
389  [ $index, $options, $fallbackIndex, $fallbackOptions ] =
390  DBAccessObjectUtils::getDBOptions( $queryFlags );
391  // Text data is immutable; check replica DBs first.
392  $dbConnection = $this->getDBConnection( $index );
393  $rows = $dbConnection->newSelectQueryBuilder()
394  ->select( [ 'old_id', 'old_text', 'old_flags' ] )
395  ->from( 'text' )
396  ->where( [ 'old_id' => $textIds ] )
397  ->options( $options )
398  ->caller( __METHOD__ )->fetchResultSet();
399  $numRows = 0;
400  if ( $rows instanceof IResultWrapper ) {
401  $numRows = $rows->numRows();
402  }
403 
404  // Fallback to DB_PRIMARY in some cases if not all the rows were found, using the appropriate
405  // options, such as FOR UPDATE to avoid missing rows due to REPEATABLE-READ.
406  if ( $numRows !== count( $textIds ) && $fallbackIndex !== null ) {
407  $fetchedTextIds = [];
408  foreach ( $rows as $row ) {
409  $fetchedTextIds[] = $row->old_id;
410  }
411  $missingTextIds = array_diff( $textIds, $fetchedTextIds );
412  $dbConnection = $this->getDBConnection( $fallbackIndex );
413  $rowsFromFallback = $dbConnection->newSelectQueryBuilder()
414  ->select( [ 'old_id', 'old_text', 'old_flags' ] )
415  ->from( 'text' )
416  ->where( [ 'old_id' => $missingTextIds ] )
417  ->options( $fallbackOptions )
418  ->caller( __METHOD__ )->fetchResultSet();
419  $appendIterator = new AppendIterator();
420  $appendIterator->append( $rows );
421  $appendIterator->append( $rowsFromFallback );
422  $rows = $appendIterator;
423  }
424 
425  foreach ( $rows as $row ) {
426  $blobAddress = $textIdToBlobAddress[$row->old_id];
427  $blob = false;
428  if ( $row->old_text !== null ) {
429  $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
430  }
431  if ( $blob === false ) {
432  $errors[$blobAddress] = [
433  'internalerror',
434  "Bad data in text row {$row->old_id}. Use findBadBlobs.php to remedy."
435  ];
436  }
437  $result[$blobAddress] = $blob;
438  }
439 
440  // If we're still missing some of the rows, set errors for missing blobs.
441  if ( count( $result ) !== count( $blobAddresses ) ) {
442  foreach ( $blobAddresses as $blobAddress ) {
443  if ( !isset( $result[$blobAddress ] ) ) {
444  $errors[$blobAddress] = [
445  'internalerror',
446  "Unable to fetch blob at $blobAddress. Use findBadBlobs.php to remedy."
447  ];
448  $result[$blobAddress] = false;
449  }
450  }
451  }
452  return [ $result, $errors ];
453  }
454 
465  private function getCacheKey( $blobAddress ) {
466  return $this->cache->makeGlobalKey(
467  'SqlBlobStore-blob',
468  $this->dbLoadBalancer->resolveDomainID( $this->dbDomain ),
469  $blobAddress
470  );
471  }
472 
478  private function getCacheOptions() {
479  return [
480  'pcGroup' => self::TEXT_CACHE_GROUP,
481  'pcTTL' => WANObjectCache::TTL_PROC_LONG,
482  'segmentable' => true
483  ];
484  }
485 
506  public function expandBlob( $raw, $flags, $blobAddress = null ) {
507  if ( is_string( $flags ) ) {
508  $flags = self::explodeFlags( $flags );
509  }
510  if ( in_array( 'error', $flags ) ) {
511  throw new BadBlobException(
512  "The content of this revision is missing or corrupted (error flag)"
513  );
514  }
515 
516  // Use external methods for external objects, text in table is URL-only then
517  if ( in_array( 'external', $flags ) ) {
518  $url = $raw;
519  $parts = explode( '://', $url, 2 );
520  if ( count( $parts ) == 1 || $parts[1] == '' ) {
521  return false;
522  }
523 
524  if ( $blobAddress ) {
525  // The cached value should be decompressed, so handle that and return here.
526  return $this->cache->getWithSetCallback(
527  $this->getCacheKey( $blobAddress ),
528  $this->getCacheTTL(),
529  function () use ( $url, $flags ) {
530  // Ignore $setOpts; blobs are immutable and negatives are not cached
531  $blob = $this->extStoreAccess
532  ->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
533 
534  return $blob === false ? false : $this->decompressData( $blob, $flags );
535  },
536  $this->getCacheOptions()
537  );
538  } else {
539  $blob = $this->extStoreAccess->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
540  return $blob === false ? false : $this->decompressData( $blob, $flags );
541  }
542  } else {
543  return $this->decompressData( $raw, $flags );
544  }
545  }
546 
563  public function compressData( &$blob ) {
564  $blobFlags = [];
565 
566  // Revisions not marked as UTF-8 will have legacy decoding applied by decompressData().
567  // XXX: if $this->legacyEncoding is not set, we could skip this. That would however be
568  // risky, since $this->legacyEncoding being set in the future would lead to data corruption.
569  $blobFlags[] = 'utf-8';
570 
571  if ( $this->compressBlobs ) {
572  if ( function_exists( 'gzdeflate' ) ) {
573  $deflated = gzdeflate( $blob );
574 
575  if ( $deflated === false ) {
576  wfLogWarning( __METHOD__ . ': gzdeflate() failed' );
577  } else {
578  $blob = $deflated;
579  $blobFlags[] = 'gzip';
580  }
581  } else {
582  wfDebug( __METHOD__ . " -- no zlib support, not compressing" );
583  }
584  }
585  return implode( ',', $blobFlags );
586  }
587 
603  public function decompressData( string $blob, array $blobFlags ) {
604  if ( in_array( 'error', $blobFlags ) ) {
605  // Error row, return false
606  return false;
607  }
608 
609  if ( in_array( 'gzip', $blobFlags ) ) {
610  # Deal with optional compression of archived pages.
611  # This can be done periodically via maintenance/compressOld.php, and
612  # as pages are saved if $wgCompressRevisions is set.
613  $blob = gzinflate( $blob );
614 
615  if ( $blob === false ) {
616  wfWarn( __METHOD__ . ': gzinflate() failed' );
617  return false;
618  }
619  }
620 
621  if ( in_array( 'object', $blobFlags ) ) {
622  # Generic compressed storage
623  $obj = HistoryBlobUtils::unserialize( $blob );
624  if ( !$obj ) {
625  // Invalid object
626  return false;
627  }
628  $blob = $obj->getText();
629  }
630 
631  // Needed to support old revisions from before MW 1.5.
632  if ( $blob !== false && $this->legacyEncoding
633  && !in_array( 'utf-8', $blobFlags ) && !in_array( 'utf8', $blobFlags )
634  ) {
635  # Old revisions kept around in a legacy encoding?
636  # Upconvert on demand.
637  # ("utf8" checked for compatibility with some broken
638  # conversion scripts 2008-12-30)
639  # Even with //IGNORE iconv can whine about illegal characters in
640  # *input* string. We just ignore those too.
641  # REF: https://bugs.php.net/bug.php?id=37166
642  # REF: https://phabricator.wikimedia.org/T18885
643  AtEase::suppressWarnings();
644  $blob = iconv( $this->legacyEncoding, 'UTF-8//IGNORE', $blob );
645  AtEase::restoreWarnings();
646  }
647 
648  return $blob;
649  }
650 
658  private function getCacheTTL() {
659  $cache = $this->cache;
660 
661  if ( $cache->getQoS( $cache::ATTR_DURABILITY ) >= $cache::QOS_DURABILITY_RDBMS ) {
662  // Do not cache RDBMs blobs in...the RDBMs store
663  $ttl = $cache::TTL_UNCACHEABLE;
664  } else {
665  $ttl = $this->cacheExpiry ?: $cache::TTL_UNCACHEABLE;
666  }
667 
668  return $ttl;
669  }
670 
691  public function getTextIdFromAddress( $address ) {
692  [ $schema, $id, ] = self::splitBlobAddress( $address );
693 
694  if ( $schema !== 'tt' ) {
695  return null;
696  }
697 
698  $textId = intval( $id );
699 
700  if ( !$textId || $id !== (string)$textId ) {
701  throw new InvalidArgumentException( "Malformed text_id: $id" );
702  }
703 
704  return $textId;
705  }
706 
720  public static function makeAddressFromTextId( $id ) {
721  return 'tt:' . $id;
722  }
723 
730  public static function explodeFlags( string $flagsString ) {
731  return $flagsString === '' ? [] : explode( ',', $flagsString );
732  }
733 
744  public static function splitBlobAddress( $address ) {
745  if ( !preg_match( '/^([-+.\w]+):([^\s?]+)(\?([^\s]*))?$/', $address, $m ) ) {
746  throw new InvalidArgumentException( "Bad blob address: $address" );
747  }
748 
749  $schema = strtolower( $m[1] );
750  $id = $m[2];
751  $parameters = wfCgiToArray( $m[4] ?? '' );
752 
753  return [ $schema, $id, $parameters ];
754  }
755 
756  public function isReadOnly() {
757  if ( $this->useExternalStore && $this->extStoreAccess->isReadOnly() ) {
758  return true;
759  }
760 
761  return ( $this->getDBLoadBalancer()->getReadOnlyReason() !== false );
762  }
763 }
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfWarn( $msg, $callerOffset=1, $level=E_USER_NOTICE)
Send a warning either to the debug log or in a PHP error depending on $wgDevelopmentWarnings.
wfLogWarning( $msg, $callerOffset=1, $level=E_USER_WARNING)
Send a warning as a PHP error and the debug log.
wfCgiToArray( $query)
This is the logical opposite of wfArrayToCgi(): it accepts a query string as its argument and returns...
Helper class for DAO classes.
static getDBOptions( $bitfield)
Get an appropriate DB index, options, and fallback DB index for a query.
static hasFlags( $bitfield, $flags)
This is the main interface for fetching or inserting objects with ExternalStore.
static unserialize(string $str, bool $allowDouble=false)
Unserialize a HistoryBlob.
Exception thrown when a blob has the "bad" content address schema, or has "error" in its old_flags,...
Exception representing a failure to access a data blob.
Service for storing and loading Content objects representing revision data blobs.
static makeAddressFromTextId( $id)
Returns an address referring to content stored in the text table row with the given ID.
getTextIdFromAddress( $address)
Returns an ID corresponding to the old_id field in the text table, corresponding to the given $addres...
__construct(ILoadBalancer $dbLoadBalancer, ExternalStoreAccess $extStoreAccess, WANObjectCache $cache, $dbDomain=false)
decompressData(string $blob, array $blobFlags)
Re-converts revision text according to its flags.
setCacheExpiry(int $cacheExpiry)
getBlob( $blobAddress, $queryFlags=0)
Retrieve a blob, given an address.
setLegacyEncoding(string $legacyEncoding)
Set the legacy encoding to assume for blobs that do not have the utf-8 flag set.
compressData(&$blob)
If $wgCompressRevisions is enabled, we will compress data.
static splitBlobAddress( $address)
Splits a blob address into three parts: the schema, the ID, and parameters/flags.
getBlobBatch( $blobAddresses, $queryFlags=0)
A batched version of BlobStore::getBlob.
storeBlob( $data, $hints=[])
Stores an arbitrary blob of data and returns an address that can be used with getBlob() to retrieve t...
setUseExternalStore(bool $useExternalStore)
isReadOnly()
Check if the blob metadata or backing blob data store is read-only.
setCompressBlobs( $compressBlobs)
expandBlob( $raw, $flags, $blobAddress=null)
Expand a raw data blob according to the flags given.
static explodeFlags(string $flagsString)
Split a comma-separated old_flags value into its constituent parts.
Generic operation result class Has warning/error list, boolean status and arbitrary value.
Definition: StatusValue.php:46
static newGood( $value=null)
Factory function for good results.
Definition: StatusValue.php:85
Multi-datacenter aware caching interface.
Interface for database access objects.
Service for loading and storing data blobs.
Definition: BlobStore.php:33
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:36
This class is a delegate to ILBFactory for a given database cluster.
Result wrapper for grabbing data queried from an IDatabase object.
const DB_PRIMARY
Definition: defines.php:28
return true
Definition: router.php:90