23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27use Psr\Log\LoggerInterface;
31use Wikimedia\ScopedCallback;
84 private static $scopeStack;
89 private static $preventOpportunisticUpdates = 0;
94 public const PRESEND = 1;
96 public const POSTSEND = 2;
99 public const STAGES = [ self::PRESEND, self::POSTSEND ];
102 private const BIG_QUEUE_SIZE = 100;
125 $commandLineMode = MediaWikiServices::getInstance()->getMainConfig()->get(
'CommandLineMode' );
127 self::getScopeStack()->current()->addUpdate( $update, $stage );
133 if ( $commandLineMode ) {
134 self::tryOpportunisticExecute();
173 public static function doUpdates( $unused =
null, $stage = self::ALL ) {
174 $services = MediaWikiServices::getInstance();
175 $stats = $services->getStatsdDataFactory();
176 $lbf = $services->getDBLoadBalancerFactory();
177 $logger = LoggerFactory::getInstance(
'DeferredUpdates' );
178 $jobQueueGroupFactory = $services->getJobQueueGroupFactory();
179 $httpMethod = $services->getMainConfig()->get(
'CommandLineMode' )
181 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
188 $scope = self::getScopeStack()->current();
191 $activeUpdate = $scope->getActiveUpdate();
192 if ( $activeUpdate ) {
193 $class = get_class( $activeUpdate );
195 throw new LogicException(
196 __METHOD__ .
": reached from $class, which is not TransactionRoundAwareUpdate"
199 if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
200 throw new LogicException(
201 __METHOD__ .
": reached from $class, which does not specify TRX_ROUND_ABSENT"
206 $scope->processUpdates(
209 use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
211 $scopeStack = self::getScopeStack();
212 $childScope = $scopeStack->
descend( $activeStage, $update );
214 $e = self::run( $update, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
215 $guiError = $guiError ?: ( $e instanceof
ErrorPageError ? $e : null );
216 $exception = $exception ?: $e;
222 $childScope->processUpdates(
225 use ( $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod, &$guiError, &$exception )
227 $e = self::run( $subUpdate, $lbf, $logger, $stats, $jobQueueGroupFactory, $httpMethod );
228 $guiError = $guiError ?: ( $e instanceof
ErrorPageError ? $e : null );
229 $exception = $exception ?: $e;
240 if ( $exception && defined(
'MW_PHPUNIT_TEST' ) ) {
248 if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
274 || self::$preventOpportunisticUpdates
280 if ( !self::areDatabaseTransactionsActive() ) {
281 self::doUpdates(
null, self::ALL );
286 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
291 self::getScopeStack()->current()->consumeMatchingUpdates(
293 EnqueueableDataUpdate::class,
296 MediaWikiServices::getInstance()->getJobQueueGroupFactory()
297 ->makeJobQueueGroup( $spec[
'domain'] )->push( $spec[
'job'] );
312 self::$preventOpportunisticUpdates++;
313 return new ScopedCallback(
static function () {
314 self::$preventOpportunisticUpdates--;
328 return self::getScopeStack()->current()->pendingUpdatesCount();
344 return self::getScopeStack()->current()->getPendingUpdates( $stage );
356 self::getScopeStack()->current()->clearPendingUpdates();
366 return self::getScopeStack()->getRecursiveDepth();
381 private static function run(
384 LoggerInterface $logger,
385 StatsdDataFactoryInterface $stats,
390 $type = get_class( $update ) . $suffix;
391 $stats->increment(
"deferred_updates.$httpMethod.$type" );
392 $updateId = spl_object_id( $update );
393 $logger->debug( __METHOD__ .
": started $type #$updateId" );
395 $updateException =
null;
397 $startTime = microtime(
true );
399 self::attemptUpdate( $update, $lbFactory );
400 }
catch ( Throwable $updateException ) {
403 "Deferred update '{deferred_type}' failed to run.",
405 'deferred_type' =>
$type,
406 'exception' => $updateException,
411 $walltime = microtime(
true ) - $startTime;
412 $logger->debug( __METHOD__ .
": ended $type #$updateId, processing time: $walltime" );
418 $spec = $update->getAsJobSpecification();
419 $jobQueueGroupFactory->
makeJobQueueGroup( $spec[
'domain'] )->push( $spec[
'job'] );
420 }
catch ( Throwable $jobException ) {
423 "Deferred update '{deferred_type}' failed to enqueue as a job.",
425 'deferred_type' =>
$type,
426 'exception' => $jobException,
433 return $updateException;
454 $update->setTransactionTicket( $ticket );
459 ? $update->getOrigin()
460 : get_class( $update ) .
'::doUpdate';
462 $useExplicitTrxRound = !(
464 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
468 if ( $useExplicitTrxRound ) {
482 private static function areDatabaseTransactionsActive() {
483 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
488 foreach ( $lbFactory->
getAllLBs() as $lb ) {
489 if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
500 private static function getScopeStack() {
501 if ( self::$scopeStack ===
null ) {
505 return self::$scopeStack;
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Abstract base class for update jobs that do something with some secondary data extracted from article...
DeferredUpdates helper class for tracking DeferrableUpdate::doUpdate() nesting levels caused by neste...
ascend()
Pop the innermost scope from the stack.
descend( $activeStage, DeferrableUpdate $update)
Make a new child scope, push it onto the stack, and return it.
Class for managing the deferral of updates within the scope of a PHP script invocation.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
static tryOpportunisticExecute()
Consume and execute all pending updates unless an update is already in progress or the ILBFactory ser...
static pendingUpdatesCount()
Get the number of pending updates for the current execution context.
static getRecursiveExecutionStackDepth()
Get the number of in-progress calls to DeferredUpdates::doUpdates()
static clearPendingUpdates()
Cancel all pending updates for the current execution context.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add an update to the pending update queue that invokes the specified callback when run.
static preventOpportunisticUpdates()
Prevent opportunistic updates until the returned ScopedCallback is consumed.
static doUpdates( $unused=null, $stage=self::ALL)
Consume and execute all pending updates.
static getPendingUpdates( $stage=self::ALL)
Get a list of the pending updates for the current execution context.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...