23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26use Psr\Log\LoggerInterface;
69 private static $preSendUpdates = [];
76 private static $postSendUpdates = [];
81 private static $executionStack = [];
110 if ( self::$executionStack && !( $update instanceof
MergeableUpdate ) ) {
112 end( self::$executionStack );
113 $topStackPos = key( self::$executionStack );
114 if ( self::$executionStack[$topStackPos][
'stage'] >= $stage ) {
116 self::push( self::$executionStack[$topStackPos][
'subqueue'], $update );
122 if ( $stage === self::PRESEND ) {
123 self::push( self::$preSendUpdates, $update );
125 self::push( self::$postSendUpdates, $update );
131 self::tryOpportunisticExecute(
'run' );
146 $callable, $stage = self::POSTSEND, $dbw =
null
171 public static function doUpdates( $mode =
'run', $stage = self::ALL ) {
172 $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
174 if ( self::$executionStack ) {
176 end( self::$executionStack );
177 $topStackPos = key( self::$executionStack );
178 self::handleUpdateQueue(
179 self::$executionStack[$topStackPos][
'subqueue'],
189 if ( $stage === self::ALL || $stage === self::PRESEND ) {
190 self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
193 if ( $stage === self::ALL || $stage == self::POSTSEND ) {
194 self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
196 }
while ( $stage === self::ALL && self::$preSendUpdates );
205 $class = get_class( $update );
206 if ( isset(
$queue[$class] ) ) {
208 $existingUpdate =
$queue[$class];
209 '@phan-var MergeableUpdate $existingUpdate';
210 $existingUpdate->merge( $update );
214 $queue[$class] = $existingUpdate;
235 $services = MediaWikiServices::getInstance();
236 $stats = $services->getStatsdDataFactory();
237 $lbf = $services->getDBLoadBalancerFactory();
238 $logger = LoggerFactory::getInstance(
'DeferredUpdates' );
239 $httpMethod = $services->getMainConfig()->get(
'CommandLineMode' )
241 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
256 $dataUpdateQueue = [];
257 $genericUpdateQueue = [];
258 foreach ( $updates as $update ) {
260 $dataUpdateQueue[] = $update;
262 $genericUpdateQueue[] = $update;
266 foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
267 foreach ( $updateQueue as $curUpdate ) {
270 self::jobify( $curUpdate, $lbf, $logger, $stats, $httpMethod );
274 $stackEntry = [
'stage' => $stage,
'update' => $curUpdate,
'subqueue' => [] ];
275 $stackKey = count( self::$executionStack );
276 self::$executionStack[$stackKey] =& $stackEntry;
278 $e = self::run( $curUpdate, $lbf, $logger, $stats, $httpMethod );
280 $exception = $exception ?: $e;
283 while ( $stackEntry[
'subqueue'] ) {
284 $duChild = reset( $stackEntry[
'subqueue'] );
285 $duChildKey = key( $stackEntry[
'subqueue'] );
286 unset( $stackEntry[
'subqueue'][$duChildKey] );
288 $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
290 $exception = $exception ?: $e;
296 unset( self::$executionStack[$stackKey] );
306 if ( $exception && defined(
'MW_PHPUNIT_TEST' ) ) {
314 if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
329 private static function run(
332 LoggerInterface $logger,
333 StatsdDataFactoryInterface $stats,
337 $type = get_class( $update ) . $suffix;
338 $stats->increment(
"deferred_updates.$httpMethod.$type" );
340 $updateId = spl_object_id( $update );
341 $logger->debug( __METHOD__ .
": started $type #$updateId" );
344 self::attemptUpdate( $update, $lbFactory );
347 }
catch ( Throwable $e ) {
349 $logger->debug( __METHOD__ .
": ended $type #$updateId" );
354 "Deferred update '{deferred_type}' failed to run.",
356 'deferred_type' =>
$type,
367 $spec = $update->getAsJobSpecification();
371 }
catch ( Throwable $jobEx ) {
376 "Deferred update '{deferred_type}' failed to enqueue as a job.",
378 'deferred_type' =>
$type,
379 'exception' => $jobEx,
401 LoggerInterface $logger,
402 StatsdDataFactoryInterface $stats,
405 $type = get_class( $update );
406 $stats->increment(
"deferred_updates.$httpMethod.$type" );
411 JobQueueGroup::singleton( $spec[
'domain'] )->push( $spec[
'job'] );
414 }
catch ( Throwable $jobEx ) {
417 MWExceptionHandler::logException( $jobEx );
419 "Deferred update '$type' failed to enqueue as a job.",
421 'deferred_type' =>
$type,
422 'exception' => $jobEx,
447 $update->setTransactionTicket( $ticket );
452 ? $update->getOrigin()
453 : get_class( $update ) .
'::doUpdate';
455 $useExplicitTrxRound = !(
457 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
461 if ( $useExplicitTrxRound ) {
485 if ( self::$executionStack ) {
490 if ( !self::areDatabaseTransactionsActive() ) {
491 self::doUpdates( $mode );
495 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
498 self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
499 self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
502 return !self::pendingUpdatesCount();
514 foreach ( $updates as $update ) {
516 $spec = $update->getAsJobSpecification();
517 $domain = $spec[
'domain'] ?? $spec[
'wiki'];
518 JobQueueGroup::singleton( $domain )->push( $spec[
'job'] );
520 $remaining[] = $update;
536 if ( self::$executionStack ) {
537 throw new LogicException(
"Called during execution of a DeferrableUpdate" );
540 return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
556 if ( $stage === self::ALL || $stage === self::PRESEND ) {
557 $updates = array_merge( $updates, self::$preSendUpdates );
559 if ( $stage === self::ALL || $stage === self::POSTSEND ) {
560 $updates = array_merge( $updates, self::$postSendUpdates );
574 self::$preSendUpdates = [];
575 self::$postSendUpdates = [];
582 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
global $wgCommandLineMode
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Abstract base class for update jobs that do something with some secondary data extracted from article...
Class for managing the deferred updates.
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run an update, and, if an error was thrown, catch/log it and fallback to the job queue.
static doUpdates( $mode='run', $stage=self::ALL)
Consume the list of deferred updates and execute them.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred update queue for execution at the appropriate time.
static array[] $executionStack
Execution stack of currently running updates -var array<array{stage:int,update:DeferrableUpdate,...
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a update into the job queue system and catch/log any exceptions.
static pendingUpdatesCount()
Get the number of currently enqueued updates in the top-queues.
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static push(array &$queue, DeferrableUpdate $update)
static clearPendingUpdates()
Clear all pending updates without performing them.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until just before HTTP response emission.
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until just after HTTP response emission.
static getPendingUpdates( $stage=self::ALL)
Get the list of pending updates in the top-queues.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
static singleton( $domain=false)
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement to signal that updates can be combined.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...