110 private static $cmdLineOptionMap = [
111 'no-count' =>
'noCount',
112 'procs' =>
'numProcs',
113 'copy-only' =>
'copyOnly',
114 'child' =>
'isChild',
115 'child-id' =>
'childId',
116 'debug-log' =>
'debugLog',
117 'info-log' =>
'infoLog',
118 'critical-log' =>
'criticalLog',
122 return self::$optionsWithArgs;
126 $jobOptions = [
'destClusters' => $args ];
127 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
128 if ( isset( $options[$cmdOption] ) ) {
129 $jobOptions[$classOption] = $options[$cmdOption];
133 return new self( $jobOptions );
137 foreach ( $options as $name => $value ) {
138 $this->$name = $value;
140 $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
141 $this->store = $esFactory->getStore(
'DB' );
142 if ( !$this->isChild ) {
143 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
144 } elseif ( $this->childId !==
false ) {
145 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->childId}: ";
147 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
148 DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
149 $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
151 $this->blobStore = MediaWikiServices::getInstance()
152 ->getBlobStoreFactory()
158 if ( $this->debugLog ) {
159 $this->logToFile( $msg, $this->debugLog );
165 if ( $this->infoLog ) {
166 $this->logToFile( $msg, $this->infoLog );
172 if ( $this->criticalLog ) {
173 $this->logToFile( $msg, $this->criticalLog );
177 private function logToFile( $msg, $file ) {
179 if ( $this->childId !==
false ) {
180 $header .=
"({$this->childId})";
182 $header .=
' ' . WikiMap::getCurrentWikiDbDomain()->getId();
183 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ), $file );
191 private function syncDBs() {
192 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication( [
'timeout' => 100_000 ] );
199 if ( $this->isChild ) {
210 if ( !$this->checkTrackingTable() ) {
215 $this->startChildProcs();
217 $this->doAllOrphans();
218 $this->killChildProcs();
225 private function checkTrackingTable() {
226 $row = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase()->newSelectQueryBuilder()
228 ->from(
'blob_tracking' )
229 ->caller( __METHOD__ )->fetchRow();
231 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
245 private function startChildProcs() {
246 $wiki = WikiMap::getCurrentWikiId();
248 $cmd =
'php ' . Shell::escape( __FILE__ );
249 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
250 if ( $cmdOption ==
'child-id' ) {
254 if ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
256 $cmd .=
" --$cmdOption " . Shell::escape( $this->$classOption );
257 } elseif ( $this->$classOption ) {
258 $cmd .=
" --$cmdOption";
262 ' --wiki ' . Shell::escape( $wiki ) .
263 ' ' . Shell::escape( ...$this->destClusters );
265 $this->childPipes = $this->childProcs = [];
270 [
'file',
'php://stdout',
'w' ],
271 [
'file',
'php://stderr',
'w' ]
273 AtEase::suppressWarnings();
274 $proc = proc_open(
"$cmd --child-id $i", $spec, $pipes );
275 AtEase::restoreWarnings();
277 $this->
critical(
"Error opening child process: $cmd" );
280 $this->childProcs[$i] = $proc;
281 $this->childPipes[$i] = $pipes[0];
283 $this->prevChildId = -1;
289 private function killChildProcs() {
290 $this->
info(
"Waiting for child processes to finish..." );
292 $this->dispatchToChild( $i,
'quit' );
295 $status = proc_close( $this->childProcs[$i] );
297 $this->
critical(
"Warning: child #$i exited with status $status" );
300 $this->
info(
"Done." );
308 private function dispatch( ...$args ) {
312 $numPipes = stream_select( $x, $pipes, $y, 3600 );
314 $this->
critical(
"Error waiting to write to child process. Aborting" );
318 $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs;
321 $this->dispatchToChild(
$childId, $args );
335 private function dispatchToChild(
$childId, $args ) {
336 $args = (array)$args;
337 $cmd = implode(
' ', $args );
338 fwrite( $this->childPipes[
$childId],
"$cmd\n" );
344 private function doAllPages() {
345 $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
348 if ( $this->noCount ) {
349 $numPages =
'[unknown]';
351 $numPages = $dbr->newSelectQueryBuilder()
352 ->select(
'COUNT(DISTINCT bt_page)' )
353 ->from(
'blob_tracking' )
354 ->where( [
'bt_moved' => 0 ] )
355 ->caller( __METHOD__ )->fetchField();
357 if ( $this->copyOnly ) {
358 $this->
info(
"Copying pages..." );
360 $this->
info(
"Moving pages..." );
363 $res = $dbr->newSelectQueryBuilder()
364 ->select( [
'bt_page' ] )
366 ->from(
'blob_tracking' )
367 ->where( [
'bt_moved' => 0, $dbr->expr(
'bt_page',
'>', $startId ) ] )
368 ->orderBy(
'bt_page' )
369 ->limit( $this->batchSize )
370 ->caller( __METHOD__ )->fetchResultSet();
371 if ( !$res->numRows() ) {
374 foreach ( $res as $row ) {
375 $startId = $row->bt_page;
376 $this->dispatch(
'doPage', $row->bt_page );
379 $this->report(
'pages', $i, $numPages );
381 $this->report(
'pages', $i, $numPages );
382 if ( $this->copyOnly ) {
383 $this->
info(
"All page copies queued." );
385 $this->
info(
"All page moves queued." );
395 private function report( $label, $current, $end ) {
397 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
398 $this->numBatches = 0;
399 $this->
info(
"$label: $current / $end" );
400 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
407 private function doAllOrphans() {
408 $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
411 if ( $this->noCount ) {
412 $numOrphans =
'[unknown]';
414 $numOrphans = $dbr->newSelectQueryBuilder()
415 ->select(
'COUNT(DISTINCT bt_text_id)' )
416 ->from(
'blob_tracking' )
417 ->where( [
'bt_moved' => 0,
'bt_page' => 0 ] )
418 ->caller( __METHOD__ )->fetchField();
419 if ( !$numOrphans ) {
423 if ( $this->copyOnly ) {
424 $this->
info(
"Copying orphans..." );
426 $this->
info(
"Moving orphans..." );
430 $res = $dbr->newSelectQueryBuilder()
431 ->select( [
'bt_text_id' ] )
433 ->from(
'blob_tracking' )
434 ->where( [
'bt_moved' => 0,
'bt_page' => 0, $dbr->expr(
'bt_text_id',
'>', $startId ) ] )
435 ->orderBy(
'bt_text_id' )
436 ->limit( $this->batchSize )
437 ->caller( __METHOD__ )->fetchResultSet();
438 if ( !$res->numRows() ) {
442 foreach ( $res as $row ) {
443 $startId = $row->bt_text_id;
444 $ids[] = $row->bt_text_id;
450 while ( count( $ids ) > $this->orphanBatchSize ) {
451 $args = array_slice( $ids, 0, $this->orphanBatchSize );
452 $ids = array_slice( $ids, $this->orphanBatchSize );
453 array_unshift( $args,
'doOrphanList' );
454 $this->dispatch( ...$args );
456 if ( count( $ids ) ) {
458 array_unshift( $args,
'doOrphanList' );
459 $this->dispatch( ...$args );
462 $this->report(
'orphans', $i, $numOrphans );
464 $this->report(
'orphans', $i, $numOrphans );
465 $this->
info(
"All orphans queued." );
472 $this->
debug(
'starting' );
475 while ( !feof( STDIN ) ) {
476 $line = rtrim( fgets( STDIN ) );
480 $this->
debug( $line );
481 $args = explode(
' ', $line );
482 $cmd = array_shift( $args );
485 $this->doPage( intval( $args[0] ) );
488 $this->doOrphanList( array_map(
'intval', $args ) );
493 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
502 private function doPage( $pageId ) {
503 $title = Title::newFromID( $pageId );
505 $titleText = $title->getPrefixedText();
507 $titleText =
'[deleted]';
509 $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
512 if ( !$this->copyOnly ) {
513 $this->finishIncompleteMoves( [
'bt_page' => $pageId ] );
520 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
522 $res = $dbr->newSelectQueryBuilder()
524 ->from(
'blob_tracking' )
525 ->join(
'text',
null,
'bt_text_id=old_id' )
527 'bt_page' => $pageId,
528 $dbr->expr(
'bt_text_id',
'>', $startId ),
530 'bt_new_url' =>
null,
532 ->orderBy(
'bt_text_id' )
533 ->limit( $this->batchSize )
534 ->caller( __METHOD__ )->fetchResultSet();
535 if ( !$res->numRows() ) {
540 foreach ( $res as $row ) {
541 $startId = $row->bt_text_id;
542 if ( $lastTextId == $row->bt_text_id ) {
546 $lastTextId = $row->bt_text_id;
548 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
549 if ( $text ===
false ) {
550 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
555 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
556 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
559 $lbFactory->waitForReplication();
564 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
582 if ( $this->copyOnly ) {
583 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
586 $dbw = MediaWikiServices::getInstance()->getConnectionProvider()->getPrimaryDatabase();
588 $dbw->begin( __METHOD__ );
589 $dbw->newUpdateQueryBuilder()
593 'old_flags' =>
'external,utf-8',
598 ->caller( __METHOD__ )
600 $dbw->newUpdateQueryBuilder()
601 ->update(
'blob_tracking' )
602 ->set( [
'bt_moved' => 1 ] )
603 ->where( [
'bt_text_id' => $textId ] )
604 ->caller( __METHOD__ )
606 $dbw->commit( __METHOD__ );
619 private function finishIncompleteMoves( $conds ) {
620 $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
621 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
624 $conds = array_merge( $conds, [
626 $dbr->expr(
'bt_new_url',
'!=',
null ),
629 $res = $dbr->newSelectQueryBuilder()
631 ->from(
'blob_tracking' )
633 ->andWhere( $dbr->expr(
'bt_text_id',
'>', $startId ) )
634 ->orderBy(
'bt_text_id' )
635 ->limit( $this->batchSize )
636 ->caller( __METHOD__ )->fetchResultSet();
637 if ( !$res->numRows() ) {
640 $this->
debug(
'Incomplete: ' . $res->numRows() .
' rows' );
641 foreach ( $res as $row ) {
642 $startId = $row->bt_text_id;
643 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
644 if ( $row->bt_text_id % 10 == 0 ) {
645 $lbFactory->waitForReplication();
656 $cluster = next( $this->destClusters );
657 if ( $cluster ===
false ) {
658 $cluster = reset( $this->destClusters );
669 private function doOrphanList( $textIds ) {
671 if ( !$this->copyOnly ) {
672 $this->finishIncompleteMoves( [
'bt_text_id' => $textIds ] );
678 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
679 $res = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase()->newSelectQueryBuilder()
680 ->select( [
'old_id',
'old_text',
'old_flags' ] )
683 ->join(
'blob_tracking',
null,
'bt_text_id=old_id' )
684 ->where( [
'old_id' => $textIds,
'bt_moved' => 0 ] )
685 ->caller( __METHOD__ )->fetchResultSet();
687 foreach ( $res as $row ) {
688 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
689 if ( $text ===
false ) {
690 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
694 if ( !$trx->addItem( $text, $row->old_id ) ) {
695 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
698 $lbFactory->waitForReplication();
701 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );