MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26 use Psr\Log\LoggerInterface;
32 
84  private static $scopeStack;
85 
87  public const ALL = 0;
89  public const PRESEND = 1;
91  public const POSTSEND = 2;
92 
94  public const STAGES = [ self::PRESEND, self::POSTSEND ];
95 
97  private const BIG_QUEUE_SIZE = 100;
98 
119  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
120  global $wgCommandLineMode;
121 
122  self::getScopeStack()->current()->addUpdate( $update, $stage );
123  // If CLI mode is active and no RDBMs transaction round is in the way, then run all
124  // the pending updates now. This is needed for scripts that never, or rarely, use the
125  // RDBMs layer, but that do modify systems via deferred updates. This logic avoids
126  // excessive pending update queue sizes when long-running scripts never trigger the
127  // basic RDBMs hooks for running pending updates.
128  if ( $wgCommandLineMode ) {
130  }
131  }
132 
145  public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
146  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
147  }
148 
173  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
174  $services = MediaWikiServices::getInstance();
175  $stats = $services->getStatsdDataFactory();
176  $lbf = $services->getDBLoadBalancerFactory();
177  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
178  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
179  ? 'cli'
180  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
181 
183  $guiError = null;
185  $exception = null;
186 
187  $scope = self::getScopeStack()->current();
188 
189  // T249069: recursion is not possible once explicit transaction rounds are involved
190  $activeUpdate = $scope->getActiveUpdate();
191  if ( $activeUpdate ) {
192  $class = get_class( $activeUpdate );
193  if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
194  throw new LogicException(
195  __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
196  );
197  }
198  if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
199  throw new LogicException(
200  __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
201  );
202  }
203  }
204 
205  $scope->processUpdates(
206  $stage,
207  function ( DeferrableUpdate $update, $activeStage )
208  use ( $mode, $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
209  {
210  // If applicable, just enqueue the update as a job in the job queue system
211  if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
212  self::jobify( $update, $lbf, $logger, $stats, $httpMethod );
213 
214  return;
215  }
216 
217  // Otherwise, run the update....
219  $childScope = $scopeStack->descend( $activeStage, $update );
220  try {
221  $e = self::run( $update, $lbf, $logger, $stats, $httpMethod );
222  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
223  $exception = $exception ?: $e;
224  // Any addUpdate() calls between descend() and ascend() used the sub-queue.
225  // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
226  // doUpdates() itself. In any case, process remaining updates in the subqueue.
227  // them, enqueueing them, or transferring them to the parent scope
228  // queues as appropriate...
229  $childScope->processUpdates(
230  $activeStage,
231  function ( DeferrableUpdate $subUpdate )
232  use ( $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
233  {
234  $e = self::run( $subUpdate, $lbf, $logger, $stats, $httpMethod );
235  $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
236  $exception = $exception ?: $e;
237  }
238  );
239  } finally {
240  $scopeStack->ascend();
241  }
242  }
243  );
244 
245  // VW-style hack to work around T190178, so we can make sure
246  // PageMetaDataUpdater doesn't throw exceptions.
247  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
248  throw $exception;
249  }
250 
251  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
252  // callers should check permissions *before* enqueueing updates. If the main transaction
253  // round actions succeed but some deferred updates fail due to permissions errors then
254  // there is a risk that some secondary data was not properly updated.
255  if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
256  throw $guiError;
257  }
258  }
259 
282  public static function tryOpportunisticExecute( $mode = 'run' ) {
283  // Leave execution up to the current loop if an update is already in progress
284  if ( self::getRecursiveExecutionStackDepth() ) {
285  return false;
286  }
287 
288  // Run the updates for this context if they will have outer transaction scope
289  if ( !self::areDatabaseTransactionsActive() ) {
290  self::doUpdates( $mode, self::ALL );
291 
292  return true;
293  }
294 
295  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
296  // There are a large number of pending updates and none of them can run yet.
297  // The odds of losing updates due to an error increase when executing long queues
298  // and when large amounts of time pass while tasks are queued. Mitigate this by
299  // trying to migrate updates to the job queue system (where applicable).
300  self::getScopeStack()->current()->consumeMatchingUpdates(
301  self::ALL,
302  EnqueueableDataUpdate::class,
303  static function ( EnqueueableDataUpdate $update ) {
304  $spec = $update->getAsJobSpecification();
305  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
306  }
307  );
308  }
309 
310  return false;
311  }
312 
322  public static function pendingUpdatesCount() {
323  return self::getScopeStack()->current()->pendingUpdatesCount();
324  }
325 
338  public static function getPendingUpdates( $stage = self::ALL ) {
339  return self::getScopeStack()->current()->getPendingUpdates( $stage );
340  }
341 
350  public static function clearPendingUpdates() {
351  self::getScopeStack()->current()->clearPendingUpdates();
352  }
353 
360  public static function getRecursiveExecutionStackDepth() {
361  return self::getScopeStack()->getRecursiveDepth();
362  }
363 
375  private static function run(
376  DeferrableUpdate $update,
377  ILBFactory $lbFactory,
378  LoggerInterface $logger,
379  StatsdDataFactoryInterface $stats,
380  $httpMethod
381  ): ?Throwable {
382  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
383  $type = get_class( $update ) . $suffix;
384  $stats->increment( "deferred_updates.$httpMethod.$type" );
385 
386  $updateId = spl_object_id( $update );
387  $logger->debug( __METHOD__ . ": started $type #$updateId" );
388  $startTime = microtime( true );
389  $e = null;
390  try {
391  self::attemptUpdate( $update, $lbFactory );
392 
393  return null;
394  } catch ( Throwable $e ) {
395  } finally {
396  $executionTime = microtime( true ) - $startTime;
397  $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $executionTime" );
398  }
399 
401  $logger->error(
402  "Deferred update '{deferred_type}' failed to run.",
403  [
404  'deferred_type' => $type,
405  'exception' => $e,
406  ]
407  );
408 
409  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
410 
411  // Try to push the update as a job so it can run later if possible
412  if ( $update instanceof EnqueueableDataUpdate ) {
413  $jobEx = null;
414  try {
415  $spec = $update->getAsJobSpecification();
416  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
417 
418  return $e;
419  } catch ( Throwable $jobEx ) {
420  }
421 
423  $logger->error(
424  "Deferred update '{deferred_type}' failed to enqueue as a job.",
425  [
426  'deferred_type' => $type,
427  'exception' => $jobEx,
428  ]
429  );
430 
431  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
432  }
433 
434  return $e;
435  }
436 
446  private static function jobify(
447  EnqueueableDataUpdate $update,
448  LBFactory $lbFactory,
449  LoggerInterface $logger,
450  StatsdDataFactoryInterface $stats,
451  $httpMethod
452  ) {
453  $type = get_class( $update );
454  $stats->increment( "deferred_updates.$httpMethod.$type" );
455 
456  $jobEx = null;
457  try {
458  $spec = $update->getAsJobSpecification();
459  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
460 
461  return;
462  } catch ( Throwable $jobEx ) {
463  }
464 
466  $logger->error(
467  "Deferred update '$type' failed to enqueue as a job.",
468  [
469  'deferred_type' => $type,
470  'exception' => $jobEx,
471  ]
472  );
473 
474  $lbFactory->rollbackPrimaryChanges( __METHOD__ );
475  }
476 
488  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
489  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
490  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
491  throw new DBTransactionError( null, "A database transaction round is pending." );
492  }
493 
494  if ( $update instanceof DataUpdate ) {
495  $update->setTransactionTicket( $ticket );
496  }
497 
498  // Designate $update::doUpdate() as the write round owner
499  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
500  ? $update->getOrigin()
501  : get_class( $update ) . '::doUpdate';
502  // Determine whether the write round will be explicit or implicit
503  $useExplicitTrxRound = !(
504  $update instanceof TransactionRoundAwareUpdate &&
505  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
506  );
507 
508  // Flush any pending changes left over from an implicit transaction round
509  if ( $useExplicitTrxRound ) {
510  $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
511  } else {
512  $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
513  }
514  // Run the update after any stale primary DB view snapshots have been flushed
515  $update->doUpdate();
516  // Commit any pending changes from the explicit or implicit transaction round
517  $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
518  }
519 
523  private static function areDatabaseTransactionsActive() {
524  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
525  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
526  return true;
527  }
528 
529  $connsBusy = false;
530  $lbFactory->forEachLB( static function ( LoadBalancer $lb ) use ( &$connsBusy ) {
531  $lb->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( &$connsBusy ) {
532  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
533  $connsBusy = true;
534  }
535  } );
536  } );
537 
538  return $connsBusy;
539  }
540 
544  private static function getScopeStack() {
545  if ( self::$scopeStack === null ) {
546  self::$scopeStack = new DeferredUpdatesScopeStack();
547  }
548 
549  return self::$scopeStack;
550  }
551 }
DeferredUpdates\jobify
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Enqueue an update as a job in the job queue system and catch/log any exceptions.
Definition: DeferredUpdates.php:446
DeferredUpdatesScopeStack\ascend
ascend()
Pop the innermost scope from the stack.
Definition: DeferredUpdatesScopeStack.php:62
Wikimedia\Rdbms\ILBFactory\commitPrimaryChanges
commitPrimaryChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all primary connections.
Wikimedia\Rdbms\LoadBalancer\forEachOpenPrimaryConnection
forEachOpenPrimaryConnection( $callback, array $params=[])
Call a function with each open connection object to a primary.
Definition: LoadBalancer.php:2338
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:200
DeferredUpdates\attemptUpdate
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
Definition: DeferredUpdates.php:488
Wikimedia\Rdbms\ILBFactory\beginPrimaryChanges
beginPrimaryChanges( $fname=__METHOD__)
Flush any primary transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
DeferrableUpdate\doUpdate
doUpdate()
Perform the actual work.
DeferredUpdates\$scopeStack
static DeferredUpdatesScopeStack null $scopeStack
Queue states based on recursion level.
Definition: DeferredUpdates.php:84
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
Definition: DeferredUpdates.php:119
DeferredUpdates\clearPendingUpdates
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
Definition: DeferredUpdates.php:350
DeferrableCallback
Callback wrapper that has an originating method.
Definition: DeferrableCallback.php:10
TransactionRoundAwareUpdate
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Definition: TransactionRoundAwareUpdate.php:11
DataUpdate
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:30
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
DeferredUpdatesScopeStack\descend
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Definition: DeferredUpdatesScopeStack.php:50
EnqueueableDataUpdate
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Definition: EnqueueableDataUpdate.php:12
DeferredUpdatesScopeStack
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
Definition: DeferredUpdatesScopeStack.php:28
Wikimedia\Rdbms\LBFactory\rollbackPrimaryChanges
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
Definition: LBFactory.php:335
MWExceptionHandler\logException
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Definition: MWExceptionHandler.php:700
MediaWiki\Logger\LoggerFactory
PSR-3 logger instance factory.
Definition: LoggerFactory.php:45
$wgCommandLineMode
global $wgCommandLineMode
Definition: DevelopmentSettings.php:29
DeferredUpdates
Class for managing the deferral of updates within the scope of a PHP script invocation.
Definition: DeferredUpdates.php:82
EnqueueableDataUpdate\getAsJobSpecification
getAsJobSpecification()
DeferredUpdates\getScopeStack
static getScopeStack()
Definition: DeferredUpdates.php:544
Wikimedia\Rdbms\LoadBalancer
Database connection, tracking, load balancing, and transaction manager for a cluster.
Definition: LoadBalancer.php:43
Wikimedia\Rdbms\ILBFactory\hasTransactionRound
hasTransactionRound()
Check if an explicit transaction round is active.
Wikimedia\Rdbms\ILBFactory\rollbackPrimaryChanges
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
DeferredUpdates\tryOpportunisticExecute
static tryOpportunisticExecute( $mode='run')
Consume and execute all pending updates unless an update is already in progress or the LBFactory serv...
Definition: DeferredUpdates.php:282
Wikimedia\Rdbms\IDatabase\explicitTrxActive
explicitTrxActive()
DeferredUpdates\getRecursiveExecutionStackDepth
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
Definition: DeferredUpdates.php:360
DeferredUpdates\areDatabaseTransactionsActive
static areDatabaseTransactionsActive()
Definition: DeferredUpdates.php:523
RequestContext\getMain
static getMain()
Get the RequestContext object associated with the main request.
Definition: RequestContext.php:484
DeferredUpdates\doUpdates
static doUpdates( $mode='run', $stage=self::ALL)
Consume and execute all pending updates.
Definition: DeferredUpdates.php:173
DeferredUpdates\pendingUpdatesCount
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
Definition: DeferredUpdates.php:322
MWCallableUpdate
Deferrable Update for closure/callback.
Definition: MWCallableUpdate.php:10
Wikimedia\Rdbms\DBTransactionError
@newable
Definition: DBTransactionError.php:29
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:114
DeferredUpdates\getPendingUpdates
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
Definition: DeferredUpdates.php:338
Wikimedia\Rdbms\LBFactory
An interface for generating database load balancers.
Definition: LBFactory.php:42
DeferrableUpdate
Interface that deferrable updates should implement.
Definition: DeferrableUpdate.php:11
ErrorPageError
An error page which can definitely be safely rendered using the OutputPage.
Definition: ErrorPageError.php:30
Wikimedia\Rdbms\ILBFactory\getEmptyTransactionTicket
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active on tracked load balancers.
DeferredUpdates\addCallableUpdate
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
Definition: DeferredUpdates.php:145
wfGetCaller
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Definition: GlobalFunctions.php:1352
Wikimedia\Rdbms\ILBFactory
An interface for generating database load balancers.
Definition: ILBFactory.php:33
DeferredUpdates\run
static run(DeferrableUpdate $update, ILBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run an update, and, if an error was thrown, catch/log it and enqueue the update as a job in the job q...
Definition: DeferredUpdates.php:375
$type
$type
Definition: testCompression.php:52
Wikimedia\Rdbms\IDatabase\writesOrCallbacksPending
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...