110 private static $cmdLineOptionMap = [
111 'no-count' =>
'noCount',
112 'procs' =>
'numProcs',
113 'copy-only' =>
'copyOnly',
114 'child' =>
'isChild',
115 'child-id' =>
'childId',
116 'debug-log' =>
'debugLog',
117 'info-log' =>
'infoLog',
118 'critical-log' =>
'criticalLog',
122 return self::$optionsWithArgs;
126 $jobOptions = [
'destClusters' => $args ];
127 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
128 if ( isset( $options[$cmdOption] ) ) {
129 $jobOptions[$classOption] = $options[$cmdOption];
133 return new self( $jobOptions );
137 foreach ( $options as $name => $value ) {
138 $this->$name = $value;
140 $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
141 $this->store = $esFactory->getStore(
'DB' );
142 if ( !$this->isChild ) {
143 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT M: ";
144 } elseif ( $this->childId !==
false ) {
145 $GLOBALS[
'wgDebugLogPrefix'] =
"RCT {$this->childId}: ";
147 $this->pageBlobClass = function_exists(
'xdiff_string_bdiff' ) ?
148 DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
149 $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
151 $this->blobStore = MediaWikiServices::getInstance()
152 ->getBlobStoreFactory()
158 if ( $this->debugLog ) {
159 $this->logToFile( $msg, $this->debugLog );
165 if ( $this->infoLog ) {
166 $this->logToFile( $msg, $this->infoLog );
172 if ( $this->criticalLog ) {
173 $this->logToFile( $msg, $this->criticalLog );
177 private function logToFile( $msg, $file ) {
179 if ( $this->childId !==
false ) {
180 $header .=
"({$this->childId})";
182 $header .=
' ' . WikiMap::getCurrentWikiDbDomain()->getId();
183 LegacyLogger::emit( sprintf(
"%-50s %s\n",
$header, $msg ), $file );
191 private function syncDBs() {
192 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication( [
'timeout' => 100_000 ] );
199 if ( $this->isChild ) {
210 if ( !$this->checkTrackingTable() ) {
215 $this->startChildProcs();
217 $this->doAllOrphans();
218 $this->killChildProcs();
225 private function checkTrackingTable() {
226 $row = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase()->newSelectQueryBuilder()
228 ->from(
'blob_tracking' )
229 ->caller( __METHOD__ )->fetchRow();
231 $this->
info(
"Warning: blob_tracking table contains no rows, skipping this wiki." );
245 private function startChildProcs() {
246 $wiki = WikiMap::getCurrentWikiId();
248 $cmd =
'php ' . Shell::escape( __FILE__ );
249 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
250 if ( $cmdOption ==
'child-id' ) {
253 if ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
255 $cmd .=
" --$cmdOption " . Shell::escape( $this->$classOption );
256 } elseif ( $this->$classOption ) {
257 $cmd .=
" --$cmdOption";
261 ' --wiki ' . Shell::escape( $wiki ) .
262 ' ' . Shell::escape( ...$this->destClusters );
264 $this->childPipes = $this->childProcs = [];
269 [
'file',
'php://stdout',
'w' ],
270 [
'file',
'php://stderr',
'w' ]
272 AtEase::suppressWarnings();
273 $proc = proc_open(
"$cmd --child-id $i", $spec, $pipes );
274 AtEase::restoreWarnings();
276 $this->
critical(
"Error opening child process: $cmd" );
279 $this->childProcs[$i] = $proc;
280 $this->childPipes[$i] = $pipes[0];
282 $this->prevChildId = -1;
288 private function killChildProcs() {
289 $this->
info(
"Waiting for child processes to finish..." );
291 $this->dispatchToChild( $i,
'quit' );
294 $status = proc_close( $this->childProcs[$i] );
296 $this->
critical(
"Warning: child #$i exited with status $status" );
299 $this->
info(
"Done." );
307 private function dispatch( ...$args ) {
311 $numPipes = stream_select( $x, $pipes, $y, 3600 );
313 $this->
critical(
"Error waiting to write to child process. Aborting" );
317 $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs;
320 $this->dispatchToChild(
$childId, $args );
334 private function dispatchToChild(
$childId, $args ) {
335 $args = (array)$args;
336 $cmd = implode(
' ', $args );
337 fwrite( $this->childPipes[
$childId],
"$cmd\n" );
343 private function doAllPages() {
344 $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
347 if ( $this->noCount ) {
348 $numPages =
'[unknown]';
350 $numPages = $dbr->newSelectQueryBuilder()
351 ->select(
'COUNT(DISTINCT bt_page)' )
352 ->from(
'blob_tracking' )
353 ->where( [
'bt_moved' => 0 ] )
354 ->caller( __METHOD__ )->fetchField();
356 if ( $this->copyOnly ) {
357 $this->
info(
"Copying pages..." );
359 $this->
info(
"Moving pages..." );
362 $res = $dbr->newSelectQueryBuilder()
363 ->select( [
'bt_page' ] )
365 ->from(
'blob_tracking' )
366 ->where( [
'bt_moved' => 0, $dbr->expr(
'bt_page',
'>', $startId ) ] )
367 ->orderBy(
'bt_page' )
368 ->limit( $this->batchSize )
369 ->caller( __METHOD__ )->fetchResultSet();
370 if ( !$res->numRows() ) {
373 foreach ( $res as $row ) {
374 $startId = $row->bt_page;
375 $this->dispatch(
'doPage', $row->bt_page );
378 $this->report(
'pages', $i, $numPages );
380 $this->report(
'pages', $i, $numPages );
381 if ( $this->copyOnly ) {
382 $this->
info(
"All page copies queued." );
384 $this->
info(
"All page moves queued." );
394 private function report( $label, $current, $end ) {
396 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
397 $this->numBatches = 0;
398 $this->
info(
"$label: $current / $end" );
399 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
406 private function doAllOrphans() {
407 $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
410 if ( $this->noCount ) {
411 $numOrphans =
'[unknown]';
413 $numOrphans = $dbr->newSelectQueryBuilder()
414 ->select(
'COUNT(DISTINCT bt_text_id)' )
415 ->from(
'blob_tracking' )
416 ->where( [
'bt_moved' => 0,
'bt_page' => 0 ] )
417 ->caller( __METHOD__ )->fetchField();
418 if ( !$numOrphans ) {
422 if ( $this->copyOnly ) {
423 $this->
info(
"Copying orphans..." );
425 $this->
info(
"Moving orphans..." );
429 $res = $dbr->newSelectQueryBuilder()
430 ->select( [
'bt_text_id' ] )
432 ->from(
'blob_tracking' )
433 ->where( [
'bt_moved' => 0,
'bt_page' => 0, $dbr->expr(
'bt_text_id',
'>', $startId ) ] )
434 ->orderBy(
'bt_text_id' )
435 ->limit( $this->batchSize )
436 ->caller( __METHOD__ )->fetchResultSet();
437 if ( !$res->numRows() ) {
441 foreach ( $res as $row ) {
442 $startId = $row->bt_text_id;
443 $ids[] = $row->bt_text_id;
449 while ( count( $ids ) > $this->orphanBatchSize ) {
450 $args = array_slice( $ids, 0, $this->orphanBatchSize );
451 $ids = array_slice( $ids, $this->orphanBatchSize );
452 array_unshift( $args,
'doOrphanList' );
453 $this->dispatch( ...$args );
455 if ( count( $ids ) ) {
457 array_unshift( $args,
'doOrphanList' );
458 $this->dispatch( ...$args );
461 $this->report(
'orphans', $i, $numOrphans );
463 $this->report(
'orphans', $i, $numOrphans );
464 $this->
info(
"All orphans queued." );
471 $this->
debug(
'starting' );
474 while ( !feof( STDIN ) ) {
475 $line = rtrim( fgets( STDIN ) );
479 $this->
debug( $line );
480 $args = explode(
' ', $line );
481 $cmd = array_shift( $args );
484 $this->doPage( intval( $args[0] ) );
487 $this->doOrphanList( array_map(
'intval', $args ) );
492 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
501 private function doPage( $pageId ) {
502 $title = Title::newFromID( $pageId );
504 $titleText = $title->getPrefixedText();
506 $titleText =
'[deleted]';
508 $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
511 if ( !$this->copyOnly ) {
512 $this->finishIncompleteMoves( [
'bt_page' => $pageId ] );
519 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
521 $res = $dbr->newSelectQueryBuilder()
523 ->from(
'blob_tracking' )
524 ->join(
'text',
null,
'bt_text_id=old_id' )
526 'bt_page' => $pageId,
527 $dbr->expr(
'bt_text_id',
'>', $startId ),
529 'bt_new_url' =>
null,
531 ->orderBy(
'bt_text_id' )
532 ->limit( $this->batchSize )
533 ->caller( __METHOD__ )->fetchResultSet();
534 if ( !$res->numRows() ) {
539 foreach ( $res as $row ) {
540 $startId = $row->bt_text_id;
541 if ( $lastTextId == $row->bt_text_id ) {
545 $lastTextId = $row->bt_text_id;
547 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
548 if ( $text ===
false ) {
549 $this->
critical(
"Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
554 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
555 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
558 $lbFactory->waitForReplication();
563 $this->
debug(
"$titleText: committing blob with " . $trx->getSize() .
" items" );
581 if ( $this->copyOnly ) {
582 $this->
critical(
"Internal error: can't call moveTextRow() in --copy-only mode" );
585 $dbw = MediaWikiServices::getInstance()->getConnectionProvider()->getPrimaryDatabase();
587 $dbw->begin( __METHOD__ );
588 $dbw->newUpdateQueryBuilder()
592 'old_flags' =>
'external,utf-8',
597 ->caller( __METHOD__ )
599 $dbw->newUpdateQueryBuilder()
600 ->update(
'blob_tracking' )
601 ->set( [
'bt_moved' => 1 ] )
602 ->where( [
'bt_text_id' => $textId ] )
603 ->caller( __METHOD__ )
605 $dbw->commit( __METHOD__ );
618 private function finishIncompleteMoves( $conds ) {
619 $dbr = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase();
620 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
623 $conds = array_merge( $conds, [
625 $dbr->expr(
'bt_new_url',
'!=',
null ),
628 $res = $dbr->newSelectQueryBuilder()
630 ->from(
'blob_tracking' )
632 ->andWhere( $dbr->expr(
'bt_text_id',
'>', $startId ) )
633 ->orderBy(
'bt_text_id' )
634 ->limit( $this->batchSize )
635 ->caller( __METHOD__ )->fetchResultSet();
636 if ( !$res->numRows() ) {
639 $this->
debug(
'Incomplete: ' . $res->numRows() .
' rows' );
640 foreach ( $res as $row ) {
641 $startId = $row->bt_text_id;
642 $this->
moveTextRow( $row->bt_text_id, $row->bt_new_url );
643 if ( $row->bt_text_id % 10 == 0 ) {
644 $lbFactory->waitForReplication();
655 $cluster = next( $this->destClusters );
656 if ( $cluster ===
false ) {
657 $cluster = reset( $this->destClusters );
668 private function doOrphanList( $textIds ) {
670 if ( !$this->copyOnly ) {
671 $this->finishIncompleteMoves( [
'bt_text_id' => $textIds ] );
677 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
678 $res = MediaWikiServices::getInstance()->getConnectionProvider()->getReplicaDatabase()->newSelectQueryBuilder()
679 ->select( [
'old_id',
'old_text',
'old_flags' ] )
682 ->join(
'blob_tracking',
null,
'bt_text_id=old_id' )
683 ->where( [
'old_id' => $textIds,
'bt_moved' => 0 ] )
684 ->caller( __METHOD__ )->fetchResultSet();
686 foreach ( $res as $row ) {
687 $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags );
688 if ( $text ===
false ) {
689 $this->
critical(
"Error: cannot load revision text for old_id={$row->old_id}" );
693 if ( !$trx->addItem( $text, $row->old_id ) ) {
694 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );
697 $lbFactory->waitForReplication();
700 $this->
debug(
"[orphan]: committing blob with " . $trx->getSize() .
" rows" );