MediaWiki master
SqlBlobStore.php
Go to the documentation of this file.
1<?php
26namespace MediaWiki\Storage;
27
28use AppendIterator;
32use InvalidArgumentException;
33use StatusValue;
34use Wikimedia\Assert\Assert;
40
49class SqlBlobStore implements BlobStore {
50
51 // Note: the name has been taken unchanged from the old Revision class.
52 public const TEXT_CACHE_GROUP = 'revisiontext:10';
53
55 public const DEFAULT_TTL = 7 * 24 * 3600; // 7 days
56
60 private $dbLoadBalancer;
61
65 private $extStoreAccess;
66
70 private $cache;
71
75 private $dbDomain;
76
80 private $cacheExpiry = self::DEFAULT_TTL;
81
85 private $compressBlobs = false;
86
90 private $legacyEncoding = false;
91
95 private $useExternalStore = false;
96
108 public function __construct(
109 ILoadBalancer $dbLoadBalancer,
110 ExternalStoreAccess $extStoreAccess,
111 WANObjectCache $cache,
112 $dbDomain = false
113 ) {
114 $this->dbLoadBalancer = $dbLoadBalancer;
115 $this->extStoreAccess = $extStoreAccess;
116 $this->cache = $cache;
117 $this->dbDomain = $dbDomain;
118 }
119
123 public function getCacheExpiry() {
124 return $this->cacheExpiry;
125 }
126
130 public function setCacheExpiry( int $cacheExpiry ) {
131 $this->cacheExpiry = $cacheExpiry;
132 }
133
137 public function getCompressBlobs() {
138 return $this->compressBlobs;
139 }
140
144 public function setCompressBlobs( $compressBlobs ) {
145 $this->compressBlobs = $compressBlobs;
146 }
147
152 public function getLegacyEncoding() {
153 return $this->legacyEncoding;
154 }
155
164 public function setLegacyEncoding( string $legacyEncoding ) {
165 $this->legacyEncoding = $legacyEncoding;
166 }
167
171 public function getUseExternalStore() {
172 return $this->useExternalStore;
173 }
174
178 public function setUseExternalStore( bool $useExternalStore ) {
179 $this->useExternalStore = $useExternalStore;
180 }
181
185 private function getDBLoadBalancer() {
186 return $this->dbLoadBalancer;
187 }
188
194 private function getDBConnection( $index ) {
195 $lb = $this->getDBLoadBalancer();
196 return $lb->getConnection( $index, [], $this->dbDomain );
197 }
198
209 public function storeBlob( $data, $hints = [] ) {
210 $flags = $this->compressData( $data );
211
212 # Write to external storage if required
213 if ( $this->useExternalStore ) {
214 // Store and get the URL
215 try {
216 $data = $this->extStoreAccess->insert( $data, [ 'domain' => $this->dbDomain ] );
217 } catch ( ExternalStoreException $e ) {
218 throw new BlobAccessException( $e->getMessage(), 0, $e );
219 }
220 if ( !$data ) {
221 throw new BlobAccessException( "Failed to store text to external storage" );
222 }
223 if ( $flags ) {
224 return 'es:' . $data . '?flags=' . $flags;
225 } else {
226 return 'es:' . $data;
227 }
228 } else {
229 $dbw = $this->getDBConnection( DB_PRIMARY );
230
231 $dbw->newInsertQueryBuilder()
232 ->insertInto( 'text' )
233 ->row( [ 'old_text' => $data, 'old_flags' => $flags ] )
234 ->caller( __METHOD__ )->execute();
235
236 $textId = $dbw->insertId();
237
238 return self::makeAddressFromTextId( $textId );
239 }
240 }
241
254 public function getBlob( $blobAddress, $queryFlags = 0 ) {
255 Assert::parameterType( 'string', $blobAddress, '$blobAddress' );
256
257 $error = null;
258 $blob = $this->cache->getWithSetCallback(
259 $this->getCacheKey( $blobAddress ),
260 $this->getCacheTTL(),
261 function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
262 // Ignore $setOpts; blobs are immutable and negatives are not cached
263 [ $result, $errors ] = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
264 // No negative caching; negative hits on text rows may be due to corrupted replica DBs
265 $error = $errors[$blobAddress] ?? null;
266 if ( $error ) {
267 $ttl = WANObjectCache::TTL_UNCACHEABLE;
268 }
269 return $result[$blobAddress];
270 },
271 $this->getCacheOptions()
272 );
273
274 if ( $error ) {
275 if ( $error[0] === 'badrevision' ) {
276 throw new BadBlobException( $error[1] );
277 } else {
278 throw new BlobAccessException( $error[1] );
279 }
280 }
281
282 Assert::postcondition( is_string( $blob ), 'Blob must not be null' );
283 return $blob;
284 }
285
297 public function getBlobBatch( $blobAddresses, $queryFlags = 0 ) {
298 // FIXME: All caching has temporarily been removed in I94c6f9ba7b9caeeb due to T235188.
299 // Caching behavior should be restored by reverting I94c6f9ba7b9caeeb as soon as
300 // the root cause of T235188 has been resolved.
301
302 [ $blobsByAddress, $errors ] = $this->fetchBlobs( $blobAddresses, $queryFlags );
303
304 $blobsByAddress = array_map( static function ( $blob ) {
305 return $blob === false ? null : $blob;
306 }, $blobsByAddress );
307
308 $result = StatusValue::newGood( $blobsByAddress );
309 foreach ( $errors as $error ) {
310 // @phan-suppress-next-line PhanParamTooFewUnpack
311 $result->warning( ...$error );
312 }
313 return $result;
314 }
315
330 private function fetchBlobs( $blobAddresses, $queryFlags ) {
331 $textIdToBlobAddress = [];
332 $result = [];
333 $errors = [];
334 foreach ( $blobAddresses as $blobAddress ) {
335 try {
336 [ $schema, $id, $params ] = self::splitBlobAddress( $blobAddress );
337 } catch ( InvalidArgumentException $ex ) {
338 throw new BlobAccessException(
339 $ex->getMessage() . '. Use findBadBlobs.php to remedy.',
340 0,
341 $ex
342 );
343 }
344
345 if ( $schema === 'es' ) {
346 if ( $params && isset( $params['flags'] ) ) {
347 $blob = $this->expandBlob( $id, $params['flags'] . ',external', $blobAddress );
348 } else {
349 $blob = $this->expandBlob( $id, 'external', $blobAddress );
350 }
351
352 if ( $blob === false ) {
353 $errors[$blobAddress] = [
354 'internalerror',
355 "Bad data in external store address $id. Use findBadBlobs.php to remedy."
356 ];
357 }
358 $result[$blobAddress] = $blob;
359 } elseif ( $schema === 'bad' ) {
360 // Database row was marked as "known bad"
361 wfDebug(
362 __METHOD__
363 . ": loading known-bad content ($blobAddress), returning empty string"
364 );
365 $result[$blobAddress] = '';
366 $errors[$blobAddress] = [
367 'badrevision',
368 'The content of this revision is missing or corrupted (bad schema)'
369 ];
370 } elseif ( $schema === 'tt' ) {
371 $textId = intval( $id );
372
373 if ( $textId < 1 || $id !== (string)$textId ) {
374 $errors[$blobAddress] = [
375 'internalerror',
376 "Bad blob address: $blobAddress. Use findBadBlobs.php to remedy."
377 ];
378 $result[$blobAddress] = false;
379 }
380
381 $textIdToBlobAddress[$textId] = $blobAddress;
382 } else {
383 $errors[$blobAddress] = [
384 'internalerror',
385 "Unknown blob address schema: $schema. Use findBadBlobs.php to remedy."
386 ];
387 $result[$blobAddress] = false;
388 }
389 }
390
391 $textIds = array_keys( $textIdToBlobAddress );
392 if ( !$textIds ) {
393 return [ $result, $errors ];
394 }
395 // Callers doing updates will pass in READ_LATEST as usual. Since the text/blob tables
396 // do not normally get rows changed around, set READ_LATEST_IMMUTABLE in those cases.
397 $queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, IDBAccessObject::READ_LATEST )
398 ? IDBAccessObject::READ_LATEST_IMMUTABLE
399 : 0;
400 [ $index, $options, $fallbackIndex, $fallbackOptions ] =
401 self::getDBOptions( $queryFlags );
402 // Text data is immutable; check replica DBs first.
403 $dbConnection = $this->getDBConnection( $index );
404 $rows = $dbConnection->newSelectQueryBuilder()
405 ->select( [ 'old_id', 'old_text', 'old_flags' ] )
406 ->from( 'text' )
407 ->where( [ 'old_id' => $textIds ] )
408 ->options( $options )
409 ->caller( __METHOD__ )->fetchResultSet();
410 $numRows = $rows->numRows();
411
412 // Fallback to DB_PRIMARY in some cases if not all the rows were found, using the appropriate
413 // options, such as FOR UPDATE to avoid missing rows due to REPEATABLE-READ.
414 if ( $numRows !== count( $textIds ) && $fallbackIndex !== null ) {
415 $fetchedTextIds = [];
416 foreach ( $rows as $row ) {
417 $fetchedTextIds[] = $row->old_id;
418 }
419 $missingTextIds = array_diff( $textIds, $fetchedTextIds );
420 $dbConnection = $this->getDBConnection( $fallbackIndex );
421 $rowsFromFallback = $dbConnection->newSelectQueryBuilder()
422 ->select( [ 'old_id', 'old_text', 'old_flags' ] )
423 ->from( 'text' )
424 ->where( [ 'old_id' => $missingTextIds ] )
425 ->options( $fallbackOptions )
426 ->caller( __METHOD__ )->fetchResultSet();
427 $appendIterator = new AppendIterator();
428 $appendIterator->append( $rows );
429 $appendIterator->append( $rowsFromFallback );
430 $rows = $appendIterator;
431 }
432
433 foreach ( $rows as $row ) {
434 $blobAddress = $textIdToBlobAddress[$row->old_id];
435 $blob = false;
436 if ( $row->old_text !== null ) {
437 $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
438 }
439 if ( $blob === false ) {
440 $errors[$blobAddress] = [
441 'internalerror',
442 "Bad data in text row {$row->old_id}. Use findBadBlobs.php to remedy."
443 ];
444 }
445 $result[$blobAddress] = $blob;
446 }
447
448 // If we're still missing some of the rows, set errors for missing blobs.
449 if ( count( $result ) !== count( $blobAddresses ) ) {
450 foreach ( $blobAddresses as $blobAddress ) {
451 if ( !isset( $result[$blobAddress ] ) ) {
452 $errors[$blobAddress] = [
453 'internalerror',
454 "Unable to fetch blob at $blobAddress. Use findBadBlobs.php to remedy."
455 ];
456 $result[$blobAddress] = false;
457 }
458 }
459 }
460 return [ $result, $errors ];
461 }
462
463 private static function getDBOptions( $bitfield ) {
464 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST_IMMUTABLE ) ) {
465 $index = DB_REPLICA; // override READ_LATEST if set
466 $fallbackIndex = DB_PRIMARY;
467 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST ) ) {
468 $index = DB_PRIMARY;
469 $fallbackIndex = null;
470 } else {
471 $index = DB_REPLICA;
472 $fallbackIndex = null;
473 }
474
475 $lockingOptions = [];
476 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_EXCLUSIVE ) ) {
477 $lockingOptions[] = 'FOR UPDATE';
478 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LOCKING ) ) {
479 $lockingOptions[] = 'LOCK IN SHARE MODE';
480 }
481
482 if ( $fallbackIndex !== null ) {
483 $options = []; // locks on DB_REPLICA make no sense
484 $fallbackOptions = $lockingOptions;
485 } else {
486 $options = $lockingOptions;
487 $fallbackOptions = []; // no fallback
488 }
489
490 return [ $index, $options, $fallbackIndex, $fallbackOptions ];
491 }
492
503 private function getCacheKey( $blobAddress ) {
504 return $this->cache->makeGlobalKey(
505 'SqlBlobStore-blob',
506 $this->dbLoadBalancer->resolveDomainID( $this->dbDomain ),
507 $blobAddress
508 );
509 }
510
516 private function getCacheOptions() {
517 return [
518 'pcGroup' => self::TEXT_CACHE_GROUP,
519 'pcTTL' => WANObjectCache::TTL_PROC_LONG,
520 'segmentable' => true
521 ];
522 }
523
544 public function expandBlob( $raw, $flags, $blobAddress = null ) {
545 if ( is_string( $flags ) ) {
546 $flags = self::explodeFlags( $flags );
547 }
548 if ( in_array( 'error', $flags ) ) {
549 throw new BadBlobException(
550 "The content of this revision is missing or corrupted (error flag)"
551 );
552 }
553
554 // Use external methods for external objects, text in table is URL-only then
555 if ( in_array( 'external', $flags ) ) {
556 $url = $raw;
557 $parts = explode( '://', $url, 2 );
558 if ( count( $parts ) == 1 || $parts[1] == '' ) {
559 return false;
560 }
561
562 if ( $blobAddress ) {
563 // The cached value should be decompressed, so handle that and return here.
564 return $this->cache->getWithSetCallback(
565 $this->getCacheKey( $blobAddress ),
566 $this->getCacheTTL(),
567 function () use ( $url, $flags, $blobAddress ) {
568 // Ignore $setOpts; blobs are immutable and negatives are not cached
569 $blob = $this->extStoreAccess
570 ->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
571
572 return $blob === false ? false : $this->decompressData( $blob, $flags, $blobAddress );
573 },
574 $this->getCacheOptions()
575 );
576 } else {
577 $blob = $this->extStoreAccess->fetchFromURL( $url, [ 'domain' => $this->dbDomain ] );
578 return $blob === false ? false : $this->decompressData( $blob, $flags, $blobAddress );
579 }
580 } else {
581 return $this->decompressData( $raw, $flags, $blobAddress );
582 }
583 }
584
601 public function compressData( &$blob ) {
602 $blobFlags = [];
603
604 // Revisions not marked as UTF-8 will have legacy decoding applied by decompressData().
605 // XXX: if $this->legacyEncoding is not set, we could skip this. That would however be
606 // risky, since $this->legacyEncoding being set in the future would lead to data corruption.
607 $blobFlags[] = 'utf-8';
608
609 if ( $this->compressBlobs ) {
610 if ( function_exists( 'gzdeflate' ) ) {
611 $deflated = gzdeflate( $blob );
612
613 if ( $deflated === false ) {
614 wfLogWarning( __METHOD__ . ': gzdeflate() failed' );
615 } else {
616 $blob = $deflated;
617 $blobFlags[] = 'gzip';
618 }
619 } else {
620 wfDebug( __METHOD__ . " -- no zlib support, not compressing" );
621 }
622 }
623 return implode( ',', $blobFlags );
624 }
625
642 public function decompressData( string $blob, array $blobFlags, ?string $blobAddress = null ) {
643 if ( in_array( 'error', $blobFlags ) ) {
644 // Error row, return false
645 return false;
646 }
647
648 // Deal with optional compression of archived pages.
649 // This can be done periodically via maintenance/compressOld.php, and
650 // as pages are saved if $wgCompressRevisions is set.
651 if ( in_array( 'gzip', $blobFlags ) ) {
652 // Silence native warning in favour of more detailed warning (T380347)
653 // phpcs:ignore Generic.PHP.NoSilencedErrors.Discouraged
654 $blob = @gzinflate( $blob );
655 if ( $blob === false ) {
656 wfWarn( __METHOD__ . ': gzinflate() failed' .
657 ( $blobAddress ? ' (at blob address ' . $blobAddress . ')' : '' ) );
658 return false;
659 }
660 }
661
662 if ( in_array( 'object', $blobFlags ) ) {
663 # Generic compressed storage
664 $obj = HistoryBlobUtils::unserialize( $blob );
665 if ( !$obj ) {
666 // Invalid object
667 return false;
668 }
669 $blob = $obj->getText();
670 }
671
672 // Needed to support old revisions from before MW 1.5.
673 if ( $blob !== false && $this->legacyEncoding
674 && !in_array( 'utf-8', $blobFlags ) && !in_array( 'utf8', $blobFlags )
675 ) {
676 // - Old revisions kept around in a legacy encoding?
677 // Upconvert on demand.
678 // - "utf8" checked for compatibility with some broken
679 // conversion scripts 2008-12-30.
680 // - Even with "//IGNORE" iconv can whine about illegal characters in
681 // *input* string. We just ignore those too.
682 // Ref https://bugs.php.net/bug.php?id=37166
683 // Ref https://phabricator.wikimedia.org/T18885
684 //
685 // phpcs:ignore Generic.PHP.NoSilencedErrors.Discouraged
686 $blob = @iconv( $this->legacyEncoding, 'UTF-8//IGNORE', $blob );
687 }
688
689 return $blob;
690 }
691
699 private function getCacheTTL() {
700 $cache = $this->cache;
701
702 if ( $cache->getQoS( $cache::ATTR_DURABILITY ) >= $cache::QOS_DURABILITY_RDBMS ) {
703 // Do not cache RDBMs blobs in...the RDBMs store
704 $ttl = $cache::TTL_UNCACHEABLE;
705 } else {
706 $ttl = $this->cacheExpiry ?: $cache::TTL_UNCACHEABLE;
707 }
708
709 return $ttl;
710 }
711
732 public function getTextIdFromAddress( $address ) {
733 [ $schema, $id, ] = self::splitBlobAddress( $address );
734
735 if ( $schema !== 'tt' ) {
736 return null;
737 }
738
739 $textId = intval( $id );
740
741 if ( !$textId || $id !== (string)$textId ) {
742 throw new InvalidArgumentException( "Malformed text_id: $id" );
743 }
744
745 return $textId;
746 }
747
761 public static function makeAddressFromTextId( $id ) {
762 return 'tt:' . $id;
763 }
764
771 public static function explodeFlags( string $flagsString ) {
772 return $flagsString === '' ? [] : explode( ',', $flagsString );
773 }
774
784 public static function splitBlobAddress( $address ) {
785 if ( !preg_match( '/^([-+.\w]+):([^\s?]+)(\?([^\s]*))?$/', $address, $m ) ) {
786 throw new InvalidArgumentException( "Bad blob address: $address" );
787 }
788
789 $schema = strtolower( $m[1] );
790 $id = $m[2];
791 $parameters = wfCgiToArray( $m[4] ?? '' );
792
793 return [ $schema, $id, $parameters ];
794 }
795
796 public function isReadOnly() {
797 if ( $this->useExternalStore && $this->extStoreAccess->isReadOnly() ) {
798 return true;
799 }
800
801 return ( $this->getDBLoadBalancer()->getReadOnlyReason() !== false );
802 }
803}
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfWarn( $msg, $callerOffset=1, $level=E_USER_NOTICE)
Send a warning either to the debug log or in a PHP error depending on $wgDevelopmentWarnings.
wfLogWarning( $msg, $callerOffset=1, $level=E_USER_WARNING)
Send a warning as a PHP error and the debug log.
wfCgiToArray( $query)
This is the logical opposite of wfArrayToCgi(): it accepts a query string as its argument and returns...
array $params
The job parameters.
getCacheKey()
Get the cache key used to store status.
This is the main interface for fetching or inserting objects with ExternalStore.
static unserialize(string $str, bool $allowDouble=false)
Unserialize a HistoryBlob.
Exception thrown when a blob has the "bad" content address schema, or has "error" in its old_flags,...
Exception representing a failure to access a data blob.
Service for storing and loading Content objects representing revision data blobs.
static makeAddressFromTextId( $id)
Returns an address referring to content stored in the text table row with the given ID.
decompressData(string $blob, array $blobFlags, ?string $blobAddress=null)
Re-converts revision text according to its flags.
getTextIdFromAddress( $address)
Returns an ID corresponding to the old_id field in the text table, corresponding to the given $addres...
__construct(ILoadBalancer $dbLoadBalancer, ExternalStoreAccess $extStoreAccess, WANObjectCache $cache, $dbDomain=false)
getBlob( $blobAddress, $queryFlags=0)
Retrieve a blob, given an address.
setLegacyEncoding(string $legacyEncoding)
Set the legacy encoding to assume for blobs that do not have the utf-8 flag set.
compressData(&$blob)
If $wgCompressRevisions is enabled, we will compress data.
static splitBlobAddress( $address)
Splits a blob address into three parts: the schema, the ID, and parameters/flags.
getBlobBatch( $blobAddresses, $queryFlags=0)
A batched version of BlobStore::getBlob.
storeBlob( $data, $hints=[])
Stores an arbitrary blob of data and returns an address that can be used with getBlob() to retrieve t...
setUseExternalStore(bool $useExternalStore)
isReadOnly()
Check if the blob metadata or backing blob data store is read-only.
expandBlob( $raw, $flags, $blobAddress=null)
Expand a raw data blob according to the flags given.
static explodeFlags(string $flagsString)
Split a comma-separated old_flags value into its constituent parts.
Generic operation result class Has warning/error list, boolean status and arbitrary value.
Multi-datacenter aware caching interface.
Service for loading and storing data blobs.
Definition BlobStore.php:33
Interface for database access objects.
Interface to a relational database.
Definition IDatabase.php:45
This class is a delegate to ILBFactory for a given database cluster.
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28