MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27 use Psr\Log\LoggerInterface;
31 use Wikimedia\ScopedCallback;
32 
84  private static $scopeStack;
85 
89  private static $preventOpportunisticUpdates = 0;
90 
92  public const ALL = 0;
94  public const PRESEND = 1;
96  public const POSTSEND = 2;
97 
99  public const STAGES = [ self::PRESEND, self::POSTSEND ];
100 
102  private const BIG_QUEUE_SIZE = 100;
103 
124  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
125  $commandLineMode = MediaWikiServices::getInstance()->getMainConfig()->get( 'CommandLineMode' );
126 
127  self::getScopeStack()->current()->addUpdate( $update, $stage );
128  // If CLI mode is active and no RDBMs transaction round is in the way, then run all
129  // the pending updates now. This is needed for scripts that never, or rarely, use the
130  // RDBMs layer, but that do modify systems via deferred updates. This logic avoids
131  // excessive pending update queue sizes when long-running scripts never trigger the
132  // basic RDBMs hooks for running pending updates.
133  if ( $commandLineMode ) {
135  }
136  }
137 
150  public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
151  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
152  }
153 
173  public static function doUpdates( $unused = null, $stage = self::ALL ) {
174  $services = MediaWikiServices::getInstance();
175  $stats = $services->getStatsdDataFactory();
176  $lbf = $services->getDBLoadBalancerFactory();
177  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
178  $jobQueueGroupFactory = $services->getJobQueueGroupFactory();
179  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
180  ? 'cli'
181  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
182 
184  $guiError = null;
186  $exception = null;
187 
188  $scope = self::getScopeStack()->current();
189 
190  // T249069: recursion is not possible once explicit transaction rounds are involved
191  $activeUpdate = $scope->getActiveUpdate();
192  if ( $activeUpdate ) {
193  $class = get_class( $activeUpdate );
194  if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
195  throw new LogicException(
196  __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
197  );
198  }
199  if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
200  throw new LogicException(
201  __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
202  );
203  }
204  }
205 
206  $scope->processUpdates(
207  $stage,
208  function ( DeferrableUpdate $update, $activeStage )
209  use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
210  {
211  $scopeStack = self::getScopeStack();
212  $childScope = $scopeStack->descend( $activeStage, $update );
213  try {
214  $e = self::run( $update, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
215  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
216  $exception = $exception ?: $e;
217  // Any addUpdate() calls between descend() and ascend() used the sub-queue.
218  // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
219  // doUpdates() itself. In any case, process remaining updates in the subqueue.
220  // them, enqueueing them, or transferring them to the parent scope
221  // queues as appropriate...
222  $childScope->processUpdates(
223  $activeStage,
224  function ( DeferrableUpdate $subUpdate )
225  use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
226  {
227  $e = self::run( $subUpdate, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
228  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
229  $exception = $exception ?: $e;
230  }
231  );
232  } finally {
233  $scopeStack->ascend();
234  }
235  }
236  );
237 
238  // VW-style hack to work around T190178, so we can make sure
239  // PageMetaDataUpdater doesn't throw exceptions.
240  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
241  throw $exception;
242  }
243 
244  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
245  // callers should check permissions *before* enqueueing updates. If the main transaction
246  // round actions succeed but some deferred updates fail due to permissions errors then
247  // there is a risk that some secondary data was not properly updated.
248  if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
249  throw $guiError;
250  }
251  }
252 
270  public static function tryOpportunisticExecute(): bool {
271  // Leave execution up to the current loop if an update is already in progress
272  // or if updates are explicitly disabled
274  || self::$preventOpportunisticUpdates
275  ) {
276  return false;
277  }
278 
279  // Run the updates for this context if they will have outer transaction scope
280  if ( !self::areDatabaseTransactionsActive() ) {
281  self::doUpdates( null, self::ALL );
282 
283  return true;
284  }
285 
286  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
287  // There are a large number of pending updates and none of them can run yet.
288  // The odds of losing updates due to an error increase when executing long queues
289  // and when large amounts of time pass while tasks are queued. Mitigate this by
290  // trying to migrate updates to the job queue system (where applicable).
291  self::getScopeStack()->current()->consumeMatchingUpdates(
292  self::ALL,
293  EnqueueableDataUpdate::class,
294  static function ( EnqueueableDataUpdate $update ) {
295  $spec = $update->getAsJobSpecification();
296  MediaWikiServices::getInstance()->getJobQueueGroupFactory()
297  ->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
298  }
299  );
300  }
301 
302  return false;
303  }
304 
311  public static function preventOpportunisticUpdates() {
312  self::$preventOpportunisticUpdates++;
313  return new ScopedCallback( static function () {
314  self::$preventOpportunisticUpdates--;
315  } );
316  }
317 
327  public static function pendingUpdatesCount() {
328  return self::getScopeStack()->current()->pendingUpdatesCount();
329  }
330 
343  public static function getPendingUpdates( $stage = self::ALL ) {
344  return self::getScopeStack()->current()->getPendingUpdates( $stage );
345  }
346 
355  public static function clearPendingUpdates() {
356  self::getScopeStack()->current()->clearPendingUpdates();
357  }
358 
365  public static function getRecursiveExecutionStackDepth() {
366  return self::getScopeStack()->getRecursiveDepth();
367  }
368 
381  private static function run(
382  DeferrableUpdate $update,
383  ILBFactory $lbFactory,
384  LoggerInterface $logger,
385  StatsdDataFactoryInterface $stats,
386  JobQueueGroupFactory $jobQueueGroupFactory,
387  $httpMethod
388  ): ?Throwable {
389  $suffix = $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '';
390  $type = get_class( $update ) . $suffix;
391  $stats->increment( "deferred_updates.$httpMethod.$type" );
392  $updateId = spl_object_id( $update );
393  $logger->debug( __METHOD__ . ": started $type #$updateId" );
394 
395  $updateException = null;
396 
397  $startTime = microtime( true );
398  try {
399  self::attemptUpdate( $update, $lbFactory );
400  } catch ( Throwable $updateException ) {
401  MWExceptionHandler::logException( $updateException );
402  $logger->error(
403  "Deferred update '{deferred_type}' failed to run.",
404  [
405  'deferred_type' => $type,
406  'exception' => $updateException,
407  ]
408  );
409  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
410  } finally {
411  $walltime = microtime( true ) - $startTime;
412  $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" );
413  }
414 
415  // Try to push the update as a job so it can run later if possible
416  if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
417  try {
418  $spec = $update->getAsJobSpecification();
419  $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
420  } catch ( Throwable $jobException ) {
421  MWExceptionHandler::logException( $jobException );
422  $logger->error(
423  "Deferred update '{deferred_type}' failed to enqueue as a job.",
424  [
425  'deferred_type' => $type,
426  'exception' => $jobException,
427  ]
428  );
429  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
430  }
431  }
432 
433  return $updateException;
434  }
435 
447  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
448  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
449  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
450  throw new DBTransactionError( null, "A database transaction round is pending." );
451  }
452 
453  if ( $update instanceof DataUpdate ) {
454  $update->setTransactionTicket( $ticket );
455  }
456 
457  // Designate $update::doUpdate() as the write round owner
458  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
459  ? $update->getOrigin()
460  : get_class( $update ) . '::doUpdate';
461  // Determine whether the write round will be explicit or implicit
462  $useExplicitTrxRound = !(
463  $update instanceof TransactionRoundAwareUpdate &&
464  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
465  );
466 
467  // Flush any pending changes left over from an implicit transaction round
468  if ( $useExplicitTrxRound ) {
469  $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
470  } else {
471  $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
472  }
473  // Run the update after any stale primary DB view snapshots have been flushed
474  $update->doUpdate();
475  // Commit any pending changes from the explicit or implicit transaction round
476  $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
477  }
478 
482  private static function areDatabaseTransactionsActive() {
483  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
484  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
485  return true;
486  }
487 
488  foreach ( $lbFactory->getAllLBs() as $lb ) {
489  if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
490  return true;
491  }
492  }
493 
494  return false;
495  }
496 
500  private static function getScopeStack() {
501  if ( self::$scopeStack === null ) {
502  self::$scopeStack = new DeferredUpdatesScopeStack();
503  }
504 
505  return self::$scopeStack;
506  }
507 }
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Definition: WebStart.php:82
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:30
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
ascend()
Pop the innermost scope from the stack.
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Class for managing the deferral of updates within the scope of a PHP script invocation.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
static tryOpportunisticExecute()
Consume and execute all pending updates unless an update is already in progress or the ILBFactory ser...
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
static preventOpportunisticUpdates()
Prevent opportunistic updates until the returned ScopedCallback is consumed.
static doUpdates( $unused=null, $stage=self::ALL)
Consume and execute all pending updates.
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Class to construct JobQueueGroups.
PSR-3 logger instance factory.
Service locator for MediaWiki core services.
static getMain()
Get the RequestContext object associated with the main request.
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:40
Manager of ILoadBalancer objects, and indirectly of IDatabase connections.
Definition: ILBFactory.php:31
beginPrimaryChanges( $fname=__METHOD__)
Flush any primary transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
commitPrimaryChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all primary connections.
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
getAllLBs()
Get all tracked load balancer instances (generator)
hasTransactionRound()
Check if an explicit transaction round is active.
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active on tracked load balancers.
isReadyForRoundOperations()
Check if transaction rounds can be started, committed, or rolled back right now.