32require __DIR__ .
'/../commandLine.inc';
34if ( count(
$args ) < 1 ) {
35 echo
"Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
36Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
37and recompresses them in the process. Restartable.
40 --procs <procs> Set the number of child processes (default 1)
41 --copy-only Copy only, do not update the text table. Restart
42 without this option to complete.
43 --debug-log <file> Log debugging data to the specified file
44 --info-log <file> Log progress messages to the specified file
45 --critical-log <file> Log error messages to the specified file
87 'no-count' =>
'noCount',
88 'procs' =>
'numProcs',
89 'copy-only' =>
'copyOnly',
91 'child-id' =>
'childId',
92 'debug-log' =>
'debugLog',
93 'info-log' =>
'infoLog',
94 'critical-log' =>
'criticalLog',
102 $jobOptions = [
'destClusters' =>
$args ];
103 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
104 if ( isset( $options[$cmdOption] ) ) {
105 $jobOptions[$classOption] = $options[$cmdOption];
109 return new self( $jobOptions );
113 foreach ( $options as $name => $value ) {
114 $this->$name = $value;
116 $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
117 $this->store = $esFactory->getStore(
'DB' );
118 if ( !$this->isChild ) {
119 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
120 } elseif ( $this->childId !==
false ) {
121 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->childId}: ";
123 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
124 DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
125 $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
127 $this->blobStore = MediaWikiServices::getInstance()
128 ->getBlobStoreFactory()
134 if ( $this->debugLog ) {
135 $this->
logToFile( $msg, $this->debugLog );
141 if ( $this->infoLog ) {
142 $this->
logToFile( $msg, $this->infoLog );
148 if ( $this->criticalLog ) {
149 $this->
logToFile( $msg, $this->criticalLog );
155 if ( $this->childId !==
false ) {
156 $header .=
"({$this->childId})";
158 $header .=
' ' . WikiMap::getCurrentWikiDbDomain()->getId();
159 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ),
$file );
170 $pos = $dbw->getMasterPos();
171 $dbr->masterPosWait( $pos, 100000 );
178 if ( $this->isChild ) {
206 if ( !
$dbr->tableExists(
'blob_tracking', __METHOD__ ) ) {
207 $this->
critical(
"Error: blob_tracking table does not exist" );
211 $row =
$dbr->selectRow(
'blob_tracking',
'*',
'', __METHOD__ );
213 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
228 $wiki = WikiMap::getCurrentWikiId();
230 $cmd =
'php ' . Shell::escape( __FILE__ );
231 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
232 if ( $cmdOption ==
'child-id' ) {
234 } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
235 $cmd .=
" --$cmdOption " . Shell::escape( $this->$classOption );
236 } elseif ( $this->$classOption ) {
237 $cmd .=
" --$cmdOption";
241 ' --wiki ' . Shell::escape( $wiki ) .
242 ' ' . Shell::escape( ...$this->destClusters );
244 $this->childPipes = $this->childProcs = [];
249 [
'file',
'php://stdout',
'w' ],
250 [
'file',
'php://stderr',
'w' ]
252 Wikimedia\suppressWarnings();
253 $proc = proc_open(
"$cmd --child-id $i", $spec, $pipes );
254 Wikimedia\restoreWarnings();
256 $this->
critical(
"Error opening child process: $cmd" );
259 $this->childProcs[$i] = $proc;
260 $this->childPipes[$i] = $pipes[0];
262 $this->prevChildId = -1;
269 $this->
info(
"Waiting for child processes to finish..." );
274 $status = proc_close( $this->childProcs[$i] );
276 $this->
critical(
"Warning: child #$i exited with status $status" );
279 $this->
info(
"Done." );
291 $numPipes = stream_select( $x, $pipes, $y, 3600 );
293 $this->
critical(
"Error waiting to write to child process. Aborting" );
297 $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs;
316 $cmd = implode(
' ',
$args );
317 fwrite( $this->childPipes[
$childId],
"$cmd\n" );
327 if ( $this->noCount ) {
328 $numPages =
'[unknown]';
330 $numPages =
$dbr->selectField(
'blob_tracking',
331 'COUNT(DISTINCT bt_page)',
332 # A condition is required so that
this query uses the index
337 if ( $this->copyOnly ) {
338 $this->
info(
"Copying pages..." );
340 $this->
info(
"Moving pages..." );
343 $res =
$dbr->select(
'blob_tracking',
347 'bt_page > ' .
$dbr->addQuotes( $startId )
352 'ORDER BY' =>
'bt_page',
353 'LIMIT' => $this->batchSize,
356 if ( !
$res->numRows() ) {
359 foreach (
$res as $row ) {
360 $startId = $row->bt_page;
361 $this->
dispatch(
'doPage', $row->bt_page );
364 $this->
report(
'pages', $i, $numPages );
366 $this->
report(
'pages', $i, $numPages );
367 if ( $this->copyOnly ) {
368 $this->
info(
"All page copies queued." );
370 $this->
info(
"All page moves queued." );
380 private function report( $label, $current, $end ) {
382 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
383 $this->numBatches = 0;
384 $this->
info(
"$label: $current / $end" );
385 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
396 if ( $this->noCount ) {
397 $numOrphans =
'[unknown]';
399 $numOrphans =
$dbr->selectField(
'blob_tracking',
400 'COUNT(DISTINCT bt_text_id)',
401 [
'bt_moved' => 0,
'bt_page' => 0 ],
403 if ( !$numOrphans ) {
407 if ( $this->copyOnly ) {
408 $this->
info(
"Copying orphans..." );
410 $this->
info(
"Moving orphans..." );
414 $res =
$dbr->select(
'blob_tracking',
419 'bt_text_id > ' .
$dbr->addQuotes( $startId )
424 'ORDER BY' =>
'bt_text_id',
425 'LIMIT' => $this->batchSize
428 if ( !
$res->numRows() ) {
432 foreach (
$res as $row ) {
433 $startId = $row->bt_text_id;
434 $ids[] = $row->bt_text_id;
440 while ( count( $ids ) > $this->orphanBatchSize ) {
441 $args = array_slice( $ids, 0, $this->orphanBatchSize );
442 $ids = array_slice( $ids, $this->orphanBatchSize );
443 array_unshift(
$args,
'doOrphanList' );
446 if ( count( $ids ) ) {
448 array_unshift(
$args,
'doOrphanList' );
452 $this->
report(
'orphans', $i, $numOrphans );
454 $this->
report(
'orphans', $i, $numOrphans );
455 $this->
info(
"All orphans queued." );
462 $this->
debug(
'starting' );
465 while ( !feof( STDIN ) ) {
466 $line = rtrim( fgets( STDIN ) );
472 $cmd = array_shift(
$args );
483 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
493 $title = Title::newFromID( $pageId );
495 $titleText =
$title->getPrefixedText();
497 $titleText =
'[deleted]';
502 if ( !$this->copyOnly ) {
510 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
513 [
'blob_tracking',
'text' ],
516 'bt_page' => $pageId,
517 'bt_text_id > ' .
$dbr->addQuotes( $startId ),
519 'bt_new_url IS NULL',
524 'ORDER BY' =>
'bt_text_id',
525 'LIMIT' => $this->batchSize
528 if ( !
$res->numRows() ) {
533 foreach (
$res as $row ) {
534 $startId = $row->bt_text_id;
535 if ( $lastTextId == $row->bt_text_id ) {
539 $lastTextId = $row->bt_text_id;
541 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
542 if ( $text ===
false ) {
543 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
548 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
549 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
552 $lbFactory->waitForReplication();
557 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
575 if ( $this->copyOnly ) {
576 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
580 $dbw->begin( __METHOD__ );
581 $dbw->update(
'text',
584 'old_flags' =>
'external,utf-8',
591 $dbw->update(
'blob_tracking',
593 [
'bt_text_id' => $textId ],
596 $dbw->commit( __METHOD__ );
611 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
614 $conds = array_merge( $conds, [
616 'bt_new_url IS NOT NULL'
619 $res =
$dbr->select(
'blob_tracking',
621 array_merge( $conds, [
'bt_text_id > ' .
$dbr->addQuotes( $startId ) ] ),
624 'ORDER BY' =>
'bt_text_id',
625 'LIMIT' => $this->batchSize,
628 if ( !
$res->numRows() ) {
631 $this->
debug(
'Incomplete: ' .
$res->numRows() .
' rows' );
632 foreach (
$res as $row ) {
633 $startId = $row->bt_text_id;
634 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
635 if ( $row->bt_text_id % 10 == 0 ) {
636 $lbFactory->waitForReplication();
647 $cluster = next( $this->destClusters );
648 if ( $cluster ===
false ) {
649 $cluster = reset( $this->destClusters );
661 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
662 $lb = $lbFactory->getExternalLB( $cluster );
664 return $lb->getMaintenanceConnectionRef(
DB_MASTER );
674 if ( !$this->copyOnly ) {
681 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
683 [
'text',
'blob_tracking' ],
684 [
'old_id',
'old_text',
'old_flags' ],
686 'old_id' => $textIds,
694 foreach (
$res as $row ) {
695 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
696 if ( $text ===
false ) {
697 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
701 if ( !$trx->addItem( $text, $row->old_id ) ) {
702 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
705 $lbFactory->waitForReplication();
708 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
748 $this->cgz =
new $class;
750 $hash = $this->cgz->addItem( $text );
751 $this->referrers[$textId] = $hash;
752 $this->texts[$textId] = $text;
754 return $this->cgz->isHappy();
758 return count( $this->texts );
766 $this->cgz =
new $class;
767 $this->referrers = [];
768 foreach ( $this->texts as $textId => $text ) {
769 $hash = $this->cgz->addItem( $text );
770 $this->referrers[$textId] = $hash;
780 $originalCount = count( $this->texts );
781 if ( !$originalCount ) {
794 $dbw->begin( __METHOD__ );
795 $res = $dbw->select(
'blob_tracking',
796 [
'bt_text_id',
'bt_moved' ],
797 [
'bt_text_id' => array_keys( $this->referrers ) ],
798 __METHOD__, [
'FOR UPDATE' ] );
800 foreach (
$res as $row ) {
801 if ( $row->bt_moved ) {
802 # This row has already been moved, remove it
803 $this->parent->debug(
"TRX: conflict detected in old_id={$row->bt_text_id}" );
804 unset( $this->texts[$row->bt_text_id] );
811 if ( !count( $this->texts ) ) {
813 if ( $originalCount > 1 ) {
815 $this->parent->critical(
816 "Warning: concurrent operation detected, are there two conflicting " .
817 "processes running, doing the same job?" );
826 $targetCluster = $this->parent->getTargetCluster();
827 $store = $this->parent->store;
828 $targetDB = $store->getMaster( $targetCluster );
829 $targetDB->clearFlag(
DBO_TRX );
830 $targetDB->begin( __METHOD__ );
831 $baseUrl = $this->parent->store->store( $targetCluster,
serialize( $this->cgz ) );
834 foreach ( $this->referrers as $textId => $hash ) {
835 $url = $baseUrl .
'/' . $hash;
836 $dbw->update(
'blob_tracking',
837 [
'bt_new_url' => $url ],
839 'bt_text_id' => $textId,
840 'bt_moved' => 0, # Check
for concurrent conflicting update
846 $targetDB->commit( __METHOD__ );
849 $dbw->commit( __METHOD__ );
852 if ( !$this->parent->copyOnly ) {
853 foreach ( $this->referrers as $textId => $hash ) {
854 $url = $baseUrl .
'/' . $hash;
855 $this->parent->moveTextRow( $textId, $url );
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfGetDB( $db, $groups=[], $wiki=false)
Get a Database object.
wfHostname()
Get host name of the current machine, for use in error reporting.
Class to represent a recompression operation for a single CGZ blob.
RecompressTracked $parent
ConcatenatedGzipHistoryBlob false $cgz
addItem( $text, $textId)
Add text.
recompress()
Recompress text after some aberrant modification.
__construct( $parent, $blobClass)
Create a transaction from a RecompressTracked object.
Concatenated gzip (CGZ) storage Improves compression ratio by concatenating like objects before gzipp...
DB accessible external objects.
Maintenance script that moves blobs indexed by trackBlobs.php to a specified list of destination clus...
executeChild()
Main entry point for worker processes.
moveTextRow( $textId, $url)
Atomic move operation.
killChildProcs()
Gracefully terminate the child processes.
report( $label, $current, $end)
Display a progress report.
execute()
Execute parent or child depending on the isChild option.
getExtDB( $cluster)
Gets a DB master connection for the given external cluster name.
doOrphanList( $textIds)
Move an orphan text_id to the new cluster.
startChildProcs()
Start the worker processes.
doAllOrphans()
Move all orphan text to the new clusters.
static newFromCommandLine( $args, $options)
finishIncompleteMoves( $conds)
Moves are done in two phases: bt_new_url and then bt_moved.
dispatch(... $args)
Dispatch a command to the next available child process.
doPage( $pageId)
Move tracked text in a given page.
syncDBs()
Wait until the selected replica DB has caught up to the master.
dispatchToChild( $childId, $args)
Dispatch a command to a specified child process.
static getOptionsWithArgs()
getTargetCluster()
Returns the name of the next target cluster.
executeParent()
Execute the parent process.
doAllPages()
Move all tracked pages to the new clusters.
checkTrackingTable()
Make sure the tracking table exists and isn't empty.
Advanced database interface for IDatabase handles that include maintenance methods.
if(count( $args)< 1) $job
if(PHP_SAPI !='cli-server') if(!isset( $_SERVER['SCRIPT_FILENAME'])) $file
Item class for a filearchive table row.