MediaWiki master
SqlBlobStore.php
Go to the documentation of this file.
1<?php
26namespace MediaWiki\Storage;
27
28use AppendIterator;
34use InvalidArgumentException;
35use StatusValue;
37use Wikimedia\Assert\Assert;
38use Wikimedia\AtEase\AtEase;
41
50class SqlBlobStore implements BlobStore {
51
52 // Note: the name has been taken unchanged from the old Revision class.
53 public const TEXT_CACHE_GROUP = 'revisiontext:10';
54
56 public const DEFAULT_TTL = 7 * 24 * 3600; // 7 days
57
61 private $dbLoadBalancer;
62
66 private $extStoreAccess;
67
71 private $cache;
72
76 private $dbDomain;
77
81 private $cacheExpiry = self::DEFAULT_TTL;
82
86 private $compressBlobs = false;
87
91 private $legacyEncoding = false;
92
96 private $useExternalStore = false;
97
109 public function __construct(
110 ILoadBalancer $dbLoadBalancer,
111 ExternalStoreAccess $extStoreAccess,
112 WANObjectCache $cache,
113 $dbDomain = false
114 ) {
115 $this->dbLoadBalancer = $dbLoadBalancer;
116 $this->extStoreAccess = $extStoreAccess;
117 $this->cache = $cache;
118 $this->dbDomain = $dbDomain;
119 }
120
124 public function getCacheExpiry() {
125 return $this->cacheExpiry;
126 }
127
131 public function setCacheExpiry( int $cacheExpiry ) {
132 $this->cacheExpiry = $cacheExpiry;
133 }
134
138 public function getCompressBlobs() {
139 return $this->compressBlobs;
140 }
141
145 public function setCompressBlobs( $compressBlobs ) {
146 $this->compressBlobs = $compressBlobs;
147 }
148
153 public function getLegacyEncoding() {
154 return $this->legacyEncoding;
155 }
156
165 public function setLegacyEncoding( string $legacyEncoding ) {
166 $this->legacyEncoding = $legacyEncoding;
167 }
168
172 public function getUseExternalStore() {
173 return $this->useExternalStore;
174 }
175
179 public function setUseExternalStore( bool $useExternalStore ) {
180 $this->useExternalStore = $useExternalStore;
181 }
182
186 private function getDBLoadBalancer() {
187 return $this->dbLoadBalancer;
188 }
189
195 private function getDBConnection( $index ) {
196 $lb = $this->getDBLoadBalancer();
197 return $lb->getConnection( $index, [], $this->dbDomain );
198 }
199
210 public function storeBlob( $data, $hints = [] ) {
211 $flags = $this->compressData( $data );
212
213 # Write to external storage if required
214 if ( $this->useExternalStore ) {
215 // Store and get the URL
216 try {
217 $data = $this->extStoreAccess->insert( $data, [ 'domain' => $this->dbDomain ] );
218 } catch ( ExternalStoreException $e ) {
219 throw new BlobAccessException( $e->getMessage(), 0, $e );
220 }
221 if ( !$data ) {
222 throw new BlobAccessException( "Failed to store text to external storage" );
223 }
224 if ( $flags ) {
225 return 'es:' . $data . '?flags=' . $flags;
226 } else {
227 return 'es:' . $data;
228 }
229 } else {
230 $dbw = $this->getDBConnection( DB_PRIMARY );
231
232 $dbw->newInsertQueryBuilder()
233 ->insertInto( 'text' )
234 ->row( [ 'old_text' => $data, 'old_flags' => $flags ] )
235 ->caller( __METHOD__ )->execute();
236
237 $textId = $dbw->insertId();
238
239 return self::makeAddressFromTextId( $textId );
240 }
241 }
242
255 public function getBlob( $blobAddress, $queryFlags = 0 ) {
256 Assert::parameterType( 'string', $blobAddress, '$blobAddress' );
257
258 $error = null;
259 $blob = $this->cache->getWithSetCallback(
260 $this->getCacheKey( $blobAddress ),
261 $this->getCacheTTL(),
262 function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
263 // Ignore $setOpts; blobs are immutable and negatives are not cached
264 [ $result, $errors ] = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
265 // No negative caching; negative hits on text rows may be due to corrupted replica DBs
266 $error = $errors[$blobAddress] ?? null;
267 if ( $error ) {
268 $ttl = WANObjectCache::TTL_UNCACHEABLE;
269 }
270 return $result[$blobAddress];
271 },
272 $this->getCacheOptions()
273 );
274
275 if ( $error ) {
276 if ( $error[0] === 'badrevision' ) {
277 throw new BadBlobException( $error[1] );
278 } else {
279 throw new BlobAccessException( $error[1] );
280 }
281 }
282
283 Assert::postcondition( is_string( $blob ), 'Blob must not be null' );
284 return $blob;
285 }
286
298 public function getBlobBatch( $blobAddresses, $queryFlags = 0 ) {
299 // FIXME: All caching has temporarily been removed in I94c6f9ba7b9caeeb due to T235188.
300 // Caching behavior should be restored by reverting I94c6f9ba7b9caeeb as soon as
301 // the root cause of T235188 has been resolved.
302
303 [ $blobsByAddress, $errors ] = $this->fetchBlobs( $blobAddresses, $queryFlags );
304
305 $blobsByAddress = array_map( static function ( $blob ) {
306 return $blob === false ? null : $blob;
307 }, $blobsByAddress );
308
309 $result = StatusValue::newGood( $blobsByAddress );
310 foreach ( $errors as $error ) {
311 // @phan-suppress-next-line PhanParamTooFewUnpack
312 $result->warning( ...$error );
313 }
314 return $result;
315 }
316
331 private function fetchBlobs( $blobAddresses, $queryFlags ) {
332 $textIdToBlobAddress = [];
333 $result = [];
334 $errors = [];
335 foreach ( $blobAddresses as $blobAddress ) {
336 try {
337 [ $schema, $id, $params ] = self::splitBlobAddress( $blobAddress );
338 } catch ( InvalidArgumentException $ex ) {
339 throw new BlobAccessException(
340 $ex->getMessage() . '. Use findBadBlobs.php to remedy.',
341 0,
342 $ex
343 );
344 }
345
346 if ( $schema === 'es' ) {
347 if ( $params && isset( $params['flags'] ) ) {
348 $blob = $this->expandBlob( $id, $params['flags'] . ',external', $blobAddress );
349 } else {
350 $blob = $this->expandBlob( $id, 'external', $blobAddress );
351 }
352
353 if ( $blob === false ) {
354 $errors[$blobAddress] = [
355 'internalerror',
356 "Bad data in external store address $id. Use findBadBlobs.php to remedy."
357 ];
358 }
359 $result[$blobAddress] = $blob;
360 } elseif ( $schema === 'bad' ) {
361 // Database row was marked as "known bad"
362 wfDebug(
363 __METHOD__
364 . ": loading known-bad content ($blobAddress), returning empty string"
365 );
366 $result[$blobAddress] = '';
367 $errors[$blobAddress] = [
368 'badrevision',
369 'The content of this revision is missing or corrupted (bad schema)'
370 ];
371 } elseif ( $schema === 'tt' ) {
372 $textId = intval( $id );
373
374 if ( $textId < 1 || $id !== (string)$textId ) {
375 $errors[$blobAddress] = [
376 'internalerror',
377 "Bad blob address: $blobAddress. Use findBadBlobs.php to remedy."
378 ];
379 $result[$blobAddress] = false;
380 }
381
382 $textIdToBlobAddress[$textId] = $blobAddress;
383 } else {
384 $errors[$blobAddress] = [
385 'internalerror',
386 "Unknown blob address schema: $schema. Use findBadBlobs.php to remedy."
387 ];
388 $result[$blobAddress] = false;
389 }
390 }
391
392 $textIds = array_keys( $textIdToBlobAddress );
393 if ( !$textIds ) {
394 return [ $result, $errors ];
395 }
396 // Callers doing updates will pass in READ_LATEST as usual. Since the text/blob tables
397 // do not normally get rows changed around, set READ_LATEST_IMMUTABLE in those cases.
398 $queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, IDBAccessObject::READ_LATEST )
399 ? IDBAccessObject::READ_LATEST_IMMUTABLE
400 : 0;
401 [ $index, $options, $fallbackIndex, $fallbackOptions ] =
402 self::getDBOptions( $queryFlags );
403 // Text data is immutable; check replica DBs first.
404 $dbConnection = $this->getDBConnection( $index );
405 $rows = $dbConnection->newSelectQueryBuilder()
406 ->select( [ 'old_id', 'old_text', 'old_flags' ] )
407 ->from( 'text' )
408 ->where( [ 'old_id' => $textIds ] )
409 ->options( $options )
410 ->caller( __METHOD__ )->fetchResultSet();
411 $numRows = $rows->numRows();
412
413 // Fallback to DB_PRIMARY in some cases if not all the rows were found, using the appropriate
414 // options, such as FOR UPDATE to avoid missing rows due to REPEATABLE-READ.
415 if ( $numRows !== count( $textIds ) && $fallbackIndex !== null ) {
416 $fetchedTextIds = [];
417 foreach ( $rows as $row ) {
418 $fetchedTextIds[] = $row->old_id;
419 }
420 $missingTextIds = array_diff( $textIds, $fetchedTextIds );
421 $dbConnection = $this->getDBConnection( $fallbackIndex );
422 $rowsFromFallback = $dbConnection->newSelectQueryBuilder()
423 ->select( [ 'old_id', 'old_text', 'old_flags' ] )
424 ->from( 'text' )
425 ->where( [ 'old_id' => $missingTextIds ] )
426 ->options( $fallbackOptions )
427 ->caller( __METHOD__ )->fetchResultSet();
428 $appendIterator = new AppendIterator();
429 $appendIterator->append( $rows );
430 $appendIterator->append( $rowsFromFallback );
431 $rows = $appendIterator;
432 }
433
434 foreach ( $rows as $row ) {
435 $blobAddress = $textIdToBlobAddress[$row->old_id];
436 $blob = false;
437 if ( $row->old_text !== null ) {
438 $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
439 }
440 if ( $blob === false ) {
441 $errors[$blobAddress] = [
442 'internalerror',
443 "Bad data in text row {$row->old_id}. Use findBadBlobs.php to remedy."
444 ];
445 }
446 $result[$blobAddress] = $blob;
447 }
448
449 // If we're still missing some of the rows, set errors for missing blobs.
450 if ( count( $result ) !== count( $blobAddresses ) ) {
451 foreach ( $blobAddresses as $blobAddress ) {
452 if ( !isset( $result[$blobAddress ] ) ) {
453 $errors[$blobAddress] = [
454 'internalerror',
455 "Unable to fetch blob at $blobAddress. Use findBadBlobs.php to remedy."
456 ];
457 $result[$blobAddress] = false;
458 }
459 }
460 }
461 return [ $result, $errors ];
462 }
463
464 private static function getDBOptions( $bitfield ) {
465 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST_IMMUTABLE ) ) {
466 $index = DB_REPLICA; // override READ_LATEST if set
467 $fallbackIndex = DB_PRIMARY;
468 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST ) ) {
469 $index = DB_PRIMARY;
470 $fallbackIndex = null;
471 } else {
472 $index = DB_REPLICA;
473 $fallbackIndex = null;
474 }
475
476 $lockingOptions = [];
477 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_EXCLUSIVE ) ) {
478 $lockingOptions[] = 'FOR UPDATE';
479 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LOCKING ) ) {
480 $lockingOptions[] = 'LOCK IN SHARE MODE';
481 }
482
483 if ( $fallbackIndex !== null ) {
484 $options = []; // locks on DB_REPLICA make no sense
485 $fallbackOptions = $lockingOptions;
486 } else {
487 $options = $lockingOptions;
488 $fallbackOptions = []; // no fallback
489 }
490
491 return [ $index, $options, $fallbackIndex, $fallbackOptions ];
492 }
493
504 private function getCacheKey( $blobAddress ) {
505 return $this->cache->makeGlobalKey(
506 'SqlBlobStore-blob',
507 $this->dbLoadBalancer->resolveDomainID( $this->dbDomain ),
508 $blobAddress
509 );
510 }
511
517 private function getCacheOptions() {
518 return [
519 'pcGroup' => self::TEXT_CACHE_GROUP,
520 'pcTTL' => WANObjectCache::TTL_PROC_LONG,
521 'segmentable' => true
522 ];
523 }
524
545 public function expandBlob( $raw, $flags, $blobAddress = null ) {
546 if ( is_string( $flags ) ) {
547 $flags = self::explodeFlags( $flags );
548 }
549 if ( in_array( 'error', $flags ) ) {
550 throw new BadBlobException(
551 "The content of this revision is missing or corrupted (error flag)"
552 );
553 }
554
555 // Use external methods for external objects, text in table is URL-only then
556 if ( in_array( 'external', $flags ) ) {
557 $url = $raw;
558 $parts = explode( '://', $url, 2 );
559 if ( count( $parts ) == 1 || $parts[1] == '' ) {
560 return false;
561 }
562
563 if ( $blobAddress ) {
564 // The cached value should be decompressed, so handle that and return here.
565 return $this->cache->getWithSetCallback(
566 $this->getCacheKey( $blobAddress ),
567 $this->getCacheTTL(),
568 function () use ( $url, $flags ) {
569 // Ignore $setOpts; blobs are immutable and negatives are not cached
570 $blob = $this->extStoreAccess
571 ->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
572
573 return $blob === false ? false : $this->decompressData( $blob, $flags );
574 },
575 $this->getCacheOptions()
576 );
577 } else {
578 $blob = $this->extStoreAccess->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
579 return $blob === false ? false : $this->decompressData( $blob, $flags );
580 }
581 } else {
582 return $this->decompressData( $raw, $flags );
583 }
584 }
585
602 public function compressData( &$blob ) {
603 $blobFlags = [];
604
605 // Revisions not marked as UTF-8 will have legacy decoding applied by decompressData().
606 // XXX: if $this->legacyEncoding is not set, we could skip this. That would however be
607 // risky, since $this->legacyEncoding being set in the future would lead to data corruption.
608 $blobFlags[] = 'utf-8';
609
610 if ( $this->compressBlobs ) {
611 if ( function_exists( 'gzdeflate' ) ) {
612 $deflated = gzdeflate( $blob );
613
614 if ( $deflated === false ) {
615 wfLogWarning( __METHOD__ . ': gzdeflate() failed' );
616 } else {
617 $blob = $deflated;
618 $blobFlags[] = 'gzip';
619 }
620 } else {
621 wfDebug( __METHOD__ . " -- no zlib support, not compressing" );
622 }
623 }
624 return implode( ',', $blobFlags );
625 }
626
642 public function decompressData( string $blob, array $blobFlags ) {
643 if ( in_array( 'error', $blobFlags ) ) {
644 // Error row, return false
645 return false;
646 }
647
648 if ( in_array( 'gzip', $blobFlags ) ) {
649 # Deal with optional compression of archived pages.
650 # This can be done periodically via maintenance/compressOld.php, and
651 # as pages are saved if $wgCompressRevisions is set.
652 $blob = gzinflate( $blob );
653
654 if ( $blob === false ) {
655 wfWarn( __METHOD__ . ': gzinflate() failed' );
656 return false;
657 }
658 }
659
660 if ( in_array( 'object', $blobFlags ) ) {
661 # Generic compressed storage
662 $obj = HistoryBlobUtils::unserialize( $blob );
663 if ( !$obj ) {
664 // Invalid object
665 return false;
666 }
667 $blob = $obj->getText();
668 }
669
670 // Needed to support old revisions from before MW 1.5.
671 if ( $blob !== false && $this->legacyEncoding
672 && !in_array( 'utf-8', $blobFlags ) && !in_array( 'utf8', $blobFlags )
673 ) {
674 # Old revisions kept around in a legacy encoding?
675 # Upconvert on demand.
676 # ("utf8" checked for compatibility with some broken
677 # conversion scripts 2008-12-30)
678 # Even with //IGNORE iconv can whine about illegal characters in
679 # *input* string. We just ignore those too.
680 # REF: https://bugs.php.net/bug.php?id=37166
681 # REF: https://phabricator.wikimedia.org/T18885
682 AtEase::suppressWarnings();
683 $blob = iconv( $this->legacyEncoding, 'UTF-8//IGNORE', $blob );
684 AtEase::restoreWarnings();
685 }
686
687 return $blob;
688 }
689
697 private function getCacheTTL() {
698 $cache = $this->cache;
699
700 if ( $cache->getQoS( $cache::ATTR_DURABILITY ) >= $cache::QOS_DURABILITY_RDBMS ) {
701 // Do not cache RDBMs blobs in...the RDBMs store
702 $ttl = $cache::TTL_UNCACHEABLE;
703 } else {
704 $ttl = $this->cacheExpiry ?: $cache::TTL_UNCACHEABLE;
705 }
706
707 return $ttl;
708 }
709
730 public function getTextIdFromAddress( $address ) {
731 [ $schema, $id, ] = self::splitBlobAddress( $address );
732
733 if ( $schema !== 'tt' ) {
734 return null;
735 }
736
737 $textId = intval( $id );
738
739 if ( !$textId || $id !== (string)$textId ) {
740 throw new InvalidArgumentException( "Malformed text_id: $id" );
741 }
742
743 return $textId;
744 }
745
759 public static function makeAddressFromTextId( $id ) {
760 return 'tt:' . $id;
761 }
762
769 public static function explodeFlags( string $flagsString ) {
770 return $flagsString === '' ? [] : explode( ',', $flagsString );
771 }
772
783 public static function splitBlobAddress( $address ) {
784 if ( !preg_match( '/^([-+.\w]+):([^\s?]+)(\?([^\s]*))?$/', $address, $m ) ) {
785 throw new InvalidArgumentException( "Bad blob address: $address" );
786 }
787
788 $schema = strtolower( $m[1] );
789 $id = $m[2];
790 $parameters = wfCgiToArray( $m[4] ?? '' );
791
792 return [ $schema, $id, $parameters ];
793 }
794
795 public function isReadOnly() {
796 if ( $this->useExternalStore && $this->extStoreAccess->isReadOnly() ) {
797 return true;
798 }
799
800 return ( $this->getDBLoadBalancer()->getReadOnlyReason() !== false );
801 }
802}
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfWarn( $msg, $callerOffset=1, $level=E_USER_NOTICE)
Send a warning either to the debug log or in a PHP error depending on $wgDevelopmentWarnings.
wfLogWarning( $msg, $callerOffset=1, $level=E_USER_WARNING)
Send a warning as a PHP error and the debug log.
wfCgiToArray( $query)
This is the logical opposite of wfArrayToCgi(): it accepts a query string as its argument and returns...
array $params
The job parameters.
getCacheKey()
Get the cache key used to store status.
Helper class for DAO classes.
static hasFlags( $bitfield, $flags)
This is the main interface for fetching or inserting objects with ExternalStore.
static unserialize(string $str, bool $allowDouble=false)
Unserialize a HistoryBlob.
Exception thrown when a blob has the "bad" content address schema, or has "error" in its old_flags,...
Exception representing a failure to access a data blob.
Service for storing and loading Content objects representing revision data blobs.
static makeAddressFromTextId( $id)
Returns an address referring to content stored in the text table row with the given ID.
getTextIdFromAddress( $address)
Returns an ID corresponding to the old_id field in the text table, corresponding to the given $addres...
__construct(ILoadBalancer $dbLoadBalancer, ExternalStoreAccess $extStoreAccess, WANObjectCache $cache, $dbDomain=false)
decompressData(string $blob, array $blobFlags)
Re-converts revision text according to its flags.
getBlob( $blobAddress, $queryFlags=0)
Retrieve a blob, given an address.
setLegacyEncoding(string $legacyEncoding)
Set the legacy encoding to assume for blobs that do not have the utf-8 flag set.
compressData(&$blob)
If $wgCompressRevisions is enabled, we will compress data.
static splitBlobAddress( $address)
Splits a blob address into three parts: the schema, the ID, and parameters/flags.
getBlobBatch( $blobAddresses, $queryFlags=0)
A batched version of BlobStore::getBlob.
storeBlob( $data, $hints=[])
Stores an arbitrary blob of data and returns an address that can be used with getBlob() to retrieve t...
setUseExternalStore(bool $useExternalStore)
isReadOnly()
Check if the blob metadata or backing blob data store is read-only.
expandBlob( $raw, $flags, $blobAddress=null)
Expand a raw data blob according to the flags given.
static explodeFlags(string $flagsString)
Split a comma-separated old_flags value into its constituent parts.
Generic operation result class Has warning/error list, boolean status and arbitrary value.
Multi-datacenter aware caching interface.
Interface for database access objects.
Service for loading and storing data blobs.
Definition BlobStore.php:33
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
This class is a delegate to ILBFactory for a given database cluster.
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28