23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26use Psr\Log\LoggerInterface;
89 public const PRESEND = 1;
91 public const POSTSEND = 2;
94 public const STAGES = [ self::PRESEND, self::POSTSEND ];
97 private const BIG_QUEUE_SIZE = 100;
122 self::getScopeStack()->current()->addUpdate( $update, $stage );
129 self::tryOpportunisticExecute(
'run' );
173 public static function doUpdates( $mode =
'run', $stage = self::ALL ) {
174 $services = MediaWikiServices::getInstance();
175 $stats = $services->getStatsdDataFactory();
176 $lbf = $services->getDBLoadBalancerFactory();
177 $logger = LoggerFactory::getInstance(
'DeferredUpdates' );
178 $httpMethod = $services->getMainConfig()->get(
'CommandLineMode' )
180 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
187 $scope = self::getScopeStack()->current();
190 $activeUpdate = $scope->getActiveUpdate();
191 if ( $activeUpdate ) {
192 $class = get_class( $activeUpdate );
194 throw new LogicException(
195 __METHOD__ .
": reached from $class, which is not TransactionRoundAwareUpdate"
198 if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
199 throw new LogicException(
200 __METHOD__ .
": reached from $class, which does not specify TRX_ROUND_ABSENT"
205 $scope->processUpdates(
208 use ( $mode, $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
212 self::jobify( $update, $lbf, $logger, $stats, $httpMethod );
221 $e = self::run( $update, $lbf, $logger, $stats, $httpMethod );
222 $guiError = $guiError ?: ( $e instanceof
ErrorPageError ? $e : null );
223 $exception = $exception ?: $e;
229 $childScope->processUpdates(
232 use ( $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception )
234 $e = self::run( $subUpdate, $lbf, $logger, $stats, $httpMethod );
235 $guiError = $guiError ?: ( $e instanceof
ErrorPageError ? $e : null );
236 $exception = $exception ?: $e;
247 if ( $exception && defined(
'MW_PHPUNIT_TEST' ) ) {
255 if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
284 if ( self::getRecursiveExecutionStackDepth() ) {
289 if ( !self::areDatabaseTransactionsActive() ) {
290 self::doUpdates( $mode, self::ALL );
295 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
300 self::getScopeStack()->current()->consumeMatchingUpdates(
302 EnqueueableDataUpdate::class,
305 JobQueueGroup::singleton( $spec[
'domain'] )->push( $spec[
'job'] );
323 return self::getScopeStack()->current()->pendingUpdatesCount();
339 return self::getScopeStack()->current()->getPendingUpdates( $stage );
351 self::getScopeStack()->current()->clearPendingUpdates();
361 return self::getScopeStack()->getRecursiveDepth();
375 private static function run(
378 LoggerInterface $logger,
379 StatsdDataFactoryInterface $stats,
383 $type = get_class( $update ) . $suffix;
384 $stats->increment(
"deferred_updates.$httpMethod.$type" );
386 $updateId = spl_object_id( $update );
387 $logger->debug( __METHOD__ .
": started $type #$updateId" );
388 $startTime = microtime(
true );
391 self::attemptUpdate( $update, $lbFactory );
394 }
catch ( Throwable $e ) {
396 $executionTime = microtime(
true ) - $startTime;
397 $logger->debug( __METHOD__ .
": ended $type #$updateId, processing time: $executionTime" );
402 "Deferred update '{deferred_type}' failed to run.",
404 'deferred_type' =>
$type,
415 $spec = $update->getAsJobSpecification();
419 }
catch ( Throwable $jobEx ) {
424 "Deferred update '{deferred_type}' failed to enqueue as a job.",
426 'deferred_type' =>
$type,
427 'exception' => $jobEx,
449 LoggerInterface $logger,
450 StatsdDataFactoryInterface $stats,
453 $type = get_class( $update );
454 $stats->increment(
"deferred_updates.$httpMethod.$type" );
459 JobQueueGroup::singleton( $spec[
'domain'] )->push( $spec[
'job'] );
462 }
catch ( Throwable $jobEx ) {
465 MWExceptionHandler::logException( $jobEx );
467 "Deferred update '$type' failed to enqueue as a job.",
469 'deferred_type' =>
$type,
470 'exception' => $jobEx,
495 $update->setTransactionTicket( $ticket );
500 ? $update->getOrigin()
501 : get_class( $update ) .
'::doUpdate';
503 $useExplicitTrxRound = !(
505 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
509 if ( $useExplicitTrxRound ) {
524 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
545 if ( self::$scopeStack ===
null ) {
549 return self::$scopeStack;
global $wgCommandLineMode
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Abstract base class for update jobs that do something with some secondary data extracted from article...
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
ascend()
Pop the innermost scope from the stack.
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Class for managing the deferral of updates within the scope of a PHP script invocation.
static doUpdates( $mode='run', $stage=self::ALL)
Consume and execute all pending updates.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
static run(DeferrableUpdate $update, ILBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run an update, and, if an error was thrown, catch/log it and enqueue the update as a job in the job q...
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Enqueue an update as a job in the job queue system and catch/log any exceptions.
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
static tryOpportunisticExecute( $mode='run')
Consume and execute all pending updates unless an update is already in progress or the LBFactory serv...
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
static DeferredUpdatesScopeStack null $scopeStack
Queue states based on recursion level.
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
static singleton( $domain=false)
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...