Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 261
0.00% covered (danger)
0.00%
0 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 1
TrackBlobs
0.00% covered (danger)
0.00%
0 / 258
0.00% covered (danger)
0.00%
0 / 9
1980
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
20
 checkIntegrity
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
6
 initTrackingTable
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 getTextClause
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 interpretPointer
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 trackRevisions
0.00% covered (danger)
0.00%
0 / 69
0.00% covered (danger)
0.00%
0 / 1
72
 trackOrphanText
0.00% covered (danger)
0.00%
0 / 64
0.00% covered (danger)
0.00%
0 / 1
72
 findOrphanBlobs
0.00% covered (danger)
0.00%
0 / 68
0.00% covered (danger)
0.00%
0 / 1
210
1<?php
2/**
3 * Adds blobs from a given external storage cluster to the blob_tracking table.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Maintenance
22 */
23
24use MediaWiki\Revision\SlotRecord;
25use Wikimedia\Rdbms\DBConnectionError;
26use Wikimedia\Rdbms\IExpression;
27use Wikimedia\Rdbms\LikeValue;
28use Wikimedia\Rdbms\OrExpressionGroup;
29
30require_once __DIR__ . '/../Maintenance.php';
31
32class TrackBlobs extends Maintenance {
33    public $clusters, $textClause;
34    public $doBlobOrphans;
35    public $trackedBlobs = [];
36
37    public $batchSize = 1000;
38    public $reportingInterval = 10;
39
40    public function __construct() {
41        parent::__construct();
42
43        $this->addArg( 'cluster', 'cluster(s) to scan', true, true );
44
45        $this->addDescription(
46            'Adds blobs from a given ES cluster to the blob_tracking table. ' .
47            'Automatically deletes the tracking table and starts from the start again when restarted.'
48        );
49    }
50
51    public function execute() {
52        $this->clusters = $this->parameters->getArgs();
53        if ( extension_loaded( 'gmp' ) ) {
54            $this->doBlobOrphans = true;
55            foreach ( $this->clusters as $cluster ) {
56                $this->trackedBlobs[$cluster] = gmp_init( 0 );
57            }
58        } else {
59            echo "Warning: the gmp extension is needed to find orphan blobs\n";
60        }
61
62        $this->checkIntegrity();
63        $this->initTrackingTable();
64        $this->trackRevisions();
65        $this->trackOrphanText();
66        if ( $this->doBlobOrphans ) {
67            $this->findOrphanBlobs();
68        }
69        $this->output( "All done.\n" );
70    }
71
72    private function checkIntegrity() {
73        echo "Doing integrity check...\n";
74        $dbr = $this->getReplicaDB();
75
76        // Scan for HistoryBlobStub objects in the text table (T22757)
77
78        $exists = (bool)$dbr->newSelectQueryBuilder()
79            ->select( '1' )
80            ->from( 'text' )
81            ->where(
82                'old_flags LIKE \'%object%\' AND old_flags NOT LIKE \'%external%\' ' .
83                'AND LOWER(CONVERT(LEFT(old_text,22) USING latin1)) = \'o:15:"historyblobstub"\'' )
84            ->caller( __METHOD__ )->fetchField();
85
86        if ( $exists ) {
87            echo "Integrity check failed: found HistoryBlobStub objects in your text table.\n" .
88                "This script could destroy these objects if it continued. Run resolveStubs.php\n" .
89                "to fix this.\n";
90            exit( 1 );
91        }
92
93        echo "Integrity check OK\n";
94    }
95
96    private function initTrackingTable() {
97        $dbw = $this->getDB( DB_PRIMARY );
98        if ( $dbw->tableExists( 'blob_tracking', __METHOD__ ) ) {
99            $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_tracking' ), __METHOD__ );
100            $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_orphans' ), __METHOD__ );
101        }
102        $dbw->sourceFile( __DIR__ . '/blob_tracking.sql' );
103    }
104
105    private function getTextClause() {
106        if ( !$this->textClause ) {
107            $dbr = $this->getReplicaDB();
108            $conds = [];
109            foreach ( $this->clusters as $cluster ) {
110                $conds[] = $dbr->expr(
111                    'old_text',
112                    IExpression::LIKE,
113                    new LikeValue( "DB://$cluster/", $dbr->anyString() )
114                );
115            }
116            $this->textClause = new OrExpressionGroup( ...$conds );
117        }
118
119        return $this->textClause;
120    }
121
122    private function interpretPointer( $text ) {
123        if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) {
124            return false;
125        }
126
127        return [
128            'cluster' => $m[1],
129            'id' => intval( $m[2] ),
130            'hash' => $m[3] ?? null
131        ];
132    }
133
134    /**
135     *  Scan the revision table for rows stored in the specified clusters
136     */
137    private function trackRevisions() {
138        $dbw = $this->getPrimaryDB();
139        $dbr = $this->getReplicaDB();
140
141        $textClause = $this->getTextClause();
142        $startId = 0;
143        $endId = (int)$dbr->newSelectQueryBuilder()
144            ->select( 'MAX(rev_id)' )
145            ->from( 'revision' )
146            ->caller( __METHOD__ )->fetchField();
147        $batchesDone = 0;
148        $rowsInserted = 0;
149
150        echo "Finding revisions...\n";
151
152        $conds = [
153            $textClause,
154            $dbr->expr(
155                'old_flags',
156                IExpression::LIKE,
157                new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() )
158            )
159        ];
160        $slotRoleStore = $this->getServiceContainer()->getSlotRoleStore();
161
162        $conds = array_merge( [
163            'slot_role_id=' . $slotRoleStore->getId( SlotRecord::MAIN ),
164            'SUBSTRING(content_address, 1, 3)=' . $dbr->addQuotes( 'tt:' ),
165        ], $conds );
166
167        while ( true ) {
168            $res = $dbr->newSelectQueryBuilder()
169                ->select( [ 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ] )
170                ->from( 'revision' )
171                ->join( 'slots', null, 'rev_id=slot_revision_id' )
172                ->join( 'content', null, 'content_id=slot_content_id' )
173                ->join( 'text', null, 'SUBSTRING(content_address, 4)=old_id' )
174                ->where( $dbr->expr( 'rev_id', '>', $startId ) )
175                ->andWhere( $conds )
176                ->orderBy( 'rev_id' )
177                ->limit( $this->batchSize )
178                ->caller( __METHOD__ )->fetchResultSet();
179            if ( !$res->numRows() ) {
180                break;
181            }
182
183            $insertBatch = [];
184            foreach ( $res as $row ) {
185                $startId = (int)$row->rev_id;
186                $info = $this->interpretPointer( $row->old_text );
187                if ( !$info ) {
188                    echo "Invalid DB:// URL in rev_id {$row->rev_id}\n";
189                    continue;
190                }
191                if ( !in_array( $info['cluster'], $this->clusters ) ) {
192                    echo "Invalid cluster returned in SQL query: {$info['cluster']}\n";
193                    continue;
194                }
195                $insertBatch[] = [
196                    'bt_page' => $row->rev_page,
197                    'bt_rev_id' => $row->rev_id,
198                    'bt_text_id' => $row->old_id,
199                    'bt_cluster' => $info['cluster'],
200                    'bt_blob_id' => $info['id'],
201                    'bt_cgz_hash' => $info['hash']
202                ];
203                if ( $this->doBlobOrphans ) {
204                    gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
205                }
206            }
207            $dbw->newInsertQueryBuilder()
208                ->insertInto( 'blob_tracking' )
209                ->rows( $insertBatch )
210                ->caller( __METHOD__ )->execute();
211            $rowsInserted += count( $insertBatch );
212
213            ++$batchesDone;
214            if ( $batchesDone >= $this->reportingInterval ) {
215                $batchesDone = 0;
216                echo "$startId / $endId\n";
217                $this->waitForReplication();
218            }
219        }
220        echo "Found $rowsInserted revisions\n";
221    }
222
223    /**
224     * Scan the text table for orphan text
225     * Orphan text here does not imply DB corruption -- deleted text tracked by the
226     * archive table counts as orphan for our purposes.
227     */
228    private function trackOrphanText() {
229        # Wait until the blob_tracking table is available in the replica DB
230        $dbw = $this->getPrimaryDB();
231        $dbr = $this->getReplicaDB();
232        $pos = $dbw->getPrimaryPos();
233        $dbr->primaryPosWait( $pos, 100_000 );
234
235        $textClause = $this->getTextClause();
236        $startId = 0;
237        $endId = (int)$dbr->newSelectQueryBuilder()
238            ->select( 'MAX(old_id)' )
239            ->from( 'text' )
240            ->caller( __METHOD__ )->fetchField();
241        $rowsInserted = 0;
242        $batchesDone = 0;
243
244        echo "Finding orphan text...\n";
245
246        # Scan the text table for orphan text
247        while ( true ) {
248            $res = $dbr->newSelectQueryBuilder()
249                ->select( [ 'old_id', 'old_flags', 'old_text' ] )
250                ->from( 'text' )
251                ->leftJoin( 'blob_tracking', null, 'bt_text_id=old_id' )
252                ->where( [
253                    $dbr->expr( 'old_id', '>', $startId ),
254                    $textClause,
255                    $dbr->expr(
256                        'old_flags',
257                        IExpression::LIKE,
258                        new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() )
259                    ),
260                    'bt_text_id' => null,
261                ] )
262                ->orderBy( 'old_id' )
263                ->limit( $this->batchSize )
264                ->caller( __METHOD__ )->fetchResultSet();
265
266            if ( !$res->numRows() ) {
267                break;
268            }
269
270            $insertBatch = [];
271            foreach ( $res as $row ) {
272                $startId = (int)$row->old_id;
273                $info = $this->interpretPointer( $row->old_text );
274                if ( !$info ) {
275                    echo "Invalid DB:// URL in old_id {$row->old_id}\n";
276                    continue;
277                }
278                if ( !in_array( $info['cluster'], $this->clusters ) ) {
279                    echo "Invalid cluster returned in SQL query\n";
280                    continue;
281                }
282
283                $insertBatch[] = [
284                    'bt_page' => 0,
285                    'bt_rev_id' => 0,
286                    'bt_text_id' => $row->old_id,
287                    'bt_cluster' => $info['cluster'],
288                    'bt_blob_id' => $info['id'],
289                    'bt_cgz_hash' => $info['hash']
290                ];
291                if ( $this->doBlobOrphans ) {
292                    gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
293                }
294            }
295            $dbw->newInsertQueryBuilder()
296                ->insertInto( 'blob_tracking' )
297                ->rows( $insertBatch )
298                ->caller( __METHOD__ )->execute();
299
300            $rowsInserted += count( $insertBatch );
301            ++$batchesDone;
302            if ( $batchesDone >= $this->reportingInterval ) {
303                $batchesDone = 0;
304                echo "$startId / $endId\n";
305                $this->waitForReplication();
306            }
307        }
308        echo "Found $rowsInserted orphan text rows\n";
309    }
310
311    /**
312     * Scan the blobs table for rows not registered in blob_tracking (and thus not
313     * registered in the text table).
314     *
315     * Orphan blobs are indicative of DB corruption. They are inaccessible and
316     * should probably be deleted.
317     */
318    private function findOrphanBlobs() {
319        if ( !extension_loaded( 'gmp' ) ) {
320            echo "Can't find orphan blobs, need bitfield support provided by GMP.\n";
321
322            return;
323        }
324
325        $dbw = $this->getPrimaryDB();
326        $lbFactory = $this->getServiceContainer()->getDBLoadBalancerFactory();
327
328        foreach ( $this->clusters as $cluster ) {
329            echo "Searching for orphan blobs in $cluster...\n";
330            $lb = $lbFactory->getExternalLB( $cluster );
331            try {
332                $extDB = $lb->getMaintenanceConnectionRef( DB_REPLICA );
333            } catch ( DBConnectionError $e ) {
334                if ( strpos( $e->getMessage(), 'Unknown database' ) !== false ) {
335                    echo "No database on $cluster\n";
336                } else {
337                    echo "Error on $cluster" . $e->getMessage() . "\n";
338                }
339                continue;
340            }
341            $table = $extDB->getLBInfo( 'blobs table' ) ?? 'blobs';
342            if ( !$extDB->tableExists( $table, __METHOD__ ) ) {
343                echo "No blobs table on cluster $cluster\n";
344                continue;
345            }
346            $startId = 0;
347            $batchesDone = 0;
348            $actualBlobs = gmp_init( 0 );
349            $endId = (int)$extDB->newSelectQueryBuilder()
350                ->select( 'MAX(blob_id)' )
351                ->from( $table )
352                ->caller( __METHOD__ )->fetchField();
353
354            // Build a bitmap of actual blob rows
355            while ( true ) {
356                $res = $extDB->newSelectQueryBuilder()
357                    ->select( [ 'blob_id' ] )
358                    ->from( $table )
359                    ->where( $extDB->expr( 'blob_id', '>', $startId ) )
360                    ->orderBy( 'blob_id' )
361                    ->limit( $this->batchSize )
362                    ->caller( __METHOD__ )->fetchResultSet();
363
364                if ( !$res->numRows() ) {
365                    break;
366                }
367
368                foreach ( $res as $row ) {
369                    gmp_setbit( $actualBlobs, $row->blob_id );
370                    $startId = (int)$row->blob_id;
371                }
372
373                ++$batchesDone;
374                if ( $batchesDone >= $this->reportingInterval ) {
375                    $batchesDone = 0;
376                    echo "$startId / $endId\n";
377                }
378            }
379
380            // Find actual blobs that weren't tracked by the previous passes
381            // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B
382            $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) );
383
384            // Traverse the orphan list
385            $insertBatch = [];
386            $id = 0;
387            $numOrphans = 0;
388            while ( true ) {
389                $id = gmp_scan1( $orphans, $id );
390                if ( $id == -1 ) {
391                    break;
392                }
393                $insertBatch[] = [
394                    'bo_cluster' => $cluster,
395                    'bo_blob_id' => $id
396                ];
397                if ( count( $insertBatch ) > $this->batchSize ) {
398                    $dbw->newInsertQueryBuilder()
399                        ->insertInto( 'blob_orphans' )
400                        ->rows( $insertBatch )
401                        ->caller( __METHOD__ )->execute();
402                    $insertBatch = [];
403                }
404
405                ++$id;
406                ++$numOrphans;
407            }
408            if ( $insertBatch ) {
409                $dbw->newInsertQueryBuilder()
410                    ->insertInto( 'blob_orphans' )
411                    ->rows( $insertBatch )
412                    ->caller( __METHOD__ )->execute();
413            }
414            echo "Found $numOrphans orphan(s) in $cluster\n";
415        }
416    }
417}
418
419$maintClass = TrackBlobs::class;
420require_once RUN_MAINTENANCE_IF_MAIN;