30 require __DIR__ .
'/../commandLine.inc';
33 echo
"Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
34 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
35 and recompresses them in the process. Restartable.
38 --procs <procs> Set the number of child processes (default 1)
39 --copy-only Copy only, do not update the text table. Restart
40 without this option to complete.
41 --debug-log <file> Log debugging data to the specified file
42 --info-log <file> Log progress messages to the specified file
43 --critical-log <file> Log error messages to the specified file
82 'no-count' =>
'noCount',
83 'procs' =>
'numProcs',
84 'copy-only' =>
'copyOnly',
86 'replica-id' =>
'replicaId',
87 'debug-log' =>
'debugLog',
88 'info-log' =>
'infoLog',
89 'critical-log' =>
'criticalLog',
97 $jobOptions = [
'destClusters' =>
$args ];
98 foreach ( self::$cmdLineOptionMap
as $cmdOption => $classOption ) {
99 if ( isset(
$options[$cmdOption] ) ) {
100 $jobOptions[$classOption] =
$options[$cmdOption];
104 return new self( $jobOptions );
112 if ( !$this->isChild ) {
113 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
114 } elseif ( $this->replicaId !==
false ) {
115 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->replicaId}: ";
117 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
124 if ( $this->debugLog ) {
125 $this->
logToFile( $msg, $this->debugLog );
131 if ( $this->infoLog ) {
132 $this->
logToFile( $msg, $this->infoLog );
138 if ( $this->criticalLog ) {
139 $this->
logToFile( $msg, $this->criticalLog );
145 if ( $this->replicaId !==
false ) {
146 $header .=
"({$this->replicaId})";
149 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ), $file );
160 $pos = $dbw->getMasterPos();
161 $dbr->masterPosWait( $pos, 100000 );
168 if ( $this->isChild ) {
196 if ( !
$dbr->tableExists(
'blob_tracking' ) ) {
197 $this->
critical(
"Error: blob_tracking table does not exist" );
201 $row =
$dbr->selectRow(
'blob_tracking',
'*',
'', __METHOD__ );
203 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
219 foreach ( self::$cmdLineOptionMap
as $cmdOption => $classOption ) {
220 if ( $cmdOption ==
'replica-id' ) {
224 } elseif ( $this->$classOption ) {
225 $cmd .=
" --$cmdOption";
232 $this->replicaPipes = $this->replicaProcs = [];
237 [
'file',
'php://stdout',
'w' ],
238 [
'file',
'php://stderr',
'w' ]
240 Wikimedia\suppressWarnings();
241 $proc = proc_open(
"$cmd --replica-id $i", $spec, $pipes );
242 Wikimedia\restoreWarnings();
244 $this->
critical(
"Error opening replica DB process: $cmd" );
247 $this->replicaProcs[$i] = $proc;
248 $this->replicaPipes[$i] = $pipes[0];
250 $this->prevReplicaId = -1;
257 $this->
info(
"Waiting for replica DB processes to finish..." );
262 $status = proc_close( $this->replicaProcs[$i] );
264 $this->
critical(
"Warning: child #$i exited with status $status" );
267 $this->
info(
"Done." );
275 $args = func_get_args();
277 $numPipes = stream_select( $x = [], $pipes, $y = [], 3600 );
279 $this->
critical(
"Error waiting to write to replica DBs. Aborting" );
283 $replicaId = ( $i + $this->prevReplicaId + 1 ) % $this->numProcs;
302 $cmd = implode(
' ',
$args );
303 fwrite( $this->replicaPipes[
$replicaId],
"$cmd\n" );
313 if ( $this->noCount ) {
314 $numPages =
'[unknown]';
316 $numPages =
$dbr->selectField(
'blob_tracking',
317 'COUNT(DISTINCT bt_page)',
323 if ( $this->copyOnly ) {
324 $this->
info(
"Copying pages..." );
326 $this->
info(
"Moving pages..." );
329 $res =
$dbr->select(
'blob_tracking',
333 'bt_page > ' .
$dbr->addQuotes( $startId )
338 'ORDER BY' =>
'bt_page',
342 if ( !
$res->numRows() ) {
345 foreach (
$res as $row ) {
346 $startId = $row->bt_page;
347 $this->
dispatch(
'doPage', $row->bt_page );
350 $this->
report(
'pages', $i, $numPages );
352 $this->
report(
'pages', $i, $numPages );
353 if ( $this->copyOnly ) {
354 $this->
info(
"All page copies queued." );
356 $this->
info(
"All page moves queued." );
366 function report( $label, $current, $end ) {
368 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
369 $this->numBatches = 0;
370 $this->
info(
"$label: $current / $end" );
371 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
382 if ( $this->noCount ) {
383 $numOrphans =
'[unknown]';
385 $numOrphans =
$dbr->selectField(
'blob_tracking',
386 'COUNT(DISTINCT bt_text_id)',
387 [
'bt_moved' => 0,
'bt_page' => 0 ],
389 if ( !$numOrphans ) {
393 if ( $this->copyOnly ) {
394 $this->
info(
"Copying orphans..." );
396 $this->
info(
"Moving orphans..." );
400 $res =
$dbr->select(
'blob_tracking',
405 'bt_text_id > ' .
$dbr->addQuotes( $startId )
410 'ORDER BY' =>
'bt_text_id',
414 if ( !
$res->numRows() ) {
418 foreach (
$res as $row ) {
419 $startId = $row->bt_text_id;
420 $ids[] = $row->bt_text_id;
427 $args = array_slice( $ids, 0, $this->orphanBatchSize );
428 $ids = array_slice( $ids, $this->orphanBatchSize );
429 array_unshift(
$args,
'doOrphanList' );
432 if (
count( $ids ) ) {
434 array_unshift(
$args,
'doOrphanList' );
438 $this->
report(
'orphans', $i, $numOrphans );
440 $this->
report(
'orphans', $i, $numOrphans );
441 $this->
info(
"All orphans queued." );
448 $this->
debug(
'starting' );
451 while ( !feof( STDIN ) ) {
452 $line = rtrim( fgets( STDIN ) );
458 $cmd = array_shift(
$args );
469 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
481 $titleText =
$title->getPrefixedText();
483 $titleText =
'[deleted]';
488 if ( !$this->copyOnly ) {
496 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
499 [
'blob_tracking',
'text' ],
502 'bt_page' => $pageId,
503 'bt_text_id > ' .
$dbr->addQuotes( $startId ),
505 'bt_new_url IS NULL',
510 'ORDER BY' =>
'bt_text_id',
514 if ( !
$res->numRows() ) {
519 foreach (
$res as $row ) {
520 $startId = $row->bt_text_id;
521 if ( $lastTextId == $row->bt_text_id ) {
525 $lastTextId = $row->bt_text_id;
528 if ( $text ===
false ) {
529 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
534 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
535 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
538 $lbFactory->waitForReplication();
543 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
561 if ( $this->copyOnly ) {
562 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
566 $dbw->begin( __METHOD__ );
567 $dbw->update(
'text',
570 'old_flags' =>
'external,utf-8',
577 $dbw->update(
'blob_tracking',
579 [
'bt_text_id' => $textId ],
582 $dbw->commit( __METHOD__ );
597 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
600 $conds = array_merge( $conds, [
602 'bt_new_url IS NOT NULL'
605 $res =
$dbr->select(
'blob_tracking',
607 array_merge( $conds, [
'bt_text_id > ' .
$dbr->addQuotes( $startId ) ] ),
610 'ORDER BY' =>
'bt_text_id',
611 'LIMIT' => $this->batchSize,
614 if ( !
$res->numRows() ) {
617 $this->
debug(
'Incomplete: ' .
$res->numRows() .
' rows' );
618 foreach (
$res as $row ) {
619 $startId = $row->bt_text_id;
620 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
621 if ( $row->bt_text_id % 10 == 0 ) {
622 $lbFactory->waitForReplication();
633 $cluster = next( $this->destClusters );
634 if ( $cluster ===
false ) {
635 $cluster = reset( $this->destClusters );
647 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
648 $lb = $lbFactory->getExternalLB( $cluster );
660 if ( !$this->copyOnly ) {
667 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
669 [
'text',
'blob_tracking' ],
670 [
'old_id',
'old_text',
'old_flags' ],
672 'old_id' => $textIds,
680 foreach (
$res as $row ) {
682 if ( $text ===
false ) {
683 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
687 if ( !$trx->addItem( $text, $row->old_id ) ) {
688 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
691 $lbFactory->waitForReplication();
694 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
732 $this->cgz =
new $class;
734 $hash = $this->cgz->addItem( $text );
735 $this->referrers[$textId] = $hash;
736 $this->texts[$textId] = $text;
738 return $this->cgz->isHappy();
742 return count( $this->texts );
750 $this->cgz =
new $class;
751 $this->referrers = [];
752 foreach ( $this->texts
as $textId => $text ) {
753 $hash = $this->cgz->addItem( $text );
754 $this->referrers[$textId] = $hash;
764 $originalCount =
count( $this->texts );
765 if ( !$originalCount ) {
778 $dbw->begin( __METHOD__ );
779 $res = $dbw->select(
'blob_tracking',
780 [
'bt_text_id',
'bt_moved' ],
781 [
'bt_text_id' => array_keys( $this->referrers ) ],
782 __METHOD__, [
'FOR UPDATE' ] );
784 foreach (
$res as $row ) {
785 if ( $row->bt_moved ) {
786 # This row has already been moved, remove it
787 $this->parent->debug(
"TRX: conflict detected in old_id={$row->bt_text_id}" );
788 unset( $this->texts[$row->bt_text_id] );
795 if ( !
count( $this->texts ) ) {
797 if ( $originalCount > 1 ) {
799 $this->parent->critical(
800 "Warning: concurrent operation detected, are there two conflicting " .
801 "processes running, doing the same job?" );
810 $targetCluster = $this->parent->getTargetCluster();
811 $store = $this->parent->store;
812 $targetDB = $store->getMaster( $targetCluster );
813 $targetDB->clearFlag(
DBO_TRX );
814 $targetDB->begin( __METHOD__ );
815 $baseUrl = $this->parent->store->store( $targetCluster,
serialize( $this->cgz ) );
818 foreach ( $this->referrers
as $textId => $hash ) {
819 $url = $baseUrl .
'/' . $hash;
820 $dbw->update(
'blob_tracking',
821 [
'bt_new_url' => $url ],
823 'bt_text_id' => $textId,
824 'bt_moved' => 0, # Check
for concurrent conflicting update
830 $targetDB->commit( __METHOD__ );
833 $dbw->commit( __METHOD__ );
836 if ( !$this->parent->copyOnly ) {
837 foreach ( $this->referrers
as $textId => $hash ) {
838 $url = $baseUrl .
'/' . $hash;
839 $this->parent->moveTextRow( $textId, $url );