61 private $dbLoadBalancer;
66 private $extStoreAccess;
86 private $compressBlobs =
false;
91 private $legacyEncoding =
false;
96 private $useExternalStore =
false;
115 $this->dbLoadBalancer = $dbLoadBalancer;
116 $this->extStoreAccess = $extStoreAccess;
117 $this->cache = $cache;
118 $this->dbDomain = $dbDomain;
125 return $this->cacheExpiry;
132 $this->cacheExpiry = $cacheExpiry;
139 return $this->compressBlobs;
146 $this->compressBlobs = $compressBlobs;
154 return $this->legacyEncoding;
166 $this->legacyEncoding = $legacyEncoding;
173 return $this->useExternalStore;
180 $this->useExternalStore = $useExternalStore;
186 private function getDBLoadBalancer() {
187 return $this->dbLoadBalancer;
195 private function getDBConnection( $index ) {
196 $lb = $this->getDBLoadBalancer();
197 return $lb->getConnection( $index, [], $this->dbDomain );
213 # Write to external storage if required
214 if ( $this->useExternalStore ) {
217 $data = $this->extStoreAccess->insert( $data, [
'domain' => $this->dbDomain ] );
225 return 'es:' . $data .
'?flags=' . $flags;
227 return 'es:' . $data;
232 $dbw->newInsertQueryBuilder()
233 ->insertInto(
'text' )
234 ->row( [
'old_text' => $data,
'old_flags' => $flags ] )
235 ->caller( __METHOD__ )->execute();
237 $textId = $dbw->insertId();
255 public function getBlob( $blobAddress, $queryFlags = 0 ) {
256 Assert::parameterType(
'string', $blobAddress,
'$blobAddress' );
259 $blob = $this->cache->getWithSetCallback(
261 $this->getCacheTTL(),
262 function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
264 [ $result, $errors ] = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
266 $error = $errors[$blobAddress] ??
null;
268 $ttl = WANObjectCache::TTL_UNCACHEABLE;
270 return $result[$blobAddress];
272 $this->getCacheOptions()
276 if ( $error[0] ===
'badrevision' ) {
283 Assert::postcondition( is_string( $blob ),
'Blob must not be null' );
303 [ $blobsByAddress, $errors ] = $this->fetchBlobs( $blobAddresses, $queryFlags );
305 $blobsByAddress = array_map(
static function ( $blob ) {
306 return $blob ===
false ? null : $blob;
307 }, $blobsByAddress );
309 $result = StatusValue::newGood( $blobsByAddress );
310 foreach ( $errors as $error ) {
312 $result->warning( ...$error );
331 private function fetchBlobs( $blobAddresses, $queryFlags ) {
332 $textIdToBlobAddress = [];
335 foreach ( $blobAddresses as $blobAddress ) {
338 }
catch ( InvalidArgumentException $ex ) {
339 throw new BlobAccessException(
340 $ex->getMessage() .
'. Use findBadBlobs.php to remedy.',
346 if ( $schema ===
'es' ) {
350 $blob = $this->
expandBlob( $id,
'external', $blobAddress );
353 if ( $blob ===
false ) {
354 $errors[$blobAddress] = [
356 "Bad data in external store address $id. Use findBadBlobs.php to remedy."
359 $result[$blobAddress] = $blob;
360 } elseif ( $schema ===
'bad' ) {
364 .
": loading known-bad content ($blobAddress), returning empty string"
366 $result[$blobAddress] =
'';
367 $errors[$blobAddress] = [
369 'The content of this revision is missing or corrupted (bad schema)'
371 } elseif ( $schema ===
'tt' ) {
372 $textId = intval( $id );
374 if ( $textId < 1 || $id !== (
string)$textId ) {
375 $errors[$blobAddress] = [
377 "Bad blob address: $blobAddress. Use findBadBlobs.php to remedy."
379 $result[$blobAddress] =
false;
382 $textIdToBlobAddress[$textId] = $blobAddress;
384 $errors[$blobAddress] = [
386 "Unknown blob address schema: $schema. Use findBadBlobs.php to remedy."
388 $result[$blobAddress] =
false;
392 $textIds = array_keys( $textIdToBlobAddress );
394 return [ $result, $errors ];
398 $queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, IDBAccessObject::READ_LATEST )
399 ? IDBAccessObject::READ_LATEST_IMMUTABLE
401 [ $index, $options, $fallbackIndex, $fallbackOptions ] =
402 self::getDBOptions( $queryFlags );
404 $dbConnection = $this->getDBConnection( $index );
405 $rows = $dbConnection->newSelectQueryBuilder()
406 ->select( [
'old_id',
'old_text',
'old_flags' ] )
408 ->where( [
'old_id' => $textIds ] )
409 ->options( $options )
410 ->caller( __METHOD__ )->fetchResultSet();
411 $numRows = $rows->numRows();
415 if ( $numRows !== count( $textIds ) && $fallbackIndex !==
null ) {
416 $fetchedTextIds = [];
417 foreach ( $rows as $row ) {
418 $fetchedTextIds[] = $row->old_id;
420 $missingTextIds = array_diff( $textIds, $fetchedTextIds );
421 $dbConnection = $this->getDBConnection( $fallbackIndex );
422 $rowsFromFallback = $dbConnection->newSelectQueryBuilder()
423 ->select( [
'old_id',
'old_text',
'old_flags' ] )
425 ->where( [
'old_id' => $missingTextIds ] )
426 ->options( $fallbackOptions )
427 ->caller( __METHOD__ )->fetchResultSet();
428 $appendIterator =
new AppendIterator();
429 $appendIterator->append( $rows );
430 $appendIterator->append( $rowsFromFallback );
431 $rows = $appendIterator;
434 foreach ( $rows as $row ) {
435 $blobAddress = $textIdToBlobAddress[$row->old_id];
437 if ( $row->old_text !==
null ) {
438 $blob = $this->
expandBlob( $row->old_text, $row->old_flags, $blobAddress );
440 if ( $blob ===
false ) {
441 $errors[$blobAddress] = [
443 "Bad data in text row {$row->old_id}. Use findBadBlobs.php to remedy."
446 $result[$blobAddress] = $blob;
450 if ( count( $result ) !== count( $blobAddresses ) ) {
451 foreach ( $blobAddresses as $blobAddress ) {
452 if ( !isset( $result[$blobAddress ] ) ) {
453 $errors[$blobAddress] = [
455 "Unable to fetch blob at $blobAddress. Use findBadBlobs.php to remedy."
457 $result[$blobAddress] =
false;
461 return [ $result, $errors ];
464 private static function getDBOptions( $bitfield ) {
465 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST_IMMUTABLE ) ) {
468 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LATEST ) ) {
470 $fallbackIndex =
null;
473 $fallbackIndex =
null;
476 $lockingOptions = [];
477 if ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_EXCLUSIVE ) ) {
478 $lockingOptions[] =
'FOR UPDATE';
479 } elseif ( DBAccessObjectUtils::hasFlags( $bitfield, IDBAccessObject::READ_LOCKING ) ) {
480 $lockingOptions[] =
'LOCK IN SHARE MODE';
483 if ( $fallbackIndex !==
null ) {
485 $fallbackOptions = $lockingOptions;
487 $options = $lockingOptions;
488 $fallbackOptions = [];
491 return [ $index, $options, $fallbackIndex, $fallbackOptions ];
505 return $this->cache->makeGlobalKey(
507 $this->dbLoadBalancer->resolveDomainID( $this->dbDomain ),
517 private function getCacheOptions() {
520 'pcTTL' => WANObjectCache::TTL_PROC_LONG,
521 'segmentable' => true
545 public function expandBlob( $raw, $flags, $blobAddress =
null ) {
546 if ( is_string( $flags ) ) {
549 if ( in_array(
'error', $flags ) ) {
551 "The content of this revision is missing or corrupted (error flag)"
556 if ( in_array(
'external', $flags ) ) {
558 $parts = explode(
'://',
$url, 2 );
559 if ( count( $parts ) == 1 || $parts[1] ==
'' ) {
563 if ( $blobAddress ) {
565 return $this->cache->getWithSetCallback(
567 $this->getCacheTTL(),
568 function () use (
$url, $flags, $blobAddress ) {
570 $blob = $this->extStoreAccess
571 ->fetchFromURL(
$url, [
'domain' => $this->dbDomain ] );
573 return $blob ===
false ? false : $this->
decompressData( $blob, $flags, $blobAddress );
575 $this->getCacheOptions()
578 $blob = $this->extStoreAccess->fetchFromURL(
$url, [
'domain' => $this->dbDomain ] );
579 return $blob ===
false ? false : $this->
decompressData( $blob, $flags, $blobAddress );
608 $blobFlags[] =
'utf-8';
610 if ( $this->compressBlobs ) {
611 if ( function_exists(
'gzdeflate' ) ) {
612 $deflated = gzdeflate( $blob );
614 if ( $deflated ===
false ) {
618 $blobFlags[] =
'gzip';
621 wfDebug( __METHOD__ .
" -- no zlib support, not compressing" );
624 return implode(
',', $blobFlags );
643 public function decompressData(
string $blob, array $blobFlags, ?
string $blobAddress =
null ) {
644 if ( in_array(
'error', $blobFlags ) ) {
649 if ( in_array(
'gzip', $blobFlags ) ) {
650 # Deal with optional compression of archived pages.
651 # This can be done periodically via maintenance/compressOld.php, and
652 # as pages are saved if $wgCompressRevisions is set.
653 AtEase::suppressWarnings();
654 $blob = gzinflate( $blob );
655 AtEase::restoreWarnings();
657 if ( $blob ===
false ) {
658 wfWarn( __METHOD__ .
': gzinflate() failed' .
659 ( $blobAddress ?
' (at blob address ' . $blobAddress .
')' :
'' ) );
664 if ( in_array(
'object', $blobFlags ) ) {
665 # Generic compressed storage
671 $blob = $obj->getText();
675 if ( $blob !==
false && $this->legacyEncoding
676 && !in_array(
'utf-8', $blobFlags ) && !in_array(
'utf8', $blobFlags )
678 # Old revisions kept around in a legacy encoding?
679 # Upconvert on demand.
680 # ("utf8" checked for compatibility with some broken
681 # conversion scripts 2008-12-30)
683 # *input* string. We just ignore those too.
686 AtEase::suppressWarnings();
687 $blob = iconv( $this->legacyEncoding,
'UTF-8//IGNORE', $blob );
688 AtEase::restoreWarnings();
701 private function getCacheTTL() {
702 $cache = $this->cache;
704 if ( $cache->
getQoS( $cache::ATTR_DURABILITY ) >= $cache::QOS_DURABILITY_RDBMS ) {
706 $ttl = $cache::TTL_UNCACHEABLE;
708 $ttl = $this->cacheExpiry ?: $cache::TTL_UNCACHEABLE;
737 if ( $schema !==
'tt' ) {
741 $textId = intval( $id );
743 if ( !$textId || $id !== (
string)$textId ) {
744 throw new InvalidArgumentException(
"Malformed text_id: $id" );
774 return $flagsString ===
'' ? [] : explode(
',', $flagsString );
787 if ( !preg_match(
'/^([-+.\w]+):([^\s?]+)(\?([^\s]*))?$/', $address, $m ) ) {
788 throw new InvalidArgumentException(
"Bad blob address: $address" );
791 $schema = strtolower( $m[1] );
795 return [ $schema, $id, $parameters ];
799 if ( $this->useExternalStore && $this->extStoreAccess->isReadOnly() ) {
803 return ( $this->getDBLoadBalancer()->getReadOnlyReason() !==
false );
static unserialize(string $str, bool $allowDouble=false)
Unserialize a HistoryBlob.