Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 260
0.00% covered (danger)
0.00%
0 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 1
TrackBlobs
0.00% covered (danger)
0.00%
0 / 257
0.00% covered (danger)
0.00%
0 / 9
1980
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
20
 checkIntegrity
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
6
 initTrackingTable
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 getTextClause
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 interpretPointer
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 trackRevisions
0.00% covered (danger)
0.00%
0 / 69
0.00% covered (danger)
0.00%
0 / 1
72
 trackOrphanText
0.00% covered (danger)
0.00%
0 / 63
0.00% covered (danger)
0.00%
0 / 1
72
 findOrphanBlobs
0.00% covered (danger)
0.00%
0 / 68
0.00% covered (danger)
0.00%
0 / 1
210
1<?php
2/**
3 * Adds blobs from a given external storage cluster to the blob_tracking table.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Maintenance
22 */
23
24use MediaWiki\Revision\SlotRecord;
25use Wikimedia\Rdbms\DBConnectionError;
26use Wikimedia\Rdbms\IExpression;
27use Wikimedia\Rdbms\LikeValue;
28
29require_once __DIR__ . '/../Maintenance.php';
30
31class TrackBlobs extends Maintenance {
32    public $clusters;
33    public $textClause;
34    public $doBlobOrphans;
35    public $trackedBlobs = [];
36
37    public $batchSize = 1000;
38    public $reportingInterval = 10;
39
40    public function __construct() {
41        parent::__construct();
42
43        $this->addArg( 'cluster', 'cluster(s) to scan', true, true );
44
45        $this->addDescription(
46            'Adds blobs from a given ES cluster to the blob_tracking table. ' .
47            'Automatically deletes the tracking table and starts from the start again when restarted.'
48        );
49    }
50
51    public function execute() {
52        $this->clusters = $this->parameters->getArgs();
53        if ( extension_loaded( 'gmp' ) ) {
54            $this->doBlobOrphans = true;
55            foreach ( $this->clusters as $cluster ) {
56                $this->trackedBlobs[$cluster] = gmp_init( 0 );
57            }
58        } else {
59            echo "Warning: the gmp extension is needed to find orphan blobs\n";
60        }
61
62        $this->checkIntegrity();
63        $this->initTrackingTable();
64        $this->trackRevisions();
65        $this->trackOrphanText();
66        if ( $this->doBlobOrphans ) {
67            $this->findOrphanBlobs();
68        }
69        $this->output( "All done.\n" );
70    }
71
72    private function checkIntegrity() {
73        echo "Doing integrity check...\n";
74        $dbr = $this->getReplicaDB();
75
76        // Scan for HistoryBlobStub objects in the text table (T22757)
77
78        $exists = (bool)$dbr->newSelectQueryBuilder()
79            ->select( '1' )
80            ->from( 'text' )
81            ->where(
82                'old_flags LIKE \'%object%\' AND old_flags NOT LIKE \'%external%\' ' .
83                'AND LOWER(CONVERT(LEFT(old_text,22) USING latin1)) = \'o:15:"historyblobstub"\'' )
84            ->caller( __METHOD__ )->fetchField();
85
86        if ( $exists ) {
87            echo "Integrity check failed: found HistoryBlobStub objects in your text table.\n" .
88                "This script could destroy these objects if it continued. Run resolveStubs.php\n" .
89                "to fix this.\n";
90            exit( 1 );
91        }
92
93        echo "Integrity check OK\n";
94    }
95
96    private function initTrackingTable() {
97        $dbw = $this->getDB( DB_PRIMARY );
98        if ( $dbw->tableExists( 'blob_tracking', __METHOD__ ) ) {
99            $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_tracking' ), __METHOD__ );
100            $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_orphans' ), __METHOD__ );
101        }
102        $dbw->sourceFile( __DIR__ . '/blob_tracking.sql' );
103    }
104
105    private function getTextClause() {
106        if ( !$this->textClause ) {
107            $dbr = $this->getReplicaDB();
108            $conds = [];
109            foreach ( $this->clusters as $cluster ) {
110                $conds[] = $dbr->expr(
111                    'old_text',
112                    IExpression::LIKE,
113                    new LikeValue( "DB://$cluster/", $dbr->anyString() )
114                );
115            }
116            $this->textClause = $dbr->orExpr( $conds );
117        }
118
119        return $this->textClause;
120    }
121
122    private function interpretPointer( $text ) {
123        if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) {
124            return false;
125        }
126
127        return [
128            'cluster' => $m[1],
129            'id' => intval( $m[2] ),
130            'hash' => $m[3] ?? null
131        ];
132    }
133
134    /**
135     *  Scan the revision table for rows stored in the specified clusters
136     */
137    private function trackRevisions() {
138        $dbw = $this->getPrimaryDB();
139        $dbr = $this->getReplicaDB();
140
141        $textClause = $this->getTextClause();
142        $startId = 0;
143        $endId = (int)$dbr->newSelectQueryBuilder()
144            ->select( 'MAX(rev_id)' )
145            ->from( 'revision' )
146            ->caller( __METHOD__ )->fetchField();
147        $batchesDone = 0;
148        $rowsInserted = 0;
149
150        echo "Finding revisions...\n";
151
152        $conds = [
153            $textClause,
154            $dbr->expr(
155                'old_flags',
156                IExpression::LIKE,
157                new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() )
158            )
159        ];
160        $slotRoleStore = $this->getServiceContainer()->getSlotRoleStore();
161
162        $conds = array_merge( [
163            'slot_role_id=' . $slotRoleStore->getId( SlotRecord::MAIN ),
164            'SUBSTRING(content_address, 1, 3)=' . $dbr->addQuotes( 'tt:' ),
165        ], $conds );
166
167        while ( true ) {
168            $res = $dbr->newSelectQueryBuilder()
169                ->select( [ 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ] )
170                ->from( 'revision' )
171                ->join( 'slots', null, 'rev_id=slot_revision_id' )
172                ->join( 'content', null, 'content_id=slot_content_id' )
173                ->join( 'text', null, 'SUBSTRING(content_address, 4)=old_id' )
174                ->where( $dbr->expr( 'rev_id', '>', $startId ) )
175                ->andWhere( $conds )
176                ->orderBy( 'rev_id' )
177                ->limit( $this->batchSize )
178                ->caller( __METHOD__ )->fetchResultSet();
179            if ( !$res->numRows() ) {
180                break;
181            }
182
183            $insertBatch = [];
184            foreach ( $res as $row ) {
185                $startId = (int)$row->rev_id;
186                $info = $this->interpretPointer( $row->old_text );
187                if ( !$info ) {
188                    echo "Invalid DB:// URL in rev_id {$row->rev_id}\n";
189                    continue;
190                }
191                if ( !in_array( $info['cluster'], $this->clusters ) ) {
192                    echo "Invalid cluster returned in SQL query: {$info['cluster']}\n";
193                    continue;
194                }
195                $insertBatch[] = [
196                    'bt_page' => $row->rev_page,
197                    'bt_rev_id' => $row->rev_id,
198                    'bt_text_id' => $row->old_id,
199                    'bt_cluster' => $info['cluster'],
200                    'bt_blob_id' => $info['id'],
201                    'bt_cgz_hash' => $info['hash']
202                ];
203                if ( $this->doBlobOrphans ) {
204                    gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
205                }
206            }
207            $dbw->newInsertQueryBuilder()
208                ->insertInto( 'blob_tracking' )
209                ->rows( $insertBatch )
210                ->caller( __METHOD__ )->execute();
211            $rowsInserted += count( $insertBatch );
212
213            ++$batchesDone;
214            if ( $batchesDone >= $this->reportingInterval ) {
215                $batchesDone = 0;
216                echo "$startId / $endId\n";
217                $this->waitForReplication();
218            }
219        }
220        echo "Found $rowsInserted revisions\n";
221    }
222
223    /**
224     * Scan the text table for orphan text
225     * Orphan text here does not imply DB corruption -- deleted text tracked by the
226     * archive table counts as orphan for our purposes.
227     */
228    private function trackOrphanText() {
229        # Wait until the blob_tracking table is available in the replica DB
230        $dbw = $this->getPrimaryDB();
231        $dbr = $this->getReplicaDB();
232        $this->getServiceContainer()->getDBLoadBalancerFactory()->waitForReplication( [ 'timeout' => 100_000 ] );
233
234        $textClause = $this->getTextClause();
235        $startId = 0;
236        $endId = (int)$dbr->newSelectQueryBuilder()
237            ->select( 'MAX(old_id)' )
238            ->from( 'text' )
239            ->caller( __METHOD__ )->fetchField();
240        $rowsInserted = 0;
241        $batchesDone = 0;
242
243        echo "Finding orphan text...\n";
244
245        # Scan the text table for orphan text
246        while ( true ) {
247            $res = $dbr->newSelectQueryBuilder()
248                ->select( [ 'old_id', 'old_flags', 'old_text' ] )
249                ->from( 'text' )
250                ->leftJoin( 'blob_tracking', null, 'bt_text_id=old_id' )
251                ->where( [
252                    $dbr->expr( 'old_id', '>', $startId ),
253                    $textClause,
254                    $dbr->expr(
255                        'old_flags',
256                        IExpression::LIKE,
257                        new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() )
258                    ),
259                    'bt_text_id' => null,
260                ] )
261                ->orderBy( 'old_id' )
262                ->limit( $this->batchSize )
263                ->caller( __METHOD__ )->fetchResultSet();
264
265            if ( !$res->numRows() ) {
266                break;
267            }
268
269            $insertBatch = [];
270            foreach ( $res as $row ) {
271                $startId = (int)$row->old_id;
272                $info = $this->interpretPointer( $row->old_text );
273                if ( !$info ) {
274                    echo "Invalid DB:// URL in old_id {$row->old_id}\n";
275                    continue;
276                }
277                if ( !in_array( $info['cluster'], $this->clusters ) ) {
278                    echo "Invalid cluster returned in SQL query\n";
279                    continue;
280                }
281
282                $insertBatch[] = [
283                    'bt_page' => 0,
284                    'bt_rev_id' => 0,
285                    'bt_text_id' => $row->old_id,
286                    'bt_cluster' => $info['cluster'],
287                    'bt_blob_id' => $info['id'],
288                    'bt_cgz_hash' => $info['hash']
289                ];
290                if ( $this->doBlobOrphans ) {
291                    gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
292                }
293            }
294            $dbw->newInsertQueryBuilder()
295                ->insertInto( 'blob_tracking' )
296                ->rows( $insertBatch )
297                ->caller( __METHOD__ )->execute();
298
299            $rowsInserted += count( $insertBatch );
300            ++$batchesDone;
301            if ( $batchesDone >= $this->reportingInterval ) {
302                $batchesDone = 0;
303                echo "$startId / $endId\n";
304                $this->waitForReplication();
305            }
306        }
307        echo "Found $rowsInserted orphan text rows\n";
308    }
309
310    /**
311     * Scan the blobs table for rows not registered in blob_tracking (and thus not
312     * registered in the text table).
313     *
314     * Orphan blobs are indicative of DB corruption. They are inaccessible and
315     * should probably be deleted.
316     */
317    private function findOrphanBlobs() {
318        if ( !extension_loaded( 'gmp' ) ) {
319            echo "Can't find orphan blobs, need bitfield support provided by GMP.\n";
320
321            return;
322        }
323
324        $dbw = $this->getPrimaryDB();
325        $lbFactory = $this->getServiceContainer()->getDBLoadBalancerFactory();
326
327        foreach ( $this->clusters as $cluster ) {
328            echo "Searching for orphan blobs in $cluster...\n";
329            $lb = $lbFactory->getExternalLB( $cluster );
330            try {
331                $extDB = $lb->getMaintenanceConnectionRef( DB_REPLICA );
332            } catch ( DBConnectionError $e ) {
333                if ( strpos( $e->getMessage(), 'Unknown database' ) !== false ) {
334                    echo "No database on $cluster\n";
335                } else {
336                    echo "Error on $cluster" . $e->getMessage() . "\n";
337                }
338                continue;
339            }
340            $table = $extDB->getLBInfo( 'blobs table' ) ?? 'blobs';
341            if ( !$extDB->tableExists( $table, __METHOD__ ) ) {
342                echo "No blobs table on cluster $cluster\n";
343                continue;
344            }
345            $startId = 0;
346            $batchesDone = 0;
347            $actualBlobs = gmp_init( 0 );
348            $endId = (int)$extDB->newSelectQueryBuilder()
349                ->select( 'MAX(blob_id)' )
350                ->from( $table )
351                ->caller( __METHOD__ )->fetchField();
352
353            // Build a bitmap of actual blob rows
354            while ( true ) {
355                $res = $extDB->newSelectQueryBuilder()
356                    ->select( [ 'blob_id' ] )
357                    ->from( $table )
358                    ->where( $extDB->expr( 'blob_id', '>', $startId ) )
359                    ->orderBy( 'blob_id' )
360                    ->limit( $this->batchSize )
361                    ->caller( __METHOD__ )->fetchResultSet();
362
363                if ( !$res->numRows() ) {
364                    break;
365                }
366
367                foreach ( $res as $row ) {
368                    gmp_setbit( $actualBlobs, $row->blob_id );
369                    $startId = (int)$row->blob_id;
370                }
371
372                ++$batchesDone;
373                if ( $batchesDone >= $this->reportingInterval ) {
374                    $batchesDone = 0;
375                    echo "$startId / $endId\n";
376                }
377            }
378
379            // Find actual blobs that weren't tracked by the previous passes
380            // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B
381            $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) );
382
383            // Traverse the orphan list
384            $insertBatch = [];
385            $id = 0;
386            $numOrphans = 0;
387            while ( true ) {
388                $id = gmp_scan1( $orphans, $id );
389                if ( $id == -1 ) {
390                    break;
391                }
392                $insertBatch[] = [
393                    'bo_cluster' => $cluster,
394                    'bo_blob_id' => $id
395                ];
396                if ( count( $insertBatch ) > $this->batchSize ) {
397                    $dbw->newInsertQueryBuilder()
398                        ->insertInto( 'blob_orphans' )
399                        ->rows( $insertBatch )
400                        ->caller( __METHOD__ )->execute();
401                    $insertBatch = [];
402                }
403
404                ++$id;
405                ++$numOrphans;
406            }
407            if ( $insertBatch ) {
408                $dbw->newInsertQueryBuilder()
409                    ->insertInto( 'blob_orphans' )
410                    ->rows( $insertBatch )
411                    ->caller( __METHOD__ )->execute();
412            }
413            echo "Found $numOrphans orphan(s) in $cluster\n";
414        }
415    }
416}
417
418$maintClass = TrackBlobs::class;
419require_once RUN_MAINTENANCE_IF_MAIN;