MediaWiki REL1_39
DeferredUpdates.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27use Psr\Log\LoggerInterface;
31use Wikimedia\ScopedCallback;
32
84 private static $scopeStack;
85
89 private static $preventOpportunisticUpdates = 0;
90
92 public const ALL = 0;
94 public const PRESEND = 1;
96 public const POSTSEND = 2;
97
99 public const STAGES = [ self::PRESEND, self::POSTSEND ];
100
102 private const BIG_QUEUE_SIZE = 100;
103
124 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
125 $commandLineMode = MediaWikiServices::getInstance()->getMainConfig()->get( 'CommandLineMode' );
126
127 self::getScopeStack()->current()->addUpdate( $update, $stage );
128 // If CLI mode is active and no RDBMs transaction round is in the way, then run all
129 // the pending updates now. This is needed for scripts that never, or rarely, use the
130 // RDBMs layer, but that do modify systems via deferred updates. This logic avoids
131 // excessive pending update queue sizes when long-running scripts never trigger the
132 // basic RDBMs hooks for running pending updates.
133 if ( $commandLineMode ) {
134 self::tryOpportunisticExecute();
135 }
136 }
137
150 public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
151 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
152 }
153
173 public static function doUpdates( $unused = null, $stage = self::ALL ) {
174 $services = MediaWikiServices::getInstance();
175 $stats = $services->getStatsdDataFactory();
176 $lbf = $services->getDBLoadBalancerFactory();
177 $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
178 $jobQueueGroupFactory = $services->getJobQueueGroupFactory();
179 $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
180 ? 'cli'
181 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
182
184 $guiError = null;
186 $exception = null;
187
188 $scope = self::getScopeStack()->current();
189
190 // T249069: recursion is not possible once explicit transaction rounds are involved
191 $activeUpdate = $scope->getActiveUpdate();
192 if ( $activeUpdate ) {
193 $class = get_class( $activeUpdate );
194 if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
195 throw new LogicException(
196 __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
197 );
198 }
199 if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
200 throw new LogicException(
201 __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
202 );
203 }
204 }
205
206 $scope->processUpdates(
207 $stage,
208 function ( DeferrableUpdate $update, $activeStage )
209 use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
210 {
211 $scopeStack = self::getScopeStack();
212 $childScope = $scopeStack->descend( $activeStage, $update );
213 try {
214 $e = self::run( $update, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
215 $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
216 $exception = $exception ?: $e;
217 // Any addUpdate() calls between descend() and ascend() used the sub-queue.
218 // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
219 // doUpdates() itself. In any case, process remaining updates in the subqueue.
220 // them, enqueueing them, or transferring them to the parent scope
221 // queues as appropriate...
222 $childScope->processUpdates(
223 $activeStage,
224 function ( DeferrableUpdate $subUpdate )
225 use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
226 {
227 $e = self::run( $subUpdate, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
228 $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
229 $exception = $exception ?: $e;
230 }
231 );
232 } finally {
233 $scopeStack->ascend();
234 }
235 }
236 );
237
238 // VW-style hack to work around T190178, so we can make sure
239 // PageMetaDataUpdater doesn't throw exceptions.
240 if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
241 throw $exception;
242 }
243
244 // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
245 // callers should check permissions *before* enqueueing updates. If the main transaction
246 // round actions succeed but some deferred updates fail due to permissions errors then
247 // there is a risk that some secondary data was not properly updated.
248 if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
249 throw $guiError;
250 }
251 }
252
270 public static function tryOpportunisticExecute(): bool {
271 // Leave execution up to the current loop if an update is already in progress
272 // or if updates are explicitly disabled
274 || self::$preventOpportunisticUpdates
275 ) {
276 return false;
277 }
278
279 // Run the updates for this context if they will have outer transaction scope
280 if ( !self::areDatabaseTransactionsActive() ) {
281 self::doUpdates( null, self::ALL );
282
283 return true;
284 }
285
286 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
287 // There are a large number of pending updates and none of them can run yet.
288 // The odds of losing updates due to an error increase when executing long queues
289 // and when large amounts of time pass while tasks are queued. Mitigate this by
290 // trying to migrate updates to the job queue system (where applicable).
291 self::getScopeStack()->current()->consumeMatchingUpdates(
292 self::ALL,
293 EnqueueableDataUpdate::class,
294 static function ( EnqueueableDataUpdate $update ) {
295 $spec = $update->getAsJobSpecification();
296 MediaWikiServices::getInstance()->getJobQueueGroupFactory()
297 ->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
298 }
299 );
300 }
301
302 return false;
303 }
304
311 public static function preventOpportunisticUpdates() {
312 self::$preventOpportunisticUpdates++;
313 return new ScopedCallback( static function () {
314 self::$preventOpportunisticUpdates--;
315 } );
316 }
317
327 public static function pendingUpdatesCount() {
328 return self::getScopeStack()->current()->pendingUpdatesCount();
329 }
330
343 public static function getPendingUpdates( $stage = self::ALL ) {
344 return self::getScopeStack()->current()->getPendingUpdates( $stage );
345 }
346
355 public static function clearPendingUpdates() {
356 self::getScopeStack()->current()->clearPendingUpdates();
357 }
358
365 public static function getRecursiveExecutionStackDepth() {
366 return self::getScopeStack()->getRecursiveDepth();
367 }
368
381 private static function run(
382 DeferrableUpdate $update,
383 ILBFactory $lbFactory,
384 LoggerInterface $logger,
385 StatsdDataFactoryInterface $stats,
386 JobQueueGroupFactory $jobQueueGroupFactory,
387 $httpMethod
388 ): ?Throwable {
389 $suffix = $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '';
390 $type = get_class( $update ) . $suffix;
391 $stats->increment( "deferred_updates.$httpMethod.$type" );
392 $updateId = spl_object_id( $update );
393 $logger->debug( __METHOD__ . ": started $type #$updateId" );
394
395 $updateException = null;
396
397 $startTime = microtime( true );
398 try {
399 self::attemptUpdate( $update, $lbFactory );
400 } catch ( Throwable $updateException ) {
401 MWExceptionHandler::logException( $updateException );
402 $logger->error(
403 "Deferred update '{deferred_type}' failed to run.",
404 [
405 'deferred_type' => $type,
406 'exception' => $updateException,
407 ]
408 );
409 $lbFactory->rollbackPrimaryChanges( __METHOD__ );
410 } finally {
411 $walltime = microtime( true ) - $startTime;
412 $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" );
413 }
414
415 // Try to push the update as a job so it can run later if possible
416 if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
417 try {
418 $spec = $update->getAsJobSpecification();
419 $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
420 } catch ( Throwable $jobException ) {
421 MWExceptionHandler::logException( $jobException );
422 $logger->error(
423 "Deferred update '{deferred_type}' failed to enqueue as a job.",
424 [
425 'deferred_type' => $type,
426 'exception' => $jobException,
427 ]
428 );
429 $lbFactory->rollbackPrimaryChanges( __METHOD__ );
430 }
431 }
432
433 return $updateException;
434 }
435
447 public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
448 $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
449 if ( !$ticket || $lbFactory->hasTransactionRound() ) {
450 throw new DBTransactionError( null, "A database transaction round is pending." );
451 }
452
453 if ( $update instanceof DataUpdate ) {
454 $update->setTransactionTicket( $ticket );
455 }
456
457 // Designate $update::doUpdate() as the write round owner
458 $fnameTrxOwner = ( $update instanceof DeferrableCallback )
459 ? $update->getOrigin()
460 : get_class( $update ) . '::doUpdate';
461 // Determine whether the write round will be explicit or implicit
462 $useExplicitTrxRound = !(
463 $update instanceof TransactionRoundAwareUpdate &&
464 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
465 );
466
467 // Flush any pending changes left over from an implicit transaction round
468 if ( $useExplicitTrxRound ) {
469 $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
470 } else {
471 $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
472 }
473 // Run the update after any stale primary DB view snapshots have been flushed
474 $update->doUpdate();
475 // Commit any pending changes from the explicit or implicit transaction round
476 $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
477 }
478
482 private static function areDatabaseTransactionsActive() {
483 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
484 if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
485 return true;
486 }
487
488 foreach ( $lbFactory->getAllLBs() as $lb ) {
489 if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
490 return true;
491 }
492 }
493
494 return false;
495 }
496
500 private static function getScopeStack() {
501 if ( self::$scopeStack === null ) {
502 self::$scopeStack = new DeferredUpdatesScopeStack();
503 }
504
505 return self::$scopeStack;
506 }
507}
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Definition WebStart.php:82
Abstract base class for update jobs that do something with some secondary data extracted from article...
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
ascend()
Pop the innermost scope from the stack.
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Class for managing the deferral of updates within the scope of a PHP script invocation.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
static tryOpportunisticExecute()
Consume and execute all pending updates unless an update is already in progress or the ILBFactory ser...
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
static preventOpportunisticUpdates()
Prevent opportunistic updates until the returned ScopedCallback is consumed.
static doUpdates( $unused=null, $stage=self::ALL)
Consume and execute all pending updates.
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Class to construct JobQueueGroups.
PSR-3 logger instance factory.
Service locator for MediaWiki core services.
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:39
Manager of ILoadBalancer objects, and indirectly of IDatabase connections.
beginPrimaryChanges( $fname=__METHOD__)
Flush any primary transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
commitPrimaryChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all primary connections.
rollbackPrimaryChanges( $fname=__METHOD__)
Rollback changes on all primary connections.
getAllLBs()
Get all tracked load balancer instances (generator)
hasTransactionRound()
Check if an explicit transaction round is active.
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active on tracked load balancers.
isReadyForRoundOperations()
Check if transaction rounds can be started, committed, or rolled back right now.