MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26 use Psr\Log\LoggerInterface;
32 
69  private static $preSendUpdates = [];
76  private static $postSendUpdates = [];
81  private static $executionStack = [];
82 
83  public const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
84  public const PRESEND = 1; // for updates that should run before flushing output buffer
85  public const POSTSEND = 2; // for updates that should run after flushing output buffer
86 
87  private const BIG_QUEUE_SIZE = 100;
88 
106  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
107  global $wgCommandLineMode;
108 
109  // Special handling for updates pushed while another update is in progress
110  if ( self::$executionStack && !( $update instanceof MergeableUpdate ) ) {
111  // Get the innermost in-progress update
112  end( self::$executionStack );
113  $topStackPos = key( self::$executionStack );
114  if ( self::$executionStack[$topStackPos]['stage'] >= $stage ) {
115  // Put this update into the sub-queue of that in-progress update
116  self::push( self::$executionStack[$topStackPos]['subqueue'], $update );
117 
118  return;
119  }
120  }
121 
122  if ( $stage === self::PRESEND ) {
123  self::push( self::$preSendUpdates, $update );
124  } else {
125  self::push( self::$postSendUpdates, $update );
126  }
127 
128  // Try to run the updates now if in CLI mode and no transaction is active.
129  // This covers scripts that don't/barely use the DB but make updates to other stores.
130  if ( $wgCommandLineMode ) {
132  }
133  }
134 
145  public static function addCallableUpdate(
146  $callable, $stage = self::POSTSEND, $dbw = null
147  ) {
148  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
149  }
150 
171  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
172  $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
173  // Special handling for when an in-progress update triggers this method
174  if ( self::$executionStack ) {
175  // Run the sub-queue updates for the innermost in-progress update
176  end( self::$executionStack );
177  $topStackPos = key( self::$executionStack );
179  self::$executionStack[$topStackPos]['subqueue'],
180  $mode,
181  $stageEffective
182  );
183 
184  return;
185  }
186  // For ALL mode, make sure that any PRESEND updates added along the way get run.
187  // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
188  do {
189  if ( $stage === self::ALL || $stage === self::PRESEND ) {
190  self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
191  }
192 
193  if ( $stage === self::ALL || $stage == self::POSTSEND ) {
194  self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
195  }
196  } while ( $stage === self::ALL && self::$preSendUpdates );
197  }
198 
203  private static function push( array &$queue, DeferrableUpdate $update ) {
204  if ( $update instanceof MergeableUpdate ) {
205  $class = get_class( $update ); // fully-qualified class
206  if ( isset( $queue[$class] ) ) {
208  $existingUpdate = $queue[$class];
209  '@phan-var MergeableUpdate $existingUpdate';
210  $existingUpdate->merge( $update );
211  // Move the update to the end to handle things like mergeable purge
212  // updates that might depend on the prior updates in the queue running
213  unset( $queue[$class] );
214  $queue[$class] = $existingUpdate;
215  } else {
216  $queue[$class] = $update;
217  }
218  } else {
219  $queue[] = $update;
220  }
221  }
222 
234  protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
235  $services = MediaWikiServices::getInstance();
236  $stats = $services->getStatsdDataFactory();
237  $lbf = $services->getDBLoadBalancerFactory();
238  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
239  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
240  ? 'cli'
241  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
242 
244  $guiEx = null;
246  $exception = null;
247 
249  $updates = $queue;
250 
251  // Keep doing rounds of updates until none get enqueued...
252  while ( $updates ) {
253  $queue = []; // clear the queue
254 
255  // Segregate the queue into one for DataUpdate and one for everything else
256  $dataUpdateQueue = [];
257  $genericUpdateQueue = [];
258  foreach ( $updates as $update ) {
259  if ( $update instanceof DataUpdate ) {
260  $dataUpdateQueue[] = $update;
261  } else {
262  $genericUpdateQueue[] = $update;
263  }
264  }
265  // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
266  foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
267  foreach ( $updateQueue as $curUpdate ) {
268  // Enqueue the update into the job queue system instead if applicable
269  if ( $mode === 'enqueue' && $curUpdate instanceof EnqueueableDataUpdate ) {
270  self::jobify( $curUpdate, $lbf, $logger, $stats, $httpMethod );
271  continue;
272  }
273  // Otherwise, execute the update, followed by any sub-updates that it spawns
274  $stackEntry = [ 'stage' => $stage, 'update' => $curUpdate, 'subqueue' => [] ];
275  $stackKey = count( self::$executionStack );
276  self::$executionStack[$stackKey] =& $stackEntry;
277  try {
278  $e = self::run( $curUpdate, $lbf, $logger, $stats, $httpMethod );
279  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
280  $exception = $exception ?: $e;
281  // Do the subqueue updates for $update until there are none
282  // @phan-suppress-next-line PhanImpossibleConditionInLoop
283  while ( $stackEntry['subqueue'] ) {
284  $duChild = reset( $stackEntry['subqueue'] );
285  $duChildKey = key( $stackEntry['subqueue'] );
286  unset( $stackEntry['subqueue'][$duChildKey] );
287 
288  $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
289  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
290  $exception = $exception ?: $e;
291  }
292  } finally {
293  // Make sure we always clean up the context.
294  // Losing updates while rewinding the stack is acceptable,
295  // losing updates that are added later is not.
296  unset( self::$executionStack[$stackKey] );
297  }
298  }
299  }
300 
301  $updates = $queue; // new snapshot of queue (check for new entries)
302  }
303 
304  // VW-style hack to work around T190178, so we can make sure
305  // PageMetaDataUpdater doesn't throw exceptions.
306  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
307  throw $exception;
308  }
309 
310  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
311  // callers should check permissions *before* enqueueing updates. If the main transaction
312  // round actions succeed but some deferred updates fail due to permissions errors then
313  // there is a risk that some secondary data was not properly updated.
314  if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
315  throw $guiEx;
316  }
317  }
318 
329  private static function run(
330  DeferrableUpdate $update,
331  LBFactory $lbFactory,
332  LoggerInterface $logger,
333  StatsdDataFactoryInterface $stats,
334  $httpMethod
335  ) : ?Throwable {
336  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
337  $type = get_class( $update ) . $suffix;
338  $stats->increment( "deferred_updates.$httpMethod.$type" );
339 
340  $updateId = spl_object_id( $update );
341  $logger->debug( __METHOD__ . ": started $type #$updateId" );
342  $e = null;
343  try {
344  self::attemptUpdate( $update, $lbFactory );
345 
346  return null;
347  } catch ( Throwable $e ) {
348  } finally {
349  $logger->debug( __METHOD__ . ": ended $type #$updateId" );
350  }
351 
353  $logger->error(
354  "Deferred update '{deferred_type}' failed to run.",
355  [
356  'deferred_type' => $type,
357  'exception' => $e,
358  ]
359  );
360 
361  $lbFactory->rollbackMasterChanges( __METHOD__ );
362 
363  // Try to push the update as a job so it can run later if possible
364  if ( $update instanceof EnqueueableDataUpdate ) {
365  $jobEx = null;
366  try {
367  $spec = $update->getAsJobSpecification();
368  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
369 
370  return $e;
371  } catch ( Throwable $jobEx ) {
372  }
373 
375  $logger->error(
376  "Deferred update '{deferred_type}' failed to enqueue as a job.",
377  [
378  'deferred_type' => $type,
379  'exception' => $jobEx,
380  ]
381  );
382 
383  $lbFactory->rollbackMasterChanges( __METHOD__ );
384  }
385 
386  return $e;
387  }
388 
398  private static function jobify(
399  EnqueueableDataUpdate $update,
400  LBFactory $lbFactory,
401  LoggerInterface $logger,
402  StatsdDataFactoryInterface $stats,
403  $httpMethod
404  ) {
405  $type = get_class( $update );
406  $stats->increment( "deferred_updates.$httpMethod.$type" );
407 
408  $jobEx = null;
409  try {
410  $spec = $update->getAsJobSpecification();
411  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
412 
413  return;
414  } catch ( Throwable $jobEx ) {
415  }
416 
418  $logger->error(
419  "Deferred update '$type' failed to enqueue as a job.",
420  [
421  'deferred_type' => $type,
422  'exception' => $jobEx,
423  ]
424  );
425 
426  $lbFactory->rollbackMasterChanges( __METHOD__ );
427  }
428 
440  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
441  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
442  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
443  throw new DBTransactionError( null, "A database transaction round is pending." );
444  }
445 
446  if ( $update instanceof DataUpdate ) {
447  $update->setTransactionTicket( $ticket );
448  }
449 
450  // Designate $update::doUpdate() as the write round owner
451  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
452  ? $update->getOrigin()
453  : get_class( $update ) . '::doUpdate';
454  // Determine whether the write round will be explicit or implicit
455  $useExplicitTrxRound = !(
456  $update instanceof TransactionRoundAwareUpdate &&
457  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
458  );
459 
460  // Flush any pending changes left over from an implicit transaction round
461  if ( $useExplicitTrxRound ) {
462  $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
463  } else {
464  $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
465  }
466  // Run the update after any stale master view snapshots have been flushed
467  $update->doUpdate();
468  // Commit any pending changes from the explicit or implicit transaction round
469  $lbFactory->commitMasterChanges( $fnameTrxOwner );
470  }
471 
483  public static function tryOpportunisticExecute( $mode = 'run' ) {
484  // An update is already in progress
485  if ( self::$executionStack ) {
486  return false;
487  }
488 
489  // Avoiding running updates without them having outer scope
490  if ( !self::areDatabaseTransactionsActive() ) {
491  self::doUpdates( $mode );
492  return true;
493  }
494 
495  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
496  // If we cannot run the updates with outer transaction context, try to
497  // at least enqueue all the updates that support queueing to job queue
498  self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
499  self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
500  }
501 
502  return !self::pendingUpdatesCount();
503  }
504 
511  private static function enqueueUpdates( array $updates ) {
512  $remaining = [];
513 
514  foreach ( $updates as $update ) {
515  if ( $update instanceof EnqueueableDataUpdate ) {
516  $spec = $update->getAsJobSpecification();
517  $domain = $spec['domain'] ?? $spec['wiki'];
518  JobQueueGroup::singleton( $domain )->push( $spec['job'] );
519  } else {
520  $remaining[] = $update;
521  }
522  }
523 
524  return $remaining;
525  }
526 
535  public static function pendingUpdatesCount() {
536  if ( self::$executionStack ) {
537  throw new LogicException( "Called during execution of a DeferrableUpdate" );
538  }
539 
540  return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
541  }
542 
554  public static function getPendingUpdates( $stage = self::ALL ) {
555  $updates = [];
556  if ( $stage === self::ALL || $stage === self::PRESEND ) {
557  $updates = array_merge( $updates, self::$preSendUpdates );
558  }
559  if ( $stage === self::ALL || $stage === self::POSTSEND ) {
560  $updates = array_merge( $updates, self::$postSendUpdates );
561  }
562 
563  return $updates;
564  }
565 
573  public static function clearPendingUpdates() {
574  self::$preSendUpdates = [];
575  self::$postSendUpdates = [];
576  }
577 
581  private static function areDatabaseTransactionsActive() {
582  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
583  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
584  return true;
585  }
586 
587  $connsBusy = false;
588  $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
589  $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
590  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
591  $connsBusy = true;
592  }
593  } );
594  } );
595 
596  return $connsBusy;
597  }
598 }
DeferredUpdates\jobify
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a update into the job queue system and catch/log any exceptions.
Definition: DeferredUpdates.php:398
DeferredUpdates\ALL
const ALL
Definition: DeferredUpdates.php:83
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:152
DeferredUpdates\attemptUpdate
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
Definition: DeferredUpdates.php:440
MergeableUpdate
Interface that deferrable updates can implement to signal that updates can be combined.
Definition: MergeableUpdate.php:18
DeferrableUpdate\doUpdate
doUpdate()
Perform the actual work.
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred update queue for execution at the appropriate time.
Definition: DeferredUpdates.php:106
Wikimedia\Rdbms\LoadBalancer\forEachOpenMasterConnection
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
Definition: LoadBalancer.php:2144
DeferredUpdates\clearPendingUpdates
static clearPendingUpdates()
Clear all pending updates without performing them.
Definition: DeferredUpdates.php:573
DeferredUpdates\$postSendUpdates
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until just after HTTP response emission.
Definition: DeferredUpdates.php:76
DeferrableCallback
Callback wrapper that has an originating method.
Definition: DeferrableCallback.php:8
TransactionRoundAwareUpdate
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Definition: TransactionRoundAwareUpdate.php:9
DataUpdate
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:28
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
EnqueueableDataUpdate
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Definition: EnqueueableDataUpdate.php:10
MWExceptionHandler\logException
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Definition: MWExceptionHandler.php:656
DeferredUpdates\run
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run an update, and, if an error was thrown, catch/log it and fallback to the job queue.
Definition: DeferredUpdates.php:329
MediaWiki\Logger\LoggerFactory
PSR-3 logger instance factory.
Definition: LoggerFactory.php:45
$wgCommandLineMode
global $wgCommandLineMode
Definition: DevelopmentSettings.php:29
DeferredUpdates
Class for managing the deferred updates.
Definition: DeferredUpdates.php:62
EnqueueableDataUpdate\getAsJobSpecification
getAsJobSpecification()
DeferredUpdates\handleUpdateQueue
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
Definition: DeferredUpdates.php:234
$queue
$queue
Definition: mergeMessageFileList.php:157
DeferredUpdates\POSTSEND
const POSTSEND
Definition: DeferredUpdates.php:85
DeferredUpdates\$executionStack
static array[] $executionStack
Execution stack of currently running updates -var array<array{stage:int,update:DeferrableUpdate,...
Definition: DeferredUpdates.php:81
Wikimedia\Rdbms\LoadBalancer
Database connection, tracking, load balancing, and transaction manager for a cluster.
Definition: LoadBalancer.php:42
Wikimedia\Rdbms\ILBFactory\hasTransactionRound
hasTransactionRound()
Check if an explicit transaction round is active.
DeferredUpdates\BIG_QUEUE_SIZE
const BIG_QUEUE_SIZE
Definition: DeferredUpdates.php:87
DeferredUpdates\tryOpportunisticExecute
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
Definition: DeferredUpdates.php:483
Wikimedia\Rdbms\IDatabase\explicitTrxActive
explicitTrxActive()
DeferredUpdates\areDatabaseTransactionsActive
static areDatabaseTransactionsActive()
Definition: DeferredUpdates.php:581
RequestContext\getMain
static getMain()
Get the RequestContext object associated with the main request.
Definition: RequestContext.php:451
DeferredUpdates\doUpdates
static doUpdates( $mode='run', $stage=self::ALL)
Consume the list of deferred updates and execute them.
Definition: DeferredUpdates.php:171
DeferredUpdates\pendingUpdatesCount
static pendingUpdatesCount()
Get the number of currently enqueued updates in the top-queues.
Definition: DeferredUpdates.php:535
Wikimedia\Rdbms\ILBFactory\beginMasterChanges
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
MWCallableUpdate
Deferrable Update for closure/callback.
Definition: MWCallableUpdate.php:10
Wikimedia\Rdbms\DBTransactionError
Definition: DBTransactionError.php:27
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:70
Wikimedia\Rdbms\LBFactory\rollbackMasterChanges
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Definition: LBFactory.php:320
DeferredUpdates\PRESEND
const PRESEND
Definition: DeferredUpdates.php:84
DeferredUpdates\getPendingUpdates
static getPendingUpdates( $stage=self::ALL)
Get the list of pending updates in the top-queues.
Definition: DeferredUpdates.php:554
Wikimedia\Rdbms\LBFactory
An interface for generating database load balancers.
Definition: LBFactory.php:41
Wikimedia\Rdbms\ILBFactory\commitMasterChanges
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
DeferredUpdates\enqueueUpdates
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
Definition: DeferredUpdates.php:511
DeferredUpdates\push
static push(array &$queue, DeferrableUpdate $update)
Definition: DeferredUpdates.php:203
DeferrableUpdate
Interface that deferrable updates should implement.
Definition: DeferrableUpdate.php:9
DeferredUpdates\$preSendUpdates
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until just before HTTP response emission.
Definition: DeferredUpdates.php:69
ErrorPageError
An error page which can definitely be safely rendered using the OutputPage.
Definition: ErrorPageError.php:27
Wikimedia\Rdbms\ILBFactory\getEmptyTransactionTicket
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.
DeferredUpdates\addCallableUpdate
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
Definition: DeferredUpdates.php:145
wfGetCaller
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Definition: GlobalFunctions.php:1393
Wikimedia\Rdbms\ILBFactory
An interface for generating database load balancers.
Definition: ILBFactory.php:33
$type
$type
Definition: testCompression.php:52
Wikimedia\Rdbms\IDatabase\writesOrCallbacksPending
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...