29 require __DIR__ .
'/../commandLine.inc';
32 echo
"Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
33 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
34 and recompresses them in the process. Restartable.
37 --procs <procs> Set the number of child processes (default 1)
38 --copy-only Copy only, do not update the text table. Restart
39 without this option to complete.
40 --debug-log <file> Log debugging data to the specified file
41 --info-log <file> Log progress messages to the specified file
42 --critical-log <file> Log error messages to the specified file
81 'no-count' =>
'noCount',
82 'procs' =>
'numProcs',
83 'copy-only' =>
'copyOnly',
85 'replica-id' =>
'replicaId',
86 'debug-log' =>
'debugLog',
87 'info-log' =>
'infoLog',
88 'critical-log' =>
'criticalLog',
96 $jobOptions = [
'destClusters' =>
$args ];
97 foreach ( self::$cmdLineOptionMap
as $cmdOption => $classOption ) {
98 if ( isset(
$options[$cmdOption] ) ) {
99 $jobOptions[$classOption] =
$options[$cmdOption];
103 return new self( $jobOptions );
111 if ( !$this->isChild ) {
112 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
113 } elseif ( $this->replicaId !==
false ) {
114 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->replicaId}: ";
116 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
117 'DiffHistoryBlob' :
'ConcatenatedGzipHistoryBlob';
118 $this->orphanBlobClass =
'ConcatenatedGzipHistoryBlob';
123 if ( $this->debugLog ) {
124 $this->
logToFile( $msg, $this->debugLog );
130 if ( $this->infoLog ) {
131 $this->
logToFile( $msg, $this->infoLog );
137 if ( $this->criticalLog ) {
138 $this->
logToFile( $msg, $this->criticalLog );
144 if ( $this->replicaId !==
false ) {
145 $header .=
"({$this->replicaId})";
148 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ), $file );
159 $pos = $dbw->getMasterPos();
160 $dbr->masterPosWait( $pos, 100000 );
167 if ( $this->isChild ) {
195 if ( !
$dbr->tableExists(
'blob_tracking' ) ) {
196 $this->
critical(
"Error: blob_tracking table does not exist" );
200 $row =
$dbr->selectRow(
'blob_tracking',
'*',
'', __METHOD__ );
202 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
218 foreach ( self::$cmdLineOptionMap
as $cmdOption => $classOption ) {
219 if ( $cmdOption ==
'replica-id' ) {
223 } elseif ( $this->$classOption ) {
224 $cmd .=
" --$cmdOption";
229 ' ' . call_user_func_array(
'wfEscapeShellArg', $this->destClusters );
231 $this->replicaPipes = $this->replicaProcs = [];
236 [
'file',
'php://stdout',
'w' ],
237 [
'file',
'php://stderr',
'w' ]
239 MediaWiki\suppressWarnings();
240 $proc = proc_open(
"$cmd --replica-id $i", $spec, $pipes );
241 MediaWiki\restoreWarnings();
243 $this->
critical(
"Error opening replica DB process: $cmd" );
246 $this->replicaProcs[$i] = $proc;
247 $this->replicaPipes[$i] = $pipes[0];
249 $this->prevReplicaId = -1;
256 $this->
info(
"Waiting for replica DB processes to finish..." );
261 $status = proc_close( $this->replicaProcs[$i] );
263 $this->
critical(
"Warning: child #$i exited with status $status" );
266 $this->
info(
"Done." );
274 $args = func_get_args();
276 $numPipes = stream_select( $x = [], $pipes, $y = [], 3600 );
278 $this->
critical(
"Error waiting to write to replica DBs. Aborting" );
282 $replicaId = ( $i + $this->prevReplicaId + 1 ) % $this->numProcs;
301 $cmd = implode(
' ',
$args );
302 fwrite( $this->replicaPipes[
$replicaId],
"$cmd\n" );
312 if ( $this->noCount ) {
313 $numPages =
'[unknown]';
315 $numPages =
$dbr->selectField(
'blob_tracking',
316 'COUNT(DISTINCT bt_page)',
322 if ( $this->copyOnly ) {
323 $this->
info(
"Copying pages..." );
325 $this->
info(
"Moving pages..." );
328 $res =
$dbr->select(
'blob_tracking',
332 'bt_page > ' .
$dbr->addQuotes( $startId )
337 'ORDER BY' =>
'bt_page',
341 if ( !
$res->numRows() ) {
344 foreach (
$res as $row ) {
345 $startId = $row->bt_page;
346 $this->
dispatch(
'doPage', $row->bt_page );
349 $this->
report(
'pages', $i, $numPages );
351 $this->
report(
'pages', $i, $numPages );
352 if ( $this->copyOnly ) {
353 $this->
info(
"All page copies queued." );
355 $this->
info(
"All page moves queued." );
365 function report( $label, $current, $end ) {
367 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
368 $this->numBatches = 0;
369 $this->
info(
"$label: $current / $end" );
370 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
381 if ( $this->noCount ) {
382 $numOrphans =
'[unknown]';
384 $numOrphans =
$dbr->selectField(
'blob_tracking',
385 'COUNT(DISTINCT bt_text_id)',
386 [
'bt_moved' => 0,
'bt_page' => 0 ],
388 if ( !$numOrphans ) {
392 if ( $this->copyOnly ) {
393 $this->
info(
"Copying orphans..." );
395 $this->
info(
"Moving orphans..." );
399 $res =
$dbr->select(
'blob_tracking',
404 'bt_text_id > ' .
$dbr->addQuotes( $startId )
409 'ORDER BY' =>
'bt_text_id',
413 if ( !
$res->numRows() ) {
417 foreach (
$res as $row ) {
418 $startId = $row->bt_text_id;
419 $ids[] = $row->bt_text_id;
426 $args = array_slice( $ids, 0, $this->orphanBatchSize );
427 $ids = array_slice( $ids, $this->orphanBatchSize );
428 array_unshift(
$args,
'doOrphanList' );
429 call_user_func_array( [ $this,
'dispatch' ],
$args );
431 if (
count( $ids ) ) {
433 array_unshift(
$args,
'doOrphanList' );
434 call_user_func_array( [ $this,
'dispatch' ],
$args );
437 $this->
report(
'orphans', $i, $numOrphans );
439 $this->
report(
'orphans', $i, $numOrphans );
440 $this->
info(
"All orphans queued." );
447 $this->
debug(
'starting' );
450 while ( !feof( STDIN ) ) {
451 $line = rtrim( fgets( STDIN ) );
457 $cmd = array_shift(
$args );
468 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
480 $titleText =
$title->getPrefixedText();
482 $titleText =
'[deleted]';
487 if ( !$this->copyOnly ) {
495 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
498 [
'blob_tracking',
'text' ],
501 'bt_page' => $pageId,
502 'bt_text_id > ' .
$dbr->addQuotes( $startId ),
504 'bt_new_url IS NULL',
509 'ORDER BY' =>
'bt_text_id',
513 if ( !
$res->numRows() ) {
518 foreach (
$res as $row ) {
519 $startId = $row->bt_text_id;
520 if ( $lastTextId == $row->bt_text_id ) {
524 $lastTextId = $row->bt_text_id;
527 if ( $text ===
false ) {
528 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
533 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
534 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
542 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
560 if ( $this->copyOnly ) {
561 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
565 $dbw->begin( __METHOD__ );
566 $dbw->update(
'text',
569 'old_flags' =>
'external,utf-8',
576 $dbw->update(
'blob_tracking',
578 [
'bt_text_id' => $textId ],
581 $dbw->commit( __METHOD__ );
596 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
599 $conds = array_merge( $conds, [
601 'bt_new_url IS NOT NULL'
604 $res =
$dbr->select(
'blob_tracking',
606 array_merge( $conds, [
'bt_text_id > ' .
$dbr->addQuotes( $startId ) ] ),
609 'ORDER BY' =>
'bt_text_id',
610 'LIMIT' => $this->batchSize,
613 if ( !
$res->numRows() ) {
616 $this->
debug(
'Incomplete: ' .
$res->numRows() .
' rows' );
617 foreach (
$res as $row ) {
618 $startId = $row->bt_text_id;
619 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
620 if ( $row->bt_text_id % 10 == 0 ) {
632 $cluster = next( $this->destClusters );
633 if ( $cluster ===
false ) {
634 $cluster = reset( $this->destClusters );
658 if ( !$this->copyOnly ) {
665 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
667 [
'text',
'blob_tracking' ],
668 [
'old_id',
'old_text',
'old_flags' ],
670 'old_id' => $textIds,
678 foreach (
$res as $row ) {
680 if ( $text ===
false ) {
681 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
685 if ( !$trx->addItem( $text, $row->old_id ) ) {
686 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
692 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
730 $this->cgz =
new $class;
732 $hash = $this->cgz->addItem( $text );
733 $this->referrers[$textId] = $hash;
734 $this->texts[$textId] = $text;
736 return $this->cgz->isHappy();
740 return count( $this->texts );
748 $this->cgz =
new $class;
749 $this->referrers = [];
750 foreach ( $this->texts
as $textId => $text ) {
751 $hash = $this->cgz->addItem( $text );
752 $this->referrers[$textId] = $hash;
762 $originalCount =
count( $this->texts );
763 if ( !$originalCount ) {
776 $dbw->begin( __METHOD__ );
777 $res = $dbw->select(
'blob_tracking',
778 [
'bt_text_id',
'bt_moved' ],
779 [
'bt_text_id' => array_keys( $this->referrers ) ],
780 __METHOD__, [
'FOR UPDATE' ] );
782 foreach (
$res as $row ) {
783 if ( $row->bt_moved ) {
784 # This row has already been moved, remove it
785 $this->parent->debug(
"TRX: conflict detected in old_id={$row->bt_text_id}" );
786 unset( $this->texts[$row->bt_text_id] );
793 if ( !
count( $this->texts ) ) {
795 if ( $originalCount > 1 ) {
797 $this->parent->critical(
798 "Warning: concurrent operation detected, are there two conflicting " .
799 "processes running, doing the same job?" );
808 $targetCluster = $this->parent->getTargetCluster();
809 $store = $this->parent->store;
810 $targetDB = $store->getMaster( $targetCluster );
811 $targetDB->clearFlag(
DBO_TRX );
812 $targetDB->begin( __METHOD__ );
813 $baseUrl = $this->parent->store->store( $targetCluster,
serialize( $this->cgz ) );
816 foreach ( $this->referrers
as $textId => $hash ) {
817 $url = $baseUrl .
'/' . $hash;
818 $dbw->update(
'blob_tracking',
819 [
'bt_new_url' => $url ],
821 'bt_text_id' => $textId,
822 'bt_moved' => 0, # Check
for concurrent conflicting update
828 $targetDB->commit( __METHOD__ );
831 $dbw->commit( __METHOD__ );
834 if ( !$this->parent->copyOnly ) {
835 foreach ( $this->referrers
as $textId => $hash ) {
836 $url = $baseUrl .
'/' . $hash;
837 $this->parent->moveTextRow( $textId, $url );