50 throw new InvalidArgumentException(
"LBFactory required in 'lbFactory' field." );
52 $this->lbFactory =
$params[
'lbFactory'];
67 $ret = $this->fetchBlob( $cluster, $id, $itemID );
69 if ( $itemID !==
false && $ret !==
false ) {
70 return $ret->getItem( $itemID );
87 $batched = $inverseUrlMap = [];
88 foreach ( $urls as
$url ) {
89 [ $cluster, $id, $itemID ] = $this->
parseURL( $url );
90 $batched[$cluster][$id][] = $itemID;
93 $inverseUrlMap[$cluster][$id][$itemID] =
$url;
96 foreach ( $batched as $cluster => $batchByCluster ) {
97 $res = $this->batchFetchBlobs( $cluster, $batchByCluster );
99 foreach ( $res as $id => $blob ) {
100 foreach ( $batchByCluster[$id] as $itemID ) {
101 $url = $inverseUrlMap[$cluster][$id][$itemID];
102 if ( $itemID ===
false ) {
105 $ret[
$url] = $blob->getItem( $itemID );
117 public function store( $location, $data ) {
119 $dbw->newInsertQueryBuilder()
120 ->insertInto( $this->
getTable( $dbw, $location ) )
121 ->row( [
'blob_text' => $data ] )
122 ->caller( __METHOD__ )->execute();
123 $id = $dbw->insertId();
128 return "DB://$location/$id";
135 if ( parent::isReadOnly( $location ) ) {
139 return ( $this->getLoadBalancer( $location )->getReadOnlyReason() !==
false );
148 private function getLoadBalancer( $cluster ) {
149 return $this->lbFactory->getExternalLB( $cluster );
160 $lb = $this->getLoadBalancer( $cluster );
162 return $lb->getConnection(
165 $this->getDomainId( $lb->getServerInfo( ServerInfo::WRITER_INDEX ) ),
166 $lb::CONN_TRX_AUTOCOMMIT
178 $lb = $this->getLoadBalancer( $cluster );
180 return $lb->getMaintenanceConnectionRef(
183 $this->getDomainId( $lb->getServerInfo( ServerInfo::WRITER_INDEX ) ),
184 $lb::CONN_TRX_AUTOCOMMIT
192 private function getDomainId( array $server ) {
193 if ( $this->isDbDomainExplicit ) {
197 if ( isset( $server[
'dbname'] ) ) {
205 $server[
'schema'] ??
null,
206 $server[
'tablePrefix'] ??
''
209 return $domain->getId();
223 if ( $cluster !==
null ) {
224 $lb = $this->getLoadBalancer( $cluster );
225 $info = $lb->getServerInfo( ServerInfo::WRITER_INDEX );
226 if ( isset( $info[
'blobs table'] ) ) {
227 return $info[
'blobs table'];
244 static $supportedTypes = [
'mysql',
'sqlite' ];
247 if ( !in_array( $dbw->getType(), $supportedTypes,
true ) ) {
248 throw new DBUnexpectedError( $dbw,
"RDBMS type '{$dbw->getType()}' not supported." );
251 $sqlFilePath =
"$IP/maintenance/storage/blobs.sql";
252 $sql = file_get_contents( $sqlFilePath );
253 if ( $sql ===
false ) {
254 throw new RuntimeException(
"Failed to read '$sqlFilePath'." );
257 $rawTable = $this->
getTable( $dbw, $cluster );
258 $encTable = $dbw->tableName( $rawTable );
260 $sqlWithReplacedVars = str_replace(
261 [
'/*$wgDBprefix*/blobs',
'/*_*/blobs' ],
262 [ $encTable, $encTable ],
268 $sqlWithReplacedVars,
269 $dbw::QUERY_CHANGE_SCHEMA,
287 private function fetchBlob( $cluster, $id, $itemID ) {
294 static $externalBlobCache = [];
296 $cacheID = ( $itemID === false ) ?
"$cluster/$id" :
"$cluster/$id/";
297 $cacheID =
"$cacheID@{$this->dbDomain}";
299 if ( isset( $externalBlobCache[$cacheID] ) ) {
300 $this->logger->debug( __METHOD__ .
": cache hit on $cacheID" );
302 return $externalBlobCache[$cacheID];
305 $this->logger->debug( __METHOD__ .
": cache miss on $cacheID" );
308 $ret = $dbr->newSelectQueryBuilder()
309 ->select(
'blob_text' )
310 ->from( $this->
getTable( $dbr, $cluster ) )
311 ->where( [
'blob_id' => $id ] )
312 ->caller( __METHOD__ )->fetchField();
313 if ( $ret ===
false ) {
315 $this->logger->warning( __METHOD__ .
": primary DB fallback on $cacheID" );
316 $trxProfiler = $this->lbFactory->getTransactionProfiler();
317 $scope = $trxProfiler->silenceForScope( $trxProfiler::EXPECTATION_REPLICAS_ONLY );
319 $ret = $dbw->newSelectQueryBuilder()
320 ->select(
'blob_text' )
321 ->from( $this->
getTable( $dbw, $cluster ) )
322 ->where( [
'blob_id' => $id ] )
323 ->caller( __METHOD__ )->fetchField();
324 ScopedCallback::consume( $scope );
325 if ( $ret ===
false ) {
326 $this->logger->warning( __METHOD__ .
": primary DB failed to find $cacheID" );
329 if ( $itemID !==
false && $ret !==
false ) {
334 $externalBlobCache = [ $cacheID => $ret ];
347 private function batchFetchBlobs( $cluster, array $ids ) {
349 $res = $dbr->newSelectQueryBuilder()
350 ->select( [
'blob_id',
'blob_text' ] )
351 ->from( $this->
getTable( $dbr, $cluster ) )
352 ->where( [
'blob_id' => array_keys( $ids ) ] )
353 ->caller( __METHOD__ )
357 $this->mergeBatchResult( $ret, $ids, $res );
361 __METHOD__ .
": primary fallback on '$cluster' for: " .
362 implode(
',', array_keys( $ids ) )
364 $trxProfiler = $this->lbFactory->getTransactionProfiler();
365 $scope = $trxProfiler->silenceForScope( $trxProfiler::EXPECTATION_REPLICAS_ONLY );
367 $res = $dbw->newSelectQueryBuilder()
368 ->select( [
'blob_id',
'blob_text' ] )
369 ->from( $this->
getTable( $dbr, $cluster ) )
370 ->where( [
'blob_id' => array_keys( $ids ) ] )
371 ->caller( __METHOD__ )
373 ScopedCallback::consume( $scope );
374 $this->mergeBatchResult( $ret, $ids, $res );
377 $this->logger->error(
378 __METHOD__ .
": primary on '$cluster' failed locating items: " .
379 implode(
',', array_keys( $ids ) )
392 private function mergeBatchResult( array &$ret, array &$ids, $res ) {
393 foreach ( $res as $row ) {
395 $itemIDs = $ids[$id];
397 if ( count( $itemIDs ) === 1 && reset( $itemIDs ) ===
false ) {
399 $ret[$id] = $row->blob_text;
static unserialize(string $str, bool $allowDouble=false)
Unserialize a HistoryBlob.
Base class for general text storage via the "object" flag in old_flags, or two-part external storage ...