29 use Wikimedia\AtEase\AtEase;
32 require __DIR__ .
'/../CommandLineInc.php';
34 if ( count(
$args ) < 1 ) {
35 echo
"Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
36 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
37 and recompresses them in the process. Restartable.
40 --procs <procs> Set the number of child processes (default 1)
41 --copy-only Copy only, do not update the text table. Restart
42 without this option to complete.
43 --debug-log <file> Log debugging data to the specified file
44 --info-log <file> Log progress messages to the specified file
45 --critical-log <file> Log error messages to the specified file
87 'no-count' =>
'noCount',
88 'procs' =>
'numProcs',
89 'copy-only' =>
'copyOnly',
91 'child-id' =>
'childId',
92 'debug-log' =>
'debugLog',
93 'info-log' =>
'infoLog',
94 'critical-log' =>
'criticalLog',
102 $jobOptions = [
'destClusters' =>
$args ];
103 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
104 if ( isset( $options[$cmdOption] ) ) {
105 $jobOptions[$classOption] = $options[$cmdOption];
109 return new self( $jobOptions );
113 foreach ( $options as $name => $value ) {
114 $this->$name = $value;
116 $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
117 $this->store = $esFactory->getStore(
'DB' );
118 if ( !$this->isChild ) {
119 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
120 } elseif ( $this->childId !==
false ) {
121 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->childId}: ";
123 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
124 DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
125 $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
127 $this->blobStore = MediaWikiServices::getInstance()
128 ->getBlobStoreFactory()
134 if ( $this->debugLog ) {
135 $this->
logToFile( $msg, $this->debugLog );
141 if ( $this->infoLog ) {
142 $this->
logToFile( $msg, $this->infoLog );
148 if ( $this->criticalLog ) {
149 $this->
logToFile( $msg, $this->criticalLog );
155 if ( $this->childId !==
false ) {
156 $header .=
"({$this->childId})";
159 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ),
$file );
170 $pos = $dbw->getPrimaryPos();
171 $dbr->primaryPosWait( $pos, 100000 );
178 if ( $this->isChild ) {
206 if ( !
$dbr->tableExists(
'blob_tracking', __METHOD__ ) ) {
207 $this->
critical(
"Error: blob_tracking table does not exist" );
211 $row =
$dbr->selectRow(
'blob_tracking',
'*',
'', __METHOD__ );
213 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
230 $cmd =
'php ' . Shell::escape( __FILE__ );
231 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
232 if ( $cmdOption ==
'child-id' ) {
237 $cmd .=
" --$cmdOption " . Shell::escape( $this->$classOption );
238 } elseif ( $this->$classOption ) {
239 $cmd .=
" --$cmdOption";
243 ' --wiki ' . Shell::escape( $wiki ) .
244 ' ' . Shell::escape( ...$this->destClusters );
246 $this->childPipes = $this->childProcs = [];
251 [
'file',
'php://stdout',
'w' ],
252 [
'file',
'php://stderr',
'w' ]
254 AtEase::suppressWarnings();
255 $proc = proc_open(
"$cmd --child-id $i", $spec, $pipes );
256 AtEase::restoreWarnings();
258 $this->
critical(
"Error opening child process: $cmd" );
261 $this->childProcs[$i] = $proc;
262 $this->childPipes[$i] = $pipes[0];
264 $this->prevChildId = -1;
271 $this->
info(
"Waiting for child processes to finish..." );
276 $status = proc_close( $this->childProcs[$i] );
278 $this->
critical(
"Warning: child #$i exited with status $status" );
281 $this->
info(
"Done." );
293 $numPipes = stream_select( $x, $pipes, $y, 3600 );
295 $this->
critical(
"Error waiting to write to child process. Aborting" );
299 $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs;
318 $cmd = implode(
' ',
$args );
319 fwrite( $this->childPipes[
$childId],
"$cmd\n" );
329 if ( $this->noCount ) {
330 $numPages =
'[unknown]';
332 $numPages =
$dbr->selectField(
'blob_tracking',
333 'COUNT(DISTINCT bt_page)',
334 # A condition is required so that
this query uses the index
339 if ( $this->copyOnly ) {
340 $this->
info(
"Copying pages..." );
342 $this->
info(
"Moving pages..." );
345 $res =
$dbr->select(
'blob_tracking',
349 'bt_page > ' .
$dbr->addQuotes( $startId )
354 'ORDER BY' =>
'bt_page',
355 'LIMIT' => $this->batchSize,
358 if ( !
$res->numRows() ) {
361 foreach (
$res as $row ) {
362 $startId = $row->bt_page;
363 $this->
dispatch(
'doPage', $row->bt_page );
366 $this->
report(
'pages', $i, $numPages );
368 $this->
report(
'pages', $i, $numPages );
369 if ( $this->copyOnly ) {
370 $this->
info(
"All page copies queued." );
372 $this->
info(
"All page moves queued." );
382 private function report( $label, $current, $end ) {
384 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
385 $this->numBatches = 0;
386 $this->
info(
"$label: $current / $end" );
387 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
398 if ( $this->noCount ) {
399 $numOrphans =
'[unknown]';
401 $numOrphans =
$dbr->selectField(
'blob_tracking',
402 'COUNT(DISTINCT bt_text_id)',
403 [
'bt_moved' => 0,
'bt_page' => 0 ],
405 if ( !$numOrphans ) {
409 if ( $this->copyOnly ) {
410 $this->
info(
"Copying orphans..." );
412 $this->
info(
"Moving orphans..." );
416 $res =
$dbr->select(
'blob_tracking',
421 'bt_text_id > ' .
$dbr->addQuotes( $startId )
426 'ORDER BY' =>
'bt_text_id',
427 'LIMIT' => $this->batchSize
430 if ( !
$res->numRows() ) {
434 foreach (
$res as $row ) {
435 $startId = $row->bt_text_id;
436 $ids[] = $row->bt_text_id;
442 while ( count( $ids ) > $this->orphanBatchSize ) {
443 $args = array_slice( $ids, 0, $this->orphanBatchSize );
444 $ids = array_slice( $ids, $this->orphanBatchSize );
445 array_unshift(
$args,
'doOrphanList' );
448 if ( count( $ids ) ) {
450 array_unshift(
$args,
'doOrphanList' );
454 $this->
report(
'orphans', $i, $numOrphans );
456 $this->
report(
'orphans', $i, $numOrphans );
457 $this->
info(
"All orphans queued." );
464 $this->
debug(
'starting' );
467 while ( !feof( STDIN ) ) {
468 $line = rtrim( fgets( STDIN ) );
474 $cmd = array_shift(
$args );
485 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
497 $titleText =
$title->getPrefixedText();
499 $titleText =
'[deleted]';
504 if ( !$this->copyOnly ) {
512 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
515 [
'blob_tracking',
'text' ],
518 'bt_page' => $pageId,
519 'bt_text_id > ' .
$dbr->addQuotes( $startId ),
521 'bt_new_url IS NULL',
526 'ORDER BY' =>
'bt_text_id',
527 'LIMIT' => $this->batchSize
530 if ( !
$res->numRows() ) {
535 foreach (
$res as $row ) {
536 $startId = $row->bt_text_id;
537 if ( $lastTextId == $row->bt_text_id ) {
541 $lastTextId = $row->bt_text_id;
543 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
544 if ( $text ===
false ) {
545 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
550 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
551 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
554 $lbFactory->waitForReplication();
559 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
577 if ( $this->copyOnly ) {
578 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
582 $dbw->begin( __METHOD__ );
583 $dbw->update(
'text',
586 'old_flags' =>
'external,utf-8',
593 $dbw->update(
'blob_tracking',
595 [
'bt_text_id' => $textId ],
598 $dbw->commit( __METHOD__ );
613 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
616 $conds = array_merge( $conds, [
618 'bt_new_url IS NOT NULL'
621 $res =
$dbr->select(
'blob_tracking',
623 array_merge( $conds, [
'bt_text_id > ' .
$dbr->addQuotes( $startId ) ] ),
626 'ORDER BY' =>
'bt_text_id',
627 'LIMIT' => $this->batchSize,
630 if ( !
$res->numRows() ) {
633 $this->
debug(
'Incomplete: ' .
$res->numRows() .
' rows' );
634 foreach (
$res as $row ) {
635 $startId = $row->bt_text_id;
636 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
637 if ( $row->bt_text_id % 10 == 0 ) {
638 $lbFactory->waitForReplication();
649 $cluster = next( $this->destClusters );
650 if ( $cluster ===
false ) {
651 $cluster = reset( $this->destClusters );
664 if ( !$this->copyOnly ) {
671 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
673 [
'text',
'blob_tracking' ],
674 [
'old_id',
'old_text',
'old_flags' ],
676 'old_id' => $textIds,
684 foreach (
$res as $row ) {
685 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
686 if ( $text ===
false ) {
687 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
691 if ( !$trx->addItem( $text, $row->old_id ) ) {
692 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
695 $lbFactory->waitForReplication();
698 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
738 $this->cgz =
new $class;
740 $hash = $this->cgz->addItem( $text );
741 $this->referrers[$textId] = $hash;
742 $this->texts[$textId] = $text;
744 return $this->cgz->isHappy();
748 return count( $this->texts );
756 $this->cgz =
new $class;
757 $this->referrers = [];
758 foreach ( $this->texts as $textId => $text ) {
759 $hash = $this->cgz->addItem( $text );
760 $this->referrers[$textId] = $hash;
770 $originalCount = count( $this->texts );
771 if ( !$originalCount ) {
784 $dbw->begin( __METHOD__ );
785 $res = $dbw->select(
'blob_tracking',
786 [
'bt_text_id',
'bt_moved' ],
787 [
'bt_text_id' => array_keys( $this->referrers ) ],
788 __METHOD__, [
'FOR UPDATE' ] );
790 foreach (
$res as $row ) {
791 if ( $row->bt_moved ) {
792 # This row has already been moved, remove it
793 $this->parent->debug(
"TRX: conflict detected in old_id={$row->bt_text_id}" );
794 unset( $this->texts[$row->bt_text_id] );
801 if ( !count( $this->texts ) ) {
803 if ( $originalCount > 1 ) {
805 $this->parent->critical(
806 "Warning: concurrent operation detected, are there two conflicting " .
807 "processes running, doing the same job?" );
816 $targetCluster = $this->parent->getTargetCluster();
817 $store = $this->parent->store;
818 $targetDB = $store->getPrimary( $targetCluster );
819 $targetDB->clearFlag(
DBO_TRX );
820 $targetDB->begin( __METHOD__ );
821 $baseUrl = $this->parent->store->store( $targetCluster,
serialize( $this->cgz ) );
824 foreach ( $this->referrers as $textId => $hash ) {
825 $url = $baseUrl .
'/' . $hash;
826 $dbw->update(
'blob_tracking',
827 [
'bt_new_url' => $url ],
829 'bt_text_id' => $textId,
830 'bt_moved' => 0, # Check
for concurrent conflicting update
836 $targetDB->commit( __METHOD__ );
839 $dbw->commit( __METHOD__ );
842 if ( !$this->parent->copyOnly ) {
843 foreach ( $this->referrers as $textId => $hash ) {
844 $url = $baseUrl .
'/' . $hash;
845 $this->parent->moveTextRow( $textId, $url );
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfGetDB( $db, $groups=[], $wiki=false)
Get a Database object.
wfHostname()
Get host name of the current machine, for use in error reporting.
Class to represent a recompression operation for a single CGZ blob.
RecompressTracked $parent
ConcatenatedGzipHistoryBlob false $cgz
addItem( $text, $textId)
Add text.
recompress()
Recompress text after some aberrant modification.
__construct( $parent, $blobClass)
Create a transaction from a RecompressTracked object.
Maintenance script that moves blobs indexed by trackBlobs.php to a specified list of destination clus...
executeChild()
Main entry point for worker processes.
moveTextRow( $textId, $url)
Atomic move operation.
killChildProcs()
Gracefully terminate the child processes.
report( $label, $current, $end)
Display a progress report.
execute()
Execute parent or child depending on the isChild option.
doOrphanList( $textIds)
Move an orphan text_id to the new cluster.
startChildProcs()
Start the worker processes.
doAllOrphans()
Move all orphan text to the new clusters.
static newFromCommandLine( $args, $options)
finishIncompleteMoves( $conds)
Moves are done in two phases: bt_new_url and then bt_moved.
dispatch(... $args)
Dispatch a command to the next available child process.
doPage( $pageId)
Move tracked text in a given page.
syncDBs()
Wait until the selected replica DB has caught up to the master.
dispatchToChild( $childId, $args)
Dispatch a command to a specified child process.
static getOptionsWithArgs()
getTargetCluster()
Returns the name of the next target cluster.
executeParent()
Execute the parent process.
doAllPages()
Move all tracked pages to the new clusters.
checkTrackingTable()
Make sure the tracking table exists and isn't empty.
static newFromID( $id, $flags=0)
Create a new Title from an article ID.
static getCurrentWikiId()
static getCurrentWikiDbDomain()
if(count( $args)< 1) $job
if(PHP_SAPI !='cli-server') if(!isset( $_SERVER['SCRIPT_FILENAME'])) $file
Item class for a filearchive table row.