31 require __DIR__ .
'/../commandLine.inc';
34 echo
"Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
35 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
36 and recompresses them in the process. Restartable.
39 --procs <procs> Set the number of child processes (default 1)
40 --copy-only Copy only, do not update the text table. Restart
41 without this option to complete.
42 --debug-log <file> Log debugging data to the specified file
43 --info-log <file> Log progress messages to the specified file
44 --critical-log <file> Log error messages to the specified file
83 'no-count' =>
'noCount',
84 'procs' =>
'numProcs',
85 'copy-only' =>
'copyOnly',
87 'replica-id' =>
'replicaId',
88 'debug-log' =>
'debugLog',
89 'info-log' =>
'infoLog',
90 'critical-log' =>
'criticalLog',
98 $jobOptions = [
'destClusters' =>
$args ];
99 foreach ( self::$cmdLineOptionMap
as $cmdOption => $classOption ) {
100 if ( isset(
$options[$cmdOption] ) ) {
101 $jobOptions[$classOption] =
$options[$cmdOption];
105 return new self( $jobOptions );
113 if ( !$this->isChild ) {
114 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
115 } elseif ( $this->replicaId !==
false ) {
116 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->replicaId}: ";
118 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
125 if ( $this->debugLog ) {
126 $this->
logToFile( $msg, $this->debugLog );
132 if ( $this->infoLog ) {
133 $this->
logToFile( $msg, $this->infoLog );
139 if ( $this->criticalLog ) {
140 $this->
logToFile( $msg, $this->criticalLog );
146 if ( $this->replicaId !==
false ) {
147 $header .=
"({$this->replicaId})";
150 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ),
$file );
161 $pos = $dbw->getMasterPos();
162 $dbr->masterPosWait( $pos, 100000 );
169 if ( $this->isChild ) {
197 if ( !
$dbr->tableExists(
'blob_tracking' ) ) {
198 $this->
critical(
"Error: blob_tracking table does not exist" );
202 $row =
$dbr->selectRow(
'blob_tracking',
'*',
'', __METHOD__ );
204 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
219 $cmd =
'php ' . Shell::escape( __FILE__ );
220 foreach ( self::$cmdLineOptionMap
as $cmdOption => $classOption ) {
221 if ( $cmdOption ==
'replica-id' ) {
224 $cmd .=
" --$cmdOption " . Shell::escape( $this->$classOption );
225 } elseif ( $this->$classOption ) {
226 $cmd .=
" --$cmdOption";
230 ' --wiki ' . Shell::escape(
wfWikiID() ) .
231 ' ' . Shell::escape( ...$this->destClusters );
233 $this->replicaPipes = $this->replicaProcs = [];
238 [
'file',
'php://stdout',
'w' ],
239 [
'file',
'php://stderr',
'w' ]
241 Wikimedia\suppressWarnings();
242 $proc = proc_open(
"$cmd --replica-id $i", $spec, $pipes );
243 Wikimedia\restoreWarnings();
245 $this->
critical(
"Error opening replica DB process: $cmd" );
248 $this->replicaProcs[$i] = $proc;
249 $this->replicaPipes[$i] = $pipes[0];
251 $this->prevReplicaId = -1;
258 $this->
info(
"Waiting for replica DB processes to finish..." );
263 $status = proc_close( $this->replicaProcs[$i] );
265 $this->
critical(
"Warning: child #$i exited with status $status" );
268 $this->
info(
"Done." );
276 $args = func_get_args();
280 $numPipes = stream_select( $x, $pipes, $y, 3600 );
282 $this->
critical(
"Error waiting to write to replica DBs. Aborting" );
286 $replicaId = ( $i + $this->prevReplicaId + 1 ) % $this->numProcs;
305 $cmd = implode(
' ',
$args );
306 fwrite( $this->replicaPipes[
$replicaId],
"$cmd\n" );
316 if ( $this->noCount ) {
317 $numPages =
'[unknown]';
319 $numPages =
$dbr->selectField(
'blob_tracking',
320 'COUNT(DISTINCT bt_page)',
326 if ( $this->copyOnly ) {
327 $this->
info(
"Copying pages..." );
329 $this->
info(
"Moving pages..." );
332 $res =
$dbr->select(
'blob_tracking',
336 'bt_page > ' .
$dbr->addQuotes( $startId )
341 'ORDER BY' =>
'bt_page',
345 if ( !
$res->numRows() ) {
348 foreach (
$res as $row ) {
349 $startId = $row->bt_page;
350 $this->
dispatch(
'doPage', $row->bt_page );
353 $this->
report(
'pages', $i, $numPages );
355 $this->
report(
'pages', $i, $numPages );
356 if ( $this->copyOnly ) {
357 $this->
info(
"All page copies queued." );
359 $this->
info(
"All page moves queued." );
369 function report( $label, $current, $end ) {
371 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
372 $this->numBatches = 0;
373 $this->
info(
"$label: $current / $end" );
374 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
385 if ( $this->noCount ) {
386 $numOrphans =
'[unknown]';
388 $numOrphans =
$dbr->selectField(
'blob_tracking',
389 'COUNT(DISTINCT bt_text_id)',
390 [
'bt_moved' => 0,
'bt_page' => 0 ],
392 if ( !$numOrphans ) {
396 if ( $this->copyOnly ) {
397 $this->
info(
"Copying orphans..." );
399 $this->
info(
"Moving orphans..." );
403 $res =
$dbr->select(
'blob_tracking',
408 'bt_text_id > ' .
$dbr->addQuotes( $startId )
413 'ORDER BY' =>
'bt_text_id',
417 if ( !
$res->numRows() ) {
421 foreach (
$res as $row ) {
422 $startId = $row->bt_text_id;
423 $ids[] = $row->bt_text_id;
430 $args = array_slice( $ids, 0, $this->orphanBatchSize );
431 $ids = array_slice( $ids, $this->orphanBatchSize );
432 array_unshift(
$args,
'doOrphanList' );
435 if (
count( $ids ) ) {
437 array_unshift(
$args,
'doOrphanList' );
441 $this->
report(
'orphans', $i, $numOrphans );
443 $this->
report(
'orphans', $i, $numOrphans );
444 $this->
info(
"All orphans queued." );
451 $this->
debug(
'starting' );
454 while ( !feof( STDIN ) ) {
455 $line = rtrim( fgets( STDIN ) );
461 $cmd = array_shift(
$args );
472 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
484 $titleText =
$title->getPrefixedText();
486 $titleText =
'[deleted]';
491 if ( !$this->copyOnly ) {
499 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
502 [
'blob_tracking',
'text' ],
505 'bt_page' => $pageId,
506 'bt_text_id > ' .
$dbr->addQuotes( $startId ),
508 'bt_new_url IS NULL',
513 'ORDER BY' =>
'bt_text_id',
517 if ( !
$res->numRows() ) {
522 foreach (
$res as $row ) {
523 $startId = $row->bt_text_id;
524 if ( $lastTextId == $row->bt_text_id ) {
528 $lastTextId = $row->bt_text_id;
531 if ( $text ===
false ) {
532 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
537 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
538 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
541 $lbFactory->waitForReplication();
546 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
564 if ( $this->copyOnly ) {
565 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
569 $dbw->begin( __METHOD__ );
570 $dbw->update(
'text',
573 'old_flags' =>
'external,utf-8',
580 $dbw->update(
'blob_tracking',
582 [
'bt_text_id' => $textId ],
585 $dbw->commit( __METHOD__ );
600 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
603 $conds = array_merge( $conds, [
605 'bt_new_url IS NOT NULL'
608 $res =
$dbr->select(
'blob_tracking',
610 array_merge( $conds, [
'bt_text_id > ' .
$dbr->addQuotes( $startId ) ] ),
613 'ORDER BY' =>
'bt_text_id',
614 'LIMIT' => $this->batchSize,
617 if ( !
$res->numRows() ) {
620 $this->
debug(
'Incomplete: ' .
$res->numRows() .
' rows' );
621 foreach (
$res as $row ) {
622 $startId = $row->bt_text_id;
623 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
624 if ( $row->bt_text_id % 10 == 0 ) {
625 $lbFactory->waitForReplication();
636 $cluster = next( $this->destClusters );
637 if ( $cluster ===
false ) {
638 $cluster = reset( $this->destClusters );
650 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
651 $lb = $lbFactory->getExternalLB( $cluster );
663 if ( !$this->copyOnly ) {
670 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
672 [
'text',
'blob_tracking' ],
673 [
'old_id',
'old_text',
'old_flags' ],
675 'old_id' => $textIds,
683 foreach (
$res as $row ) {
685 if ( $text ===
false ) {
686 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
690 if ( !$trx->addItem( $text, $row->old_id ) ) {
691 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
694 $lbFactory->waitForReplication();
697 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
735 $this->cgz =
new $class;
737 $hash = $this->cgz->addItem( $text );
738 $this->referrers[$textId] = $hash;
739 $this->texts[$textId] = $text;
741 return $this->cgz->isHappy();
745 return count( $this->texts );
753 $this->cgz =
new $class;
754 $this->referrers = [];
755 foreach ( $this->texts
as $textId => $text ) {
756 $hash = $this->cgz->addItem( $text );
757 $this->referrers[$textId] = $hash;
767 $originalCount =
count( $this->texts );
768 if ( !$originalCount ) {
781 $dbw->begin( __METHOD__ );
782 $res = $dbw->select(
'blob_tracking',
783 [
'bt_text_id',
'bt_moved' ],
784 [
'bt_text_id' => array_keys( $this->referrers ) ],
785 __METHOD__, [
'FOR UPDATE' ] );
787 foreach (
$res as $row ) {
788 if ( $row->bt_moved ) {
789 # This row has already been moved, remove it
790 $this->parent->debug(
"TRX: conflict detected in old_id={$row->bt_text_id}" );
791 unset( $this->texts[$row->bt_text_id] );
798 if ( !
count( $this->texts ) ) {
800 if ( $originalCount > 1 ) {
802 $this->parent->critical(
803 "Warning: concurrent operation detected, are there two conflicting " .
804 "processes running, doing the same job?" );
813 $targetCluster = $this->parent->getTargetCluster();
814 $store = $this->parent->store;
815 $targetDB = $store->getMaster( $targetCluster );
816 $targetDB->clearFlag(
DBO_TRX );
817 $targetDB->begin( __METHOD__ );
818 $baseUrl = $this->parent->store->store( $targetCluster,
serialize( $this->cgz ) );
821 foreach ( $this->referrers
as $textId => $hash ) {
822 $url = $baseUrl .
'/' . $hash;
823 $dbw->update(
'blob_tracking',
824 [
'bt_new_url' => $url ],
826 'bt_text_id' => $textId,
827 'bt_moved' => 0, # Check
for concurrent conflicting update
833 $targetDB->commit( __METHOD__ );
836 $dbw->commit( __METHOD__ );
839 if ( !$this->parent->copyOnly ) {
840 foreach ( $this->referrers
as $textId => $hash ) {
841 $url = $baseUrl .
'/' . $hash;
842 $this->parent->moveTextRow( $textId, $url );