MediaWiki  master
SqlBlobStore.php
Go to the documentation of this file.
1 <?php
27 namespace MediaWiki\Storage;
28 
29 use AppendIterator;
31 use IDBAccessObject;
32 use IExpiringStore;
34 use MWException;
35 use StatusValue;
36 use WANObjectCache;
42 
52 
53  // Note: the name has been taken unchanged from the Revision class.
54  const TEXT_CACHE_GROUP = 'revisiontext:10';
55 
59  private $dbLoadBalancer;
60 
64  private $extStoreAccess;
65 
69  private $cache;
70 
74  private $dbDomain;
75 
79  private $cacheExpiry = 604800; // 7 days
80 
84  private $compressBlobs = false;
85 
89  private $legacyEncoding = false;
90 
94  private $useExternalStore = false;
95 
107  public function __construct(
111  $dbDomain = false
112  ) {
113  $this->dbLoadBalancer = $dbLoadBalancer;
114  $this->extStoreAccess = $extStoreAccess;
115  $this->cache = $cache;
116  $this->dbDomain = $dbDomain;
117  }
118 
122  public function getCacheExpiry() {
123  return $this->cacheExpiry;
124  }
125 
129  public function setCacheExpiry( $cacheExpiry ) {
130  Assert::parameterType( 'integer', $cacheExpiry, '$cacheExpiry' );
131 
132  $this->cacheExpiry = $cacheExpiry;
133  }
134 
138  public function getCompressBlobs() {
139  return $this->compressBlobs;
140  }
141 
145  public function setCompressBlobs( $compressBlobs ) {
146  $this->compressBlobs = $compressBlobs;
147  }
148 
153  public function getLegacyEncoding() {
154  return $this->legacyEncoding;
155  }
156 
162  wfDeprecated( __METHOD__ );
163  return null;
164  }
165 
174  public function setLegacyEncoding( $legacyEncoding ) {
175  Assert::parameterType( 'string', $legacyEncoding, '$legacyEncoding' );
176 
177  $this->legacyEncoding = $legacyEncoding;
178  }
179 
183  public function getUseExternalStore() {
185  }
186 
191  Assert::parameterType( 'boolean', $useExternalStore, '$useExternalStore' );
192 
193  $this->useExternalStore = $useExternalStore;
194  }
195 
199  private function getDBLoadBalancer() {
200  return $this->dbLoadBalancer;
201  }
202 
208  private function getDBConnection( $index ) {
209  $lb = $this->getDBLoadBalancer();
210  return $lb->getConnectionRef( $index, [], $this->dbDomain );
211  }
212 
223  public function storeBlob( $data, $hints = [] ) {
224  try {
225  $flags = $this->compressData( $data );
226 
227  # Write to external storage if required
228  if ( $this->useExternalStore ) {
229  // Store and get the URL
230  $data = $this->extStoreAccess->insert( $data, [ 'domain' => $this->dbDomain ] );
231  if ( !$data ) {
232  throw new BlobAccessException( "Failed to store text to external storage" );
233  }
234  if ( $flags ) {
235  $flags .= ',';
236  }
237  $flags .= 'external';
238 
239  // TODO: we could also return an address for the external store directly here.
240  // That would mean bypassing the text table entirely when the external store is
241  // used. We'll need to assess expected fallout before doing that.
242  }
243 
244  $dbw = $this->getDBConnection( DB_MASTER );
245 
246  $old_id = $dbw->nextSequenceValue( 'text_old_id_seq' );
247  $dbw->insert(
248  'text',
249  [
250  'old_id' => $old_id,
251  'old_text' => $data,
252  'old_flags' => $flags,
253  ],
254  __METHOD__
255  );
256 
257  $textId = $dbw->insertId();
258 
259  return self::makeAddressFromTextId( $textId );
260  } catch ( MWException $e ) {
261  throw new BlobAccessException( $e->getMessage(), 0, $e );
262  }
263  }
264 
277  public function getBlob( $blobAddress, $queryFlags = 0 ) {
278  Assert::parameterType( 'string', $blobAddress, '$blobAddress' );
279 
280  $error = null;
281  $blob = $this->cache->getWithSetCallback(
282  $this->getCacheKey( $blobAddress ),
283  $this->getCacheTTL(),
284  function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
285  // Ignore $setOpts; blobs are immutable and negatives are not cached
286  list( $result, $errors ) = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
287  // No negative caching; negative hits on text rows may be due to corrupted replica DBs
288  $error = $errors[$blobAddress] ?? null;
289  return $result[$blobAddress];
290  },
291  [ 'pcGroup' => self::TEXT_CACHE_GROUP, 'pcTTL' => IExpiringStore::TTL_PROC_LONG ]
292  );
293 
294  if ( $error ) {
295  throw new BlobAccessException( $error );
296  }
297 
298  Assert::postcondition( is_string( $blob ), 'Blob must not be null' );
299  return $blob;
300  }
301 
313  public function getBlobBatch( $blobAddresses, $queryFlags = 0 ) {
314  // FIXME: All caching has temporarily been removed in I94c6f9ba7b9caeeb due to T235188.
315  // Caching behavior should be restored by reverting I94c6f9ba7b9caeeb as soon as
316  // the root cause of T235188 has been resolved.
317 
318  list( $blobsByAddress, $errors ) = $this->fetchBlobs( $blobAddresses, $queryFlags );
319 
320  $blobsByAddress = array_map( function ( $blob ) {
321  return $blob === false ? null : $blob;
322  }, $blobsByAddress );
323 
324  $result = StatusValue::newGood( $blobsByAddress );
325  if ( $errors ) {
326  foreach ( $errors as $error ) {
327  $result->warning( 'internalerror', $error );
328  }
329  }
330  return $result;
331  }
332 
343  private function fetchBlobs( $blobAddresses, $queryFlags ) {
344  $textIdToBlobAddress = [];
345  $result = [];
346  $errors = [];
347  foreach ( $blobAddresses as $blobAddress ) {
348  list( $schema, $id ) = self::splitBlobAddress( $blobAddress );
349  //TODO: MCR: also support 'ex' schema with ExternalStore URLs, plus flags encoded in the URL!
350  if ( $schema === 'tt' ) {
351  $textId = intval( $id );
352  $textIdToBlobAddress[$textId] = $blobAddress;
353  } else {
354  $errors[$blobAddress] = "Unknown blob address schema: $schema";
355  $result[$blobAddress] = false;
356  continue;
357  }
358 
359  if ( !$textId || $id !== (string)$textId ) {
360  $errors[$blobAddress] = "Bad blob address: $blobAddress";
361  $result[$blobAddress] = false;
362  }
363  }
364 
365  $textIds = array_keys( $textIdToBlobAddress );
366  if ( !$textIds ) {
367  return [ $result, $errors ];
368  }
369  // Callers doing updates will pass in READ_LATEST as usual. Since the text/blob tables
370  // do not normally get rows changed around, set READ_LATEST_IMMUTABLE in those cases.
371  $queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, self::READ_LATEST )
372  ? self::READ_LATEST_IMMUTABLE
373  : 0;
374  list( $index, $options, $fallbackIndex, $fallbackOptions ) =
375  DBAccessObjectUtils::getDBOptions( $queryFlags );
376  // Text data is immutable; check replica DBs first.
377  $dbConnection = $this->getDBConnection( $index );
378  $rows = $dbConnection->select(
379  'text',
380  [ 'old_id', 'old_text', 'old_flags' ],
381  [ 'old_id' => $textIds ],
382  __METHOD__,
383  $options
384  );
385 
386  // Fallback to DB_MASTER in some cases if not all the rows were found, using the appropriate
387  // options, such as FOR UPDATE to avoid missing rows due to REPEATABLE-READ.
388  if ( $dbConnection->numRows( $rows ) !== count( $textIds ) && $fallbackIndex !== null ) {
389  $fetchedTextIds = [];
390  foreach ( $rows as $row ) {
391  $fetchedTextIds[] = $row->old_id;
392  }
393  $missingTextIds = array_diff( $textIds, $fetchedTextIds );
394  $dbConnection = $this->getDBConnection( $fallbackIndex );
395  $rowsFromFallback = $dbConnection->select(
396  'text',
397  [ 'old_id', 'old_text', 'old_flags' ],
398  [ 'old_id' => $missingTextIds ],
399  __METHOD__,
400  $fallbackOptions
401  );
402  $appendIterator = new AppendIterator();
403  $appendIterator->append( $rows );
404  $appendIterator->append( $rowsFromFallback );
405  $rows = $appendIterator;
406  }
407 
408  foreach ( $rows as $row ) {
409  $blobAddress = $textIdToBlobAddress[$row->old_id];
410  $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
411  if ( $blob === false ) {
412  $errors[$blobAddress] = "Bad data in text row {$row->old_id}.";
413  }
414  $result[$blobAddress] = $blob;
415  }
416 
417  // If we're still missing some of the rows, set errors for missing blobs.
418  if ( count( $result ) !== count( $blobAddresses ) ) {
419  foreach ( $blobAddresses as $blobAddress ) {
420  if ( !isset( $result[$blobAddress ] ) ) {
421  $errors[$blobAddress] = "Unable to fetch blob at $blobAddress";
422  $result[$blobAddress] = false;
423  }
424  }
425  }
426  return [ $result, $errors ];
427  }
428 
439  private function getCacheKey( $blobAddress ) {
440  return $this->cache->makeGlobalKey(
441  'SqlBlobStore-blob',
442  $this->dbLoadBalancer->resolveDomainID( $this->dbDomain ),
443  $blobAddress
444  );
445  }
446 
466  public function expandBlob( $raw, $flags, $cacheKey = null ) {
467  if ( is_string( $flags ) ) {
468  $flags = explode( ',', $flags );
469  }
470 
471  // Use external methods for external objects, text in table is URL-only then
472  if ( in_array( 'external', $flags ) ) {
473  $url = $raw;
474  $parts = explode( '://', $url, 2 );
475  if ( count( $parts ) == 1 || $parts[1] == '' ) {
476  return false;
477  }
478 
479  if ( $cacheKey ) {
480  // The cached value should be decompressed, so handle that and return here.
481  return $this->cache->getWithSetCallback(
482  $this->getCacheKey( $cacheKey ),
483  $this->getCacheTTL(),
484  function () use ( $url, $flags ) {
485  // Ignore $setOpts; blobs are immutable and negatives are not cached
486  $blob = $this->extStoreAccess
487  ->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
488 
489  return $blob === false ? false : $this->decompressData( $blob, $flags );
490  },
491  [ 'pcGroup' => self::TEXT_CACHE_GROUP, 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
492  );
493  } else {
494  $blob = $this->extStoreAccess->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
495  return $blob === false ? false : $this->decompressData( $blob, $flags );
496  }
497  } else {
498  return $this->decompressData( $raw, $flags );
499  }
500  }
501 
518  public function compressData( &$blob ) {
519  $blobFlags = [];
520 
521  // Revisions not marked as UTF-8 will have legacy decoding applied by decompressData().
522  // XXX: if $this->legacyEncoding is not set, we could skip this. That would however be
523  // risky, since $this->legacyEncoding being set in the future would lead to data corruption.
524  $blobFlags[] = 'utf-8';
525 
526  if ( $this->compressBlobs ) {
527  if ( function_exists( 'gzdeflate' ) ) {
528  $deflated = gzdeflate( $blob );
529 
530  if ( $deflated === false ) {
531  wfLogWarning( __METHOD__ . ': gzdeflate() failed' );
532  } else {
533  $blob = $deflated;
534  $blobFlags[] = 'gzip';
535  }
536  } else {
537  wfDebug( __METHOD__ . " -- no zlib support, not compressing\n" );
538  }
539  }
540  return implode( ',', $blobFlags );
541  }
542 
558  public function decompressData( $blob, array $blobFlags ) {
559  // Revision::decompressRevisionText accepted false here, so defend against that
560  Assert::parameterType( 'string', $blob, '$blob' );
561 
562  if ( in_array( 'error', $blobFlags ) ) {
563  // Error row, return false
564  return false;
565  }
566 
567  if ( in_array( 'gzip', $blobFlags ) ) {
568  # Deal with optional compression of archived pages.
569  # This can be done periodically via maintenance/compressOld.php, and
570  # as pages are saved if $wgCompressRevisions is set.
571  $blob = gzinflate( $blob );
572 
573  if ( $blob === false ) {
574  wfWarn( __METHOD__ . ': gzinflate() failed' );
575  return false;
576  }
577  }
578 
579  if ( in_array( 'object', $blobFlags ) ) {
580  # Generic compressed storage
581  $obj = unserialize( $blob );
582  if ( !is_object( $obj ) ) {
583  // Invalid object
584  return false;
585  }
586  $blob = $obj->getText();
587  }
588 
589  // Needed to support old revisions left over from from the 1.4 / 1.5 migration.
590  if ( $blob !== false && $this->legacyEncoding
591  && !in_array( 'utf-8', $blobFlags ) && !in_array( 'utf8', $blobFlags )
592  ) {
593  # Old revisions kept around in a legacy encoding?
594  # Upconvert on demand.
595  # ("utf8" checked for compatibility with some broken
596  # conversion scripts 2008-12-30)
597  # Even with //IGNORE iconv can whine about illegal characters in
598  # *input* string. We just ignore those too.
599  # REF: https://bugs.php.net/bug.php?id=37166
600  # REF: https://phabricator.wikimedia.org/T18885
601  AtEase::suppressWarnings();
602  $blob = iconv( $this->legacyEncoding, 'UTF-8//IGNORE', $blob );
603  AtEase::restoreWarnings();
604  }
605 
606  return $blob;
607  }
608 
616  private function getCacheTTL() {
617  if ( $this->cache->getQoS( WANObjectCache::ATTR_EMULATION )
619  ) {
620  // Do not cache RDBMs blobs in...the RDBMs store
621  $ttl = WANObjectCache::TTL_UNCACHEABLE;
622  } else {
623  $ttl = $this->cacheExpiry ?: WANObjectCache::TTL_UNCACHEABLE;
624  }
625 
626  return $ttl;
627  }
628 
649  public function getTextIdFromAddress( $address ) {
650  list( $schema, $id, ) = self::splitBlobAddress( $address );
651 
652  if ( $schema !== 'tt' ) {
653  return null;
654  }
655 
656  $textId = intval( $id );
657 
658  if ( !$textId || $id !== (string)$textId ) {
659  throw new InvalidArgumentException( "Malformed text_id: $id" );
660  }
661 
662  return $textId;
663  }
664 
677  public static function makeAddressFromTextId( $id ) {
678  return 'tt:' . $id;
679  }
680 
691  public static function splitBlobAddress( $address ) {
692  if ( !preg_match( '/^(\w+):(\w+)(\?(.*))?$/', $address, $m ) ) {
693  throw new InvalidArgumentException( "Bad blob address: $address" );
694  }
695 
696  $schema = strtolower( $m[1] );
697  $id = $m[2];
698  $parameters = isset( $m[4] ) ? wfCgiToArray( $m[4] ) : [];
699 
700  return [ $schema, $id, $parameters ];
701  }
702 
703  public function isReadOnly() {
704  if ( $this->useExternalStore && $this->extStoreAccess->isReadOnly() ) {
705  return true;
706  }
707 
708  return ( $this->getDBLoadBalancer()->getReadOnlyReason() !== false );
709  }
710 }
Service for storing and loading Content objects.
fetchBlobs( $blobAddresses, $queryFlags)
MCR migration note: this corresponds to Revision::fetchText.
string bool $dbDomain
DB domain ID of a wiki or false for the local one.
wfWarn( $msg, $callerOffset=1, $level=E_USER_NOTICE)
Send a warning either to the debug log or in a PHP error depending on $wgDevelopmentWarnings.
getBlob( $blobAddress, $queryFlags=0)
Retrieve a blob, given an address.
isReadOnly()
Check if the blob metadata or backing blob data store is read-only.
static makeAddressFromTextId( $id)
Returns an address referring to content stored in the text table row with the given ID...
ExternalStoreAccess $extStoreAccess
setCompressBlobs( $compressBlobs)
__construct(ILoadBalancer $dbLoadBalancer, ExternalStoreAccess $extStoreAccess, WANObjectCache $cache, $dbDomain=false)
getCacheTTL()
Get the text cache TTL.
static splitBlobAddress( $address)
Splits a blob address into three parts: the schema, the ID, and parameters/flags. ...
wfLogWarning( $msg, $callerOffset=1, $level=E_USER_WARNING)
Send a warning as a PHP error and the debug log.
storeBlob( $data, $hints=[])
Stores an arbitrary blob of data and returns an address that can be used with getBlob() to retrieve t...
Exception representing a failure to access a data blob.
const DB_MASTER
Definition: defines.php:26
static getDBOptions( $bitfield)
Get an appropriate DB index, options, and fallback DB index for a query.
setUseExternalStore( $useExternalStore)
wfCgiToArray( $query)
This is the logical opposite of wfArrayToCgi(): it accepts a query string as its argument and returns...
getCacheKey( $blobAddress)
Get a cache key for a given Blob address.
static newGood( $value=null)
Factory function for good results.
Definition: StatusValue.php:81
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
expandBlob( $raw, $flags, $cacheKey=null)
Expand a raw data blob according to the flags given.
unserialize( $serialized)
static hasFlags( $bitfield, $flags)
decompressData( $blob, array $blobFlags)
Re-converts revision text according to its flags.
Database cluster connection, tracking, load balancing, and transaction manager interface.
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Throws a warning that $function is deprecated.
getTextIdFromAddress( $address)
Returns an ID corresponding to the old_id field in the text table, corresponding to the given $addres...
getBlobBatch( $blobAddresses, $queryFlags=0)
A batched version of BlobStore::getBlob.
Service for loading and storing data blobs.
Definition: BlobStore.php:35
compressData(&$blob)
If $wgCompressRevisions is enabled, we will compress data.
setLegacyEncoding( $legacyEncoding)
Set the legacy encoding to assume for blobs that do not have the utf-8 flag set.