31require __DIR__ .
'/../CommandLineInc.php';
33if ( count(
$args ) < 1 ) {
34 echo
"Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
35Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
36and recompresses them in the process. Restartable.
39 --procs <procs> Set the number of child processes (default 1)
40 --copy-only Copy only, do not update the text table. Restart
41 without this option to complete.
42 --debug-log <file> Log debugging data to the specified file
43 --info-log <file> Log progress messages to the specified file
44 --critical-log <file> Log error messages to the specified file
86 'no-count' =>
'noCount',
87 'procs' =>
'numProcs',
88 'copy-only' =>
'copyOnly',
90 'child-id' =>
'childId',
91 'debug-log' =>
'debugLog',
92 'info-log' =>
'infoLog',
93 'critical-log' =>
'criticalLog',
101 $jobOptions = [
'destClusters' =>
$args ];
102 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
103 if ( isset( $options[$cmdOption] ) ) {
104 $jobOptions[$classOption] = $options[$cmdOption];
108 return new self( $jobOptions );
112 foreach ( $options as $name => $value ) {
113 $this->$name = $value;
115 $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
116 $this->store = $esFactory->getStore(
'DB' );
117 if ( !$this->isChild ) {
118 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
119 } elseif ( $this->childId !==
false ) {
120 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->childId}: ";
122 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
123 DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
124 $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
126 $this->blobStore = MediaWikiServices::getInstance()
127 ->getBlobStoreFactory()
133 if ( $this->debugLog ) {
134 $this->
logToFile( $msg, $this->debugLog );
140 if ( $this->infoLog ) {
141 $this->
logToFile( $msg, $this->infoLog );
147 if ( $this->criticalLog ) {
148 $this->
logToFile( $msg, $this->criticalLog );
154 if ( $this->childId !==
false ) {
155 $header .=
"({$this->childId})";
157 $header .=
' ' . WikiMap::getCurrentWikiDbDomain()->getId();
158 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ),
$file );
169 $pos = $dbw->getPrimaryPos();
170 $dbr->primaryPosWait( $pos, 100000 );
177 if ( $this->isChild ) {
205 if ( !
$dbr->tableExists(
'blob_tracking', __METHOD__ ) ) {
206 $this->
critical(
"Error: blob_tracking table does not exist" );
210 $row =
$dbr->selectRow(
'blob_tracking',
'*',
'', __METHOD__ );
212 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
227 $wiki = WikiMap::getCurrentWikiId();
229 $cmd =
'php ' . Shell::escape( __FILE__ );
230 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
231 if ( $cmdOption ==
'child-id' ) {
234 if ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
235 $cmd .=
" --$cmdOption " . Shell::escape( $this->$classOption );
236 } elseif ( $this->$classOption ) {
237 $cmd .=
" --$cmdOption";
241 ' --wiki ' . Shell::escape( $wiki ) .
242 ' ' . Shell::escape( ...$this->destClusters );
244 $this->childPipes = $this->childProcs = [];
249 [
'file',
'php://stdout',
'w' ],
250 [
'file',
'php://stderr',
'w' ]
252 Wikimedia\suppressWarnings();
253 $proc = proc_open(
"$cmd --child-id $i", $spec, $pipes );
254 Wikimedia\restoreWarnings();
256 $this->
critical(
"Error opening child process: $cmd" );
259 $this->childProcs[$i] = $proc;
260 $this->childPipes[$i] = $pipes[0];
262 $this->prevChildId = -1;
269 $this->
info(
"Waiting for child processes to finish..." );
274 $status = proc_close( $this->childProcs[$i] );
276 $this->
critical(
"Warning: child #$i exited with status $status" );
279 $this->
info(
"Done." );
291 $numPipes = stream_select( $x, $pipes, $y, 3600 );
293 $this->
critical(
"Error waiting to write to child process. Aborting" );
297 $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs;
316 $cmd = implode(
' ',
$args );
317 fwrite( $this->childPipes[
$childId],
"$cmd\n" );
327 if ( $this->noCount ) {
328 $numPages =
'[unknown]';
330 $numPages =
$dbr->selectField(
'blob_tracking',
331 'COUNT(DISTINCT bt_page)',
332 # A condition is required so that
this query uses the index
337 if ( $this->copyOnly ) {
338 $this->
info(
"Copying pages..." );
340 $this->
info(
"Moving pages..." );
343 $res =
$dbr->select(
'blob_tracking',
347 'bt_page > ' .
$dbr->addQuotes( $startId )
352 'ORDER BY' =>
'bt_page',
353 'LIMIT' => $this->batchSize,
356 if ( !
$res->numRows() ) {
359 foreach (
$res as $row ) {
360 $startId = $row->bt_page;
361 $this->
dispatch(
'doPage', $row->bt_page );
364 $this->
report(
'pages', $i, $numPages );
366 $this->
report(
'pages', $i, $numPages );
367 if ( $this->copyOnly ) {
368 $this->
info(
"All page copies queued." );
370 $this->
info(
"All page moves queued." );
380 private function report( $label, $current, $end ) {
382 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
383 $this->numBatches = 0;
384 $this->
info(
"$label: $current / $end" );
385 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
396 if ( $this->noCount ) {
397 $numOrphans =
'[unknown]';
399 $numOrphans =
$dbr->selectField(
'blob_tracking',
400 'COUNT(DISTINCT bt_text_id)',
401 [
'bt_moved' => 0,
'bt_page' => 0 ],
403 if ( !$numOrphans ) {
407 if ( $this->copyOnly ) {
408 $this->
info(
"Copying orphans..." );
410 $this->
info(
"Moving orphans..." );
414 $res =
$dbr->select(
'blob_tracking',
419 'bt_text_id > ' .
$dbr->addQuotes( $startId )
424 'ORDER BY' =>
'bt_text_id',
425 'LIMIT' => $this->batchSize
428 if ( !
$res->numRows() ) {
432 foreach (
$res as $row ) {
433 $startId = $row->bt_text_id;
434 $ids[] = $row->bt_text_id;
440 while ( count( $ids ) > $this->orphanBatchSize ) {
441 $args = array_slice( $ids, 0, $this->orphanBatchSize );
442 $ids = array_slice( $ids, $this->orphanBatchSize );
443 array_unshift(
$args,
'doOrphanList' );
446 if ( count( $ids ) ) {
448 array_unshift(
$args,
'doOrphanList' );
452 $this->
report(
'orphans', $i, $numOrphans );
454 $this->
report(
'orphans', $i, $numOrphans );
455 $this->
info(
"All orphans queued." );
462 $this->
debug(
'starting' );
465 while ( !feof( STDIN ) ) {
466 $line = rtrim( fgets( STDIN ) );
472 $cmd = array_shift(
$args );
483 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
493 $title = Title::newFromID( $pageId );
495 $titleText =
$title->getPrefixedText();
497 $titleText =
'[deleted]';
502 if ( !$this->copyOnly ) {
510 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
513 [
'blob_tracking',
'text' ],
516 'bt_page' => $pageId,
517 'bt_text_id > ' .
$dbr->addQuotes( $startId ),
519 'bt_new_url IS NULL',
524 'ORDER BY' =>
'bt_text_id',
525 'LIMIT' => $this->batchSize
528 if ( !
$res->numRows() ) {
533 foreach (
$res as $row ) {
534 $startId = $row->bt_text_id;
535 if ( $lastTextId == $row->bt_text_id ) {
539 $lastTextId = $row->bt_text_id;
541 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
542 if ( $text ===
false ) {
543 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
548 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
549 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
552 $lbFactory->waitForReplication();
557 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
575 if ( $this->copyOnly ) {
576 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
580 $dbw->begin( __METHOD__ );
581 $dbw->update(
'text',
584 'old_flags' =>
'external,utf-8',
591 $dbw->update(
'blob_tracking',
593 [
'bt_text_id' => $textId ],
596 $dbw->commit( __METHOD__ );
611 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
614 $conds = array_merge( $conds, [
616 'bt_new_url IS NOT NULL'
619 $res =
$dbr->select(
'blob_tracking',
621 array_merge( $conds, [
'bt_text_id > ' .
$dbr->addQuotes( $startId ) ] ),
624 'ORDER BY' =>
'bt_text_id',
625 'LIMIT' => $this->batchSize,
628 if ( !
$res->numRows() ) {
631 $this->
debug(
'Incomplete: ' .
$res->numRows() .
' rows' );
632 foreach (
$res as $row ) {
633 $startId = $row->bt_text_id;
634 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
635 if ( $row->bt_text_id % 10 == 0 ) {
636 $lbFactory->waitForReplication();
647 $cluster = next( $this->destClusters );
648 if ( $cluster ===
false ) {
649 $cluster = reset( $this->destClusters );
662 if ( !$this->copyOnly ) {
669 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
671 [
'text',
'blob_tracking' ],
672 [
'old_id',
'old_text',
'old_flags' ],
674 'old_id' => $textIds,
682 foreach (
$res as $row ) {
683 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
684 if ( $text ===
false ) {
685 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
689 if ( !$trx->addItem( $text, $row->old_id ) ) {
690 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
693 $lbFactory->waitForReplication();
696 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
736 $this->cgz =
new $class;
738 $hash = $this->cgz->addItem( $text );
739 $this->referrers[$textId] = $hash;
740 $this->texts[$textId] = $text;
742 return $this->cgz->isHappy();
746 return count( $this->texts );
754 $this->cgz =
new $class;
755 $this->referrers = [];
756 foreach ( $this->texts as $textId => $text ) {
757 $hash = $this->cgz->addItem( $text );
758 $this->referrers[$textId] = $hash;
768 $originalCount = count( $this->texts );
769 if ( !$originalCount ) {
782 $dbw->begin( __METHOD__ );
783 $res = $dbw->select(
'blob_tracking',
784 [
'bt_text_id',
'bt_moved' ],
785 [
'bt_text_id' => array_keys( $this->referrers ) ],
786 __METHOD__, [
'FOR UPDATE' ] );
788 foreach (
$res as $row ) {
789 if ( $row->bt_moved ) {
790 # This row has already been moved, remove it
791 $this->parent->debug(
"TRX: conflict detected in old_id={$row->bt_text_id}" );
792 unset( $this->texts[$row->bt_text_id] );
799 if ( !count( $this->texts ) ) {
801 if ( $originalCount > 1 ) {
803 $this->parent->critical(
804 "Warning: concurrent operation detected, are there two conflicting " .
805 "processes running, doing the same job?" );
814 $targetCluster = $this->parent->getTargetCluster();
815 $store = $this->parent->store;
816 $targetDB = $store->getPrimary( $targetCluster );
817 $targetDB->clearFlag(
DBO_TRX );
818 $targetDB->begin( __METHOD__ );
819 $baseUrl = $this->parent->store->store( $targetCluster,
serialize( $this->cgz ) );
822 foreach ( $this->referrers as $textId => $hash ) {
823 $url = $baseUrl .
'/' . $hash;
824 $dbw->update(
'blob_tracking',
825 [
'bt_new_url' => $url ],
827 'bt_text_id' => $textId,
828 'bt_moved' => 0, # Check
for concurrent conflicting update
834 $targetDB->commit( __METHOD__ );
837 $dbw->commit( __METHOD__ );
840 if ( !$this->parent->copyOnly ) {
841 foreach ( $this->referrers as $textId => $hash ) {
842 $url = $baseUrl .
'/' . $hash;
843 $this->parent->moveTextRow( $textId, $url );
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfGetDB( $db, $groups=[], $wiki=false)
Get a Database object.
wfHostname()
Get host name of the current machine, for use in error reporting.
Class to represent a recompression operation for a single CGZ blob.
RecompressTracked $parent
ConcatenatedGzipHistoryBlob false $cgz
addItem( $text, $textId)
Add text.
recompress()
Recompress text after some aberrant modification.
__construct( $parent, $blobClass)
Create a transaction from a RecompressTracked object.
Concatenated gzip (CGZ) storage Improves compression ratio by concatenating like objects before gzipp...
DB accessible external objects.
Maintenance script that moves blobs indexed by trackBlobs.php to a specified list of destination clus...
executeChild()
Main entry point for worker processes.
moveTextRow( $textId, $url)
Atomic move operation.
killChildProcs()
Gracefully terminate the child processes.
report( $label, $current, $end)
Display a progress report.
execute()
Execute parent or child depending on the isChild option.
doOrphanList( $textIds)
Move an orphan text_id to the new cluster.
startChildProcs()
Start the worker processes.
doAllOrphans()
Move all orphan text to the new clusters.
static newFromCommandLine( $args, $options)
finishIncompleteMoves( $conds)
Moves are done in two phases: bt_new_url and then bt_moved.
dispatch(... $args)
Dispatch a command to the next available child process.
doPage( $pageId)
Move tracked text in a given page.
syncDBs()
Wait until the selected replica DB has caught up to the master.
dispatchToChild( $childId, $args)
Dispatch a command to a specified child process.
static getOptionsWithArgs()
getTargetCluster()
Returns the name of the next target cluster.
executeParent()
Execute the parent process.
doAllPages()
Move all tracked pages to the new clusters.
checkTrackingTable()
Make sure the tracking table exists and isn't empty.
if(count( $args)< 1) $job
if(PHP_SAPI !='cli-server') if(!isset( $_SERVER['SCRIPT_FILENAME'])) $file
Item class for a filearchive table row.