MediaWiki master
trackBlobs.php
Go to the documentation of this file.
1<?php
15
16// @codeCoverageIgnoreStart
17require_once __DIR__ . '/../Maintenance.php';
18// @codeCoverageIgnoreEnd
19
20class TrackBlobs extends Maintenance {
22 public $clusters;
28 public $trackedBlobs = [];
29
31 public $batchSize = 1000;
33 public $reportingInterval = 10;
34
35 public function __construct() {
36 parent::__construct();
37
38 $this->addArg( 'cluster', 'cluster(s) to scan', true, true );
39
40 $this->addDescription(
41 'Adds blobs from a given ES cluster to the blob_tracking table. ' .
42 'Automatically deletes the tracking table and starts from the start again when restarted.'
43 );
44 }
45
46 public function execute() {
47 $this->clusters = $this->parameters->getArgs();
48 if ( extension_loaded( 'gmp' ) ) {
49 $this->doBlobOrphans = true;
50 foreach ( $this->clusters as $cluster ) {
51 $this->trackedBlobs[$cluster] = gmp_init( 0 );
52 }
53 } else {
54 echo "Warning: the gmp extension is needed to find orphan blobs\n";
55 }
56
57 $this->checkIntegrity();
58 $this->initTrackingTable();
59 $this->trackRevisions();
60 $this->trackOrphanText();
61 if ( $this->doBlobOrphans ) {
62 $this->findOrphanBlobs();
63 }
64 $this->output( "All done.\n" );
65 }
66
67 private function checkIntegrity() {
68 echo "Doing integrity check...\n";
69 $dbr = $this->getReplicaDB();
70
71 // Scan for HistoryBlobStub objects in the text table (T22757)
72
73 $exists = (bool)$dbr->newSelectQueryBuilder()
74 ->select( '1' )
75 ->from( 'text' )
76 ->where(
77 'old_flags LIKE \'%object%\' AND old_flags NOT LIKE \'%external%\' ' .
78 'AND LOWER(CONVERT(LEFT(old_text,22) USING latin1)) = \'o:15:"historyblobstub"\'' )
79 ->caller( __METHOD__ )->fetchField();
80
81 if ( $exists ) {
82 echo "Integrity check failed: found HistoryBlobStub objects in your text table.\n" .
83 "This script could destroy these objects if it continued. Run resolveStubs.php\n" .
84 "to fix this.\n";
85 exit( 1 );
86 }
87
88 echo "Integrity check OK\n";
89 }
90
91 private function initTrackingTable() {
92 $dbw = $this->getDB( DB_PRIMARY );
93 if ( $dbw->tableExists( 'blob_tracking', __METHOD__ ) ) {
94 $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_tracking' ), __METHOD__ );
95 $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_orphans' ), __METHOD__ );
96 }
97 $dbw->sourceFile( __DIR__ . '/blob_tracking.sql' );
98 }
99
100 private function getTextClause(): IExpression {
101 if ( !$this->textClause ) {
102 $dbr = $this->getReplicaDB();
103 $conds = [];
104 foreach ( $this->clusters as $cluster ) {
105 $conds[] = $dbr->expr(
106 'old_text',
107 IExpression::LIKE,
108 new LikeValue( "DB://$cluster/", $dbr->anyString() )
109 );
110 }
111 $this->textClause = $dbr->orExpr( $conds );
112 }
113
114 return $this->textClause;
115 }
116
118 private function interpretPointer( string $text ) {
119 if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) {
120 return false;
121 }
122
123 return [
124 'cluster' => $m[1],
125 'id' => intval( $m[2] ),
126 'hash' => $m[3] ?? null
127 ];
128 }
129
133 private function trackRevisions() {
134 $dbw = $this->getPrimaryDB();
135 $dbr = $this->getReplicaDB();
136
137 $textClause = $this->getTextClause();
138 $startId = 0;
139 $endId = (int)$dbr->newSelectQueryBuilder()
140 ->select( 'MAX(rev_id)' )
141 ->from( 'revision' )
142 ->caller( __METHOD__ )->fetchField();
143 $batchesDone = 0;
144 $rowsInserted = 0;
145
146 echo "Finding revisions...\n";
147
148 $conds = [
149 $textClause,
150 $dbr->expr(
151 'old_flags',
152 IExpression::LIKE,
153 new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() )
154 )
155 ];
156 $slotRoleStore = $this->getServiceContainer()->getSlotRoleStore();
157
158 $conds = array_merge( [
159 'slot_role_id' => $slotRoleStore->getId( SlotRecord::MAIN ),
160 'SUBSTRING(content_address, 1, 3)=' . $dbr->addQuotes( 'tt:' ),
161 ], $conds );
162
163 while ( true ) {
164 $res = $dbr->newSelectQueryBuilder()
165 ->select( [ 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ] )
166 ->from( 'revision' )
167 ->join( 'slots', null, 'rev_id=slot_revision_id' )
168 ->join( 'content', null, 'content_id=slot_content_id' )
169 ->join( 'text', null, 'SUBSTRING(content_address, 4)=old_id' )
170 ->where( $dbr->expr( 'rev_id', '>', $startId ) )
171 ->andWhere( $conds )
172 ->orderBy( 'rev_id' )
173 ->limit( $this->batchSize )
174 ->caller( __METHOD__ )->fetchResultSet();
175 if ( !$res->numRows() ) {
176 break;
177 }
178
179 $insertBatch = [];
180 foreach ( $res as $row ) {
181 $startId = (int)$row->rev_id;
182 $info = $this->interpretPointer( $row->old_text );
183 if ( !$info ) {
184 echo "Invalid DB:// URL in rev_id {$row->rev_id}\n";
185 continue;
186 }
187 if ( !in_array( $info['cluster'], $this->clusters ) ) {
188 echo "Invalid cluster returned in SQL query: {$info['cluster']}\n";
189 continue;
190 }
191 $insertBatch[] = [
192 'bt_page' => $row->rev_page,
193 'bt_rev_id' => $row->rev_id,
194 'bt_text_id' => $row->old_id,
195 'bt_cluster' => $info['cluster'],
196 'bt_blob_id' => $info['id'],
197 'bt_cgz_hash' => $info['hash']
198 ];
199 if ( $this->doBlobOrphans ) {
200 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
201 }
202 }
203 $dbw->newInsertQueryBuilder()
204 ->insertInto( 'blob_tracking' )
205 ->rows( $insertBatch )
206 ->caller( __METHOD__ )->execute();
207 $rowsInserted += count( $insertBatch );
208
209 ++$batchesDone;
210 if ( $batchesDone >= $this->reportingInterval ) {
211 $batchesDone = 0;
212 echo "$startId / $endId\n";
213 $this->waitForReplication();
214 }
215 }
216 echo "Found $rowsInserted revisions\n";
217 }
218
224 private function trackOrphanText() {
225 # Wait until the blob_tracking table is available in the replica DB
226 $dbw = $this->getPrimaryDB();
227 $dbr = $this->getReplicaDB();
228 $this->getServiceContainer()->getDBLoadBalancerFactory()->waitForReplication( [ 'timeout' => 100_000 ] );
229
230 $textClause = $this->getTextClause();
231 $startId = 0;
232 $endId = (int)$dbr->newSelectQueryBuilder()
233 ->select( 'MAX(old_id)' )
234 ->from( 'text' )
235 ->caller( __METHOD__ )->fetchField();
236 $rowsInserted = 0;
237 $batchesDone = 0;
238
239 echo "Finding orphan text...\n";
240
241 # Scan the text table for orphan text
242 while ( true ) {
243 $res = $dbr->newSelectQueryBuilder()
244 ->select( [ 'old_id', 'old_flags', 'old_text' ] )
245 ->from( 'text' )
246 ->leftJoin( 'blob_tracking', null, 'bt_text_id=old_id' )
247 ->where( [
248 $dbr->expr( 'old_id', '>', $startId ),
249 $textClause,
250 $dbr->expr(
251 'old_flags',
252 IExpression::LIKE,
253 new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() )
254 ),
255 'bt_text_id' => null,
256 ] )
257 ->orderBy( 'old_id' )
258 ->limit( $this->batchSize )
259 ->caller( __METHOD__ )->fetchResultSet();
260
261 if ( !$res->numRows() ) {
262 break;
263 }
264
265 $insertBatch = [];
266 foreach ( $res as $row ) {
267 $startId = (int)$row->old_id;
268 $info = $this->interpretPointer( $row->old_text );
269 if ( !$info ) {
270 echo "Invalid DB:// URL in old_id {$row->old_id}\n";
271 continue;
272 }
273 if ( !in_array( $info['cluster'], $this->clusters ) ) {
274 echo "Invalid cluster returned in SQL query\n";
275 continue;
276 }
277
278 $insertBatch[] = [
279 'bt_page' => 0,
280 'bt_rev_id' => 0,
281 'bt_text_id' => $row->old_id,
282 'bt_cluster' => $info['cluster'],
283 'bt_blob_id' => $info['id'],
284 'bt_cgz_hash' => $info['hash']
285 ];
286 if ( $this->doBlobOrphans ) {
287 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
288 }
289 }
290 $dbw->newInsertQueryBuilder()
291 ->insertInto( 'blob_tracking' )
292 ->rows( $insertBatch )
293 ->caller( __METHOD__ )->execute();
294
295 $rowsInserted += count( $insertBatch );
296 ++$batchesDone;
297 if ( $batchesDone >= $this->reportingInterval ) {
298 $batchesDone = 0;
299 echo "$startId / $endId\n";
300 $this->waitForReplication();
301 }
302 }
303 echo "Found $rowsInserted orphan text rows\n";
304 }
305
313 private function findOrphanBlobs() {
314 if ( !extension_loaded( 'gmp' ) ) {
315 echo "Can't find orphan blobs, need bitfield support provided by GMP.\n";
316
317 return;
318 }
319
320 $dbw = $this->getPrimaryDB();
321 $lbFactory = $this->getServiceContainer()->getDBLoadBalancerFactory();
322 $dbStore = $this->getServiceContainer()->getExternalStoreFactory()->getDatabaseStore();
323
324 foreach ( $this->clusters as $cluster ) {
325 echo "Searching for orphan blobs in $cluster...\n";
326 $lb = $lbFactory->getExternalLB( $cluster );
327 try {
328 $extDB = $lb->getMaintenanceConnectionRef( DB_REPLICA );
329 } catch ( DBConnectionError $e ) {
330 if ( str_contains( $e->getMessage(), 'Unknown database' ) ) {
331 echo "No database on $cluster\n";
332 } else {
333 echo "Error on $cluster: " . $e->getMessage() . "\n";
334 }
335 continue;
336 }
337 $table = $dbStore->getTable( $cluster );
338 if ( !$extDB->tableExists( $table, __METHOD__ ) ) {
339 echo "No blobs table on cluster $cluster\n";
340 continue;
341 }
342 $startId = 0;
343 $batchesDone = 0;
344 $actualBlobs = gmp_init( 0 );
345 $endId = (int)$extDB->newSelectQueryBuilder()
346 ->select( 'MAX(blob_id)' )
347 ->from( $table )
348 ->caller( __METHOD__ )->fetchField();
349
350 // Build a bitmap of actual blob rows
351 while ( true ) {
352 $res = $extDB->newSelectQueryBuilder()
353 ->select( [ 'blob_id' ] )
354 ->from( $table )
355 ->where( $extDB->expr( 'blob_id', '>', $startId ) )
356 ->orderBy( 'blob_id' )
357 ->limit( $this->batchSize )
358 ->caller( __METHOD__ )->fetchResultSet();
359
360 if ( !$res->numRows() ) {
361 break;
362 }
363
364 foreach ( $res as $row ) {
365 gmp_setbit( $actualBlobs, $row->blob_id );
366 $startId = (int)$row->blob_id;
367 }
368
369 ++$batchesDone;
370 if ( $batchesDone >= $this->reportingInterval ) {
371 $batchesDone = 0;
372 echo "$startId / $endId\n";
373 }
374 }
375
376 // Find actual blobs that weren't tracked by the previous passes
377 // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B
378 $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) );
379
380 // Traverse the orphan list
381 $insertBatch = [];
382 $id = 0;
383 $numOrphans = 0;
384 while ( true ) {
385 $id = gmp_scan1( $orphans, $id );
386 if ( $id == -1 ) {
387 break;
388 }
389 $insertBatch[] = [
390 'bo_cluster' => $cluster,
391 'bo_blob_id' => $id
392 ];
393 if ( count( $insertBatch ) > $this->batchSize ) {
394 $dbw->newInsertQueryBuilder()
395 ->insertInto( 'blob_orphans' )
396 ->rows( $insertBatch )
397 ->caller( __METHOD__ )->execute();
398 $insertBatch = [];
399 }
400
401 ++$id;
402 ++$numOrphans;
403 }
404 if ( $insertBatch ) {
405 $dbw->newInsertQueryBuilder()
406 ->insertInto( 'blob_orphans' )
407 ->rows( $insertBatch )
408 ->caller( __METHOD__ )->execute();
409 }
410 echo "Found $numOrphans orphan(s) in $cluster\n";
411 }
412 }
413}
414
415// @codeCoverageIgnoreStart
416$maintClass = TrackBlobs::class;
417require_once RUN_MAINTENANCE_IF_MAIN;
418// @codeCoverageIgnoreEnd
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28
if(!defined('MW_SETUP_CALLBACK'))
Definition WebStart.php:68
Abstract maintenance class for quickly writing and churning out maintenance scripts with minimal effo...
addArg( $arg, $description, $required=true, $multi=false)
Add some args that are needed.
output( $out, $channel=null)
Throw some output to the user.
getDB( $db, $groups=[], $dbDomain=false)
Returns a database to be used by current maintenance script.
getReplicaDB(string|false $virtualDomain=false)
addDescription( $text)
Set the description text.
Value object representing a content slot associated with a page revision.
int $reportingInterval
IExpression null $textClause
execute()
Do the actual work.
bool $doBlobOrphans
__construct()
Default constructor.
string[] $clusters
array $trackedBlobs
Content of like value.
Definition LikeValue.php:14
$maintClass