23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27use Psr\Log\LoggerInterface;
31use Wikimedia\ScopedCallback;
84 private static $scopeStack;
89 private static $preventOpportunisticUpdates = 0;
94 public const PRESEND = 1;
96 public const POSTSEND = 2;
99 public const STAGES = [ self::PRESEND, self::POSTSEND ];
102 private const BIG_QUEUE_SIZE = 100;
125 $commandLineMode = MediaWikiServices::getInstance()->getMainConfig()->get(
'CommandLineMode' );
127 self::getScopeStack()->current()->addUpdate( $update, $stage );
133 if ( $commandLineMode ) {
134 self::tryOpportunisticExecute();
172 public static function doUpdates( $stage = self::ALL ) {
173 $services = MediaWikiServices::getInstance();
174 $stats = $services->getStatsdDataFactory();
175 $lbf = $services->getDBLoadBalancerFactory();
176 $logger = LoggerFactory::getInstance(
'DeferredUpdates' );
177 $jobQueueGroupFactory = $services->getJobQueueGroupFactory();
178 $httpMethod = $services->getMainConfig()->get(
'CommandLineMode' )
180 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
187 $scope = self::getScopeStack()->current();
190 $activeUpdate = $scope->getActiveUpdate();
191 if ( $activeUpdate ) {
192 $class = get_class( $activeUpdate );
194 throw new LogicException(
195 __METHOD__ .
": reached from $class, which is not TransactionRoundAwareUpdate"
198 if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
199 throw new LogicException(
200 __METHOD__ .
": reached from $class, which does not specify TRX_ROUND_ABSENT"
205 $scope->processUpdates(
208 use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
210 $scopeStack = self::getScopeStack();
211 $childScope = $scopeStack->
descend( $activeStage, $update );
213 $e = self::run( $update, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
214 $guiError = $guiError ?: ( $e instanceof
ErrorPageError ? $e : null );
215 $exception = $exception ?: $e;
221 $childScope->processUpdates(
224 use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
226 $e = self::run( $subUpdate, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
227 $guiError = $guiError ?: ( $e instanceof
ErrorPageError ? $e : null );
228 $exception = $exception ?: $e;
239 if ( $exception && defined(
'MW_PHPUNIT_TEST' ) ) {
247 if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
273 || self::$preventOpportunisticUpdates
279 if ( !self::areDatabaseTransactionsActive() ) {
280 self::doUpdates( self::ALL );
285 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
290 self::getScopeStack()->current()->consumeMatchingUpdates(
292 EnqueueableDataUpdate::class,
295 MediaWikiServices::getInstance()->getJobQueueGroupFactory()
296 ->makeJobQueueGroup( $spec[
'domain'] )->push( $spec[
'job'] );
311 self::$preventOpportunisticUpdates++;
312 return new ScopedCallback(
static function () {
313 self::$preventOpportunisticUpdates--;
327 return self::getScopeStack()->current()->pendingUpdatesCount();
343 return self::getScopeStack()->current()->getPendingUpdates( $stage );
355 self::getScopeStack()->current()->clearPendingUpdates();
365 return self::getScopeStack()->getRecursiveDepth();
380 private static function run(
383 LoggerInterface $logger,
384 StatsdDataFactoryInterface $stats,
389 $type = get_class( $update ) . $suffix;
390 $stats->increment(
"deferred_updates.$httpMethod.$type" );
391 $updateId = spl_object_id( $update );
392 $logger->debug( __METHOD__ .
": started $type #$updateId" );
394 $updateException =
null;
396 $startTime = microtime(
true );
398 self::attemptUpdate( $update, $lbFactory );
399 }
catch ( Throwable $updateException ) {
402 "Deferred update '{deferred_type}' failed to run.",
404 'deferred_type' =>
$type,
405 'exception' => $updateException,
410 $walltime = microtime(
true ) - $startTime;
411 $logger->debug( __METHOD__ .
": ended $type #$updateId, processing time: $walltime" );
417 $spec = $update->getAsJobSpecification();
418 $jobQueueGroupFactory->
makeJobQueueGroup( $spec[
'domain'] )->push( $spec[
'job'] );
419 }
catch ( Throwable $jobException ) {
422 "Deferred update '{deferred_type}' failed to enqueue as a job.",
424 'deferred_type' =>
$type,
425 'exception' => $jobException,
432 return $updateException;
453 $update->setTransactionTicket( $ticket );
458 ? $update->getOrigin()
459 : get_class( $update ) .
'::doUpdate';
461 $useExplicitTrxRound = !(
463 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
467 if ( $useExplicitTrxRound ) {
481 private static function areDatabaseTransactionsActive() {
482 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
487 foreach ( $lbFactory->
getAllLBs() as $lb ) {
488 if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
499 private static function getScopeStack() {
500 if ( self::$scopeStack ===
null ) {
504 return self::$scopeStack;
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Abstract base class for update jobs that do something with some secondary data extracted from article...
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
ascend()
Pop the innermost scope from the stack.
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Class for managing the deferral of updates within the scope of a PHP script invocation.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
static tryOpportunisticExecute()
Consume and execute all pending updates unless an update is already in progress or the ILBFactory ser...
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
static doUpdates( $stage=self::ALL)
Consume and execute all pending updates.
static preventOpportunisticUpdates()
Prevent opportunistic updates until the returned ScopedCallback is consumed.
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...