MediaWiki master
SqlBlobStore.php
Go to the documentation of this file.
1<?php
26namespace MediaWiki\Storage;
27
28use AppendIterator;
34use InvalidArgumentException;
35use StatusValue;
37use Wikimedia\Assert\Assert;
38use Wikimedia\AtEase\AtEase;
41
50class SqlBlobStore implements BlobStore {
51
52 // Note: the name has been taken unchanged from the old Revision class.
53 public const TEXT_CACHE_GROUP = 'revisiontext:10';
54
56 public const DEFAULT_TTL = 7 * 24 * 3600; // 7 days
57
61 private $dbLoadBalancer;
62
66 private $extStoreAccess;
67
71 private $cache;
72
76 private $dbDomain;
77
81 private $cacheExpiry = self::DEFAULT_TTL;
82
86 private $compressBlobs = false;
87
91 private $legacyEncoding = false;
92
96 private $useExternalStore = false;
97
109 public function __construct(
110 ILoadBalancer $dbLoadBalancer,
111 ExternalStoreAccess $extStoreAccess,
112 WANObjectCache $cache,
113 $dbDomain = false
114 ) {
115 $this->dbLoadBalancer = $dbLoadBalancer;
116 $this->extStoreAccess = $extStoreAccess;
117 $this->cache = $cache;
118 $this->dbDomain = $dbDomain;
119 }
120
124 public function getCacheExpiry() {
125 return $this->cacheExpiry;
126 }
127
131 public function setCacheExpiry( int $cacheExpiry ) {
132 $this->cacheExpiry = $cacheExpiry;
133 }
134
138 public function getCompressBlobs() {
139 return $this->compressBlobs;
140 }
141
145 public function setCompressBlobs( $compressBlobs ) {
146 $this->compressBlobs = $compressBlobs;
147 }
148
153 public function getLegacyEncoding() {
154 return $this->legacyEncoding;
155 }
156
165 public function setLegacyEncoding( string $legacyEncoding ) {
166 $this->legacyEncoding = $legacyEncoding;
167 }
168
172 public function getUseExternalStore() {
173 return $this->useExternalStore;
174 }
175
179 public function setUseExternalStore( bool $useExternalStore ) {
180 $this->useExternalStore = $useExternalStore;
181 }
182
186 private function getDBLoadBalancer() {
187 return $this->dbLoadBalancer;
188 }
189
195 private function getDBConnection( $index ) {
196 $lb = $this->getDBLoadBalancer();
197 return $lb->getConnectionRef( $index, [], $this->dbDomain );
198 }
199
210 public function storeBlob( $data, $hints = [] ) {
211 $flags = $this->compressData( $data );
212
213 # Write to external storage if required
214 if ( $this->useExternalStore ) {
215 // Store and get the URL
216 try {
217 $data = $this->extStoreAccess->insert( $data, [ 'domain' => $this->dbDomain ] );
218 } catch ( ExternalStoreException $e ) {
219 throw new BlobAccessException( $e->getMessage(), 0, $e );
220 }
221 if ( !$data ) {
222 throw new BlobAccessException( "Failed to store text to external storage" );
223 }
224 if ( $flags ) {
225 $flags .= ',';
226 }
227 $flags .= 'external';
228
229 // TODO: we could also return an address for the external store directly here.
230 // That would mean bypassing the text table entirely when the external store is
231 // used. We'll need to assess expected fallout before doing that.
232 }
233
234 $dbw = $this->getDBConnection( DB_PRIMARY );
235
236 $dbw->newInsertQueryBuilder()
237 ->insertInto( 'text' )
238 ->row( [ 'old_text' => $data, 'old_flags' => $flags ] )
239 ->caller( __METHOD__ )->execute();
240
241 $textId = $dbw->insertId();
242
243 return self::makeAddressFromTextId( $textId );
244 }
245
258 public function getBlob( $blobAddress, $queryFlags = 0 ) {
259 Assert::parameterType( 'string', $blobAddress, '$blobAddress' );
260
261 $error = null;
262 $blob = $this->cache->getWithSetCallback(
263 $this->getCacheKey( $blobAddress ),
264 $this->getCacheTTL(),
265 function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
266 // Ignore $setOpts; blobs are immutable and negatives are not cached
267 [ $result, $errors ] = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
268 // No negative caching; negative hits on text rows may be due to corrupted replica DBs
269 $error = $errors[$blobAddress] ?? null;
270 if ( $error ) {
271 $ttl = WANObjectCache::TTL_UNCACHEABLE;
272 }
273 return $result[$blobAddress];
274 },
275 $this->getCacheOptions()
276 );
277
278 if ( $error ) {
279 if ( $error[0] === 'badrevision' ) {
280 throw new BadBlobException( $error[1] );
281 } else {
282 throw new BlobAccessException( $error[1] );
283 }
284 }
285
286 Assert::postcondition( is_string( $blob ), 'Blob must not be null' );
287 return $blob;
288 }
289
301 public function getBlobBatch( $blobAddresses, $queryFlags = 0 ) {
302 // FIXME: All caching has temporarily been removed in I94c6f9ba7b9caeeb due to T235188.
303 // Caching behavior should be restored by reverting I94c6f9ba7b9caeeb as soon as
304 // the root cause of T235188 has been resolved.
305
306 [ $blobsByAddress, $errors ] = $this->fetchBlobs( $blobAddresses, $queryFlags );
307
308 $blobsByAddress = array_map( static function ( $blob ) {
309 return $blob === false ? null : $blob;
310 }, $blobsByAddress );
311
312 $result = StatusValue::newGood( $blobsByAddress );
313 foreach ( $errors as $error ) {
314 // @phan-suppress-next-line PhanParamTooFewUnpack
315 $result->warning( ...$error );
316 }
317 return $result;
318 }
319
334 private function fetchBlobs( $blobAddresses, $queryFlags ) {
335 $textIdToBlobAddress = [];
336 $result = [];
337 $errors = [];
338 foreach ( $blobAddresses as $blobAddress ) {
339 try {
340 [ $schema, $id ] = self::splitBlobAddress( $blobAddress );
341 } catch ( InvalidArgumentException $ex ) {
342 throw new BlobAccessException(
343 $ex->getMessage() . '. Use findBadBlobs.php to remedy.',
344 0,
345 $ex
346 );
347 }
348
349 // TODO: MCR: also support 'ex' schema with ExternalStore URLs, plus flags encoded in the URL!
350 if ( $schema === 'bad' ) {
351 // Database row was marked as "known bad"
352 wfDebug(
353 __METHOD__
354 . ": loading known-bad content ($blobAddress), returning empty string"
355 );
356 $result[$blobAddress] = '';
357 $errors[$blobAddress] = [
358 'badrevision',
359 'The content of this revision is missing or corrupted (bad schema)'
360 ];
361 } elseif ( $schema === 'tt' ) {
362 $textId = intval( $id );
363
364 if ( $textId < 1 || $id !== (string)$textId ) {
365 $errors[$blobAddress] = [
366 'internalerror',
367 "Bad blob address: $blobAddress. Use findBadBlobs.php to remedy."
368 ];
369 $result[$blobAddress] = false;
370 }
371
372 $textIdToBlobAddress[$textId] = $blobAddress;
373 } else {
374 $errors[$blobAddress] = [
375 'internalerror',
376 "Unknown blob address schema: $schema. Use findBadBlobs.php to remedy."
377 ];
378 $result[$blobAddress] = false;
379 }
380 }
381
382 $textIds = array_keys( $textIdToBlobAddress );
383 if ( !$textIds ) {
384 return [ $result, $errors ];
385 }
386 // Callers doing updates will pass in READ_LATEST as usual. Since the text/blob tables
387 // do not normally get rows changed around, set READ_LATEST_IMMUTABLE in those cases.
388 $queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, IDBAccessObject::READ_LATEST )
389 ? IDBAccessObject::READ_LATEST_IMMUTABLE
390 : 0;
391 [ $index, $options, $fallbackIndex, $fallbackOptions ] =
392 self::getDBOptions( $queryFlags );
393 // Text data is immutable; check replica DBs first.
394 $dbConnection = $this->getDBConnection( $index );
395 $rows = $dbConnection->newSelectQueryBuilder()
396 ->select( [ 'old_id', 'old_text', 'old_flags' ] )
397 ->from( 'text' )
398 ->where( [ 'old_id' => $textIds ] )
399 ->options( $options )
400 ->caller( __METHOD__ )->fetchResultSet();
401 $numRows = $rows->numRows();
402
403 // Fallback to DB_PRIMARY in some cases if not all the rows were found, using the appropriate
404 // options, such as FOR UPDATE to avoid missing rows due to REPEATABLE-READ.
405 if ( $numRows !== count( $textIds ) && $fallbackIndex !== null ) {
406 $fetchedTextIds = [];
407 foreach ( $rows as $row ) {
408 $fetchedTextIds[] = $row->old_id;
409 }
410 $missingTextIds = array_diff( $textIds, $fetchedTextIds );
411 $dbConnection = $this->getDBConnection( $fallbackIndex );
412 $rowsFromFallback = $dbConnection->newSelectQueryBuilder()
413 ->select( [ 'old_id', 'old_text', 'old_flags' ] )
414 ->from( 'text' )
415 ->where( [ 'old_id' => $missingTextIds ] )
416 ->options( $fallbackOptions )
417 ->caller( __METHOD__ )->fetchResultSet();
418 $appendIterator = new AppendIterator();
419 $appendIterator->append( $rows );
420 $appendIterator->append( $rowsFromFallback );
421 $rows = $appendIterator;
422 }
423
424 foreach ( $rows as $row ) {
425 $blobAddress = $textIdToBlobAddress[$row->old_id];
426 $blob = false;
427 if ( $row->old_text !== null ) {
428 $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
429 }
430 if ( $blob === false ) {
431 $errors[$blobAddress] = [
432 'internalerror',
433 "Bad data in text row {$row->old_id}. Use findBadBlobs.php to remedy."
434 ];
435 }
436 $result[$blobAddress] = $blob;
437 }
438
439 // If we're still missing some of the rows, set errors for missing blobs.
440 if ( count( $result ) !== count( $blobAddresses ) ) {
441 foreach ( $blobAddresses as $blobAddress ) {
442 if ( !isset( $result[$blobAddress ] ) ) {
443 $errors[$blobAddress] = [
444 'internalerror',
445 "Unable to fetch blob at $blobAddress. Use findBadBlobs.php to remedy."
446 ];
447 $result[$blobAddress] = false;
448 }
449 }
450 }
451 return [ $result, $errors ];
452 }
453
454 private static function getDBOptions( $bitfield ) {
455 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST_IMMUTABLE ) ) {
456 $index = DB_REPLICA; // override READ_LATEST if set
457 $fallbackIndex = DB_PRIMARY;
458 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST ) ) {
459 $index = DB_PRIMARY;
460 $fallbackIndex = null;
461 } else {
462 $index = DB_REPLICA;
463 $fallbackIndex = null;
464 }
465
466 $lockingOptions = [];
467 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_EXCLUSIVE ) ) {
468 $lockingOptions[] = 'FOR UPDATE';
469 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LOCKING ) ) {
470 $lockingOptions[] = 'LOCK IN SHARE MODE';
471 }
472
473 if ( $fallbackIndex !== null ) {
474 $options = []; // locks on DB_REPLICA make no sense
475 $fallbackOptions = $lockingOptions;
476 } else {
477 $options = $lockingOptions;
478 $fallbackOptions = []; // no fallback
479 }
480
481 return [ $index, $options, $fallbackIndex, $fallbackOptions ];
482 }
483
494 private function getCacheKey( $blobAddress ) {
495 return $this->cache->makeGlobalKey(
496 'SqlBlobStore-blob',
497 $this->dbLoadBalancer->resolveDomainID( $this->dbDomain ),
498 $blobAddress
499 );
500 }
501
507 private function getCacheOptions() {
508 return [
509 'pcGroup' => self::TEXT_CACHE_GROUP,
510 'pcTTL' => WANObjectCache::TTL_PROC_LONG,
511 'segmentable' => true
512 ];
513 }
514
535 public function expandBlob( $raw, $flags, $blobAddress = null ) {
536 if ( is_string( $flags ) ) {
537 $flags = self::explodeFlags( $flags );
538 }
539 if ( in_array( 'error', $flags ) ) {
540 throw new BadBlobException(
541 "The content of this revision is missing or corrupted (error flag)"
542 );
543 }
544
545 // Use external methods for external objects, text in table is URL-only then
546 if ( in_array( 'external', $flags ) ) {
547 $url = $raw;
548 $parts = explode( '://', $url, 2 );
549 if ( count( $parts ) == 1 || $parts[1] == '' ) {
550 return false;
551 }
552
553 if ( $blobAddress ) {
554 // The cached value should be decompressed, so handle that and return here.
555 return $this->cache->getWithSetCallback(
556 $this->getCacheKey( $blobAddress ),
557 $this->getCacheTTL(),
558 function () use ( $url, $flags ) {
559 // Ignore $setOpts; blobs are immutable and negatives are not cached
560 $blob = $this->extStoreAccess
561 ->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
562
563 return $blob === false ? false : $this->decompressData( $blob, $flags );
564 },
565 $this->getCacheOptions()
566 );
567 } else {
568 $blob = $this->extStoreAccess->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
569 return $blob === false ? false : $this->decompressData( $blob, $flags );
570 }
571 } else {
572 return $this->decompressData( $raw, $flags );
573 }
574 }
575
592 public function compressData( &$blob ) {
593 $blobFlags = [];
594
595 // Revisions not marked as UTF-8 will have legacy decoding applied by decompressData().
596 // XXX: if $this->legacyEncoding is not set, we could skip this. That would however be
597 // risky, since $this->legacyEncoding being set in the future would lead to data corruption.
598 $blobFlags[] = 'utf-8';
599
600 if ( $this->compressBlobs ) {
601 if ( function_exists( 'gzdeflate' ) ) {
602 $deflated = gzdeflate( $blob );
603
604 if ( $deflated === false ) {
605 wfLogWarning( __METHOD__ . ': gzdeflate() failed' );
606 } else {
607 $blob = $deflated;
608 $blobFlags[] = 'gzip';
609 }
610 } else {
611 wfDebug( __METHOD__ . " -- no zlib support, not compressing" );
612 }
613 }
614 return implode( ',', $blobFlags );
615 }
616
632 public function decompressData( string $blob, array $blobFlags ) {
633 if ( in_array( 'error', $blobFlags ) ) {
634 // Error row, return false
635 return false;
636 }
637
638 if ( in_array( 'gzip', $blobFlags ) ) {
639 # Deal with optional compression of archived pages.
640 # This can be done periodically via maintenance/compressOld.php, and
641 # as pages are saved if $wgCompressRevisions is set.
642 $blob = gzinflate( $blob );
643
644 if ( $blob === false ) {
645 wfWarn( __METHOD__ . ': gzinflate() failed' );
646 return false;
647 }
648 }
649
650 if ( in_array( 'object', $blobFlags ) ) {
651 # Generic compressed storage
652 $obj = HistoryBlobUtils::unserialize( $blob );
653 if ( !$obj ) {
654 // Invalid object
655 return false;
656 }
657 $blob = $obj->getText();
658 }
659
660 // Needed to support old revisions from before MW 1.5.
661 if ( $blob !== false && $this->legacyEncoding
662 && !in_array( 'utf-8', $blobFlags ) && !in_array( 'utf8', $blobFlags )
663 ) {
664 # Old revisions kept around in a legacy encoding?
665 # Upconvert on demand.
666 # ("utf8" checked for compatibility with some broken
667 # conversion scripts 2008-12-30)
668 # Even with //IGNORE iconv can whine about illegal characters in
669 # *input* string. We just ignore those too.
670 # REF: https://bugs.php.net/bug.php?id=37166
671 # REF: https://phabricator.wikimedia.org/T18885
672 AtEase::suppressWarnings();
673 $blob = iconv( $this->legacyEncoding, 'UTF-8//IGNORE', $blob );
674 AtEase::restoreWarnings();
675 }
676
677 return $blob;
678 }
679
687 private function getCacheTTL() {
688 $cache = $this->cache;
689
690 if ( $cache->getQoS( $cache::ATTR_DURABILITY ) >= $cache::QOS_DURABILITY_RDBMS ) {
691 // Do not cache RDBMs blobs in...the RDBMs store
692 $ttl = $cache::TTL_UNCACHEABLE;
693 } else {
694 $ttl = $this->cacheExpiry ?: $cache::TTL_UNCACHEABLE;
695 }
696
697 return $ttl;
698 }
699
720 public function getTextIdFromAddress( $address ) {
721 [ $schema, $id, ] = self::splitBlobAddress( $address );
722
723 if ( $schema !== 'tt' ) {
724 return null;
725 }
726
727 $textId = intval( $id );
728
729 if ( !$textId || $id !== (string)$textId ) {
730 throw new InvalidArgumentException( "Malformed text_id: $id" );
731 }
732
733 return $textId;
734 }
735
749 public static function makeAddressFromTextId( $id ) {
750 return 'tt:' . $id;
751 }
752
759 public static function explodeFlags( string $flagsString ) {
760 return $flagsString === '' ? [] : explode( ',', $flagsString );
761 }
762
773 public static function splitBlobAddress( $address ) {
774 if ( !preg_match( '/^([-+.\w]+):([^\s?]+)(\?([^\s]*))?$/', $address, $m ) ) {
775 throw new InvalidArgumentException( "Bad blob address: $address" );
776 }
777
778 $schema = strtolower( $m[1] );
779 $id = $m[2];
780 $parameters = wfCgiToArray( $m[4] ?? '' );
781
782 return [ $schema, $id, $parameters ];
783 }
784
785 public function isReadOnly() {
786 if ( $this->useExternalStore && $this->extStoreAccess->isReadOnly() ) {
787 return true;
788 }
789
790 return ( $this->getDBLoadBalancer()->getReadOnlyReason() !== false );
791 }
792}
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfWarn( $msg, $callerOffset=1, $level=E_USER_NOTICE)
Send a warning either to the debug log or in a PHP error depending on $wgDevelopmentWarnings.
wfLogWarning( $msg, $callerOffset=1, $level=E_USER_WARNING)
Send a warning as a PHP error and the debug log.
wfCgiToArray( $query)
This is the logical opposite of wfArrayToCgi(): it accepts a query string as its argument and returns...
getCacheKey()
Get the cache key used to store status.
Helper class for DAO classes.
static hasFlags( $bitfield, $flags)
This is the main interface for fetching or inserting objects with ExternalStore.
static unserialize(string $str, bool $allowDouble=false)
Unserialize a HistoryBlob.
Exception thrown when a blob has the "bad" content address schema, or has "error" in its old_flags,...
Exception representing a failure to access a data blob.
Service for storing and loading Content objects representing revision data blobs.
static makeAddressFromTextId( $id)
Returns an address referring to content stored in the text table row with the given ID.
getTextIdFromAddress( $address)
Returns an ID corresponding to the old_id field in the text table, corresponding to the given $addres...
__construct(ILoadBalancer $dbLoadBalancer, ExternalStoreAccess $extStoreAccess, WANObjectCache $cache, $dbDomain=false)
decompressData(string $blob, array $blobFlags)
Re-converts revision text according to its flags.
getBlob( $blobAddress, $queryFlags=0)
Retrieve a blob, given an address.
setLegacyEncoding(string $legacyEncoding)
Set the legacy encoding to assume for blobs that do not have the utf-8 flag set.
compressData(&$blob)
If $wgCompressRevisions is enabled, we will compress data.
static splitBlobAddress( $address)
Splits a blob address into three parts: the schema, the ID, and parameters/flags.
getBlobBatch( $blobAddresses, $queryFlags=0)
A batched version of BlobStore::getBlob.
storeBlob( $data, $hints=[])
Stores an arbitrary blob of data and returns an address that can be used with getBlob() to retrieve t...
setUseExternalStore(bool $useExternalStore)
isReadOnly()
Check if the blob metadata or backing blob data store is read-only.
expandBlob( $raw, $flags, $blobAddress=null)
Expand a raw data blob according to the flags given.
static explodeFlags(string $flagsString)
Split a comma-separated old_flags value into its constituent parts.
Generic operation result class Has warning/error list, boolean status and arbitrary value.
Multi-datacenter aware caching interface.
Interface for database access objects.
Service for loading and storing data blobs.
Definition BlobStore.php:33
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
This class is a delegate to ILBFactory for a given database cluster.
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28