Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 400
0.00% covered (danger)
0.00%
0 / 29
CRAP
0.00% covered (danger)
0.00%
0 / 2
RecompressTracked
0.00% covered (danger)
0.00%
0 / 326
0.00% covered (danger)
0.00%
0 / 24
8010
0.00% covered (danger)
0.00%
0 / 1
 getOptionsWithArgs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 newFromCommandLine
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 __construct
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
30
 debug
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 info
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 critical
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 logToFile
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 syncDBs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 executeParent
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 checkTrackingTable
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 startChildProcs
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
72
 killChildProcs
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
20
 dispatch
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
20
 dispatchToChild
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 doAllPages
0.00% covered (danger)
0.00%
0 / 33
0.00% covered (danger)
0.00%
0 / 1
56
 report
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 doAllOrphans
0.00% covered (danger)
0.00%
0 / 43
0.00% covered (danger)
0.00%
0 / 1
90
 executeChild
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
42
 doPage
0.00% covered (danger)
0.00%
0 / 44
0.00% covered (danger)
0.00%
0 / 1
90
 moveTextRow
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
6
 finishIncompleteMoves
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
30
 getTargetCluster
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 doOrphanList
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
30
CgzCopyTransaction
0.00% covered (danger)
0.00%
0 / 66
0.00% covered (danger)
0.00%
0 / 5
272
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 addItem
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 getSize
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 recompress
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 commit
0.00% covered (danger)
0.00%
0 / 48
0.00% covered (danger)
0.00%
0 / 1
110
1<?php
2/**
3 * Moves blobs indexed by trackBlobs.php to a specified list of destination
4 * clusters, and recompresses them in the process.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 * http://www.gnu.org/copyleft/gpl.html
20 *
21 * @file
22 * @ingroup Maintenance ExternalStorage
23 */
24
25use MediaWiki\Logger\LegacyLogger;
26use MediaWiki\MediaWikiServices;
27use MediaWiki\Shell\Shell;
28use MediaWiki\Storage\SqlBlobStore;
29use MediaWiki\Title\Title;
30use MediaWiki\WikiMap\WikiMap;
31use Wikimedia\AtEase\AtEase;
32
33$optionsWithArgs = RecompressTracked::getOptionsWithArgs();
34require __DIR__ . '/../CommandLineInc.php';
35
36if ( count( $args ) < 1 ) {
37    echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
38Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
39and recompresses them in the process. Restartable.
40
41Options:
42    --procs <procs>       Set the number of child processes (default 1)
43    --copy-only           Copy only, do not update the text table. Restart
44                          without this option to complete.
45    --debug-log <file>    Log debugging data to the specified file
46    --info-log <file>     Log progress messages to the specified file
47    --critical-log <file> Log error messages to the specified file
48";
49    exit( 1 );
50}
51
52$job = RecompressTracked::newFromCommandLine( $args, $options );
53$job->execute();
54
55/**
56 * Maintenance script that moves blobs indexed by trackBlobs.php to a specified
57 * list of destination clusters, and recompresses them in the process.
58 *
59 * @ingroup Maintenance ExternalStorage
60 */
61class RecompressTracked {
62    public $destClusters;
63    public $batchSize = 1000;
64    public $orphanBatchSize = 1000;
65    public $reportingInterval = 10;
66    public $numProcs = 1;
67    public $numBatches = 0;
68    public $pageBlobClass;
69    public $orphanBlobClass;
70    public $childPipes;
71    public $childProcs;
72    public $prevChildId;
73    public $copyOnly = false;
74    public $isChild = false;
75    public $childId = false;
76    public $noCount = false;
77    public ?string $debugLog = null;
78    public ?string $infoLog = null;
79    public ?string $criticalLog = null;
80    /** @var ExternalStoreDB */
81    public $store;
82    /** @var SqlBlobStore */
83    private $blobStore;
84
85    private static $optionsWithArgs = [
86        'procs',
87        'child-id',
88        'debug-log',
89        'info-log',
90        'critical-log'
91    ];
92
93    private static $cmdLineOptionMap = [
94        'no-count' => 'noCount',
95        'procs' => 'numProcs',
96        'copy-only' => 'copyOnly',
97        'child' => 'isChild',
98        'child-id' => 'childId',
99        'debug-log' => 'debugLog',
100        'info-log' => 'infoLog',
101        'critical-log' => 'criticalLog',
102    ];
103
104    public static function getOptionsWithArgs() {
105        return self::$optionsWithArgs;
106    }
107
108    public static function newFromCommandLine( $args, $options ) {
109        $jobOptions = [ 'destClusters' => $args ];
110        foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
111            if ( isset( $options[$cmdOption] ) ) {
112                $jobOptions[$classOption] = $options[$cmdOption];
113            }
114        }
115
116        return new self( $jobOptions );
117    }
118
119    public function __construct( $options ) {
120        foreach ( $options as $name => $value ) {
121            $this->$name = $value;
122        }
123        $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
124        $this->store = $esFactory->getStore( 'DB' );
125        if ( !$this->isChild ) {
126            $GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
127        } elseif ( $this->childId !== false ) {
128            $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->childId}";
129        }
130        $this->pageBlobClass = function_exists( 'xdiff_string_bdiff' ) ?
131            DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
132        $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
133
134        $this->blobStore = MediaWikiServices::getInstance()
135            ->getBlobStoreFactory()
136            ->newSqlBlobStore();
137    }
138
139    public function debug( $msg ) {
140        wfDebug( "$msg" );
141        if ( $this->debugLog ) {
142            $this->logToFile( $msg, $this->debugLog );
143        }
144    }
145
146    public function info( $msg ) {
147        echo "$msg\n";
148        if ( $this->infoLog ) {
149            $this->logToFile( $msg, $this->infoLog );
150        }
151    }
152
153    public function critical( $msg ) {
154        echo "$msg\n";
155        if ( $this->criticalLog ) {
156            $this->logToFile( $msg, $this->criticalLog );
157        }
158    }
159
160    private function logToFile( $msg, $file ) {
161        $header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid();
162        if ( $this->childId !== false ) {
163            $header .= "({$this->childId})";
164        }
165        $header .= ' ' . WikiMap::getCurrentWikiDbDomain()->getId();
166        LegacyLogger::emit( sprintf( "%-50s %s\n", $header, $msg ), $file );
167    }
168
169    /**
170     * Wait until the selected replica DB has caught up to the master.
171     * This allows us to use the replica DB for things that were committed in a
172     * previous part of this batch process.
173     */
174    private function syncDBs() {
175        MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication( [ 'timeout' => 100_000 ] );
176    }
177
178    /**
179     * Execute parent or child depending on the isChild option
180     */
181    public function execute() {
182        if ( $this->isChild ) {
183            $this->executeChild();
184        } else {
185            $this->executeParent();
186        }
187    }
188
189    /**
190     * Execute the parent process
191     */
192    public function executeParent() {
193        if ( !$this->checkTrackingTable() ) {
194            return;
195        }
196
197        $this->syncDBs();
198        $this->startChildProcs();
199        $this->doAllPages();
200        $this->doAllOrphans();
201        $this->killChildProcs();
202    }
203
204    /**
205     * Make sure the tracking table exists and isn't empty
206     * @return bool
207     */
208    private function checkTrackingTable() {
209        $row = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase()->newSelectQueryBuilder()
210            ->select( '*' )
211            ->from( 'blob_tracking' )
212            ->caller( __METHOD__ )->fetchRow();
213        if ( !$row ) {
214            $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." );
215
216            return false;
217        }
218
219        return true;
220    }
221
222    /**
223     * Start the worker processes.
224     * These processes will listen on stdin for commands.
225     * This necessary because text recompression is slow: loading, compressing and
226     * writing are all slow.
227     */
228    private function startChildProcs() {
229        $wiki = WikiMap::getCurrentWikiId();
230
231        $cmd = 'php ' . Shell::escape( __FILE__ );
232        foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
233            if ( $cmdOption == 'child-id' ) {
234                continue;
235            }
236            if ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
237                // @phan-suppress-next-line PhanTypeMismatchArgument False positive
238                $cmd .= " --$cmdOption " . Shell::escape( $this->$classOption );
239            } elseif ( $this->$classOption ) {
240                $cmd .= " --$cmdOption";
241            }
242        }
243        $cmd .= ' --child' .
244            ' --wiki ' . Shell::escape( $wiki ) .
245            ' ' . Shell::escape( ...$this->destClusters );
246
247        $this->childPipes = $this->childProcs = [];
248        for ( $i = 0; $i < $this->numProcs; $i++ ) {
249            $pipes = [];
250            $spec = [
251                [ 'pipe', 'r' ],
252                [ 'file', 'php://stdout', 'w' ],
253                [ 'file', 'php://stderr', 'w' ]
254            ];
255            AtEase::suppressWarnings();
256            $proc = proc_open( "$cmd --child-id $i", $spec, $pipes );
257            AtEase::restoreWarnings();
258            if ( !$proc ) {
259                $this->critical( "Error opening child process: $cmd" );
260                exit( 1 );
261            }
262            $this->childProcs[$i] = $proc;
263            $this->childPipes[$i] = $pipes[0];
264        }
265        $this->prevChildId = -1;
266    }
267
268    /**
269     * Gracefully terminate the child processes
270     */
271    private function killChildProcs() {
272        $this->info( "Waiting for child processes to finish..." );
273        for ( $i = 0; $i < $this->numProcs; $i++ ) {
274            $this->dispatchToChild( $i, 'quit' );
275        }
276        for ( $i = 0; $i < $this->numProcs; $i++ ) {
277            $status = proc_close( $this->childProcs[$i] );
278            if ( $status ) {
279                $this->critical( "Warning: child #$i exited with status $status" );
280            }
281        }
282        $this->info( "Done." );
283    }
284
285    /**
286     * Dispatch a command to the next available child process.
287     * This may block until a child process finishes its work and becomes available.
288     * @param array|string ...$args
289     */
290    private function dispatch( ...$args ) {
291        $pipes = $this->childPipes;
292        $x = [];
293        $y = [];
294        $numPipes = stream_select( $x, $pipes, $y, 3600 );
295        if ( !$numPipes ) {
296            $this->critical( "Error waiting to write to child process. Aborting" );
297            exit( 1 );
298        }
299        for ( $i = 0; $i < $this->numProcs; $i++ ) {
300            $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs;
301            if ( isset( $pipes[$childId] ) ) {
302                $this->prevChildId = $childId;
303                $this->dispatchToChild( $childId, $args );
304
305                return;
306            }
307        }
308        $this->critical( "Unreachable" );
309        exit( 1 );
310    }
311
312    /**
313     * Dispatch a command to a specified child process
314     * @param int $childId
315     * @param array|string $args
316     */
317    private function dispatchToChild( $childId, $args ) {
318        $args = (array)$args;
319        $cmd = implode( ' ', $args );
320        fwrite( $this->childPipes[$childId], "$cmd\n" );
321    }
322
323    /**
324     * Move all tracked pages to the new clusters
325     */
326    private function doAllPages() {
327        $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
328        $i = 0;
329        $startId = 0;
330        if ( $this->noCount ) {
331            $numPages = '[unknown]';
332        } else {
333            $numPages = $dbr->newSelectQueryBuilder()
334                ->select( 'COUNT(DISTINCT bt_page)' )
335                ->from( 'blob_tracking' )
336                ->where( [ 'bt_moved' => 0 ] )
337                ->caller( __METHOD__ )->fetchField();
338        }
339        if ( $this->copyOnly ) {
340            $this->info( "Copying pages..." );
341        } else {
342            $this->info( "Moving pages..." );
343        }
344        while ( true ) {
345            $res = $dbr->newSelectQueryBuilder()
346                ->select( [ 'bt_page' ] )
347                ->distinct()
348                ->from( 'blob_tracking' )
349                ->where( [ 'bt_moved' => 0, $dbr->expr( 'bt_page', '>', $startId ) ] )
350                ->orderBy( 'bt_page' )
351                ->limit( $this->batchSize )
352                ->caller( __METHOD__ )->fetchResultSet();
353            if ( !$res->numRows() ) {
354                break;
355            }
356            foreach ( $res as $row ) {
357                $startId = $row->bt_page;
358                $this->dispatch( 'doPage', $row->bt_page );
359                $i++;
360            }
361            $this->report( 'pages', $i, $numPages );
362        }
363        $this->report( 'pages', $i, $numPages );
364        if ( $this->copyOnly ) {
365            $this->info( "All page copies queued." );
366        } else {
367            $this->info( "All page moves queued." );
368        }
369    }
370
371    /**
372     * Display a progress report
373     * @param string $label
374     * @param int $current
375     * @param int $end
376     */
377    private function report( $label, $current, $end ) {
378        $this->numBatches++;
379        if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
380            $this->numBatches = 0;
381            $this->info( "$label$current / $end" );
382            MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
383        }
384    }
385
386    /**
387     * Move all orphan text to the new clusters
388     */
389    private function doAllOrphans() {
390        $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
391        $startId = 0;
392        $i = 0;
393        if ( $this->noCount ) {
394            $numOrphans = '[unknown]';
395        } else {
396            $numOrphans = $dbr->newSelectQueryBuilder()
397                ->select( 'COUNT(DISTINCT bt_text_id)' )
398                ->from( 'blob_tracking' )
399                ->where( [ 'bt_moved' => 0, 'bt_page' => 0 ] )
400                ->caller( __METHOD__ )->fetchField();
401            if ( !$numOrphans ) {
402                return;
403            }
404        }
405        if ( $this->copyOnly ) {
406            $this->info( "Copying orphans..." );
407        } else {
408            $this->info( "Moving orphans..." );
409        }
410
411        while ( true ) {
412            $res = $dbr->newSelectQueryBuilder()
413                ->select( [ 'bt_text_id' ] )
414                ->distinct()
415                ->from( 'blob_tracking' )
416                ->where( [ 'bt_moved' => 0, 'bt_page' => 0, $dbr->expr( 'bt_text_id', '>', $startId ) ] )
417                ->orderBy( 'bt_text_id' )
418                ->limit( $this->batchSize )
419                ->caller( __METHOD__ )->fetchResultSet();
420            if ( !$res->numRows() ) {
421                break;
422            }
423            $ids = [];
424            foreach ( $res as $row ) {
425                $startId = $row->bt_text_id;
426                $ids[] = $row->bt_text_id;
427                $i++;
428            }
429            // Need to send enough orphan IDs to the child at a time to fill a blob,
430            // so orphanBatchSize needs to be at least ~100.
431            // batchSize can be smaller or larger.
432            while ( count( $ids ) > $this->orphanBatchSize ) {
433                $args = array_slice( $ids, 0, $this->orphanBatchSize );
434                $ids = array_slice( $ids, $this->orphanBatchSize );
435                array_unshift( $args, 'doOrphanList' );
436                $this->dispatch( ...$args );
437            }
438            if ( count( $ids ) ) {
439                $args = $ids;
440                array_unshift( $args, 'doOrphanList' );
441                $this->dispatch( ...$args );
442            }
443
444            $this->report( 'orphans', $i, $numOrphans );
445        }
446        $this->report( 'orphans', $i, $numOrphans );
447        $this->info( "All orphans queued." );
448    }
449
450    /**
451     * Main entry point for worker processes
452     */
453    public function executeChild() {
454        $this->debug( 'starting' );
455        $this->syncDBs();
456
457        while ( !feof( STDIN ) ) {
458            $line = rtrim( fgets( STDIN ) );
459            if ( $line == '' ) {
460                continue;
461            }
462            $this->debug( $line );
463            $args = explode( ' ', $line );
464            $cmd = array_shift( $args );
465            switch ( $cmd ) {
466                case 'doPage':
467                    $this->doPage( intval( $args[0] ) );
468                    break;
469                case 'doOrphanList':
470                    $this->doOrphanList( array_map( 'intval', $args ) );
471                    break;
472                case 'quit':
473                    return;
474            }
475            MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
476        }
477    }
478
479    /**
480     * Move tracked text in a given page
481     *
482     * @param int $pageId
483     */
484    private function doPage( $pageId ) {
485        $title = Title::newFromID( $pageId );
486        if ( $title ) {
487            $titleText = $title->getPrefixedText();
488        } else {
489            $titleText = '[deleted]';
490        }
491        $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
492
493        // Finish any incomplete transactions
494        if ( !$this->copyOnly ) {
495            $this->finishIncompleteMoves( [ 'bt_page' => $pageId ] );
496            $this->syncDBs();
497        }
498
499        $startId = 0;
500        $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
501
502        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
503        while ( true ) {
504            $res = $dbr->newSelectQueryBuilder()
505                ->select( '*' )
506                ->from( 'blob_tracking' )
507                ->join( 'text', null, 'bt_text_id=old_id' )
508                ->where( [
509                    'bt_page' => $pageId,
510                    $dbr->expr( 'bt_text_id', '>', $startId ),
511                    'bt_moved' => 0,
512                    'bt_new_url' => null,
513                ] )
514                ->orderBy( 'bt_text_id' )
515                ->limit( $this->batchSize )
516                ->caller( __METHOD__ )->fetchResultSet();
517            if ( !$res->numRows() ) {
518                break;
519            }
520
521            $lastTextId = 0;
522            foreach ( $res as $row ) {
523                $startId = $row->bt_text_id;
524                if ( $lastTextId == $row->bt_text_id ) {
525                    // Duplicate (null edit)
526                    continue;
527                }
528                $lastTextId = $row->bt_text_id;
529                // Load the text
530                $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
531                if ( $text === false ) {
532                    $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
533                    continue;
534                }
535
536                // Queue it
537                if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
538                    $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
539                    $trx->commit();
540                    $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
541                    $lbFactory->waitForReplication();
542                }
543            }
544        }
545
546        $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
547        $trx->commit();
548    }
549
550    /**
551     * Atomic move operation.
552     *
553     * Write the new URL to the text table and set the bt_moved flag.
554     *
555     * This is done in a single transaction to provide restartable behavior
556     * without data loss.
557     *
558     * The transaction is kept short to reduce locking.
559     *
560     * @param int $textId
561     * @param string $url
562     */
563    public function moveTextRow( $textId, $url ) {
564        if ( $this->copyOnly ) {
565            $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" );
566            exit( 1 );
567        }
568        $dbw = MediaWikiServices::getInstance()->getConnectionProvider()->getPrimaryDatabase();
569
570        $dbw->begin( __METHOD__ );
571        $dbw->newUpdateQueryBuilder()
572            ->update( 'text' )
573            ->set( [
574                'old_text' => $url,
575                'old_flags' => 'external,utf-8',
576            ] )
577            ->where( [
578                'old_id' => $textId
579            ] )
580            ->caller( __METHOD__ )
581            ->execute();
582        $dbw->newUpdateQueryBuilder()
583            ->update( 'blob_tracking' )
584            ->set( [ 'bt_moved' => 1 ] )
585            ->where( [ 'bt_text_id' => $textId ] )
586            ->caller( __METHOD__ )
587            ->execute();
588        $dbw->commit( __METHOD__ );
589    }
590
591    /**
592     * Moves are done in two phases: bt_new_url and then bt_moved.
593     *  - bt_new_url indicates that the text has been copied to the new cluster.
594     *  - bt_moved indicates that the text table has been updated.
595     *
596     * This function completes any moves that only have done bt_new_url. This
597     * can happen when the script is interrupted, or when --copy-only is used.
598     *
599     * @param array $conds
600     */
601    private function finishIncompleteMoves( $conds ) {
602        $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
603        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
604
605        $startId = 0;
606        $conds = array_merge( $conds, [
607            'bt_moved' => 0,
608            'bt_new_url IS NOT NULL'
609        ] );
610        while ( true ) {
611            $res = $dbr->newSelectQueryBuilder()
612                ->select( '*' )
613                ->from( 'blob_tracking' )
614                ->where( $conds )
615                ->andWhere( $dbr->expr( 'bt_text_id', '>', $startId ) )
616                ->orderBy( 'bt_text_id' )
617                ->limit( $this->batchSize )
618                ->caller( __METHOD__ )->fetchResultSet();
619            if ( !$res->numRows() ) {
620                break;
621            }
622            $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' );
623            foreach ( $res as $row ) {
624                $startId = $row->bt_text_id;
625                $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
626                if ( $row->bt_text_id % 10 == 0 ) {
627                    $lbFactory->waitForReplication();
628                }
629            }
630        }
631    }
632
633    /**
634     * Returns the name of the next target cluster
635     * @return string
636     */
637    public function getTargetCluster() {
638        $cluster = next( $this->destClusters );
639        if ( $cluster === false ) {
640            $cluster = reset( $this->destClusters );
641        }
642
643        return $cluster;
644    }
645
646    /**
647     * Move an orphan text_id to the new cluster
648     *
649     * @param array $textIds
650     */
651    private function doOrphanList( $textIds ) {
652        // Finish incomplete moves
653        if ( !$this->copyOnly ) {
654            $this->finishIncompleteMoves( [ 'bt_text_id' => $textIds ] );
655            $this->syncDBs();
656        }
657
658        $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
659
660        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
661        $res = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase()->newSelectQueryBuilder()
662            ->select( [ 'old_id', 'old_text', 'old_flags' ] )
663            ->distinct()
664            ->from( 'text' )
665            ->join( 'blob_tracking', null, 'bt_text_id=old_id' )
666            ->where( [ 'old_id' => $textIds, 'bt_moved' => 0 ] )
667            ->caller( __METHOD__ )->fetchResultSet();
668
669        foreach ( $res as $row ) {
670            $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
671            if ( $text === false ) {
672                $this->critical( "Error: cannot load revision text for old_id={$row->old_id}" );
673                continue;
674            }
675
676            if ( !$trx->addItem( $text, $row->old_id ) ) {
677                $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
678                $trx->commit();
679                $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
680                $lbFactory->waitForReplication();
681            }
682        }
683        $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
684        $trx->commit();
685    }
686}
687
688/**
689 * Class to represent a recompression operation for a single CGZ blob
690 */
691class CgzCopyTransaction {
692    /** @var RecompressTracked */
693    public $parent;
694    public $blobClass;
695    /** @var ConcatenatedGzipHistoryBlob|false */
696    public $cgz;
697    public $referrers;
698    /** @var array */
699    private $texts;
700
701    /**
702     * Create a transaction from a RecompressTracked object
703     * @param RecompressTracked $parent
704     * @param string $blobClass
705     */
706    public function __construct( $parent, $blobClass ) {
707        $this->blobClass = $blobClass;
708        $this->cgz = false;
709        $this->texts = [];
710        $this->parent = $parent;
711    }
712
713    /**
714     * Add text.
715     * Returns false if it's ready to commit.
716     * @param string $text
717     * @param int $textId
718     * @return bool
719     */
720    public function addItem( $text, $textId ) {
721        if ( !$this->cgz ) {
722            $class = $this->blobClass;
723            $this->cgz = new $class;
724        }
725        $hash = $this->cgz->addItem( $text );
726        $this->referrers[$textId] = $hash;
727        $this->texts[$textId] = $text;
728
729        return $this->cgz->isHappy();
730    }
731
732    public function getSize() {
733        return count( $this->texts );
734    }
735
736    /**
737     * Recompress text after some aberrant modification
738     */
739    public function recompress() {
740        $class = $this->blobClass;
741        $this->cgz = new $class;
742        $this->referrers = [];
743        foreach ( $this->texts as $textId => $text ) {
744            $hash = $this->cgz->addItem( $text );
745            $this->referrers[$textId] = $hash;
746        }
747    }
748
749    /**
750     * Commit the blob.
751     * Does nothing if no text items have been added.
752     * May skip the move if --copy-only is set.
753     */
754    public function commit() {
755        $originalCount = count( $this->texts );
756        if ( !$originalCount ) {
757            return;
758        }
759
760        /* Check to see if the target text_ids have been moved already.
761         *
762         * We originally read from the replica DB, so this can happen when a single
763         * text_id is shared between multiple pages. It's rare, but possible
764         * if a delete/move/undelete cycle splits up a null edit.
765         *
766         * We do a locking read to prevent closer-run race conditions.
767         */
768        $dbw = MediaWikiServices::getInstance()->getConnectionProvider()->getPrimaryDatabase();
769        $dbw->begin( __METHOD__ );
770        $res = $dbw->newSelectQueryBuilder()
771            ->select( [ 'bt_text_id', 'bt_moved' ] )
772            ->forUpdate()
773            ->from( 'blob_tracking' )
774            ->where( [ 'bt_text_id' => array_keys( $this->referrers ) ] )
775            ->caller( __METHOD__ )->fetchResultSet();
776        $dirty = false;
777        foreach ( $res as $row ) {
778            if ( $row->bt_moved ) {
779                # This row has already been moved, remove it
780                $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" );
781                unset( $this->texts[$row->bt_text_id] );
782                $dirty = true;
783            }
784        }
785
786        // Recompress the blob if necessary
787        if ( $dirty ) {
788            if ( !count( $this->texts ) ) {
789                // All have been moved already
790                if ( $originalCount > 1 ) {
791                    // This is suspcious, make noise
792                    $this->parent->critical(
793                        "Warning: concurrent operation detected, are there two conflicting " .
794                        "processes running, doing the same job?" );
795                }
796
797                return;
798            }
799            $this->recompress();
800        }
801
802        // Insert the data into the destination cluster
803        $targetCluster = $this->parent->getTargetCluster();
804        $store = $this->parent->store;
805        $targetDB = $store->getPrimary( $targetCluster );
806        $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
807        $targetDB->begin( __METHOD__ );
808        $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
809
810        // Write the new URLs to the blob_tracking table
811        foreach ( $this->referrers as $textId => $hash ) {
812            $url = $baseUrl . '/' . $hash;
813            $dbw->newUpdateQueryBuilder()
814                ->update( 'blob_tracking' )
815                ->set( [ 'bt_new_url' => $url ] )
816                ->where( [
817                    'bt_text_id' => $textId,
818                    'bt_moved' => 0, # Check for concurrent conflicting update
819                ] )
820                ->caller( __METHOD__ )
821                ->execute();
822        }
823
824        $targetDB->commit( __METHOD__ );
825        // Critical section here: interruption at this point causes blob duplication
826        // Reversing the order of the commits would cause data loss instead
827        $dbw->commit( __METHOD__ );
828
829        // Write the new URLs to the text table and set the moved flag
830        if ( !$this->parent->copyOnly ) {
831            foreach ( $this->referrers as $textId => $hash ) {
832                $url = $baseUrl . '/' . $hash;
833                $this->parent->moveTextRow( $textId, $url );
834            }
835        }
836    }
837}