MediaWiki master
SqlBlobStore.php
Go to the documentation of this file.
1<?php
12namespace MediaWiki\Storage;
13
14use AppendIterator;
16use InvalidArgumentException;
19use StatusValue;
20use Wikimedia\Assert\Assert;
27
36class SqlBlobStore implements BlobStore {
37
38 // Note: the name has been taken unchanged from the old Revision class.
39 public const TEXT_CACHE_GROUP = 'revisiontext:10';
40
42 public const DEFAULT_TTL = 7 * 24 * 3600; // 7 days
43
47 private $dbLoadBalancer;
48
52 private $extStoreAccess;
53
57 private $cache;
58
62 private $dbDomain;
63
67 private $cacheExpiry = self::DEFAULT_TTL;
68
72 private $compressBlobs = false;
73
77 private $legacyEncoding = false;
78
82 private $useExternalStore = false;
83
95 public function __construct(
96 ILoadBalancer $dbLoadBalancer,
97 ExternalStoreAccess $extStoreAccess,
98 WANObjectCache $cache,
99 $dbDomain = false
100 ) {
101 $this->dbLoadBalancer = $dbLoadBalancer;
102 $this->extStoreAccess = $extStoreAccess;
103 $this->cache = $cache;
104 $this->dbDomain = $dbDomain;
105 }
106
110 public function getCacheExpiry() {
111 return $this->cacheExpiry;
112 }
113
117 public function setCacheExpiry( int $cacheExpiry ) {
118 $this->cacheExpiry = $cacheExpiry;
119 }
120
124 public function getCompressBlobs() {
125 return $this->compressBlobs;
126 }
127
131 public function setCompressBlobs( $compressBlobs ) {
132 $this->compressBlobs = $compressBlobs;
133 }
134
139 public function getLegacyEncoding() {
140 return $this->legacyEncoding;
141 }
142
151 public function setLegacyEncoding( string $legacyEncoding ) {
152 $this->legacyEncoding = $legacyEncoding;
153 }
154
158 public function getUseExternalStore() {
159 return $this->useExternalStore;
160 }
161
165 public function setUseExternalStore( bool $useExternalStore ) {
166 $this->useExternalStore = $useExternalStore;
167 }
168
172 private function getDBLoadBalancer() {
173 return $this->dbLoadBalancer;
174 }
175
181 private function getDBConnection( $index ) {
182 $lb = $this->getDBLoadBalancer();
183 return $lb->getConnection( $index, [], $this->dbDomain );
184 }
185
196 public function storeBlob( $data, $hints = [] ) {
197 $flags = $this->compressData( $data );
198
199 # Write to external storage if required
200 if ( $this->useExternalStore ) {
201 // Store and get the URL
202 try {
203 $data = $this->extStoreAccess->insert( $data, [ 'domain' => $this->dbDomain ] );
204 } catch ( ExternalStoreException $e ) {
205 throw new BlobAccessException( $e->getMessage(), 0, $e );
206 }
207 if ( !$data ) {
208 throw new BlobAccessException( "Failed to store text to external storage" );
209 }
210 if ( $flags ) {
211 return 'es:' . $data . '?flags=' . $flags;
212 } else {
213 return 'es:' . $data;
214 }
215 } else {
216 $dbw = $this->getDBConnection( DB_PRIMARY );
217
218 $dbw->newInsertQueryBuilder()
219 ->insertInto( 'text' )
220 ->row( [ 'old_text' => $data, 'old_flags' => $flags ] )
221 ->caller( __METHOD__ )->execute();
222
223 $textId = $dbw->insertId();
224
225 return self::makeAddressFromTextId( $textId );
226 }
227 }
228
241 public function getBlob( $blobAddress, $queryFlags = 0 ) {
242 Assert::parameterType( 'string', $blobAddress, '$blobAddress' );
243
244 $error = null;
245 $blob = $this->cache->getWithSetCallback(
246 $this->getCacheKey( $blobAddress ),
247 $this->getCacheTTL(),
248 function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
249 // Ignore $setOpts; blobs are immutable and negatives are not cached
250 [ $result, $errors ] = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
251 // No negative caching; negative hits on text rows may be due to corrupted replica DBs
252 $error = $errors[$blobAddress] ?? null;
253 if ( $error ) {
254 $ttl = WANObjectCache::TTL_UNCACHEABLE;
255 }
256 return $result[$blobAddress];
257 },
258 $this->getCacheOptions()
259 );
260
261 if ( $error ) {
262 if ( $error[0] === 'badrevision' ) {
263 throw new BadBlobException( $error[1] );
264 } else {
265 throw new BlobAccessException( $error[1] );
266 }
267 }
268
269 Assert::postcondition( is_string( $blob ), 'Blob must not be null' );
270 return $blob;
271 }
272
284 public function getBlobBatch( $blobAddresses, $queryFlags = 0 ) {
285 // FIXME: All caching has temporarily been removed in I94c6f9ba7b9caeeb due to T235188.
286 // Caching behavior should be restored by reverting I94c6f9ba7b9caeeb as soon as
287 // the root cause of T235188 has been resolved.
288
289 [ $blobsByAddress, $errors ] = $this->fetchBlobs( $blobAddresses, $queryFlags );
290
291 $blobsByAddress = array_map( static function ( $blob ) {
292 return $blob === false ? null : $blob;
293 }, $blobsByAddress );
294
295 $result = StatusValue::newGood( $blobsByAddress );
296 foreach ( $errors as $error ) {
297 // @phan-suppress-next-line PhanParamTooFewUnpack
298 $result->warning( ...$error );
299 }
300 return $result;
301 }
302
317 private function fetchBlobs( $blobAddresses, $queryFlags ) {
318 $textIdToBlobAddress = [];
319 $result = [];
320 $errors = [];
321 foreach ( $blobAddresses as $blobAddress ) {
322 try {
323 [ $schema, $id, $params ] = self::splitBlobAddress( $blobAddress );
324 } catch ( InvalidArgumentException $ex ) {
325 throw new BlobAccessException(
326 $ex->getMessage() . '. Use findBadBlobs.php to remedy.',
327 0,
328 $ex
329 );
330 }
331
332 if ( $schema === 'es' ) {
333 if ( $params && isset( $params['flags'] ) ) {
334 $blob = $this->expandBlob( $id, $params['flags'] . ',external', $blobAddress );
335 } else {
336 $blob = $this->expandBlob( $id, 'external', $blobAddress );
337 }
338
339 if ( $blob === false ) {
340 $errors[$blobAddress] = [
341 'internalerror',
342 "Bad data in external store address $id. Use findBadBlobs.php to remedy."
343 ];
344 }
345 $result[$blobAddress] = $blob;
346 } elseif ( $schema === 'bad' ) {
347 // Database row was marked as "known bad"
348 wfDebug(
349 __METHOD__
350 . ": loading known-bad content ($blobAddress), returning empty string"
351 );
352 $result[$blobAddress] = '';
353 $errors[$blobAddress] = [
354 'badrevision',
355 'The content of this revision is missing or corrupted (bad schema)'
356 ];
357 } elseif ( $schema === 'tt' ) {
358 $textId = intval( $id );
359
360 if ( $textId < 1 || $id !== (string)$textId ) {
361 $errors[$blobAddress] = [
362 'internalerror',
363 "Bad blob address: $blobAddress. Use findBadBlobs.php to remedy."
364 ];
365 $result[$blobAddress] = false;
366 }
367
368 $textIdToBlobAddress[$textId] = $blobAddress;
369 } else {
370 $errors[$blobAddress] = [
371 'internalerror',
372 "Unknown blob address schema: $schema. Use findBadBlobs.php to remedy."
373 ];
374 $result[$blobAddress] = false;
375 }
376 }
377
378 $textIds = array_keys( $textIdToBlobAddress );
379 if ( !$textIds ) {
380 return [ $result, $errors ];
381 }
382 // Callers doing updates will pass in READ_LATEST as usual. Since the text/blob tables
383 // do not normally get rows changed around, set READ_LATEST_IMMUTABLE in those cases.
384 $queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, IDBAccessObject::READ_LATEST )
385 ? IDBAccessObject::READ_LATEST_IMMUTABLE
386 : 0;
387 [ $index, $options, $fallbackIndex, $fallbackOptions ] =
388 self::getDBOptions( $queryFlags );
389 // Text data is immutable; check replica DBs first.
390 $dbConnection = $this->getDBConnection( $index );
391 $rows = $dbConnection->newSelectQueryBuilder()
392 ->select( [ 'old_id', 'old_text', 'old_flags' ] )
393 ->from( 'text' )
394 ->where( [ 'old_id' => $textIds ] )
395 ->options( $options )
396 ->caller( __METHOD__ )->fetchResultSet();
397 $numRows = $rows->numRows();
398
399 // Fallback to DB_PRIMARY in some cases if not all the rows were found, using the appropriate
400 // options, such as FOR UPDATE to avoid missing rows due to REPEATABLE-READ.
401 if ( $numRows !== count( $textIds ) && $fallbackIndex !== null ) {
402 $fetchedTextIds = [];
403 foreach ( $rows as $row ) {
404 $fetchedTextIds[] = $row->old_id;
405 }
406 $missingTextIds = array_diff( $textIds, $fetchedTextIds );
407 $dbConnection = $this->getDBConnection( $fallbackIndex );
408 $rowsFromFallback = $dbConnection->newSelectQueryBuilder()
409 ->select( [ 'old_id', 'old_text', 'old_flags' ] )
410 ->from( 'text' )
411 ->where( [ 'old_id' => $missingTextIds ] )
412 ->options( $fallbackOptions )
413 ->caller( __METHOD__ )->fetchResultSet();
414 $appendIterator = new AppendIterator();
415 $appendIterator->append( $rows );
416 $appendIterator->append( $rowsFromFallback );
417 $rows = $appendIterator;
418 }
419
420 foreach ( $rows as $row ) {
421 $blobAddress = $textIdToBlobAddress[$row->old_id];
422 $blob = false;
423 if ( $row->old_text !== null ) {
424 $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
425 }
426 if ( $blob === false ) {
427 $errors[$blobAddress] = [
428 'internalerror',
429 "Bad data in text row {$row->old_id}. Use findBadBlobs.php to remedy."
430 ];
431 }
432 $result[$blobAddress] = $blob;
433 }
434
435 // If we're still missing some of the rows, set errors for missing blobs.
436 if ( count( $result ) !== count( $blobAddresses ) ) {
437 foreach ( $blobAddresses as $blobAddress ) {
438 if ( !isset( $result[$blobAddress ] ) ) {
439 $errors[$blobAddress] = [
440 'internalerror',
441 "Unable to fetch blob at $blobAddress. Use findBadBlobs.php to remedy."
442 ];
443 $result[$blobAddress] = false;
444 }
445 }
446 }
447 return [ $result, $errors ];
448 }
449
450 private static function getDBOptions( int $bitfield ): array {
451 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST_IMMUTABLE ) ) {
452 $index = DB_REPLICA; // override READ_LATEST if set
453 $fallbackIndex = DB_PRIMARY;
454 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST ) ) {
455 $index = DB_PRIMARY;
456 $fallbackIndex = null;
457 } else {
458 $index = DB_REPLICA;
459 $fallbackIndex = null;
460 }
461
462 $lockingOptions = [];
463 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_EXCLUSIVE ) ) {
464 $lockingOptions[] = 'FOR UPDATE';
465 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LOCKING ) ) {
466 $lockingOptions[] = 'LOCK IN SHARE MODE';
467 }
468
469 if ( $fallbackIndex !== null ) {
470 $options = []; // locks on DB_REPLICA make no sense
471 $fallbackOptions = $lockingOptions;
472 } else {
473 $options = $lockingOptions;
474 $fallbackOptions = []; // no fallback
475 }
476
477 return [ $index, $options, $fallbackIndex, $fallbackOptions ];
478 }
479
490 private function getCacheKey( $blobAddress ) {
491 return $this->cache->makeGlobalKey(
492 'SqlBlobStore-blob',
493 $this->dbLoadBalancer->resolveDomainID( $this->dbDomain ),
494 $blobAddress
495 );
496 }
497
503 private function getCacheOptions() {
504 return [
505 'pcGroup' => self::TEXT_CACHE_GROUP,
506 'pcTTL' => WANObjectCache::TTL_PROC_LONG,
507 'segmentable' => true
508 ];
509 }
510
531 public function expandBlob( $raw, $flags, $blobAddress = null ) {
532 if ( is_string( $flags ) ) {
533 $flags = self::explodeFlags( $flags );
534 }
535 if ( in_array( 'error', $flags ) ) {
536 throw new BadBlobException(
537 "The content of this revision is missing or corrupted (error flag)"
538 );
539 }
540
541 // Use external methods for external objects, text in table is URL-only then
542 if ( in_array( 'external', $flags ) ) {
543 $url = $raw;
544 $parts = explode( '://', $url, 2 );
545 if ( count( $parts ) == 1 || $parts[1] == '' ) {
546 return false;
547 }
548
549 if ( $blobAddress ) {
550 // The cached value should be decompressed, so handle that and return here.
551 return $this->cache->getWithSetCallback(
552 $this->getCacheKey( $blobAddress ),
553 $this->getCacheTTL(),
554 function () use ( $url, $flags, $blobAddress ) {
555 // Ignore $setOpts; blobs are immutable and negatives are not cached
556 $blob = $this->extStoreAccess
557 ->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
558
559 return $blob === false ? false : $this->decompressData( $blob, $flags, $blobAddress );
560 },
561 $this->getCacheOptions()
562 );
563 } else {
564 $blob = $this->extStoreAccess->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
565 return $blob === false ? false : $this->decompressData( $blob, $flags, $blobAddress );
566 }
567 } else {
568 return $this->decompressData( $raw, $flags, $blobAddress );
569 }
570 }
571
588 public function compressData( &$blob ) {
589 $blobFlags = [];
590
591 // Revisions not marked as UTF-8 will have legacy decoding applied by decompressData().
592 // XXX: if $this->legacyEncoding is not set, we could skip this. That would however be
593 // risky, since $this->legacyEncoding being set in the future would lead to data corruption.
594 $blobFlags[] = 'utf-8';
595
596 if ( $this->compressBlobs ) {
597 if ( function_exists( 'gzdeflate' ) ) {
598 $deflated = gzdeflate( $blob );
599
600 if ( $deflated === false ) {
601 wfLogWarning( __METHOD__ . ': gzdeflate() failed' );
602 } else {
603 $blob = $deflated;
604 $blobFlags[] = 'gzip';
605 }
606 } else {
607 wfDebug( __METHOD__ . " -- no zlib support, not compressing" );
608 }
609 }
610 return implode( ',', $blobFlags );
611 }
612
629 public function decompressData( string $blob, array $blobFlags, ?string $blobAddress = null ) {
630 if ( in_array( 'error', $blobFlags ) ) {
631 // Error row, return false
632 return false;
633 }
634
635 // Deal with optional compression of archived pages.
636 // This can be done periodically via maintenance/compressOld.php, and
637 // as pages are saved if $wgCompressRevisions is set.
638 if ( in_array( 'gzip', $blobFlags ) ) {
639 // Silence native warning in favour of more detailed warning (T380347)
640 // phpcs:ignore Generic.PHP.NoSilencedErrors.Discouraged
641 $blob = @gzinflate( $blob );
642 if ( $blob === false ) {
643 wfWarn( __METHOD__ . ': gzinflate() failed' .
644 ( $blobAddress ? ' (at blob address ' . $blobAddress . ')' : '' ) );
645 return false;
646 }
647 }
648
649 if ( in_array( 'object', $blobFlags ) ) {
650 # Generic compressed storage
651 $obj = HistoryBlobUtils::unserialize( $blob );
652 if ( !$obj ) {
653 // Invalid object
654 return false;
655 }
656 $blob = $obj->getText();
657 }
658
659 // Needed to support old revisions from before MW 1.5.
660 if ( $blob !== false && $this->legacyEncoding
661 && !in_array( 'utf-8', $blobFlags ) && !in_array( 'utf8', $blobFlags )
662 ) {
663 // - Old revisions kept around in a legacy encoding?
664 // Upconvert on demand.
665 // - "utf8" checked for compatibility with some broken
666 // conversion scripts 2008-12-30.
667 // - Even with "//IGNORE" iconv can whine about illegal characters in
668 // *input* string. We just ignore those too.
669 // Ref https://bugs.php.net/bug.php?id=37166
670 // Ref https://phabricator.wikimedia.org/T18885
671 //
672 // phpcs:ignore Generic.PHP.NoSilencedErrors.Discouraged
673 $blob = @iconv( $this->legacyEncoding, 'UTF-8//IGNORE', $blob );
674 }
675
676 return $blob;
677 }
678
686 private function getCacheTTL() {
687 $cache = $this->cache;
688
689 if ( $cache->getQoS( BagOStuff::ATTR_DURABILITY ) >= BagOStuff::QOS_DURABILITY_RDBMS ) {
690 // Do not cache RDBMs blobs in...the RDBMs store
691 $ttl = $cache::TTL_UNCACHEABLE;
692 } else {
693 $ttl = $this->cacheExpiry ?: $cache::TTL_UNCACHEABLE;
694 }
695
696 return $ttl;
697 }
698
719 public function getTextIdFromAddress( $address ) {
720 [ $schema, $id, ] = self::splitBlobAddress( $address );
721
722 if ( $schema !== 'tt' ) {
723 return null;
724 }
725
726 $textId = intval( $id );
727
728 if ( !$textId || $id !== (string)$textId ) {
729 throw new InvalidArgumentException( "Malformed text_id: $id" );
730 }
731
732 return $textId;
733 }
734
748 public static function makeAddressFromTextId( $id ) {
749 return 'tt:' . $id;
750 }
751
758 public static function explodeFlags( string $flagsString ) {
759 return $flagsString === '' ? [] : explode( ',', $flagsString );
760 }
761
771 public static function splitBlobAddress( $address ) {
772 if ( !preg_match( '/^([-+.\w]+):([^\s?]+)(\?([^\s]*))?$/', $address, $m ) ) {
773 throw new InvalidArgumentException( "Bad blob address: $address" );
774 }
775
776 $schema = strtolower( $m[1] );
777 $id = $m[2];
778 $parameters = wfCgiToArray( $m[4] ?? '' );
779
780 return [ $schema, $id, $parameters ];
781 }
782
784 public function isReadOnly() {
785 if ( $this->useExternalStore && $this->extStoreAccess->isReadOnly() ) {
786 return true;
787 }
788
789 return ( $this->getDBLoadBalancer()->getReadOnlyReason() !== false );
790 }
791}
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfWarn( $msg, $callerOffset=1, $level=E_USER_NOTICE)
Send a warning either to the debug log or in a PHP error depending on $wgDevelopmentWarnings.
wfLogWarning( $msg, $callerOffset=1, $level=E_USER_WARNING)
Send a warning as a PHP error and the debug log.
wfCgiToArray( $query)
This is the logical opposite of wfArrayToCgi(): it accepts a query string as its argument and returns...
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28
if(!defined('MW_SETUP_CALLBACK'))
Definition WebStart.php:69
This is the main interface for fetching or inserting objects with ExternalStore.
Exception thrown when a blob has the "bad" content address schema, or has "error" in its old_flags,...
Exception representing a failure to access a data blob.
Service for storing and loading Content objects representing revision data blobs.
static makeAddressFromTextId( $id)
Returns an address referring to content stored in the text table row with the given ID.
decompressData(string $blob, array $blobFlags, ?string $blobAddress=null)
Re-converts revision text according to its flags.
getTextIdFromAddress( $address)
Returns an ID corresponding to the old_id field in the text table, corresponding to the given $addres...
__construct(ILoadBalancer $dbLoadBalancer, ExternalStoreAccess $extStoreAccess, WANObjectCache $cache, $dbDomain=false)
getBlob( $blobAddress, $queryFlags=0)
Retrieve a blob, given an address.
setLegacyEncoding(string $legacyEncoding)
Set the legacy encoding to assume for blobs that do not have the utf-8 flag set.
compressData(&$blob)
If $wgCompressRevisions is enabled, we will compress data.
static splitBlobAddress( $address)
Splits a blob address into three parts: the schema, the ID, and parameters/flags.
getBlobBatch( $blobAddresses, $queryFlags=0)
A batched version of BlobStore::getBlob.
storeBlob( $data, $hints=[])
Stores an arbitrary blob of data and returns an address that can be used with getBlob() to retrieve t...
setUseExternalStore(bool $useExternalStore)
isReadOnly()
Check if the blob metadata or backing blob data store is read-only.bool
expandBlob( $raw, $flags, $blobAddress=null)
Expand a raw data blob according to the flags given.
static explodeFlags(string $flagsString)
Split a comma-separated old_flags value into its constituent parts.
Generic operation result class Has warning/error list, boolean status and arbitrary value.
Abstract class for any ephemeral data store.
Definition BagOStuff.php:73
Multi-datacenter aware caching interface.
Service for loading and storing data blobs.
Definition BlobStore.php:19
Interface for database access objects.
Interface to a relational database.
Definition IDatabase.php:31
This class is a delegate to ILBFactory for a given database cluster.
getCacheKey()
Get the cache key used to store status.