MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26 use Psr\Log\LoggerInterface;
32 
84  private static $scopeStack;
85 
87  public const ALL = 0;
89  public const PRESEND = 1;
91  public const POSTSEND = 2;
92 
94  public const STAGES = [ self::PRESEND, self::POSTSEND ];
95 
97  private const BIG_QUEUE_SIZE = 100;
98 
119  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
120  global $wgCommandLineMode;
121 
122  self::getScopeStack()->current()->addUpdate( $update, $stage );
123  // If CLI mode is active and no RDBMs transaction round is in the way, then run all
124  // the pending updates now. This is needed for scripts that never, or rarely, use the
125  // RDBMs layer, but that do modify systems via deferred updates. This logic avoids
126  // excessive pending update queue sizes when long-running scripts never trigger the
127  // basic RDBMs hooks for running pending updates.
128  if ( $wgCommandLineMode ) {
130  }
131  }
132 
145  public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
146  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
147  }
148 
173  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
174  $services = MediaWikiServices::getInstance();
175  $stats = $services->getStatsdDataFactory();
176  $lbf = $services->getDBLoadBalancerFactory();
177  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
178  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
179  ? 'cli'
180  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
181 
183  $guiError = null;
185  $exception = null;
186 
187  $scope = self::getScopeStack()->current();
188 
189  // T249069: recursion is not possible once explicit transaction rounds are involved
190  $activeUpdate = $scope->getActiveUpdate();
191  if ( $activeUpdate ) {
192  $class = get_class( $activeUpdate );
193  if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
194  throw new LogicException(
195  __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
196  );
197  }
198  if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
199  throw new LogicException(
200  __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
201  );
202  }
203  }
204 
205  $scope->processUpdates(
206  $stage,
207  function ( DeferrableUpdate $update, $activeStage )
208  use ( $mode, $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
209  {
210  // If applicable, just enqueue the update as a job in the job queue system
211  if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
212  self::jobify( $update, $lbf, $logger, $stats, $httpMethod );
213 
214  return;
215  }
216 
217  // Otherwise, run the update....
219  $childScope = $scopeStack->descend( $activeStage, $update );
220  try {
221  $e = self::run( $update, $lbf, $logger, $stats, $httpMethod );
222  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
223  $exception = $exception ?: $e;
224  // Any addUpdate() calls between descend() and ascend() used the sub-queue.
225  // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
226  // doUpdates() itself. In any case, process remaining updates in the subqueue.
227  // them, enqueueing them, or transferring them to the parent scope
228  // queues as appropriate...
229  $childScope->processUpdates(
230  $activeStage,
231  function ( DeferrableUpdate $subUpdate )
232  use ( $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
233  {
234  $e = self::run( $subUpdate, $lbf, $logger, $stats, $httpMethod );
235  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
236  $exception = $exception ?: $e;
237  }
238  );
239  } finally {
240  $scopeStack->ascend();
241  }
242  }
243  );
244 
245  // VW-style hack to work around T190178, so we can make sure
246  // PageMetaDataUpdater doesn't throw exceptions.
247  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
248  throw $exception;
249  }
250 
251  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
252  // callers should check permissions *before* enqueueing updates. If the main transaction
253  // round actions succeed but some deferred updates fail due to permissions errors then
254  // there is a risk that some secondary data was not properly updated.
255  if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
256  throw $guiError;
257  }
258  }
259 
282  public static function tryOpportunisticExecute( $mode = 'run' ) {
283  // Leave execution up to the current loop if an update is already in progress
284  if ( self::getRecursiveExecutionStackDepth() ) {
285  return false;
286  }
287 
288  // Run the updates for this context if they will have outer transaction scope
289  if ( !self::areDatabaseTransactionsActive() ) {
290  self::doUpdates( $mode, self::ALL );
291 
292  return true;
293  }
294 
295  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
296  // There are a large number of pending updates and none of them can run yet.
297  // The odds of losing updates due to an error increase when executing long queues
298  // and when large amounts of time pass while tasks are queued. Mitigate this by
299  // trying to migrate updates to the job queue system (where applicable).
300  self::getScopeStack()->current()->consumeMatchingUpdates(
301  self::ALL,
302  EnqueueableDataUpdate::class,
303  static function ( EnqueueableDataUpdate $update ) {
304  $spec = $update->getAsJobSpecification();
305  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
306  }
307  );
308  }
309 
310  return false;
311  }
312 
322  public static function pendingUpdatesCount() {
323  return self::getScopeStack()->current()->pendingUpdatesCount();
324  }
325 
338  public static function getPendingUpdates( $stage = self::ALL ) {
339  return self::getScopeStack()->current()->getPendingUpdates( $stage );
340  }
341 
350  public static function clearPendingUpdates() {
351  self::getScopeStack()->current()->clearPendingUpdates();
352  }
353 
360  public static function getRecursiveExecutionStackDepth() {
361  return self::getScopeStack()->getRecursiveDepth();
362  }
363 
375  private static function run(
376  DeferrableUpdate $update,
377  ILBFactory $lbFactory,
378  LoggerInterface $logger,
379  StatsdDataFactoryInterface $stats,
380  $httpMethod
381  ) : ?Throwable {
382  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
383  $type = get_class( $update ) . $suffix;
384  $stats->increment( "deferred_updates.$httpMethod.$type" );
385 
386  $updateId = spl_object_id( $update );
387  $logger->debug( __METHOD__ . ": started $type #$updateId" );
388  $e = null;
389  try {
390  self::attemptUpdate( $update, $lbFactory );
391 
392  return null;
393  } catch ( Throwable $e ) {
394  } finally {
395  $logger->debug( __METHOD__ . ": ended $type #$updateId" );
396  }
397 
399  $logger->error(
400  "Deferred update '{deferred_type}' failed to run.",
401  [
402  'deferred_type' => $type,
403  'exception' => $e,
404  ]
405  );
406 
407  $lbFactory->rollbackMasterChanges( __METHOD__ );
408 
409  // Try to push the update as a job so it can run later if possible
410  if ( $update instanceof EnqueueableDataUpdate ) {
411  $jobEx = null;
412  try {
413  $spec = $update->getAsJobSpecification();
414  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
415 
416  return $e;
417  } catch ( Throwable $jobEx ) {
418  }
419 
421  $logger->error(
422  "Deferred update '{deferred_type}' failed to enqueue as a job.",
423  [
424  'deferred_type' => $type,
425  'exception' => $jobEx,
426  ]
427  );
428 
429  $lbFactory->rollbackMasterChanges( __METHOD__ );
430  }
431 
432  return $e;
433  }
434 
444  private static function jobify(
445  EnqueueableDataUpdate $update,
446  LBFactory $lbFactory,
447  LoggerInterface $logger,
448  StatsdDataFactoryInterface $stats,
449  $httpMethod
450  ) {
451  $type = get_class( $update );
452  $stats->increment( "deferred_updates.$httpMethod.$type" );
453 
454  $jobEx = null;
455  try {
456  $spec = $update->getAsJobSpecification();
457  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
458 
459  return;
460  } catch ( Throwable $jobEx ) {
461  }
462 
464  $logger->error(
465  "Deferred update '$type' failed to enqueue as a job.",
466  [
467  'deferred_type' => $type,
468  'exception' => $jobEx,
469  ]
470  );
471 
472  $lbFactory->rollbackMasterChanges( __METHOD__ );
473  }
474 
486  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
487  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
488  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
489  throw new DBTransactionError( null, "A database transaction round is pending." );
490  }
491 
492  if ( $update instanceof DataUpdate ) {
493  $update->setTransactionTicket( $ticket );
494  }
495 
496  // Designate $update::doUpdate() as the write round owner
497  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
498  ? $update->getOrigin()
499  : get_class( $update ) . '::doUpdate';
500  // Determine whether the write round will be explicit or implicit
501  $useExplicitTrxRound = !(
502  $update instanceof TransactionRoundAwareUpdate &&
503  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
504  );
505 
506  // Flush any pending changes left over from an implicit transaction round
507  if ( $useExplicitTrxRound ) {
508  $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
509  } else {
510  $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
511  }
512  // Run the update after any stale master view snapshots have been flushed
513  $update->doUpdate();
514  // Commit any pending changes from the explicit or implicit transaction round
515  $lbFactory->commitMasterChanges( $fnameTrxOwner );
516  }
517 
521  private static function areDatabaseTransactionsActive() {
522  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
523  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
524  return true;
525  }
526 
527  $connsBusy = false;
528  $lbFactory->forEachLB( static function ( LoadBalancer $lb ) use ( &$connsBusy ) {
529  $lb->forEachOpenMasterConnection( static function ( IDatabase $conn ) use ( &$connsBusy ) {
530  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
531  $connsBusy = true;
532  }
533  } );
534  } );
535 
536  return $connsBusy;
537  }
538 
542  private static function getScopeStack() {
543  if ( self::$scopeStack === null ) {
544  self::$scopeStack = new DeferredUpdatesScopeStack();
545  }
546 
547  return self::$scopeStack;
548  }
549 }
DeferredUpdates\jobify
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Enqueue an update as a job in the job queue system and catch/log any exceptions.
Definition: DeferredUpdates.php:444
DeferredUpdatesScopeStack\ascend
ascend()
Pop the innermost scope from the stack.
Definition: DeferredUpdatesScopeStack.php:62
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:173
Wikimedia\Rdbms\ILBFactory\rollbackMasterChanges
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
DeferredUpdates\attemptUpdate
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
Definition: DeferredUpdates.php:486
DeferrableUpdate\doUpdate
doUpdate()
Perform the actual work.
DeferredUpdates\$scopeStack
static DeferredUpdatesScopeStack null $scopeStack
Queue states based on recursion level.
Definition: DeferredUpdates.php:84
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
Definition: DeferredUpdates.php:119
Wikimedia\Rdbms\LoadBalancer\forEachOpenMasterConnection
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
Definition: LoadBalancer.php:2249
DeferredUpdates\clearPendingUpdates
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
Definition: DeferredUpdates.php:350
DeferrableCallback
Callback wrapper that has an originating method.
Definition: DeferrableCallback.php:10
TransactionRoundAwareUpdate
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Definition: TransactionRoundAwareUpdate.php:11
DataUpdate
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:30
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
DeferredUpdatesScopeStack\descend
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Definition: DeferredUpdatesScopeStack.php:50
EnqueueableDataUpdate
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Definition: EnqueueableDataUpdate.php:12
DeferredUpdatesScopeStack
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
Definition: DeferredUpdatesScopeStack.php:28
MWExceptionHandler\logException
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Definition: MWExceptionHandler.php:666
MediaWiki\Logger\LoggerFactory
PSR-3 logger instance factory.
Definition: LoggerFactory.php:45
$wgCommandLineMode
global $wgCommandLineMode
Definition: DevelopmentSettings.php:29
DeferredUpdates
Class for managing the deferral of updates within the scope of a PHP script invocation.
Definition: DeferredUpdates.php:82
EnqueueableDataUpdate\getAsJobSpecification
getAsJobSpecification()
DeferredUpdates\getScopeStack
static getScopeStack()
Definition: DeferredUpdates.php:542
Wikimedia\Rdbms\LoadBalancer
Database connection, tracking, load balancing, and transaction manager for a cluster.
Definition: LoadBalancer.php:42
Wikimedia\Rdbms\ILBFactory\hasTransactionRound
hasTransactionRound()
Check if an explicit transaction round is active.
DeferredUpdates\tryOpportunisticExecute
static tryOpportunisticExecute( $mode='run')
Consume and execute all pending updates unless an update is already in progress or the LBFactory serv...
Definition: DeferredUpdates.php:282
Wikimedia\Rdbms\IDatabase\explicitTrxActive
explicitTrxActive()
DeferredUpdates\getRecursiveExecutionStackDepth
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
Definition: DeferredUpdates.php:360
DeferredUpdates\areDatabaseTransactionsActive
static areDatabaseTransactionsActive()
Definition: DeferredUpdates.php:521
RequestContext\getMain
static getMain()
Get the RequestContext object associated with the main request.
Definition: RequestContext.php:476
DeferredUpdates\doUpdates
static doUpdates( $mode='run', $stage=self::ALL)
Consume and execute all pending updates.
Definition: DeferredUpdates.php:173
DeferredUpdates\pendingUpdatesCount
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
Definition: DeferredUpdates.php:322
Wikimedia\Rdbms\ILBFactory\beginMasterChanges
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
MWCallableUpdate
Deferrable Update for closure/callback.
Definition: MWCallableUpdate.php:10
Wikimedia\Rdbms\DBTransactionError
@newable
Definition: DBTransactionError.php:29
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:70
Wikimedia\Rdbms\LBFactory\rollbackMasterChanges
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Definition: LBFactory.php:323
DeferredUpdates\getPendingUpdates
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
Definition: DeferredUpdates.php:338
Wikimedia\Rdbms\LBFactory
An interface for generating database load balancers.
Definition: LBFactory.php:41
Wikimedia\Rdbms\ILBFactory\commitMasterChanges
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
DeferrableUpdate
Interface that deferrable updates should implement.
Definition: DeferrableUpdate.php:11
ErrorPageError
An error page which can definitely be safely rendered using the OutputPage.
Definition: ErrorPageError.php:30
Wikimedia\Rdbms\ILBFactory\getEmptyTransactionTicket
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.
DeferredUpdates\addCallableUpdate
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
Definition: DeferredUpdates.php:145
wfGetCaller
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Definition: GlobalFunctions.php:1408
Wikimedia\Rdbms\ILBFactory
An interface for generating database load balancers.
Definition: ILBFactory.php:33
DeferredUpdates\run
static run(DeferrableUpdate $update, ILBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run an update, and, if an error was thrown, catch/log it and enqueue the update as a job in the job q...
Definition: DeferredUpdates.php:375
$type
$type
Definition: testCompression.php:52
Wikimedia\Rdbms\IDatabase\writesOrCallbacksPending
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...