29use Wikimedia\AtEase\AtEase;
32require __DIR__ .
'/../CommandLineInc.php';
34if ( count(
$args ) < 1 ) {
35 echo
"Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
36Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
37and recompresses them in the process. Restartable.
40 --procs <procs> Set the number of child processes (default 1)
41 --copy-only Copy only, do not update the text table. Restart
42 without this option to complete.
43 --debug-log <file> Log debugging data to the specified file
44 --info-log <file> Log progress messages to the specified file
45 --critical-log <file> Log error messages to the specified file
87 'no-count' =>
'noCount',
88 'procs' =>
'numProcs',
89 'copy-only' =>
'copyOnly',
91 'child-id' =>
'childId',
92 'debug-log' =>
'debugLog',
93 'info-log' =>
'infoLog',
94 'critical-log' =>
'criticalLog',
102 $jobOptions = [
'destClusters' =>
$args ];
103 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
104 if ( isset( $options[$cmdOption] ) ) {
105 $jobOptions[$classOption] = $options[$cmdOption];
109 return new self( $jobOptions );
113 foreach ( $options as $name => $value ) {
114 $this->$name = $value;
116 $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
117 $this->store = $esFactory->getStore(
'DB' );
118 if ( !$this->isChild ) {
119 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
120 } elseif ( $this->childId !==
false ) {
121 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->childId}: ";
123 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
124 DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
125 $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
127 $this->blobStore = MediaWikiServices::getInstance()
128 ->getBlobStoreFactory()
134 if ( $this->debugLog ) {
135 $this->
logToFile( $msg, $this->debugLog );
141 if ( $this->infoLog ) {
142 $this->
logToFile( $msg, $this->infoLog );
148 if ( $this->criticalLog ) {
149 $this->
logToFile( $msg, $this->criticalLog );
155 if ( $this->childId !==
false ) {
156 $header .=
"({$this->childId})";
158 $header .=
' ' . WikiMap::getCurrentWikiDbDomain()->getId();
159 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ),
$file );
170 $pos = $dbw->getPrimaryPos();
171 $dbr->primaryPosWait( $pos, 100000 );
178 if ( $this->isChild ) {
206 if ( !
$dbr->tableExists(
'blob_tracking', __METHOD__ ) ) {
207 $this->
critical(
"Error: blob_tracking table does not exist" );
211 $row =
$dbr->selectRow(
'blob_tracking',
'*',
'', __METHOD__ );
213 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
228 $wiki = WikiMap::getCurrentWikiId();
230 $cmd =
'php ' . Shell::escape( __FILE__ );
231 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
232 if ( $cmdOption ==
'child-id' ) {
235 if ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
236 $cmd .=
" --$cmdOption " . Shell::escape( $this->$classOption );
237 } elseif ( $this->$classOption ) {
238 $cmd .=
" --$cmdOption";
242 ' --wiki ' . Shell::escape( $wiki ) .
243 ' ' . Shell::escape( ...$this->destClusters );
245 $this->childPipes = $this->childProcs = [];
250 [
'file',
'php://stdout',
'w' ],
251 [
'file',
'php://stderr',
'w' ]
253 AtEase::suppressWarnings();
254 $proc = proc_open(
"$cmd --child-id $i", $spec, $pipes );
255 AtEase::restoreWarnings();
257 $this->
critical(
"Error opening child process: $cmd" );
260 $this->childProcs[$i] = $proc;
261 $this->childPipes[$i] = $pipes[0];
263 $this->prevChildId = -1;
270 $this->
info(
"Waiting for child processes to finish..." );
275 $status = proc_close( $this->childProcs[$i] );
277 $this->
critical(
"Warning: child #$i exited with status $status" );
280 $this->
info(
"Done." );
292 $numPipes = stream_select( $x, $pipes, $y, 3600 );
294 $this->
critical(
"Error waiting to write to child process. Aborting" );
298 $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs;
317 $cmd = implode(
' ',
$args );
318 fwrite( $this->childPipes[
$childId],
"$cmd\n" );
328 if ( $this->noCount ) {
329 $numPages =
'[unknown]';
331 $numPages =
$dbr->selectField(
'blob_tracking',
332 'COUNT(DISTINCT bt_page)',
333 # A condition is required so that
this query uses the index
338 if ( $this->copyOnly ) {
339 $this->
info(
"Copying pages..." );
341 $this->
info(
"Moving pages..." );
344 $res =
$dbr->select(
'blob_tracking',
348 'bt_page > ' .
$dbr->addQuotes( $startId )
353 'ORDER BY' =>
'bt_page',
354 'LIMIT' => $this->batchSize,
357 if ( !
$res->numRows() ) {
360 foreach (
$res as $row ) {
361 $startId = $row->bt_page;
362 $this->
dispatch(
'doPage', $row->bt_page );
365 $this->
report(
'pages', $i, $numPages );
367 $this->
report(
'pages', $i, $numPages );
368 if ( $this->copyOnly ) {
369 $this->
info(
"All page copies queued." );
371 $this->
info(
"All page moves queued." );
381 private function report( $label, $current, $end ) {
383 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
384 $this->numBatches = 0;
385 $this->
info(
"$label: $current / $end" );
386 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
397 if ( $this->noCount ) {
398 $numOrphans =
'[unknown]';
400 $numOrphans =
$dbr->selectField(
'blob_tracking',
401 'COUNT(DISTINCT bt_text_id)',
402 [
'bt_moved' => 0,
'bt_page' => 0 ],
404 if ( !$numOrphans ) {
408 if ( $this->copyOnly ) {
409 $this->
info(
"Copying orphans..." );
411 $this->
info(
"Moving orphans..." );
415 $res =
$dbr->select(
'blob_tracking',
420 'bt_text_id > ' .
$dbr->addQuotes( $startId )
425 'ORDER BY' =>
'bt_text_id',
426 'LIMIT' => $this->batchSize
429 if ( !
$res->numRows() ) {
433 foreach (
$res as $row ) {
434 $startId = $row->bt_text_id;
435 $ids[] = $row->bt_text_id;
441 while ( count( $ids ) > $this->orphanBatchSize ) {
442 $args = array_slice( $ids, 0, $this->orphanBatchSize );
443 $ids = array_slice( $ids, $this->orphanBatchSize );
444 array_unshift(
$args,
'doOrphanList' );
447 if ( count( $ids ) ) {
449 array_unshift(
$args,
'doOrphanList' );
453 $this->
report(
'orphans', $i, $numOrphans );
455 $this->
report(
'orphans', $i, $numOrphans );
456 $this->
info(
"All orphans queued." );
463 $this->
debug(
'starting' );
466 while ( !feof( STDIN ) ) {
467 $line = rtrim( fgets( STDIN ) );
473 $cmd = array_shift(
$args );
484 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
494 $title = Title::newFromID( $pageId );
496 $titleText =
$title->getPrefixedText();
498 $titleText =
'[deleted]';
503 if ( !$this->copyOnly ) {
511 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
514 [
'blob_tracking',
'text' ],
517 'bt_page' => $pageId,
518 'bt_text_id > ' .
$dbr->addQuotes( $startId ),
520 'bt_new_url IS NULL',
525 'ORDER BY' =>
'bt_text_id',
526 'LIMIT' => $this->batchSize
529 if ( !
$res->numRows() ) {
534 foreach (
$res as $row ) {
535 $startId = $row->bt_text_id;
536 if ( $lastTextId == $row->bt_text_id ) {
540 $lastTextId = $row->bt_text_id;
542 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
543 if ( $text ===
false ) {
544 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
549 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
550 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
553 $lbFactory->waitForReplication();
558 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
576 if ( $this->copyOnly ) {
577 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
581 $dbw->begin( __METHOD__ );
582 $dbw->update(
'text',
585 'old_flags' =>
'external,utf-8',
592 $dbw->update(
'blob_tracking',
594 [
'bt_text_id' => $textId ],
597 $dbw->commit( __METHOD__ );
612 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
615 $conds = array_merge( $conds, [
617 'bt_new_url IS NOT NULL'
620 $res =
$dbr->select(
'blob_tracking',
622 array_merge( $conds, [
'bt_text_id > ' .
$dbr->addQuotes( $startId ) ] ),
625 'ORDER BY' =>
'bt_text_id',
626 'LIMIT' => $this->batchSize,
629 if ( !
$res->numRows() ) {
632 $this->
debug(
'Incomplete: ' .
$res->numRows() .
' rows' );
633 foreach (
$res as $row ) {
634 $startId = $row->bt_text_id;
635 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
636 if ( $row->bt_text_id % 10 == 0 ) {
637 $lbFactory->waitForReplication();
648 $cluster = next( $this->destClusters );
649 if ( $cluster ===
false ) {
650 $cluster = reset( $this->destClusters );
663 if ( !$this->copyOnly ) {
670 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
672 [
'text',
'blob_tracking' ],
673 [
'old_id',
'old_text',
'old_flags' ],
675 'old_id' => $textIds,
683 foreach (
$res as $row ) {
684 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
685 if ( $text ===
false ) {
686 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
690 if ( !$trx->addItem( $text, $row->old_id ) ) {
691 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
694 $lbFactory->waitForReplication();
697 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
737 $this->cgz =
new $class;
739 $hash = $this->cgz->addItem( $text );
740 $this->referrers[$textId] = $hash;
741 $this->texts[$textId] = $text;
743 return $this->cgz->isHappy();
747 return count( $this->texts );
755 $this->cgz =
new $class;
756 $this->referrers = [];
757 foreach ( $this->texts as $textId => $text ) {
758 $hash = $this->cgz->addItem( $text );
759 $this->referrers[$textId] = $hash;
769 $originalCount = count( $this->texts );
770 if ( !$originalCount ) {
783 $dbw->begin( __METHOD__ );
784 $res = $dbw->select(
'blob_tracking',
785 [
'bt_text_id',
'bt_moved' ],
786 [
'bt_text_id' => array_keys( $this->referrers ) ],
787 __METHOD__, [
'FOR UPDATE' ] );
789 foreach (
$res as $row ) {
790 if ( $row->bt_moved ) {
791 # This row has already been moved, remove it
792 $this->parent->debug(
"TRX: conflict detected in old_id={$row->bt_text_id}" );
793 unset( $this->texts[$row->bt_text_id] );
800 if ( !count( $this->texts ) ) {
802 if ( $originalCount > 1 ) {
804 $this->parent->critical(
805 "Warning: concurrent operation detected, are there two conflicting " .
806 "processes running, doing the same job?" );
815 $targetCluster = $this->parent->getTargetCluster();
816 $store = $this->parent->store;
817 $targetDB = $store->getPrimary( $targetCluster );
818 $targetDB->clearFlag(
DBO_TRX );
819 $targetDB->begin( __METHOD__ );
820 $baseUrl = $this->parent->store->store( $targetCluster,
serialize( $this->cgz ) );
823 foreach ( $this->referrers as $textId => $hash ) {
824 $url = $baseUrl .
'/' . $hash;
825 $dbw->update(
'blob_tracking',
826 [
'bt_new_url' => $url ],
828 'bt_text_id' => $textId,
829 'bt_moved' => 0, # Check
for concurrent conflicting update
835 $targetDB->commit( __METHOD__ );
838 $dbw->commit( __METHOD__ );
841 if ( !$this->parent->copyOnly ) {
842 foreach ( $this->referrers as $textId => $hash ) {
843 $url = $baseUrl .
'/' . $hash;
844 $this->parent->moveTextRow( $textId, $url );
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfGetDB( $db, $groups=[], $wiki=false)
Get a Database object.
wfHostname()
Get host name of the current machine, for use in error reporting.
Class to represent a recompression operation for a single CGZ blob.
RecompressTracked $parent
ConcatenatedGzipHistoryBlob false $cgz
addItem( $text, $textId)
Add text.
recompress()
Recompress text after some aberrant modification.
__construct( $parent, $blobClass)
Create a transaction from a RecompressTracked object.
Concatenated gzip (CGZ) storage Improves compression ratio by concatenating like objects before gzipp...
External storage in a SQL database.
Maintenance script that moves blobs indexed by trackBlobs.php to a specified list of destination clus...
executeChild()
Main entry point for worker processes.
moveTextRow( $textId, $url)
Atomic move operation.
killChildProcs()
Gracefully terminate the child processes.
report( $label, $current, $end)
Display a progress report.
execute()
Execute parent or child depending on the isChild option.
doOrphanList( $textIds)
Move an orphan text_id to the new cluster.
startChildProcs()
Start the worker processes.
doAllOrphans()
Move all orphan text to the new clusters.
static newFromCommandLine( $args, $options)
finishIncompleteMoves( $conds)
Moves are done in two phases: bt_new_url and then bt_moved.
dispatch(... $args)
Dispatch a command to the next available child process.
doPage( $pageId)
Move tracked text in a given page.
syncDBs()
Wait until the selected replica DB has caught up to the master.
dispatchToChild( $childId, $args)
Dispatch a command to a specified child process.
static getOptionsWithArgs()
getTargetCluster()
Returns the name of the next target cluster.
executeParent()
Execute the parent process.
doAllPages()
Move all tracked pages to the new clusters.
checkTrackingTable()
Make sure the tracking table exists and isn't empty.
if(count( $args)< 1) $job
if(PHP_SAPI !='cli-server') if(!isset( $_SERVER['SCRIPT_FILENAME'])) $file
Item class for a filearchive table row.