32 require __DIR__ .
'/../commandLine.inc';
34 if ( count(
$args ) < 1 ) {
35 echo
"Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
36 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
37 and recompresses them in the process. Restartable.
40 --procs <procs> Set the number of child processes (default 1)
41 --copy-only Copy only, do not update the text table. Restart
42 without this option to complete.
43 --debug-log <file> Log debugging data to the specified file
44 --info-log <file> Log progress messages to the specified file
45 --critical-log <file> Log error messages to the specified file
87 'no-count' =>
'noCount',
88 'procs' =>
'numProcs',
89 'copy-only' =>
'copyOnly',
91 'child-id' =>
'childId',
92 'debug-log' =>
'debugLog',
93 'info-log' =>
'infoLog',
94 'critical-log' =>
'criticalLog',
102 $jobOptions = [
'destClusters' =>
$args ];
103 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
104 if ( isset( $options[$cmdOption] ) ) {
105 $jobOptions[$classOption] = $options[$cmdOption];
109 return new self( $jobOptions );
113 foreach ( $options as $name => $value ) {
114 $this->$name = $value;
116 $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
117 $this->store = $esFactory->getStore(
'DB' );
118 if ( !$this->isChild ) {
119 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
120 } elseif ( $this->childId !==
false ) {
121 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->childId}: ";
123 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
124 DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
125 $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
127 $this->blobStore = MediaWikiServices::getInstance()
128 ->getBlobStoreFactory()
134 if ( $this->debugLog ) {
135 $this->
logToFile( $msg, $this->debugLog );
141 if ( $this->infoLog ) {
142 $this->
logToFile( $msg, $this->infoLog );
148 if ( $this->criticalLog ) {
149 $this->
logToFile( $msg, $this->criticalLog );
155 if ( $this->childId !==
false ) {
156 $header .=
"({$this->childId})";
159 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ),
$file );
170 $pos = $dbw->getMasterPos();
171 $dbr->masterPosWait( $pos, 100000 );
178 if ( $this->isChild ) {
206 if ( !
$dbr->tableExists(
'blob_tracking' ) ) {
207 $this->
critical(
"Error: blob_tracking table does not exist" );
211 $row =
$dbr->selectRow(
'blob_tracking',
'*',
'', __METHOD__ );
213 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
230 $cmd =
'php ' . Shell::escape( __FILE__ );
231 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
232 if ( $cmdOption ==
'child-id' ) {
235 $cmd .=
" --$cmdOption " . Shell::escape( $this->$classOption );
236 } elseif ( $this->$classOption ) {
237 $cmd .=
" --$cmdOption";
241 ' --wiki ' . Shell::escape( $wiki ) .
242 ' ' . Shell::escape( ...$this->destClusters );
244 $this->childPipes = $this->childProcs = [];
249 [
'file',
'php://stdout',
'w' ],
250 [
'file',
'php://stderr',
'w' ]
252 Wikimedia\suppressWarnings();
253 $proc = proc_open(
"$cmd --child-id $i", $spec, $pipes );
254 Wikimedia\restoreWarnings();
256 $this->
critical(
"Error opening child process: $cmd" );
259 $this->childProcs[$i] = $proc;
260 $this->childPipes[$i] = $pipes[0];
262 $this->prevChildId = -1;
269 $this->
info(
"Waiting for child processes to finish..." );
274 $status = proc_close( $this->childProcs[$i] );
276 $this->
critical(
"Warning: child #$i exited with status $status" );
279 $this->
info(
"Done." );
291 $numPipes = stream_select( $x, $pipes, $y, 3600 );
293 $this->
critical(
"Error waiting to write to child process. Aborting" );
297 $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs;
316 $cmd = implode(
' ',
$args );
317 fwrite( $this->childPipes[
$childId],
"$cmd\n" );
327 if ( $this->noCount ) {
328 $numPages =
'[unknown]';
330 $numPages =
$dbr->selectField(
'blob_tracking',
331 'COUNT(DISTINCT bt_page)',
332 # A condition is required so that
this query uses the index
337 if ( $this->copyOnly ) {
338 $this->
info(
"Copying pages..." );
340 $this->
info(
"Moving pages..." );
343 $res =
$dbr->select(
'blob_tracking',
347 'bt_page > ' .
$dbr->addQuotes( $startId )
352 'ORDER BY' =>
'bt_page',
356 if ( !
$res->numRows() ) {
359 foreach (
$res as $row ) {
360 $startId = $row->bt_page;
361 $this->
dispatch(
'doPage', $row->bt_page );
364 $this->
report(
'pages', $i, $numPages );
366 $this->
report(
'pages', $i, $numPages );
367 if ( $this->copyOnly ) {
368 $this->
info(
"All page copies queued." );
370 $this->
info(
"All page moves queued." );
380 function report( $label, $current, $end ) {
382 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
383 $this->numBatches = 0;
384 $this->
info(
"$label: $current / $end" );
385 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
396 if ( $this->noCount ) {
397 $numOrphans =
'[unknown]';
399 $numOrphans =
$dbr->selectField(
'blob_tracking',
400 'COUNT(DISTINCT bt_text_id)',
401 [
'bt_moved' => 0,
'bt_page' => 0 ],
403 if ( !$numOrphans ) {
407 if ( $this->copyOnly ) {
408 $this->
info(
"Copying orphans..." );
410 $this->
info(
"Moving orphans..." );
414 $res =
$dbr->select(
'blob_tracking',
419 'bt_text_id > ' .
$dbr->addQuotes( $startId )
424 'ORDER BY' =>
'bt_text_id',
428 if ( !
$res->numRows() ) {
432 foreach (
$res as $row ) {
433 $startId = $row->bt_text_id;
434 $ids[] = $row->bt_text_id;
441 $args = array_slice( $ids, 0, $this->orphanBatchSize );
442 $ids = array_slice( $ids, $this->orphanBatchSize );
443 array_unshift(
$args,
'doOrphanList' );
446 if ( count( $ids ) ) {
448 array_unshift(
$args,
'doOrphanList' );
452 $this->
report(
'orphans', $i, $numOrphans );
454 $this->
report(
'orphans', $i, $numOrphans );
455 $this->
info(
"All orphans queued." );
462 $this->
debug(
'starting' );
465 while ( !feof( STDIN ) ) {
466 $line = rtrim( fgets( STDIN ) );
472 $cmd = array_shift(
$args );
483 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
495 $titleText =
$title->getPrefixedText();
497 $titleText =
'[deleted]';
502 if ( !$this->copyOnly ) {
510 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
513 [
'blob_tracking',
'text' ],
516 'bt_page' => $pageId,
517 'bt_text_id > ' .
$dbr->addQuotes( $startId ),
519 'bt_new_url IS NULL',
524 'ORDER BY' =>
'bt_text_id',
528 if ( !
$res->numRows() ) {
533 foreach (
$res as $row ) {
534 $startId = $row->bt_text_id;
535 if ( $lastTextId == $row->bt_text_id ) {
539 $lastTextId = $row->bt_text_id;
541 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
542 if ( $text ===
false ) {
543 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
548 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
549 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
552 $lbFactory->waitForReplication();
557 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
575 if ( $this->copyOnly ) {
576 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
580 $dbw->begin( __METHOD__ );
581 $dbw->update(
'text',
584 'old_flags' =>
'external,utf-8',
591 $dbw->update(
'blob_tracking',
593 [
'bt_text_id' => $textId ],
596 $dbw->commit( __METHOD__ );
611 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
614 $conds = array_merge( $conds, [
616 'bt_new_url IS NOT NULL'
619 $res =
$dbr->select(
'blob_tracking',
621 array_merge( $conds, [
'bt_text_id > ' .
$dbr->addQuotes( $startId ) ] ),
624 'ORDER BY' =>
'bt_text_id',
625 'LIMIT' => $this->batchSize,
628 if ( !
$res->numRows() ) {
631 $this->
debug(
'Incomplete: ' .
$res->numRows() .
' rows' );
632 foreach (
$res as $row ) {
633 $startId = $row->bt_text_id;
634 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
635 if ( $row->bt_text_id % 10 == 0 ) {
636 $lbFactory->waitForReplication();
647 $cluster = next( $this->destClusters );
648 if ( $cluster ===
false ) {
649 $cluster = reset( $this->destClusters );
661 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
662 $lb = $lbFactory->getExternalLB( $cluster );
664 return $lb->getMaintenanceConnectionRef(
DB_MASTER );
674 if ( !$this->copyOnly ) {
681 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
683 [
'text',
'blob_tracking' ],
684 [
'old_id',
'old_text',
'old_flags' ],
686 'old_id' => $textIds,
694 foreach (
$res as $row ) {
695 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
696 if ( $text ===
false ) {
697 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
701 if ( !$trx->addItem( $text, $row->old_id ) ) {
702 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
705 $lbFactory->waitForReplication();
708 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
748 $this->cgz =
new $class;
750 $hash = $this->cgz->addItem( $text );
751 $this->referrers[$textId] = $hash;
752 $this->texts[$textId] = $text;
754 return $this->cgz->isHappy();
758 return count( $this->texts );
766 $this->cgz =
new $class;
767 $this->referrers = [];
768 foreach ( $this->texts as $textId => $text ) {
769 $hash = $this->cgz->addItem( $text );
770 $this->referrers[$textId] = $hash;
780 $originalCount = count( $this->texts );
781 if ( !$originalCount ) {
794 $dbw->begin( __METHOD__ );
795 $res = $dbw->select(
'blob_tracking',
796 [
'bt_text_id',
'bt_moved' ],
797 [
'bt_text_id' => array_keys( $this->referrers ) ],
798 __METHOD__, [
'FOR UPDATE' ] );
800 foreach (
$res as $row ) {
801 if ( $row->bt_moved ) {
802 # This row has already been moved, remove it
803 $this->parent->debug(
"TRX: conflict detected in old_id={$row->bt_text_id}" );
804 unset( $this->texts[$row->bt_text_id] );
811 if ( !count( $this->texts ) ) {
813 if ( $originalCount > 1 ) {
815 $this->parent->critical(
816 "Warning: concurrent operation detected, are there two conflicting " .
817 "processes running, doing the same job?" );
826 $targetCluster = $this->parent->getTargetCluster();
827 $store = $this->parent->store;
828 $targetDB = $store->getMaster( $targetCluster );
829 $targetDB->clearFlag(
DBO_TRX );
830 $targetDB->begin( __METHOD__ );
831 $baseUrl = $this->parent->store->store( $targetCluster,
serialize( $this->cgz ) );
834 foreach ( $this->referrers as $textId => $hash ) {
835 $url = $baseUrl .
'/' . $hash;
836 $dbw->update(
'blob_tracking',
837 [
'bt_new_url' => $url ],
839 'bt_text_id' => $textId,
840 'bt_moved' => 0, # Check
for concurrent conflicting update
846 $targetDB->commit( __METHOD__ );
849 $dbw->commit( __METHOD__ );
852 if ( !$this->parent->copyOnly ) {
853 foreach ( $this->referrers as $textId => $hash ) {
854 $url = $baseUrl .
'/' . $hash;
855 $this->parent->moveTextRow( $textId, $url );