MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26 use Psr\Log\LoggerInterface;
32 use Wikimedia\ScopedCallback;
33 
85  private static $scopeStack;
86 
90  private static $preventOpportunisticUpdates = 0;
91 
93  public const ALL = 0;
95  public const PRESEND = 1;
97  public const POSTSEND = 2;
98 
100  public const STAGES = [ self::PRESEND, self::POSTSEND ];
101 
103  private const BIG_QUEUE_SIZE = 100;
104 
125  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
126  $commandLineMode = MediaWikiServices::getInstance()->getMainConfig()->get( 'CommandLineMode' );
127 
128  self::getScopeStack()->current()->addUpdate( $update, $stage );
129  // If CLI mode is active and no RDBMs transaction round is in the way, then run all
130  // the pending updates now. This is needed for scripts that never, or rarely, use the
131  // RDBMs layer, but that do modify systems via deferred updates. This logic avoids
132  // excessive pending update queue sizes when long-running scripts never trigger the
133  // basic RDBMs hooks for running pending updates.
134  if ( $commandLineMode ) {
136  }
137  }
138 
151  public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
152  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
153  }
154 
179  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
180  $services = MediaWikiServices::getInstance();
181  $stats = $services->getStatsdDataFactory();
182  $lbf = $services->getDBLoadBalancerFactory();
183  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
184  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
185  ? 'cli'
186  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
187 
189  $guiError = null;
191  $exception = null;
192 
193  $scope = self::getScopeStack()->current();
194 
195  // T249069: recursion is not possible once explicit transaction rounds are involved
196  $activeUpdate = $scope->getActiveUpdate();
197  if ( $activeUpdate ) {
198  $class = get_class( $activeUpdate );
199  if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
200  throw new LogicException(
201  __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
202  );
203  }
204  if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
205  throw new LogicException(
206  __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
207  );
208  }
209  }
210 
211  $scope->processUpdates(
212  $stage,
213  function ( DeferrableUpdate $update, $activeStage )
214  use ( $mode, $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
215  {
216  // If applicable, just enqueue the update as a job in the job queue system
217  if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
218  self::jobify( $update, $lbf, $logger, $stats, $httpMethod );
219 
220  return;
221  }
222 
223  // Otherwise, run the update....
225  $childScope = $scopeStack->descend( $activeStage, $update );
226  try {
227  $e = self::run( $update, $lbf, $logger, $stats, $httpMethod );
228  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
229  $exception = $exception ?: $e;
230  // Any addUpdate() calls between descend() and ascend() used the sub-queue.
231  // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
232  // doUpdates() itself. In any case, process remaining updates in the subqueue.
233  // them, enqueueing them, or transferring them to the parent scope
234  // queues as appropriate...
235  $childScope->processUpdates(
236  $activeStage,
237  function ( DeferrableUpdate $subUpdate )
238  use ( $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
239  {
240  $e = self::run( $subUpdate, $lbf, $logger, $stats, $httpMethod );
241  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
242  $exception = $exception ?: $e;
243  }
244  );
245  } finally {
246  $scopeStack->ascend();
247  }
248  }
249  );
250 
251  // VW-style hack to work around T190178, so we can make sure
252  // PageMetaDataUpdater doesn't throw exceptions.
253  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
254  throw $exception;
255  }
256 
257  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
258  // callers should check permissions *before* enqueueing updates. If the main transaction
259  // round actions succeed but some deferred updates fail due to permissions errors then
260  // there is a risk that some secondary data was not properly updated.
261  if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
262  throw $guiError;
263  }
264  }
265 
288  public static function tryOpportunisticExecute( $mode = 'run' ) {
289  // Leave execution up to the current loop if an update is already in progress
290  // or if updates are explicitly disabled
291  if ( self::getRecursiveExecutionStackDepth()
292  || self::$preventOpportunisticUpdates
293  ) {
294  return false;
295  }
296 
297  // Run the updates for this context if they will have outer transaction scope
298  if ( !self::areDatabaseTransactionsActive() ) {
299  self::doUpdates( $mode, self::ALL );
300 
301  return true;
302  }
303 
304  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
305  // There are a large number of pending updates and none of them can run yet.
306  // The odds of losing updates due to an error increase when executing long queues
307  // and when large amounts of time pass while tasks are queued. Mitigate this by
308  // trying to migrate updates to the job queue system (where applicable).
309  self::getScopeStack()->current()->consumeMatchingUpdates(
310  self::ALL,
311  EnqueueableDataUpdate::class,
312  static function ( EnqueueableDataUpdate $update ) {
313  $spec = $update->getAsJobSpecification();
314  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
315  }
316  );
317  }
318 
319  return false;
320  }
321 
328  public static function preventOpportunisticUpdates() {
329  self::$preventOpportunisticUpdates++;
330  return new ScopedCallback( static function () {
331  self::$preventOpportunisticUpdates--;
332  } );
333  }
334 
344  public static function pendingUpdatesCount() {
345  return self::getScopeStack()->current()->pendingUpdatesCount();
346  }
347 
360  public static function getPendingUpdates( $stage = self::ALL ) {
361  return self::getScopeStack()->current()->getPendingUpdates( $stage );
362  }
363 
372  public static function clearPendingUpdates() {
373  self::getScopeStack()->current()->clearPendingUpdates();
374  }
375 
382  public static function getRecursiveExecutionStackDepth() {
383  return self::getScopeStack()->getRecursiveDepth();
384  }
385 
397  private static function run(
398  DeferrableUpdate $update,
399  ILBFactory $lbFactory,
400  LoggerInterface $logger,
401  StatsdDataFactoryInterface $stats,
402  $httpMethod
403  ): ?Throwable {
404  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
405  $type = get_class( $update ) . $suffix;
406  $stats->increment( "deferred_updates.$httpMethod.$type" );
407  $updateId = spl_object_id( $update );
408  $logger->debug( __METHOD__ . ": started $type #$updateId" );
409 
410  $updateException = null;
411 
412  $startTime = microtime( true );
413  try {
414  self::attemptUpdate( $update, $lbFactory );
415  } catch ( Throwable $updateException ) {
416  MWExceptionHandler::logException( $updateException );
417  $logger->error(
418  "Deferred update '{deferred_type}' failed to run.",
419  [
420  'deferred_type' => $type,
421  'exception' => $updateException,
422  ]
423  );
424  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
425  } finally {
426  $walltime = microtime( true ) - $startTime;
427  $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" );
428  }
429 
430  // Try to push the update as a job so it can run later if possible
431  if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
432  try {
433  $spec = $update->getAsJobSpecification();
434  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
435  } catch ( Throwable $jobException ) {
436  MWExceptionHandler::logException( $jobException );
437  $logger->error(
438  "Deferred update '{deferred_type}' failed to enqueue as a job.",
439  [
440  'deferred_type' => $type,
441  'exception' => $jobException,
442  ]
443  );
444  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
445  }
446  }
447 
448  return $updateException;
449  }
450 
460  private static function jobify(
461  EnqueueableDataUpdate $update,
462  LBFactory $lbFactory,
463  LoggerInterface $logger,
464  StatsdDataFactoryInterface $stats,
465  $httpMethod
466  ) {
467  $type = get_class( $update );
468  $stats->increment( "deferred_updates.$httpMethod.$type" );
469 
470  $jobEx = null;
471  try {
472  $spec = $update->getAsJobSpecification();
473  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
474 
475  return;
476  } catch ( Throwable $jobEx ) {
477  }
478 
480  $logger->error(
481  "Deferred update '$type' failed to enqueue as a job.",
482  [
483  'deferred_type' => $type,
484  'exception' => $jobEx,
485  ]
486  );
487 
488  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
489  }
490 
502  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
503  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
504  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
505  throw new DBTransactionError( null, "A database transaction round is pending." );
506  }
507 
508  if ( $update instanceof DataUpdate ) {
509  $update->setTransactionTicket( $ticket );
510  }
511 
512  // Designate $update::doUpdate() as the write round owner
513  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
514  ? $update->getOrigin()
515  : get_class( $update ) . '::doUpdate';
516  // Determine whether the write round will be explicit or implicit
517  $useExplicitTrxRound = !(
518  $update instanceof TransactionRoundAwareUpdate &&
519  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
520  );
521 
522  // Flush any pending changes left over from an implicit transaction round
523  if ( $useExplicitTrxRound ) {
524  $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
525  } else {
526  $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
527  }
528  // Run the update after any stale primary DB view snapshots have been flushed
529  $update->doUpdate();
530  // Commit any pending changes from the explicit or implicit transaction round
531  $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
532  }
533 
537  private static function areDatabaseTransactionsActive() {
538  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
539  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
540  return true;
541  }
542 
543  $connsBusy = false;
544  $lbFactory->forEachLB( static function ( LoadBalancer $lb ) use ( &$connsBusy ) {
545  $lb->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( &$connsBusy ) {
546  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
547  $connsBusy = true;
548  }
549  } );
550  } );
551 
552  return $connsBusy;
553  }
554 
558  private static function getScopeStack() {
559  if ( self::$scopeStack === null ) {
560  self::$scopeStack = new DeferredUpdatesScopeStack();
561  }
562 
563  return self::$scopeStack;
564  }
565 }
DeferredUpdates\jobify
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Enqueue an update as a job in the job queue system and catch/log any exceptions.
Definition: DeferredUpdates.php:460
DeferredUpdatesScopeStack\ascend
ascend()
Pop the innermost scope from the stack.
Definition: DeferredUpdatesScopeStack.php:62
DeferredUpdates\$preventOpportunisticUpdates
static int $preventOpportunisticUpdates
Nesting level for preventOpportunisticUpdates()
Definition: DeferredUpdates.php:90
Wikimedia\Rdbms\ILBFactory\commitPrimaryChanges
commitPrimaryChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all primary connections.
Wikimedia\Rdbms\LoadBalancer\forEachOpenPrimaryConnection
forEachOpenPrimaryConnection( $callback, array $params=[])
Call a function with each open connection object to a primary.
Definition: LoadBalancer.php:2349
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:203
DeferredUpdates\attemptUpdate
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
Definition: DeferredUpdates.php:502
Wikimedia\Rdbms\ILBFactory\beginPrimaryChanges
beginPrimaryChanges( $fname=__METHOD__)
Flush any primary transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
DeferrableUpdate\doUpdate
doUpdate()
Perform the actual work.
DeferredUpdates\$scopeStack
static DeferredUpdatesScopeStack null $scopeStack
Queue states based on recursion level.
Definition: DeferredUpdates.php:85
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
Definition: DeferredUpdates.php:125
DeferredUpdates\clearPendingUpdates
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
Definition: DeferredUpdates.php:372
DeferrableCallback
Callback wrapper that has an originating method.
Definition: DeferrableCallback.php:10
TransactionRoundAwareUpdate
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Definition: TransactionRoundAwareUpdate.php:11
DataUpdate
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:30
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
DeferredUpdatesScopeStack\descend
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Definition: DeferredUpdatesScopeStack.php:50
EnqueueableDataUpdate
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Definition: EnqueueableDataUpdate.php:12
DeferredUpdatesScopeStack
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
Definition: DeferredUpdatesScopeStack.php:28
Wikimedia\Rdbms\LBFactory\rollbackPrimaryChanges
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
Definition: LBFactory.php:335
MWExceptionHandler\logException
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Definition: MWExceptionHandler.php:700
MediaWiki\Logger\LoggerFactory
PSR-3 logger instance factory.
Definition: LoggerFactory.php:45
DeferredUpdates
Class for managing the deferral of updates within the scope of a PHP script invocation.
Definition: DeferredUpdates.php:83
EnqueueableDataUpdate\getAsJobSpecification
getAsJobSpecification()
DeferredUpdates\getScopeStack
static getScopeStack()
Definition: DeferredUpdates.php:558
Wikimedia\Rdbms\LoadBalancer
Database connection, tracking, load balancing, and transaction manager for a cluster.
Definition: LoadBalancer.php:43
Wikimedia\Rdbms\ILBFactory\hasTransactionRound
hasTransactionRound()
Check if an explicit transaction round is active.
Wikimedia\Rdbms\ILBFactory\rollbackPrimaryChanges
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
DeferredUpdates\tryOpportunisticExecute
static tryOpportunisticExecute( $mode='run')
Consume and execute all pending updates unless an update is already in progress or the LBFactory serv...
Definition: DeferredUpdates.php:288
Wikimedia\Rdbms\IDatabase\explicitTrxActive
explicitTrxActive()
Check whether there is a transaction open at the specific request of a caller.
DeferredUpdates\getRecursiveExecutionStackDepth
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
Definition: DeferredUpdates.php:382
DeferredUpdates\areDatabaseTransactionsActive
static areDatabaseTransactionsActive()
Definition: DeferredUpdates.php:537
RequestContext\getMain
static getMain()
Get the RequestContext object associated with the main request.
Definition: RequestContext.php:484
DeferredUpdates\doUpdates
static doUpdates( $mode='run', $stage=self::ALL)
Consume and execute all pending updates.
Definition: DeferredUpdates.php:179
DeferredUpdates\pendingUpdatesCount
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
Definition: DeferredUpdates.php:344
MWCallableUpdate
Deferrable Update for closure/callback.
Definition: MWCallableUpdate.php:10
Wikimedia\Rdbms\DBTransactionError
Definition: DBTransactionError.php:29
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:114
DeferredUpdates\getPendingUpdates
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
Definition: DeferredUpdates.php:360
Wikimedia\Rdbms\LBFactory
An interface for generating database load balancers.
Definition: LBFactory.php:42
DeferrableUpdate
Interface that deferrable updates should implement.
Definition: DeferrableUpdate.php:11
ErrorPageError
An error page which can definitely be safely rendered using the OutputPage.
Definition: ErrorPageError.php:30
Wikimedia\Rdbms\ILBFactory\getEmptyTransactionTicket
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active on tracked load balancers.
DeferredUpdates\addCallableUpdate
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
Definition: DeferredUpdates.php:151
wfGetCaller
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Definition: GlobalFunctions.php:1344
DeferredUpdates\preventOpportunisticUpdates
static preventOpportunisticUpdates()
Prevent opportunistic updates until the returned ScopedCallback is consumed.
Definition: DeferredUpdates.php:328
Wikimedia\Rdbms\ILBFactory
An interface for generating database load balancers.
Definition: ILBFactory.php:33
DeferredUpdates\run
static run(DeferrableUpdate $update, ILBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run an update, and, if an error was thrown, catch/log it and enqueue the update as a job in the job q...
Definition: DeferredUpdates.php:397
$type
$type
Definition: testCompression.php:52
Wikimedia\Rdbms\IDatabase\writesOrCallbacksPending
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...