MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27 use Psr\Log\LoggerInterface;
31 use Wikimedia\ScopedCallback;
32 
84  private static $scopeStack;
85 
89  private static $preventOpportunisticUpdates = 0;
90 
92  public const ALL = 0;
94  public const PRESEND = 1;
96  public const POSTSEND = 2;
97 
99  public const STAGES = [ self::PRESEND, self::POSTSEND ];
100 
102  private const BIG_QUEUE_SIZE = 100;
103 
124  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
125  $commandLineMode = MediaWikiServices::getInstance()->getMainConfig()->get( 'CommandLineMode' );
126 
127  self::getScopeStack()->current()->addUpdate( $update, $stage );
128  // If CLI mode is active and no RDBMs transaction round is in the way, then run all
129  // the pending updates now. This is needed for scripts that never, or rarely, use the
130  // RDBMs layer, but that do modify systems via deferred updates. This logic avoids
131  // excessive pending update queue sizes when long-running scripts never trigger the
132  // basic RDBMs hooks for running pending updates.
133  if ( $commandLineMode ) {
135  }
136  }
137 
150  public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
151  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
152  }
153 
172  public static function doUpdates( $stage = self::ALL ) {
173  $services = MediaWikiServices::getInstance();
174  $stats = $services->getStatsdDataFactory();
175  $lbf = $services->getDBLoadBalancerFactory();
176  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
177  $jobQueueGroupFactory = $services->getJobQueueGroupFactory();
178  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
179  ? 'cli'
180  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
181 
183  $guiError = null;
185  $exception = null;
186 
187  $scope = self::getScopeStack()->current();
188 
189  // T249069: recursion is not possible once explicit transaction rounds are involved
190  $activeUpdate = $scope->getActiveUpdate();
191  if ( $activeUpdate ) {
192  $class = get_class( $activeUpdate );
193  if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
194  throw new LogicException(
195  __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
196  );
197  }
198  if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
199  throw new LogicException(
200  __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
201  );
202  }
203  }
204 
205  $scope->processUpdates(
206  $stage,
207  function ( DeferrableUpdate $update, $activeStage )
208  use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
209  {
210  $scopeStack = self::getScopeStack();
211  $childScope = $scopeStack->descend( $activeStage, $update );
212  try {
213  $e = self::run( $update, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
214  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
215  $exception = $exception ?: $e;
216  // Any addUpdate() calls between descend() and ascend() used the sub-queue.
217  // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
218  // doUpdates() itself. In any case, process remaining updates in the subqueue.
219  // them, enqueueing them, or transferring them to the parent scope
220  // queues as appropriate...
221  $childScope->processUpdates(
222  $activeStage,
223  function ( DeferrableUpdate $subUpdate )
224  use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
225  {
226  $e = self::run( $subUpdate, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
227  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
228  $exception = $exception ?: $e;
229  }
230  );
231  } finally {
232  $scopeStack->ascend();
233  }
234  }
235  );
236 
237  // VW-style hack to work around T190178, so we can make sure
238  // PageMetaDataUpdater doesn't throw exceptions.
239  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
240  throw $exception;
241  }
242 
243  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
244  // callers should check permissions *before* enqueueing updates. If the main transaction
245  // round actions succeed but some deferred updates fail due to permissions errors then
246  // there is a risk that some secondary data was not properly updated.
247  if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
248  throw $guiError;
249  }
250  }
251 
269  public static function tryOpportunisticExecute(): bool {
270  // Leave execution up to the current loop if an update is already in progress
271  // or if updates are explicitly disabled
273  || self::$preventOpportunisticUpdates
274  ) {
275  return false;
276  }
277 
278  // Run the updates for this context if they will have outer transaction scope
279  if ( !self::areDatabaseTransactionsActive() ) {
280  self::doUpdates( self::ALL );
281 
282  return true;
283  }
284 
285  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
286  // There are a large number of pending updates and none of them can run yet.
287  // The odds of losing updates due to an error increase when executing long queues
288  // and when large amounts of time pass while tasks are queued. Mitigate this by
289  // trying to migrate updates to the job queue system (where applicable).
290  self::getScopeStack()->current()->consumeMatchingUpdates(
291  self::ALL,
292  EnqueueableDataUpdate::class,
293  static function ( EnqueueableDataUpdate $update ) {
294  $spec = $update->getAsJobSpecification();
295  MediaWikiServices::getInstance()->getJobQueueGroupFactory()
296  ->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
297  }
298  );
299  }
300 
301  return false;
302  }
303 
310  public static function preventOpportunisticUpdates() {
311  self::$preventOpportunisticUpdates++;
312  return new ScopedCallback( static function () {
313  self::$preventOpportunisticUpdates--;
314  } );
315  }
316 
326  public static function pendingUpdatesCount() {
327  return self::getScopeStack()->current()->pendingUpdatesCount();
328  }
329 
342  public static function getPendingUpdates( $stage = self::ALL ) {
343  return self::getScopeStack()->current()->getPendingUpdates( $stage );
344  }
345 
354  public static function clearPendingUpdates() {
355  self::getScopeStack()->current()->clearPendingUpdates();
356  }
357 
364  public static function getRecursiveExecutionStackDepth() {
365  return self::getScopeStack()->getRecursiveDepth();
366  }
367 
380  private static function run(
381  DeferrableUpdate $update,
382  ILBFactory $lbFactory,
383  LoggerInterface $logger,
384  StatsdDataFactoryInterface $stats,
385  JobQueueGroupFactory $jobQueueGroupFactory,
386  $httpMethod
387  ): ?Throwable {
388  $suffix = $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '';
389  $type = get_class( $update ) . $suffix;
390  $stats->increment( "deferred_updates.$httpMethod.$type" );
391  $updateId = spl_object_id( $update );
392  $logger->debug( __METHOD__ . ": started $type #$updateId" );
393 
394  $updateException = null;
395 
396  $startTime = microtime( true );
397  try {
398  self::attemptUpdate( $update, $lbFactory );
399  } catch ( Throwable $updateException ) {
400  MWExceptionHandler::logException( $updateException );
401  $logger->error(
402  "Deferred update '{deferred_type}' failed to run.",
403  [
404  'deferred_type' => $type,
405  'exception' => $updateException,
406  ]
407  );
408  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
409  } finally {
410  $walltime = microtime( true ) - $startTime;
411  $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" );
412  }
413 
414  // Try to push the update as a job so it can run later if possible
415  if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
416  try {
417  $spec = $update->getAsJobSpecification();
418  $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
419  } catch ( Throwable $jobException ) {
420  MWExceptionHandler::logException( $jobException );
421  $logger->error(
422  "Deferred update '{deferred_type}' failed to enqueue as a job.",
423  [
424  'deferred_type' => $type,
425  'exception' => $jobException,
426  ]
427  );
428  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
429  }
430  }
431 
432  return $updateException;
433  }
434 
446  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
447  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
448  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
449  throw new DBTransactionError( null, "A database transaction round is pending." );
450  }
451 
452  if ( $update instanceof DataUpdate ) {
453  $update->setTransactionTicket( $ticket );
454  }
455 
456  // Designate $update::doUpdate() as the write round owner
457  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
458  ? $update->getOrigin()
459  : get_class( $update ) . '::doUpdate';
460  // Determine whether the write round will be explicit or implicit
461  $useExplicitTrxRound = !(
462  $update instanceof TransactionRoundAwareUpdate &&
463  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
464  );
465 
466  // Flush any pending changes left over from an implicit transaction round
467  if ( $useExplicitTrxRound ) {
468  $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
469  } else {
470  $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
471  }
472  // Run the update after any stale primary DB view snapshots have been flushed
473  $update->doUpdate();
474  // Commit any pending changes from the explicit or implicit transaction round
475  $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
476  }
477 
481  private static function areDatabaseTransactionsActive() {
482  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
483  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
484  return true;
485  }
486 
487  foreach ( $lbFactory->getAllLBs() as $lb ) {
488  if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
489  return true;
490  }
491  }
492 
493  return false;
494  }
495 
499  private static function getScopeStack() {
500  if ( self::$scopeStack === null ) {
501  self::$scopeStack = new DeferredUpdatesScopeStack();
502  }
503 
504  return self::$scopeStack;
505  }
506 }
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
if(!defined('MW_SETUP_CALLBACK'))
Definition: WebStart.php:88
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:30
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
ascend()
Pop the innermost scope from the stack.
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Class for managing the deferral of updates within the scope of a PHP script invocation.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
static tryOpportunisticExecute()
Consume and execute all pending updates unless an update is already in progress or the ILBFactory ser...
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
static doUpdates( $stage=self::ALL)
Consume and execute all pending updates.
static preventOpportunisticUpdates()
Prevent opportunistic updates until the returned ScopedCallback is consumed.
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Class to construct JobQueueGroups.
PSR-3 logger instance factory.
Service locator for MediaWiki core services.
static getMain()
Get the RequestContext object associated with the main request.
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
getEmptyTransactionTicket( $fname)
Get a token asserting that no write transactions are active on tracked connections.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:36
Manager of ILoadBalancer objects and, indirectly, IDatabase connections.
Definition: ILBFactory.php:46
beginPrimaryChanges( $fname=__METHOD__)
Flush any primary transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
commitPrimaryChanges( $fname=__METHOD__, int $maxWriteDuration=0)
Commit changes and clear view snapshots on all primary connections.
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
getAllLBs()
Get all tracked load balancer instances (generator)
hasTransactionRound()
Check if an explicit transaction round is active.
isReadyForRoundOperations()
Check if transaction rounds can be started, committed, or rolled back right now.