MediaWiki REL1_37
DeferredUpdates.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26use Psr\Log\LoggerInterface;
32
84 private static $scopeStack;
85
87 public const ALL = 0;
89 public const PRESEND = 1;
91 public const POSTSEND = 2;
92
94 public const STAGES = [ self::PRESEND, self::POSTSEND ];
95
97 private const BIG_QUEUE_SIZE = 100;
98
119 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
120 global $wgCommandLineMode;
121
122 self::getScopeStack()->current()->addUpdate( $update, $stage );
123 // If CLI mode is active and no RDBMs transaction round is in the way, then run all
124 // the pending updates now. This is needed for scripts that never, or rarely, use the
125 // RDBMs layer, but that do modify systems via deferred updates. This logic avoids
126 // excessive pending update queue sizes when long-running scripts never trigger the
127 // basic RDBMs hooks for running pending updates.
128 if ( $wgCommandLineMode ) {
129 self::tryOpportunisticExecute( 'run' );
130 }
131 }
132
145 public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
146 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
147 }
148
173 public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
174 $services = MediaWikiServices::getInstance();
175 $stats = $services->getStatsdDataFactory();
176 $lbf = $services->getDBLoadBalancerFactory();
177 $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
178 $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
179 ? 'cli'
180 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
181
183 $guiError = null;
185 $exception = null;
186
187 $scope = self::getScopeStack()->current();
188
189 // T249069: recursion is not possible once explicit transaction rounds are involved
190 $activeUpdate = $scope->getActiveUpdate();
191 if ( $activeUpdate ) {
192 $class = get_class( $activeUpdate );
193 if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
194 throw new LogicException(
195 __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
196 );
197 }
198 if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
199 throw new LogicException(
200 __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
201 );
202 }
203 }
204
205 $scope->processUpdates(
206 $stage,
207 function ( DeferrableUpdate $update, $activeStage )
208 use ( $mode, $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
209 {
210 // If applicable, just enqueue the update as a job in the job queue system
211 if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
212 self::jobify( $update, $lbf, $logger, $stats, $httpMethod );
213
214 return;
215 }
216
217 // Otherwise, run the update....
218 $scopeStack = self::getScopeStack();
219 $childScope = $scopeStack->descend( $activeStage, $update );
220 try {
221 $e = self::run( $update, $lbf, $logger, $stats, $httpMethod );
222 $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
223 $exception = $exception ?: $e;
224 // Any addUpdate() calls between descend() and ascend() used the sub-queue.
225 // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
226 // doUpdates() itself. In any case, process remaining updates in the subqueue.
227 // them, enqueueing them, or transferring them to the parent scope
228 // queues as appropriate...
229 $childScope->processUpdates(
230 $activeStage,
231 function ( DeferrableUpdate $subUpdate )
232 use ( $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
233 {
234 $e = self::run( $subUpdate, $lbf, $logger, $stats, $httpMethod );
235 $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
236 $exception = $exception ?: $e;
237 }
238 );
239 } finally {
241 }
242 }
243 );
244
245 // VW-style hack to work around T190178, so we can make sure
246 // PageMetaDataUpdater doesn't throw exceptions.
247 if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
248 throw $exception;
249 }
250
251 // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
252 // callers should check permissions *before* enqueueing updates. If the main transaction
253 // round actions succeed but some deferred updates fail due to permissions errors then
254 // there is a risk that some secondary data was not properly updated.
255 if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
256 throw $guiError;
257 }
258 }
259
282 public static function tryOpportunisticExecute( $mode = 'run' ) {
283 // Leave execution up to the current loop if an update is already in progress
284 if ( self::getRecursiveExecutionStackDepth() ) {
285 return false;
286 }
287
288 // Run the updates for this context if they will have outer transaction scope
289 if ( !self::areDatabaseTransactionsActive() ) {
290 self::doUpdates( $mode, self::ALL );
291
292 return true;
293 }
294
295 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
296 // There are a large number of pending updates and none of them can run yet.
297 // The odds of losing updates due to an error increase when executing long queues
298 // and when large amounts of time pass while tasks are queued. Mitigate this by
299 // trying to migrate updates to the job queue system (where applicable).
300 self::getScopeStack()->current()->consumeMatchingUpdates(
301 self::ALL,
302 EnqueueableDataUpdate::class,
303 static function ( EnqueueableDataUpdate $update ) {
304 $spec = $update->getAsJobSpecification();
305 JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
306 }
307 );
308 }
309
310 return false;
311 }
312
322 public static function pendingUpdatesCount() {
323 return self::getScopeStack()->current()->pendingUpdatesCount();
324 }
325
338 public static function getPendingUpdates( $stage = self::ALL ) {
339 return self::getScopeStack()->current()->getPendingUpdates( $stage );
340 }
341
350 public static function clearPendingUpdates() {
351 self::getScopeStack()->current()->clearPendingUpdates();
352 }
353
360 public static function getRecursiveExecutionStackDepth() {
361 return self::getScopeStack()->getRecursiveDepth();
362 }
363
375 private static function run(
376 DeferrableUpdate $update,
377 ILBFactory $lbFactory,
378 LoggerInterface $logger,
379 StatsdDataFactoryInterface $stats,
380 $httpMethod
381 ): ?Throwable {
382 $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
383 $type = get_class( $update ) . $suffix;
384 $stats->increment( "deferred_updates.$httpMethod.$type" );
385
386 $updateId = spl_object_id( $update );
387 $logger->debug( __METHOD__ . ": started $type #$updateId" );
388 $startTime = microtime( true );
389 $e = null;
390 try {
391 self::attemptUpdate( $update, $lbFactory );
392
393 return null;
394 } catch ( Throwable $e ) {
395 } finally {
396 $executionTime = microtime( true ) - $startTime;
397 $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $executionTime" );
398 }
399
401 $logger->error(
402 "Deferred update '{deferred_type}' failed to run.",
403 [
404 'deferred_type' => $type,
405 'exception' => $e,
406 ]
407 );
408
409 $lbFactory->rollbackPrimaryChanges( __METHOD__ );
410
411 // Try to push the update as a job so it can run later if possible
412 if ( $update instanceof EnqueueableDataUpdate ) {
413 $jobEx = null;
414 try {
415 $spec = $update->getAsJobSpecification();
416 JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
417
418 return $e;
419 } catch ( Throwable $jobEx ) {
420 }
421
423 $logger->error(
424 "Deferred update '{deferred_type}' failed to enqueue as a job.",
425 [
426 'deferred_type' => $type,
427 'exception' => $jobEx,
428 ]
429 );
430
431 $lbFactory->rollbackPrimaryChanges( __METHOD__ );
432 }
433
434 return $e;
435 }
436
446 private static function jobify(
447 EnqueueableDataUpdate $update,
448 LBFactory $lbFactory,
449 LoggerInterface $logger,
450 StatsdDataFactoryInterface $stats,
451 $httpMethod
452 ) {
453 $type = get_class( $update );
454 $stats->increment( "deferred_updates.$httpMethod.$type" );
455
456 $jobEx = null;
457 try {
458 $spec = $update->getAsJobSpecification();
459 JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
460
461 return;
462 } catch ( Throwable $jobEx ) {
463 }
464
465 MWExceptionHandler::logException( $jobEx );
466 $logger->error(
467 "Deferred update '$type' failed to enqueue as a job.",
468 [
469 'deferred_type' => $type,
470 'exception' => $jobEx,
471 ]
472 );
473
474 $lbFactory->rollbackPrimaryChanges( __METHOD__ );
475 }
476
488 public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
489 $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
490 if ( !$ticket || $lbFactory->hasTransactionRound() ) {
491 throw new DBTransactionError( null, "A database transaction round is pending." );
492 }
493
494 if ( $update instanceof DataUpdate ) {
495 $update->setTransactionTicket( $ticket );
496 }
497
498 // Designate $update::doUpdate() as the write round owner
499 $fnameTrxOwner = ( $update instanceof DeferrableCallback )
500 ? $update->getOrigin()
501 : get_class( $update ) . '::doUpdate';
502 // Determine whether the write round will be explicit or implicit
503 $useExplicitTrxRound = !(
504 $update instanceof TransactionRoundAwareUpdate &&
505 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
506 );
507
508 // Flush any pending changes left over from an implicit transaction round
509 if ( $useExplicitTrxRound ) {
510 $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
511 } else {
512 $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
513 }
514 // Run the update after any stale primary DB view snapshots have been flushed
515 $update->doUpdate();
516 // Commit any pending changes from the explicit or implicit transaction round
517 $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
518 }
519
523 private static function areDatabaseTransactionsActive() {
524 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
525 if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
526 return true;
527 }
528
529 $connsBusy = false;
530 $lbFactory->forEachLB( static function ( LoadBalancer $lb ) use ( &$connsBusy ) {
531 $lb->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( &$connsBusy ) {
532 if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
533 $connsBusy = true;
534 }
535 } );
536 } );
537
538 return $connsBusy;
539 }
540
544 private static function getScopeStack() {
545 if ( self::$scopeStack === null ) {
546 self::$scopeStack = new DeferredUpdatesScopeStack();
547 }
548
549 return self::$scopeStack;
550 }
551}
global $wgCommandLineMode
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Abstract base class for update jobs that do something with some secondary data extracted from article...
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
ascend()
Pop the innermost scope from the stack.
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Class for managing the deferral of updates within the scope of a PHP script invocation.
static doUpdates( $mode='run', $stage=self::ALL)
Consume and execute all pending updates.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
static run(DeferrableUpdate $update, ILBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run an update, and, if an error was thrown, catch/log it and enqueue the update as a job in the job q...
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Enqueue an update as a job in the job queue system and catch/log any exceptions.
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
static tryOpportunisticExecute( $mode='run')
Consume and execute all pending updates unless an update is already in progress or the LBFactory serv...
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
static DeferredUpdatesScopeStack null $scopeStack
Queue states based on recursion level.
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
static singleton( $domain=false)
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
PSR-3 logger instance factory.
MediaWikiServices is the service locator for the application scope of MediaWiki.
An interface for generating database load balancers.
Definition LBFactory.php:42
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
Database connection, tracking, load balancing, and transaction manager for a cluster.
forEachOpenPrimaryConnection( $callback, array $params=[])
Call a function with each open connection object to a primary.
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
An interface for generating database load balancers.
beginPrimaryChanges( $fname=__METHOD__)
Flush any primary transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
commitPrimaryChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all primary connections.
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
hasTransactionRound()
Check if an explicit transaction round is active.
forEachLB( $callback, array $params=[])
Execute a function for each instantiated tracked load balancer instance.
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active on tracked load balancers.
isReadyForRoundOperations()
Check if transaction rounds can be started, committed, or rolled back right now.