MediaWiki REL1_40
DeferredUpdates.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27use Psr\Log\LoggerInterface;
31use Wikimedia\ScopedCallback;
32
84 private static $scopeStack;
85
89 private static $preventOpportunisticUpdates = 0;
90
92 public const ALL = 0;
94 public const PRESEND = 1;
96 public const POSTSEND = 2;
97
99 public const STAGES = [ self::PRESEND, self::POSTSEND ];
100
102 private const BIG_QUEUE_SIZE = 100;
103
124 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
125 $commandLineMode = MediaWikiServices::getInstance()->getMainConfig()->get( 'CommandLineMode' );
126
127 self::getScopeStack()->current()->addUpdate( $update, $stage );
128 // If CLI mode is active and no RDBMs transaction round is in the way, then run all
129 // the pending updates now. This is needed for scripts that never, or rarely, use the
130 // RDBMs layer, but that do modify systems via deferred updates. This logic avoids
131 // excessive pending update queue sizes when long-running scripts never trigger the
132 // basic RDBMs hooks for running pending updates.
133 if ( $commandLineMode ) {
134 self::tryOpportunisticExecute();
135 }
136 }
137
150 public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
151 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
152 }
153
172 public static function doUpdates( $stage = self::ALL ) {
173 $services = MediaWikiServices::getInstance();
174 $stats = $services->getStatsdDataFactory();
175 $lbf = $services->getDBLoadBalancerFactory();
176 $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
177 $jobQueueGroupFactory = $services->getJobQueueGroupFactory();
178 $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
179 ? 'cli'
180 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
181
183 $guiError = null;
185 $exception = null;
186
187 $scope = self::getScopeStack()->current();
188
189 // T249069: recursion is not possible once explicit transaction rounds are involved
190 $activeUpdate = $scope->getActiveUpdate();
191 if ( $activeUpdate ) {
192 $class = get_class( $activeUpdate );
193 if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
194 throw new LogicException(
195 __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
196 );
197 }
198 if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
199 throw new LogicException(
200 __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
201 );
202 }
203 }
204
205 $scope->processUpdates(
206 $stage,
207 function ( DeferrableUpdate $update, $activeStage )
208 use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
209 {
210 $scopeStack = self::getScopeStack();
211 $childScope = $scopeStack->descend( $activeStage, $update );
212 try {
213 $e = self::run( $update, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
214 $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
215 $exception = $exception ?: $e;
216 // Any addUpdate() calls between descend() and ascend() used the sub-queue.
217 // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
218 // doUpdates() itself. In any case, process remaining updates in the subqueue.
219 // them, enqueueing them, or transferring them to the parent scope
220 // queues as appropriate...
221 $childScope->processUpdates(
222 $activeStage,
223 function ( DeferrableUpdate $subUpdate )
224 use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
225 {
226 $e = self::run( $subUpdate, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
227 $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
228 $exception = $exception ?: $e;
229 }
230 );
231 } finally {
232 $scopeStack->ascend();
233 }
234 }
235 );
236
237 // VW-style hack to work around T190178, so we can make sure
238 // PageMetaDataUpdater doesn't throw exceptions.
239 if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
240 throw $exception;
241 }
242
243 // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
244 // callers should check permissions *before* enqueueing updates. If the main transaction
245 // round actions succeed but some deferred updates fail due to permissions errors then
246 // there is a risk that some secondary data was not properly updated.
247 if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
248 throw $guiError;
249 }
250 }
251
269 public static function tryOpportunisticExecute(): bool {
270 // Leave execution up to the current loop if an update is already in progress
271 // or if updates are explicitly disabled
273 || self::$preventOpportunisticUpdates
274 ) {
275 return false;
276 }
277
278 // Run the updates for this context if they will have outer transaction scope
279 if ( !self::areDatabaseTransactionsActive() ) {
280 self::doUpdates( self::ALL );
281
282 return true;
283 }
284
285 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
286 // There are a large number of pending updates and none of them can run yet.
287 // The odds of losing updates due to an error increase when executing long queues
288 // and when large amounts of time pass while tasks are queued. Mitigate this by
289 // trying to migrate updates to the job queue system (where applicable).
290 self::getScopeStack()->current()->consumeMatchingUpdates(
291 self::ALL,
292 EnqueueableDataUpdate::class,
293 static function ( EnqueueableDataUpdate $update ) {
294 $spec = $update->getAsJobSpecification();
295 MediaWikiServices::getInstance()->getJobQueueGroupFactory()
296 ->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
297 }
298 );
299 }
300
301 return false;
302 }
303
310 public static function preventOpportunisticUpdates() {
311 self::$preventOpportunisticUpdates++;
312 return new ScopedCallback( static function () {
313 self::$preventOpportunisticUpdates--;
314 } );
315 }
316
326 public static function pendingUpdatesCount() {
327 return self::getScopeStack()->current()->pendingUpdatesCount();
328 }
329
342 public static function getPendingUpdates( $stage = self::ALL ) {
343 return self::getScopeStack()->current()->getPendingUpdates( $stage );
344 }
345
354 public static function clearPendingUpdates() {
355 self::getScopeStack()->current()->clearPendingUpdates();
356 }
357
364 public static function getRecursiveExecutionStackDepth() {
365 return self::getScopeStack()->getRecursiveDepth();
366 }
367
380 private static function run(
381 DeferrableUpdate $update,
382 ILBFactory $lbFactory,
383 LoggerInterface $logger,
384 StatsdDataFactoryInterface $stats,
385 JobQueueGroupFactory $jobQueueGroupFactory,
386 $httpMethod
387 ): ?Throwable {
388 $suffix = $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '';
389 $type = get_class( $update ) . $suffix;
390 $stats->increment( "deferred_updates.$httpMethod.$type" );
391 $updateId = spl_object_id( $update );
392 $logger->debug( __METHOD__ . ": started $type #$updateId" );
393
394 $updateException = null;
395
396 $startTime = microtime( true );
397 try {
398 self::attemptUpdate( $update, $lbFactory );
399 } catch ( Throwable $updateException ) {
400 MWExceptionHandler::logException( $updateException );
401 $logger->error(
402 "Deferred update '{deferred_type}' failed to run.",
403 [
404 'deferred_type' => $type,
405 'exception' => $updateException,
406 ]
407 );
408 $lbFactory->rollbackPrimaryChanges( __METHOD__ );
409 } finally {
410 $walltime = microtime( true ) - $startTime;
411 $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" );
412 }
413
414 // Try to push the update as a job so it can run later if possible
415 if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
416 try {
417 $spec = $update->getAsJobSpecification();
418 $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
419 } catch ( Throwable $jobException ) {
420 MWExceptionHandler::logException( $jobException );
421 $logger->error(
422 "Deferred update '{deferred_type}' failed to enqueue as a job.",
423 [
424 'deferred_type' => $type,
425 'exception' => $jobException,
426 ]
427 );
428 $lbFactory->rollbackPrimaryChanges( __METHOD__ );
429 }
430 }
431
432 return $updateException;
433 }
434
446 public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
447 $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
448 if ( !$ticket || $lbFactory->hasTransactionRound() ) {
449 throw new DBTransactionError( null, "A database transaction round is pending." );
450 }
451
452 if ( $update instanceof DataUpdate ) {
453 $update->setTransactionTicket( $ticket );
454 }
455
456 // Designate $update::doUpdate() as the write round owner
457 $fnameTrxOwner = ( $update instanceof DeferrableCallback )
458 ? $update->getOrigin()
459 : get_class( $update ) . '::doUpdate';
460 // Determine whether the write round will be explicit or implicit
461 $useExplicitTrxRound = !(
462 $update instanceof TransactionRoundAwareUpdate &&
463 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
464 );
465
466 // Flush any pending changes left over from an implicit transaction round
467 if ( $useExplicitTrxRound ) {
468 $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
469 } else {
470 $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
471 }
472 // Run the update after any stale primary DB view snapshots have been flushed
473 $update->doUpdate();
474 // Commit any pending changes from the explicit or implicit transaction round
475 $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
476 }
477
481 private static function areDatabaseTransactionsActive() {
482 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
483 if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
484 return true;
485 }
486
487 foreach ( $lbFactory->getAllLBs() as $lb ) {
488 if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
489 return true;
490 }
491 }
492
493 return false;
494 }
495
499 private static function getScopeStack() {
500 if ( self::$scopeStack === null ) {
501 self::$scopeStack = new DeferredUpdatesScopeStack();
502 }
503
504 return self::$scopeStack;
505 }
506}
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Definition WebStart.php:88
Abstract base class for update jobs that do something with some secondary data extracted from article...
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
ascend()
Pop the innermost scope from the stack.
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Class for managing the deferral of updates within the scope of a PHP script invocation.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
static tryOpportunisticExecute()
Consume and execute all pending updates unless an update is already in progress or the ILBFactory ser...
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
static doUpdates( $stage=self::ALL)
Consume and execute all pending updates.
static preventOpportunisticUpdates()
Prevent opportunistic updates until the returned ScopedCallback is consumed.
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Class to construct JobQueueGroups.
PSR-3 logger instance factory.
Service locator for MediaWiki core services.
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
getEmptyTransactionTicket( $fname)
Get a token asserting that no write transactions are active on tracked connections.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
Manager of ILoadBalancer objects and, indirectly, IDatabase connections.
beginPrimaryChanges( $fname=__METHOD__)
Flush any primary transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
commitPrimaryChanges( $fname=__METHOD__, int $maxWriteDuration=0)
Commit changes and clear view snapshots on all primary connections.
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
getAllLBs()
Get all tracked load balancer instances (generator)
hasTransactionRound()
Check if an explicit transaction round is active.
isReadyForRoundOperations()
Check if transaction rounds can be started, committed, or rolled back right now.