MediaWiki REL1_34
trackBlobs.php
Go to the documentation of this file.
1<?php
28
29require __DIR__ . '/../commandLine.inc';
30
31if ( count( $args ) < 1 ) {
32 echo "Usage: php trackBlobs.php <cluster> [... <cluster>]\n";
33 echo "Adds blobs from a given ES cluster to the blob_tracking table\n";
34 echo "Automatically deletes the tracking table and starts from the start again when restarted.\n";
35
36 exit( 1 );
37}
39$tracker->run();
40echo "All done.\n";
41
45 public $trackedBlobs = [];
46
47 public $batchSize = 1000;
48 public $reportingInterval = 10;
49
50 function __construct( $clusters ) {
51 $this->clusters = $clusters;
52 if ( extension_loaded( 'gmp' ) ) {
53 $this->doBlobOrphans = true;
54 foreach ( $clusters as $cluster ) {
55 $this->trackedBlobs[$cluster] = gmp_init( 0 );
56 }
57 } else {
58 echo "Warning: the gmp extension is needed to find orphan blobs\n";
59 }
60 }
61
62 function run() {
63 $this->checkIntegrity();
64 $this->initTrackingTable();
65 $this->trackRevisions();
66 $this->trackOrphanText();
67 if ( $this->doBlobOrphans ) {
68 $this->findOrphanBlobs();
69 }
70 }
71
72 function checkIntegrity() {
73 echo "Doing integrity check...\n";
75
76 // Scan for HistoryBlobStub objects in the text table (T22757)
77
78 $exists = $dbr->selectField( 'text', 1,
79 'old_flags LIKE \'%object%\' AND old_flags NOT LIKE \'%external%\' ' .
80 'AND LOWER(CONVERT(LEFT(old_text,22) USING latin1)) = \'o:15:"historyblobstub"\'',
81 __METHOD__
82 );
83
84 if ( $exists ) {
85 echo "Integrity check failed: found HistoryBlobStub objects in your text table.\n" .
86 "This script could destroy these objects if it continued. Run resolveStubs.php\n" .
87 "to fix this.\n";
88 exit( 1 );
89 }
90
91 echo "Integrity check OK\n";
92 }
93
94 function initTrackingTable() {
95 $dbw = wfGetDB( DB_MASTER );
96 if ( $dbw->tableExists( 'blob_tracking' ) ) {
97 $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_tracking' ) );
98 $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_orphans' ) );
99 }
100 $dbw->sourceFile( __DIR__ . '/blob_tracking.sql' );
101 }
102
103 function getTextClause() {
104 if ( !$this->textClause ) {
106 $this->textClause = '';
107 foreach ( $this->clusters as $cluster ) {
108 if ( $this->textClause != '' ) {
109 $this->textClause .= ' OR ';
110 }
111 $this->textClause .= 'old_text' . $dbr->buildLike( "DB://$cluster/", $dbr->anyString() );
112 }
113 }
114
115 return $this->textClause;
116 }
117
118 function interpretPointer( $text ) {
119 if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) {
120 return false;
121 }
122
123 return [
124 'cluster' => $m[1],
125 'id' => intval( $m[2] ),
126 'hash' => $m[3] ?? null
127 ];
128 }
129
133 function trackRevisions() {
135
136 $dbw = wfGetDB( DB_MASTER );
138
139 $textClause = $this->getTextClause();
140 $startId = 0;
141 $endId = $dbr->selectField( 'revision', 'MAX(rev_id)', '', __METHOD__ );
142 $batchesDone = 0;
143 $rowsInserted = 0;
144
145 echo "Finding revisions...\n";
146
147 $fields = [ 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ];
148 $options = [
149 'ORDER BY' => 'rev_id',
150 'LIMIT' => $this->batchSize
151 ];
152 $conds = [
154 'old_flags ' . $dbr->buildLike( $dbr->anyString(), 'external', $dbr->anyString() ),
155 ];
157 $tables = [ 'revision', 'text' ];
158 $conds = array_merge( [
159 'rev_text_id=old_id',
160 ], $conds );
161 } else {
162 $slotRoleStore = MediaWikiServices::getInstance()->getSlotRoleStore();
163 $tables = [ 'revision', 'slots', 'content', 'text' ];
164 $conds = array_merge( [
165 'rev_id=slot_revision_id',
166 'slot_role_id=' . $slotRoleStore->getId( SlotRecord::MAIN ),
167 'content_id=slot_content_id',
168 'SUBSTRING(content_address, 1, 3)=' . $dbr->addQuotes( 'tt:' ),
169 'SUBSTRING(content_address, 4)=old_id',
170 ], $conds );
171 }
172
173 while ( true ) {
174 $res = $dbr->select( $tables,
175 $fields,
176 array_merge( [
177 'rev_id > ' . $dbr->addQuotes( $startId ),
178 ], $conds ),
179 __METHOD__,
180 $options
181 );
182 if ( !$res->numRows() ) {
183 break;
184 }
185
186 $insertBatch = [];
187 foreach ( $res as $row ) {
188 $startId = $row->rev_id;
189 $info = $this->interpretPointer( $row->old_text );
190 if ( !$info ) {
191 echo "Invalid DB:// URL in rev_id {$row->rev_id}\n";
192 continue;
193 }
194 if ( !in_array( $info['cluster'], $this->clusters ) ) {
195 echo "Invalid cluster returned in SQL query: {$info['cluster']}\n";
196 continue;
197 }
198 $insertBatch[] = [
199 'bt_page' => $row->rev_page,
200 'bt_rev_id' => $row->rev_id,
201 'bt_text_id' => $row->old_id,
202 'bt_cluster' => $info['cluster'],
203 'bt_blob_id' => $info['id'],
204 'bt_cgz_hash' => $info['hash']
205 ];
206 if ( $this->doBlobOrphans ) {
207 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
208 }
209 }
210 $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
211 $rowsInserted += count( $insertBatch );
212
213 ++$batchesDone;
214 if ( $batchesDone >= $this->reportingInterval ) {
215 $batchesDone = 0;
216 echo "$startId / $endId\n";
218 }
219 }
220 echo "Found $rowsInserted revisions\n";
221 }
222
228 function trackOrphanText() {
229 # Wait until the blob_tracking table is available in the replica DB
230 $dbw = wfGetDB( DB_MASTER );
232 $pos = $dbw->getMasterPos();
233 $dbr->masterPosWait( $pos, 100000 );
234
235 $textClause = $this->getTextClause();
236 $startId = 0;
237 $endId = $dbr->selectField( 'text', 'MAX(old_id)', '', __METHOD__ );
238 $rowsInserted = 0;
239 $batchesDone = 0;
240
241 echo "Finding orphan text...\n";
242
243 # Scan the text table for orphan text
244 while ( true ) {
245 $res = $dbr->select( [ 'text', 'blob_tracking' ],
246 [ 'old_id', 'old_flags', 'old_text' ],
247 [
248 'old_id>' . $dbr->addQuotes( $startId ),
250 'old_flags ' . $dbr->buildLike( $dbr->anyString(), 'external', $dbr->anyString() ),
251 'bt_text_id IS NULL'
252 ],
253 __METHOD__,
254 [
255 'ORDER BY' => 'old_id',
256 'LIMIT' => $this->batchSize
257 ],
258 [ 'blob_tracking' => [ 'LEFT JOIN', 'bt_text_id=old_id' ] ]
259 );
260 $ids = [];
261 foreach ( $res as $row ) {
262 $ids[] = $row->old_id;
263 }
264
265 if ( !$res->numRows() ) {
266 break;
267 }
268
269 $insertBatch = [];
270 foreach ( $res as $row ) {
271 $startId = $row->old_id;
272 $info = $this->interpretPointer( $row->old_text );
273 if ( !$info ) {
274 echo "Invalid DB:// URL in old_id {$row->old_id}\n";
275 continue;
276 }
277 if ( !in_array( $info['cluster'], $this->clusters ) ) {
278 echo "Invalid cluster returned in SQL query\n";
279 continue;
280 }
281
282 $insertBatch[] = [
283 'bt_page' => 0,
284 'bt_rev_id' => 0,
285 'bt_text_id' => $row->old_id,
286 'bt_cluster' => $info['cluster'],
287 'bt_blob_id' => $info['id'],
288 'bt_cgz_hash' => $info['hash']
289 ];
290 if ( $this->doBlobOrphans ) {
291 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
292 }
293 }
294 $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
295
296 $rowsInserted += count( $insertBatch );
297 ++$batchesDone;
298 if ( $batchesDone >= $this->reportingInterval ) {
299 $batchesDone = 0;
300 echo "$startId / $endId\n";
302 }
303 }
304 echo "Found $rowsInserted orphan text rows\n";
305 }
306
314 function findOrphanBlobs() {
315 if ( !extension_loaded( 'gmp' ) ) {
316 echo "Can't find orphan blobs, need bitfield support provided by GMP.\n";
317
318 return;
319 }
320
321 $dbw = wfGetDB( DB_MASTER );
322
323 foreach ( $this->clusters as $cluster ) {
324 echo "Searching for orphan blobs in $cluster...\n";
325 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
326 $lb = $lbFactory->getExternalLB( $cluster );
327 try {
328 $extDB = $lb->getMaintenanceConnectionRef( DB_REPLICA );
329 } catch ( DBConnectionError $e ) {
330 if ( strpos( $e->getMessage(), 'Unknown database' ) !== false ) {
331 echo "No database on $cluster\n";
332 } else {
333 echo "Error on $cluster: " . $e->getMessage() . "\n";
334 }
335 continue;
336 }
337 $table = $extDB->getLBInfo( 'blobs table' );
338 if ( is_null( $table ) ) {
339 $table = 'blobs';
340 }
341 if ( !$extDB->tableExists( $table ) ) {
342 echo "No blobs table on cluster $cluster\n";
343 continue;
344 }
345 $startId = 0;
346 $batchesDone = 0;
347 $actualBlobs = gmp_init( 0 );
348 $endId = $extDB->selectField( $table, 'MAX(blob_id)', '', __METHOD__ );
349
350 // Build a bitmap of actual blob rows
351 while ( true ) {
352 $res = $extDB->select( $table,
353 [ 'blob_id' ],
354 [ 'blob_id > ' . $extDB->addQuotes( $startId ) ],
355 __METHOD__,
356 [ 'LIMIT' => $this->batchSize, 'ORDER BY' => 'blob_id' ]
357 );
358
359 if ( !$res->numRows() ) {
360 break;
361 }
362
363 foreach ( $res as $row ) {
364 gmp_setbit( $actualBlobs, $row->blob_id );
365 $startId = $row->blob_id;
366 }
367
368 ++$batchesDone;
369 if ( $batchesDone >= $this->reportingInterval ) {
370 $batchesDone = 0;
371 echo "$startId / $endId\n";
372 }
373 }
374
375 // Find actual blobs that weren't tracked by the previous passes
376 // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B
377 $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) );
378
379 // Traverse the orphan list
380 $insertBatch = [];
381 $id = 0;
382 $numOrphans = 0;
383 while ( true ) {
384 $id = gmp_scan1( $orphans, $id );
385 if ( $id == -1 ) {
386 break;
387 }
388 $insertBatch[] = [
389 'bo_cluster' => $cluster,
390 'bo_blob_id' => $id
391 ];
392 if ( count( $insertBatch ) > $this->batchSize ) {
393 $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ );
394 $insertBatch = [];
395 }
396
397 ++$id;
398 ++$numOrphans;
399 }
400 if ( $insertBatch ) {
401 $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ );
402 }
403 echo "Found $numOrphans orphan(s) in $cluster\n";
404 }
405 }
406}
int $wgMultiContentRevisionSchemaMigrationStage
RevisionStore table schema migration stage (content, slots, content_models & slot_roles tables).
wfWaitForSlaves( $ifWritesSince=null, $wiki=false, $cluster=false, $timeout=null)
Waits for the replica DBs to catch up to the master position.
wfGetDB( $db, $groups=[], $wiki=false)
Get a Database object.
if( $line===false) $args
Definition cdb.php:64
MediaWikiServices is the service locator for the application scope of MediaWiki.
Value object representing a content slot associated with a page revision.
interpretPointer( $text)
findOrphanBlobs()
Scan the blobs table for rows not registered in blob_tracking (and thus not registered in the text ta...
initTrackingTable()
trackOrphanText()
Scan the text table for orphan text Orphan text here does not imply DB corruption – deleted text trac...
trackRevisions()
Scan the revision table for rows stored in the specified clusters.
__construct( $clusters)
const SCHEMA_COMPAT_READ_OLD
Definition Defines.php:274
const DB_REPLICA
Definition defines.php:25
const DB_MASTER
Definition defines.php:26
if(count( $args)< 1) $tracker