MediaWiki REL1_35
DeferredUpdates.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26use Psr\Log\LoggerInterface;
32
69 private static $preSendUpdates = [];
76 private static $postSendUpdates = [];
81 private static $executionStack = [];
82
83 public const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
84 public const PRESEND = 1; // for updates that should run before flushing output buffer
85 public const POSTSEND = 2; // for updates that should run after flushing output buffer
86
87 private const BIG_QUEUE_SIZE = 100;
88
106 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
107 global $wgCommandLineMode;
108
109 // Special handling for updates pushed while another update is in progress
110 if ( self::$executionStack && !( $update instanceof MergeableUpdate ) ) {
111 // Get the innermost in-progress update
112 end( self::$executionStack );
113 $topStackPos = key( self::$executionStack );
114 if ( self::$executionStack[$topStackPos]['stage'] >= $stage ) {
115 // Put this update into the sub-queue of that in-progress update
116 self::push( self::$executionStack[$topStackPos]['subqueue'], $update );
117
118 return;
119 }
120 }
121
122 if ( $stage === self::PRESEND ) {
123 self::push( self::$preSendUpdates, $update );
124 } else {
125 self::push( self::$postSendUpdates, $update );
126 }
127
128 // Try to run the updates now if in CLI mode and no transaction is active.
129 // This covers scripts that don't/barely use the DB but make updates to other stores.
130 if ( $wgCommandLineMode ) {
131 self::tryOpportunisticExecute( 'run' );
132 }
133 }
134
145 public static function addCallableUpdate(
146 $callable, $stage = self::POSTSEND, $dbw = null
147 ) {
148 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
149 }
150
171 public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
172 $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
173 // Special handling for when an in-progress update triggers this method
174 if ( self::$executionStack ) {
175 // Run the sub-queue updates for the innermost in-progress update
176 end( self::$executionStack );
177 $topStackPos = key( self::$executionStack );
178 self::handleUpdateQueue(
179 self::$executionStack[$topStackPos]['subqueue'],
180 $mode,
181 $stageEffective
182 );
183
184 return;
185 }
186 // For ALL mode, make sure that any PRESEND updates added along the way get run.
187 // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
188 do {
189 if ( $stage === self::ALL || $stage === self::PRESEND ) {
190 self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
191 }
192
193 if ( $stage === self::ALL || $stage == self::POSTSEND ) {
194 self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
195 }
196 } while ( $stage === self::ALL && self::$preSendUpdates );
197 }
198
203 private static function push( array &$queue, DeferrableUpdate $update ) {
204 if ( $update instanceof MergeableUpdate ) {
205 $class = get_class( $update ); // fully-qualified class
206 if ( isset( $queue[$class] ) ) {
208 $existingUpdate = $queue[$class];
209 '@phan-var MergeableUpdate $existingUpdate';
210 $existingUpdate->merge( $update );
211 // Move the update to the end to handle things like mergeable purge
212 // updates that might depend on the prior updates in the queue running
213 unset( $queue[$class] );
214 $queue[$class] = $existingUpdate;
215 } else {
216 $queue[$class] = $update;
217 }
218 } else {
219 $queue[] = $update;
220 }
221 }
222
234 protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
235 $services = MediaWikiServices::getInstance();
236 $stats = $services->getStatsdDataFactory();
237 $lbf = $services->getDBLoadBalancerFactory();
238 $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
239 $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
240 ? 'cli'
241 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
242
244 $guiEx = null;
246 $exception = null;
247
249 $updates = $queue;
250
251 // Keep doing rounds of updates until none get enqueued...
252 while ( $updates ) {
253 $queue = []; // clear the queue
254
255 // Segregate the queue into one for DataUpdate and one for everything else
256 $dataUpdateQueue = [];
257 $genericUpdateQueue = [];
258 foreach ( $updates as $update ) {
259 if ( $update instanceof DataUpdate ) {
260 $dataUpdateQueue[] = $update;
261 } else {
262 $genericUpdateQueue[] = $update;
263 }
264 }
265 // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
266 foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
267 foreach ( $updateQueue as $curUpdate ) {
268 // Enqueue the update into the job queue system instead if applicable
269 if ( $mode === 'enqueue' && $curUpdate instanceof EnqueueableDataUpdate ) {
270 self::jobify( $curUpdate, $lbf, $logger, $stats, $httpMethod );
271 continue;
272 }
273 // Otherwise, execute the update, followed by any sub-updates that it spawns
274 $stackEntry = [ 'stage' => $stage, 'update' => $curUpdate, 'subqueue' => [] ];
275 $stackKey = count( self::$executionStack );
276 self::$executionStack[$stackKey] =& $stackEntry;
277 try {
278 $e = self::run( $curUpdate, $lbf, $logger, $stats, $httpMethod );
279 $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
280 $exception = $exception ?: $e;
281 // Do the subqueue updates for $update until there are none
282 // @phan-suppress-next-line PhanImpossibleConditionInLoop
283 while ( $stackEntry['subqueue'] ) {
284 $duChild = reset( $stackEntry['subqueue'] );
285 $duChildKey = key( $stackEntry['subqueue'] );
286 unset( $stackEntry['subqueue'][$duChildKey] );
287
288 $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
289 $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
290 $exception = $exception ?: $e;
291 }
292 } finally {
293 // Make sure we always clean up the context.
294 // Losing updates while rewinding the stack is acceptable,
295 // losing updates that are added later is not.
296 unset( self::$executionStack[$stackKey] );
297 }
298 }
299 }
300
301 $updates = $queue; // new snapshot of queue (check for new entries)
302 }
303
304 // VW-style hack to work around T190178, so we can make sure
305 // PageMetaDataUpdater doesn't throw exceptions.
306 if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
307 throw $exception;
308 }
309
310 // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
311 // callers should check permissions *before* enqueueing updates. If the main transaction
312 // round actions succeed but some deferred updates fail due to permissions errors then
313 // there is a risk that some secondary data was not properly updated.
314 if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
315 throw $guiEx;
316 }
317 }
318
329 private static function run(
330 DeferrableUpdate $update,
331 LBFactory $lbFactory,
332 LoggerInterface $logger,
333 StatsdDataFactoryInterface $stats,
334 $httpMethod
335 ) : ?Throwable {
336 $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
337 $type = get_class( $update ) . $suffix;
338 $stats->increment( "deferred_updates.$httpMethod.$type" );
339
340 $updateId = spl_object_id( $update );
341 $logger->debug( __METHOD__ . ": started $type #$updateId" );
342 $e = null;
343 try {
344 self::attemptUpdate( $update, $lbFactory );
345
346 return null;
347 } catch ( Throwable $e ) {
348 } finally {
349 $logger->debug( __METHOD__ . ": ended $type #$updateId" );
350 }
351
353 $logger->error(
354 "Deferred update '{deferred_type}' failed to run.",
355 [
356 'deferred_type' => $type,
357 'exception' => $e,
358 ]
359 );
360
361 $lbFactory->rollbackMasterChanges( __METHOD__ );
362
363 // Try to push the update as a job so it can run later if possible
364 if ( $update instanceof EnqueueableDataUpdate ) {
365 $jobEx = null;
366 try {
367 $spec = $update->getAsJobSpecification();
368 JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
369
370 return $e;
371 } catch ( Throwable $jobEx ) {
372 }
373
375 $logger->error(
376 "Deferred update '{deferred_type}' failed to enqueue as a job.",
377 [
378 'deferred_type' => $type,
379 'exception' => $jobEx,
380 ]
381 );
382
383 $lbFactory->rollbackMasterChanges( __METHOD__ );
384 }
385
386 return $e;
387 }
388
398 private static function jobify(
399 EnqueueableDataUpdate $update,
400 LBFactory $lbFactory,
401 LoggerInterface $logger,
402 StatsdDataFactoryInterface $stats,
403 $httpMethod
404 ) {
405 $type = get_class( $update );
406 $stats->increment( "deferred_updates.$httpMethod.$type" );
407
408 $jobEx = null;
409 try {
410 $spec = $update->getAsJobSpecification();
411 JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
412
413 return;
414 } catch ( Throwable $jobEx ) {
415 }
416
417 MWExceptionHandler::logException( $jobEx );
418 $logger->error(
419 "Deferred update '$type' failed to enqueue as a job.",
420 [
421 'deferred_type' => $type,
422 'exception' => $jobEx,
423 ]
424 );
425
426 $lbFactory->rollbackMasterChanges( __METHOD__ );
427 }
428
440 public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
441 $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
442 if ( !$ticket || $lbFactory->hasTransactionRound() ) {
443 throw new DBTransactionError( null, "A database transaction round is pending." );
444 }
445
446 if ( $update instanceof DataUpdate ) {
447 $update->setTransactionTicket( $ticket );
448 }
449
450 // Designate $update::doUpdate() as the write round owner
451 $fnameTrxOwner = ( $update instanceof DeferrableCallback )
452 ? $update->getOrigin()
453 : get_class( $update ) . '::doUpdate';
454 // Determine whether the write round will be explicit or implicit
455 $useExplicitTrxRound = !(
456 $update instanceof TransactionRoundAwareUpdate &&
457 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
458 );
459
460 // Flush any pending changes left over from an implicit transaction round
461 if ( $useExplicitTrxRound ) {
462 $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
463 } else {
464 $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
465 }
466 // Run the update after any stale master view snapshots have been flushed
467 $update->doUpdate();
468 // Commit any pending changes from the explicit or implicit transaction round
469 $lbFactory->commitMasterChanges( $fnameTrxOwner );
470 }
471
483 public static function tryOpportunisticExecute( $mode = 'run' ) {
484 // An update is already in progress
485 if ( self::$executionStack ) {
486 return false;
487 }
488
489 // Avoiding running updates without them having outer scope
490 if ( !self::areDatabaseTransactionsActive() ) {
491 self::doUpdates( $mode );
492 return true;
493 }
494
495 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
496 // If we cannot run the updates with outer transaction context, try to
497 // at least enqueue all the updates that support queueing to job queue
498 self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
499 self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
500 }
501
502 return !self::pendingUpdatesCount();
503 }
504
511 private static function enqueueUpdates( array $updates ) {
512 $remaining = [];
513
514 foreach ( $updates as $update ) {
515 if ( $update instanceof EnqueueableDataUpdate ) {
516 $spec = $update->getAsJobSpecification();
517 $domain = $spec['domain'] ?? $spec['wiki'];
518 JobQueueGroup::singleton( $domain )->push( $spec['job'] );
519 } else {
520 $remaining[] = $update;
521 }
522 }
523
524 return $remaining;
525 }
526
535 public static function pendingUpdatesCount() {
536 if ( self::$executionStack ) {
537 throw new LogicException( "Called during execution of a DeferrableUpdate" );
538 }
539
540 return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
541 }
542
554 public static function getPendingUpdates( $stage = self::ALL ) {
555 $updates = [];
556 if ( $stage === self::ALL || $stage === self::PRESEND ) {
557 $updates = array_merge( $updates, self::$preSendUpdates );
558 }
559 if ( $stage === self::ALL || $stage === self::POSTSEND ) {
560 $updates = array_merge( $updates, self::$postSendUpdates );
561 }
562
563 return $updates;
564 }
565
573 public static function clearPendingUpdates() {
574 self::$preSendUpdates = [];
575 self::$postSendUpdates = [];
576 }
577
581 private static function areDatabaseTransactionsActive() {
582 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
583 if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
584 return true;
585 }
586
587 $connsBusy = false;
588 $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
589 $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
590 if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
591 $connsBusy = true;
592 }
593 } );
594 } );
595
596 return $connsBusy;
597 }
598}
global $wgCommandLineMode
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Abstract base class for update jobs that do something with some secondary data extracted from article...
Class for managing the deferred updates.
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run an update, and, if an error was thrown, catch/log it and fallback to the job queue.
static doUpdates( $mode='run', $stage=self::ALL)
Consume the list of deferred updates and execute them.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred update queue for execution at the appropriate time.
static array[] $executionStack
Execution stack of currently running updates -var array<array{stage:int,update:DeferrableUpdate,...
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a update into the job queue system and catch/log any exceptions.
static pendingUpdatesCount()
Get the number of currently enqueued updates in the top-queues.
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static push(array &$queue, DeferrableUpdate $update)
static clearPendingUpdates()
Clear all pending updates without performing them.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until just before HTTP response emission.
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until just after HTTP response emission.
static getPendingUpdates( $stage=self::ALL)
Get the list of pending updates in the top-queues.
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
static singleton( $domain=false)
Deferrable Update for closure/callback.
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
PSR-3 logger instance factory.
MediaWikiServices is the service locator for the application scope of MediaWiki.
An interface for generating database load balancers.
Definition LBFactory.php:41
isReadyForRoundOperations()
Check if transaction rounds can be started, committed, or rolled back right now.
hasTransactionRound()
Check if an explicit transaction round is active.
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Database connection, tracking, load balancing, and transaction manager for a cluster.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement to signal that updates can be combined.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
An interface for generating database load balancers.
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
hasTransactionRound()
Check if an explicit transaction round is active.
forEachLB( $callback, array $params=[])
Execute a function for each currently tracked (instantiated) load balancer.
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.