23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use Psr\Log\LoggerInterface;
89 self::$executeContext &&
90 self::$executeContext[
'stage'] >= $stage &&
96 self::$executeContext[
'subqueue'][] = $update;
101 if ( $stage === self::PRESEND ) {
102 self::push( self::$preSendUpdates, $update );
104 self::push( self::$postSendUpdates, $update );
110 self::tryOpportunisticExecute(
'run' );
125 $callable, $stage = self::POSTSEND, $dbw =
null
139 public static function doUpdates( $mode =
'run', $stage = self::ALL ) {
140 $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
144 if ( $stage === self::ALL || $stage === self::PRESEND ) {
145 self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
148 if ( $stage === self::ALL || $stage == self::POSTSEND ) {
149 self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
151 }
while ( $stage === self::ALL && self::$preSendUpdates );
160 $class = get_class( $update );
161 if ( isset(
$queue[$class] ) ) {
163 $existingUpdate =
$queue[$class];
164 '@phan-var MergeableUpdate $existingUpdate';
165 $existingUpdate->merge( $update );
169 $queue[$class] = $existingUpdate;
188 $services = MediaWikiServices::getInstance();
189 $stats = $services->getStatsdDataFactory();
190 $lbf = $services->getDBLoadBalancerFactory();
191 $logger = LoggerFactory::getInstance(
'DeferredUpdates' );
192 $httpMethod = $services->getMainConfig()->get(
'CommandLineMode' )
194 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
206 $dataUpdateQueue = [];
207 $genericUpdateQueue = [];
208 foreach ( $updates as $update ) {
210 $dataUpdateQueue[] = $update;
212 $genericUpdateQueue[] = $update;
216 foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
217 foreach ( $updateQueue as $du ) {
220 self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
224 self::$executeContext = [
'stage' => $stage,
'subqueue' => [] ];
226 $e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
229 while ( self::$executeContext[
'subqueue'] ) {
230 $duChild = reset( self::$executeContext[
'subqueue'] );
231 $firstKey = key( self::$executeContext[
'subqueue'] );
232 unset( self::$executeContext[
'subqueue'][$firstKey] );
234 $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
241 self::$executeContext =
null;
253 if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
268 private static function run(
271 LoggerInterface $logger,
272 StatsdDataFactoryInterface $stats,
275 $name = get_class( $update );
277 $stats->increment(
"deferred_updates.$httpMethod.{$name}{$suffix}" );
281 self::attemptUpdate( $update, $lbFactory );
282 }
catch ( Exception $e ) {
283 }
catch ( Throwable $e ) {
288 "Deferred update {type} failed: {message}",
290 'type' => $name . $suffix,
291 'message' => $e->getMessage(),
292 'trace' => $e->getTraceAsString()
298 if ( defined(
'MW_PHPUNIT_TEST' ) ) {
318 LoggerInterface $logger,
319 StatsdDataFactoryInterface $stats,
322 $stats->increment(
"deferred_updates.$httpMethod." . get_class( $update ) );
327 JobQueueGroup::singleton( $spec[
'domain'] ?? $spec[
'wiki'] )->push( $spec[
'job'] );
328 }
catch ( Exception $e ) {
329 }
catch ( Throwable $e ) {
334 "Job insertion of deferred update {type} failed: {message}",
336 'type' => get_class( $update ),
337 'message' => $e->getMessage(),
338 'trace' => $e->getTraceAsString()
363 $update->setTransactionTicket( $ticket );
368 ? $update->getOrigin()
369 : get_class( $update ) .
'::doUpdate';
371 $useExplicitTrxRound = !(
373 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
377 if ( $useExplicitTrxRound ) {
401 if ( self::$executeContext ) {
406 if ( !self::areDatabaseTransactionsActive() ) {
407 self::doUpdates( $mode );
411 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
414 self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
415 self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
418 return !self::pendingUpdatesCount();
430 foreach ( $updates as $update ) {
432 $spec = $update->getAsJobSpecification();
433 $domain = $spec[
'domain'] ?? $spec[
'wiki'];
434 JobQueueGroup::singleton( $domain )->push( $spec[
'job'] );
436 $remaining[] = $update;
448 return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
458 if ( $stage === self::ALL || $stage === self::PRESEND ) {
459 $updates = array_merge( $updates, self::$preSendUpdates );
461 if ( $stage === self::ALL || $stage === self::POSTSEND ) {
462 $updates = array_merge( $updates, self::$postSendUpdates );
472 self::$preSendUpdates = [];
473 self::$postSendUpdates = [];
480 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
481 if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
486 $lbFactory->forEachLB(
function (
LoadBalancer $lb ) use ( &$connsBusy ) {
global $wgCommandLineMode
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Abstract base class for update jobs that do something with some secondary data extracted from article...
Class for managing the deferred updates.
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run a task and catch/log any exceptions.
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a task into the job queue system and catch/log any exceptions.
static pendingUpdatesCount()
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static push(array &$queue, DeferrableUpdate $update)
static clearPendingUpdates()
Clear all pending updates without performing them.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
static array null $executeContext
Information about the current execute() call or null if not running.
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until before request end.
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until after request end.
static getPendingUpdates( $stage=self::ALL)
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement to signal that updates can be combined.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...