34 parent::__construct( $params );
35 if ( !isset( $params[
'lbFactory'] ) || !( $params[
'lbFactory'] instanceof
LBFactory ) ) {
36 throw new InvalidArgumentException(
"LBFactory required in 'lbFactory' field." );
38 $this->lbFactory = $params[
'lbFactory'];
53 $ret = $this->fetchBlob( $cluster, $id, $itemID );
55 if ( $itemID !==
false && $ret !==
false ) {
56 return $ret->getItem( $itemID );
73 $batched = $inverseUrlMap = [];
74 foreach ( $urls as
$url ) {
76 $batched[$cluster][$id][] = $itemID;
79 $inverseUrlMap[$cluster][$id][$itemID] =
$url;
82 foreach ( $batched as $cluster => $batchByCluster ) {
83 $res = $this->batchFetchBlobs( $cluster, $batchByCluster );
85 foreach ( $res as $id => $blob ) {
86 foreach ( $batchByCluster[$id] as $itemID ) {
87 $url = $inverseUrlMap[$cluster][$id][$itemID];
88 if ( $itemID ===
false ) {
91 $ret[
$url] = $blob->getItem( $itemID );
103 public function store( $location, $data ) {
104 $blobsTable = $this->
getTable( $location );
107 $dbw->newInsertQueryBuilder()
108 ->insertInto( $blobsTable )
109 ->row( [
'blob_text' => $data ] )
110 ->caller( __METHOD__ )->execute();
112 $id = $dbw->insertId();
117 return "DB://$location/$id";
124 if ( parent::isReadOnly( $location ) ) {
128 return ( $this->getLoadBalancer( $location )->getReadOnlyReason() !==
false );
137 private function getLoadBalancer( $cluster ) {
138 return $this->lbFactory->getExternalLB( $cluster );
149 $lb = $this->getLoadBalancer( $cluster );
151 return $lb->getConnection(
154 $this->getDomainId( $lb->getServerInfo( ServerInfo::WRITER_INDEX ) ),
155 $lb::CONN_TRX_AUTOCOMMIT
167 $lb = $this->getLoadBalancer( $cluster );
169 return $lb->getMaintenanceConnectionRef(
172 $this->getDomainId( $lb->getServerInfo( ServerInfo::WRITER_INDEX ) ),
173 $lb::CONN_TRX_AUTOCOMMIT
181 private function getDomainId( array $server ) {
182 if ( $this->isDbDomainExplicit ) {
183 return $this->dbDomain;
186 if ( isset( $server[
'dbname'] ) ) {
194 $server[
'schema'] ??
null,
195 $server[
'tablePrefix'] ??
''
198 return $domain->getId();
215 $lb = $this->getLoadBalancer( $cluster );
216 $info = $lb->getServerInfo( ServerInfo::WRITER_INDEX );
218 return $info[
'blobs table'] ??
'blobs';
230 static $supportedTypes = [
'mysql',
'sqlite' ];
233 if ( !in_array( $dbw->getType(), $supportedTypes,
true ) ) {
234 throw new DBUnexpectedError( $dbw,
"RDBMS type '{$dbw->getType()}' not supported." );
237 $sqlFilePath =
"$IP/maintenance/storage/blobs.sql";
238 $sql = file_get_contents( $sqlFilePath );
239 if ( $sql ===
false ) {
240 throw new RuntimeException(
"Failed to read '$sqlFilePath'." );
243 $blobsTable = $this->
getTable( $cluster );
244 $encTable = $dbw->tableName( $blobsTable );
245 $sqlWithReplacedVars = str_replace(
246 [
'/*$wgDBprefix*/blobs',
'/*_*/blobs' ],
247 [ $encTable, $encTable ],
253 $sqlWithReplacedVars,
254 $dbw::QUERY_CHANGE_SCHEMA,
272 private function fetchBlob( $cluster, $id, $itemID ) {
279 static $externalBlobCache = [];
281 $cacheID = ( $itemID === false ) ?
"$cluster/$id" :
"$cluster/$id/";
282 $cacheID =
"$cacheID@{$this->dbDomain}";
284 if ( isset( $externalBlobCache[$cacheID] ) ) {
285 $this->logger->debug( __METHOD__ .
": cache hit on $cacheID" );
287 return $externalBlobCache[$cacheID];
290 $this->logger->debug( __METHOD__ .
": cache miss on $cacheID" );
292 $blobsTable = $this->
getTable( $cluster );
295 $ret = $dbr->newSelectQueryBuilder()
296 ->select(
'blob_text' )
297 ->from( $blobsTable )
298 ->where( [
'blob_id' => $id ] )
299 ->caller( __METHOD__ )->fetchField();
301 if ( $ret ===
false ) {
303 $this->logger->warning( __METHOD__ .
": primary DB fallback on $cacheID" );
304 $trxProfiler = $this->lbFactory->getTransactionProfiler();
305 $scope = $trxProfiler->silenceForScope( $trxProfiler::EXPECTATION_REPLICAS_ONLY );
307 $ret = $dbw->newSelectQueryBuilder()
308 ->select(
'blob_text' )
309 ->from( $blobsTable )
310 ->where( [
'blob_id' => $id ] )
311 ->caller( __METHOD__ )->fetchField();
312 ScopedCallback::consume( $scope );
313 if ( $ret ===
false ) {
314 $this->logger->warning( __METHOD__ .
": primary DB failed to find $cacheID" );
317 if ( $itemID !==
false && $ret !==
false ) {
322 $externalBlobCache = [ $cacheID => $ret ];
335 private function batchFetchBlobs( $cluster, array $ids ) {
336 $blobsTable = $this->
getTable( $cluster );
339 $res = $dbr->newSelectQueryBuilder()
340 ->select( [
'blob_id',
'blob_text' ] )
341 ->from( $blobsTable )
342 ->where( [
'blob_id' => array_keys( $ids ) ] )
343 ->caller( __METHOD__ )
347 $this->mergeBatchResult( $ret, $ids, $res );
351 __METHOD__ .
": primary fallback on '$cluster' for: " .
352 implode(
',', array_keys( $ids ) )
354 $trxProfiler = $this->lbFactory->getTransactionProfiler();
355 $scope = $trxProfiler->silenceForScope( $trxProfiler::EXPECTATION_REPLICAS_ONLY );
357 $res = $dbw->newSelectQueryBuilder()
358 ->select( [
'blob_id',
'blob_text' ] )
359 ->from( $blobsTable )
360 ->where( [
'blob_id' => array_keys( $ids ) ] )
361 ->caller( __METHOD__ )
363 ScopedCallback::consume( $scope );
364 $this->mergeBatchResult( $ret, $ids, $res );
367 $this->logger->error(
368 __METHOD__ .
": primary on '$cluster' failed locating items: " .
369 implode(
',', array_keys( $ids ) )
382 private function mergeBatchResult( array &$ret, array &$ids, $res ) {
383 foreach ( $res as $row ) {
385 $itemIDs = $ids[$id];
387 if ( count( $itemIDs ) === 1 && reset( $itemIDs ) ===
false ) {
389 $ret[$id] = $row->blob_text;
419 $parts = explode(
'/',
$url );
420 return $parts[2] ??
null;
431 $lb = $this->getLoadBalancer( $cluster );
432 return $this->getDomainId( $lb->getServerInfo( ServerInfo::WRITER_INDEX ) );
static unserialize(string $str, bool $allowDouble=false)
Unserialize a HistoryBlob.
Base class for general text storage via the "object" flag in old_flags, or two-part external storage ...