41 throw new InvalidArgumentException(
"LBFactory required in 'lbFactory' field." );
43 $this->lbFactory =
$params[
'lbFactory'];
58 $ret = $this->fetchBlob( $cluster, $id, $itemID );
60 if ( $itemID !==
false && $ret !==
false ) {
61 return $ret->getItem( $itemID );
78 $batched = $inverseUrlMap = [];
79 foreach ( $urls as
$url ) {
80 [ $cluster, $id, $itemID ] = $this->
parseURL( $url );
81 $batched[$cluster][$id][] = $itemID;
84 $inverseUrlMap[$cluster][$id][$itemID] =
$url;
87 foreach ( $batched as $cluster => $batchByCluster ) {
88 $res = $this->batchFetchBlobs( $cluster, $batchByCluster );
90 foreach ( $res as $id => $blob ) {
91 foreach ( $batchByCluster[$id] as $itemID ) {
92 $url = $inverseUrlMap[$cluster][$id][$itemID];
93 if ( $itemID ===
false ) {
96 $ret[
$url] = $blob->getItem( $itemID );
108 public function store( $location, $data ) {
109 $blobsTable = $this->
getTable( $location );
112 $dbw->newInsertQueryBuilder()
113 ->insertInto( $blobsTable )
114 ->row( [
'blob_text' => $data ] )
115 ->caller( __METHOD__ )->execute();
117 $id = $dbw->insertId();
122 return "DB://$location/$id";
129 if ( parent::isReadOnly( $location ) ) {
133 return ( $this->getLoadBalancer( $location )->getReadOnlyReason() !==
false );
142 private function getLoadBalancer( $cluster ) {
143 return $this->lbFactory->getExternalLB( $cluster );
154 $lb = $this->getLoadBalancer( $cluster );
156 return $lb->getConnection(
160 $lb::CONN_TRX_AUTOCOMMIT
172 $lb = $this->getLoadBalancer( $cluster );
174 return $lb->getMaintenanceConnectionRef(
178 $lb::CONN_TRX_AUTOCOMMIT
186 private function getDomainId( array $server ) {
187 if ( $this->isDbDomainExplicit ) {
191 if ( isset( $server[
'dbname'] ) ) {
197 $domain =
new DatabaseDomain(
199 $server[
'schema'] ??
null,
200 $server[
'tablePrefix'] ??
''
203 return $domain->getId();
220 $lb = $this->getLoadBalancer( $cluster );
223 return $info[
'blobs table'] ??
'blobs';
235 static $supportedTypes = [
'mysql',
'sqlite' ];
238 if ( !in_array( $dbw->getType(), $supportedTypes,
true ) ) {
239 throw new DBUnexpectedError( $dbw,
"RDBMS type '{$dbw->getType()}' not supported." );
242 $sqlFilePath =
"$IP/maintenance/storage/blobs.sql";
243 $sql = file_get_contents( $sqlFilePath );
244 if ( $sql ===
false ) {
245 throw new RuntimeException(
"Failed to read '$sqlFilePath'." );
248 $blobsTable = $this->
getTable( $cluster );
249 $encTable = $dbw->tableName( $blobsTable );
250 $sqlWithReplacedVars = str_replace(
251 [
'/*$wgDBprefix*/blobs',
'/*_*/blobs' ],
252 [ $encTable, $encTable ],
258 $sqlWithReplacedVars,
259 $dbw::QUERY_CHANGE_SCHEMA,
277 private function fetchBlob( $cluster, $id, $itemID ) {
284 static $externalBlobCache = [];
286 $cacheID = ( $itemID === false ) ?
"$cluster/$id" :
"$cluster/$id/";
287 $cacheID =
"$cacheID@{$this->dbDomain}";
289 if ( isset( $externalBlobCache[$cacheID] ) ) {
290 $this->logger->debug( __METHOD__ .
": cache hit on $cacheID" );
292 return $externalBlobCache[$cacheID];
295 $this->logger->debug( __METHOD__ .
": cache miss on $cacheID" );
297 $blobsTable = $this->
getTable( $cluster );
300 $ret = $dbr->newSelectQueryBuilder()
301 ->select(
'blob_text' )
302 ->from( $blobsTable )
303 ->where( [
'blob_id' => $id ] )
304 ->caller( __METHOD__ )->fetchField();
306 if ( $ret ===
false ) {
308 $this->logger->warning( __METHOD__ .
": primary DB fallback on $cacheID" );
309 $trxProfiler = $this->lbFactory->getTransactionProfiler();
310 $scope = $trxProfiler->silenceForScope( $trxProfiler::EXPECTATION_REPLICAS_ONLY );
312 $ret = $dbw->newSelectQueryBuilder()
313 ->select(
'blob_text' )
314 ->from( $blobsTable )
315 ->where( [
'blob_id' => $id ] )
316 ->caller( __METHOD__ )->fetchField();
317 ScopedCallback::consume( $scope );
318 if ( $ret ===
false ) {
319 $this->logger->warning( __METHOD__ .
": primary DB failed to find $cacheID" );
322 if ( $itemID !==
false && $ret !==
false ) {
327 $externalBlobCache = [ $cacheID => $ret ];
340 private function batchFetchBlobs( $cluster, array $ids ) {
341 $blobsTable = $this->
getTable( $cluster );
344 $res = $dbr->newSelectQueryBuilder()
345 ->select( [
'blob_id',
'blob_text' ] )
346 ->from( $blobsTable )
347 ->where( [
'blob_id' => array_keys( $ids ) ] )
348 ->caller( __METHOD__ )
352 $this->mergeBatchResult( $ret, $ids, $res );
356 __METHOD__ .
": primary fallback on '$cluster' for: " .
357 implode(
',', array_keys( $ids ) )
359 $trxProfiler = $this->lbFactory->getTransactionProfiler();
360 $scope = $trxProfiler->silenceForScope( $trxProfiler::EXPECTATION_REPLICAS_ONLY );
362 $res = $dbw->newSelectQueryBuilder()
363 ->select( [
'blob_id',
'blob_text' ] )
364 ->from( $blobsTable )
365 ->where( [
'blob_id' => array_keys( $ids ) ] )
366 ->caller( __METHOD__ )
368 ScopedCallback::consume( $scope );
369 $this->mergeBatchResult( $ret, $ids, $res );
372 $this->logger->error(
373 __METHOD__ .
": primary on '$cluster' failed locating items: " .
374 implode(
',', array_keys( $ids ) )
387 private function mergeBatchResult( array &$ret, array &$ids, $res ) {
388 foreach ( $res as $row ) {
390 $itemIDs = $ids[$id];
392 if ( count( $itemIDs ) === 1 && reset( $itemIDs ) ===
false ) {
394 $ret[$id] = $row->blob_text;
424 $parts = explode(
'/',
$url );
425 return $parts[2] ??
null;
436 $lb = $this->getLoadBalancer( $cluster );
static unserialize(string $str, bool $allowDouble=false)
Unserialize a HistoryBlob.
Base class for general text storage via the "object" flag in old_flags, or two-part external storage ...