MediaWiki REL1_34
DeferredUpdates.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25use Psr\Log\LoggerInterface;
32
64 private static $preSendUpdates = [];
66 private static $postSendUpdates = [];
67
68 const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
69 const PRESEND = 1; // for updates that should run before flushing output buffer
70 const POSTSEND = 2; // for updates that should run after flushing output buffer
71
72 const BIG_QUEUE_SIZE = 100;
73
75 private static $executeContext;
76
85 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
86 global $wgCommandLineMode;
87
88 if (
89 self::$executeContext &&
90 self::$executeContext['stage'] >= $stage &&
91 !( $update instanceof MergeableUpdate )
92 ) {
93 // This is a sub-DeferredUpdate; run it right after its parent update.
94 // Also, while post-send updates are running, push any "pre-send" jobs to the
95 // active post-send queue to make sure they get run this round (or at all).
96 self::$executeContext['subqueue'][] = $update;
97
98 return;
99 }
100
101 if ( $stage === self::PRESEND ) {
102 self::push( self::$preSendUpdates, $update );
103 } else {
104 self::push( self::$postSendUpdates, $update );
105 }
106
107 // Try to run the updates now if in CLI mode and no transaction is active.
108 // This covers scripts that don't/barely use the DB but make updates to other stores.
109 if ( $wgCommandLineMode ) {
110 self::tryOpportunisticExecute( 'run' );
111 }
112 }
113
124 public static function addCallableUpdate(
125 $callable, $stage = self::POSTSEND, $dbw = null
126 ) {
127 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
128 }
129
139 public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
140 $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
141 // For ALL mode, make sure that any PRESEND updates added along the way get run.
142 // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
143 do {
144 if ( $stage === self::ALL || $stage === self::PRESEND ) {
145 self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
146 }
147
148 if ( $stage === self::ALL || $stage == self::POSTSEND ) {
149 self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
150 }
151 } while ( $stage === self::ALL && self::$preSendUpdates );
152 }
153
158 private static function push( array &$queue, DeferrableUpdate $update ) {
159 if ( $update instanceof MergeableUpdate ) {
160 $class = get_class( $update ); // fully-qualified class
161 if ( isset( $queue[$class] ) ) {
163 $existingUpdate = $queue[$class];
164 '@phan-var MergeableUpdate $existingUpdate';
165 $existingUpdate->merge( $update );
166 // Move the update to the end to handle things like mergeable purge
167 // updates that might depend on the prior updates in the queue running
168 unset( $queue[$class] );
169 $queue[$class] = $existingUpdate;
170 } else {
171 $queue[$class] = $update;
172 }
173 } else {
174 $queue[] = $update;
175 }
176 }
177
187 protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
188 $services = MediaWikiServices::getInstance();
189 $stats = $services->getStatsdDataFactory();
190 $lbf = $services->getDBLoadBalancerFactory();
191 $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
192 $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
193 ? 'cli'
194 : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
195
197 $guiEx = null;
199 $updates = $queue;
200
201 // Keep doing rounds of updates until none get enqueued...
202 while ( $updates ) {
203 $queue = []; // clear the queue
204
205 // Segregate the queue into one for DataUpdate and one for everything else
206 $dataUpdateQueue = [];
207 $genericUpdateQueue = [];
208 foreach ( $updates as $update ) {
209 if ( $update instanceof DataUpdate ) {
210 $dataUpdateQueue[] = $update;
211 } else {
212 $genericUpdateQueue[] = $update;
213 }
214 }
215 // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
216 foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
217 foreach ( $updateQueue as $du ) {
218 // Enqueue the task into the job queue system instead if applicable
219 if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
220 self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
221 continue;
222 }
223 // Otherwise, execute the task and any subtasks that it spawns
224 self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
225 try {
226 $e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
227 $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
228 // Do the subqueue updates for $update until there are none
229 while ( self::$executeContext['subqueue'] ) {
230 $duChild = reset( self::$executeContext['subqueue'] );
231 $firstKey = key( self::$executeContext['subqueue'] );
232 unset( self::$executeContext['subqueue'][$firstKey] );
233
234 $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
235 $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
236 }
237 } finally {
238 // Make sure we always clean up the context.
239 // Losing updates while rewinding the stack is acceptable,
240 // losing updates that are added later is not.
241 self::$executeContext = null;
242 }
243 }
244 }
245
246 $updates = $queue; // new snapshot of queue (check for new entries)
247 }
248
249 // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
250 // callers should check permissions *before* enqueueing updates. If the main transaction
251 // round actions succeed but some deferred updates fail due to permissions errors then
252 // there is a risk that some secondary data was not properly updated.
253 if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
254 throw $guiEx;
255 }
256 }
257
268 private static function run(
269 DeferrableUpdate $update,
270 LBFactory $lbFactory,
271 LoggerInterface $logger,
272 StatsdDataFactoryInterface $stats,
273 $httpMethod
274 ) {
275 $name = get_class( $update );
276 $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
277 $stats->increment( "deferred_updates.$httpMethod.{$name}{$suffix}" );
278
279 $e = null;
280 try {
281 self::attemptUpdate( $update, $lbFactory );
282 } catch ( Exception $e ) {
283 } catch ( Throwable $e ) {
284 }
285
286 if ( $e ) {
287 $logger->error(
288 "Deferred update {type} failed: {message}",
289 [
290 'type' => $name . $suffix,
291 'message' => $e->getMessage(),
292 'trace' => $e->getTraceAsString()
293 ]
294 );
295 $lbFactory->rollbackMasterChanges( __METHOD__ );
296 // VW-style hack to work around T190178, so we can make sure
297 // PageMetaDataUpdater doesn't throw exceptions.
298 if ( defined( 'MW_PHPUNIT_TEST' ) ) {
299 throw $e;
300 }
301 }
302
303 return $e;
304 }
305
315 private static function jobify(
316 EnqueueableDataUpdate $update,
317 LBFactory $lbFactory,
318 LoggerInterface $logger,
319 StatsdDataFactoryInterface $stats,
320 $httpMethod
321 ) {
322 $stats->increment( "deferred_updates.$httpMethod." . get_class( $update ) );
323
324 $e = null;
325 try {
326 $spec = $update->getAsJobSpecification();
327 JobQueueGroup::singleton( $spec['domain'] ?? $spec['wiki'] )->push( $spec['job'] );
328 } catch ( Exception $e ) {
329 } catch ( Throwable $e ) {
330 }
331
332 if ( $e ) {
333 $logger->error(
334 "Job insertion of deferred update {type} failed: {message}",
335 [
336 'type' => get_class( $update ),
337 'message' => $e->getMessage(),
338 'trace' => $e->getTraceAsString()
339 ]
340 );
341 $lbFactory->rollbackMasterChanges( __METHOD__ );
342 }
343 }
344
356 public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
357 $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
358 if ( !$ticket || $lbFactory->hasTransactionRound() ) {
359 throw new DBTransactionError( null, "A database transaction round is pending." );
360 }
361
362 if ( $update instanceof DataUpdate ) {
363 $update->setTransactionTicket( $ticket );
364 }
365
366 // Designate $update::doUpdate() as the write round owner
367 $fnameTrxOwner = ( $update instanceof DeferrableCallback )
368 ? $update->getOrigin()
369 : get_class( $update ) . '::doUpdate';
370 // Determine whether the write round will be explicit or implicit
371 $useExplicitTrxRound = !(
372 $update instanceof TransactionRoundAwareUpdate &&
373 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
374 );
375
376 // Flush any pending changes left over from an implicit transaction round
377 if ( $useExplicitTrxRound ) {
378 $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
379 } else {
380 $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
381 }
382 // Run the update after any stale master view snapshots have been flushed
383 $update->doUpdate();
384 // Commit any pending changes from the explicit or implicit transaction round
385 $lbFactory->commitMasterChanges( $fnameTrxOwner );
386 }
387
399 public static function tryOpportunisticExecute( $mode = 'run' ) {
400 // execute() loop is already running
401 if ( self::$executeContext ) {
402 return false;
403 }
404
405 // Avoiding running updates without them having outer scope
406 if ( !self::areDatabaseTransactionsActive() ) {
407 self::doUpdates( $mode );
408 return true;
409 }
410
411 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
412 // If we cannot run the updates with outer transaction context, try to
413 // at least enqueue all the updates that support queueing to job queue
414 self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
415 self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
416 }
417
418 return !self::pendingUpdatesCount();
419 }
420
427 private static function enqueueUpdates( array $updates ) {
428 $remaining = [];
429
430 foreach ( $updates as $update ) {
431 if ( $update instanceof EnqueueableDataUpdate ) {
432 $spec = $update->getAsJobSpecification();
433 $domain = $spec['domain'] ?? $spec['wiki'];
434 JobQueueGroup::singleton( $domain )->push( $spec['job'] );
435 } else {
436 $remaining[] = $update;
437 }
438 }
439
440 return $remaining;
441 }
442
447 public static function pendingUpdatesCount() {
448 return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
449 }
450
456 public static function getPendingUpdates( $stage = self::ALL ) {
457 $updates = [];
458 if ( $stage === self::ALL || $stage === self::PRESEND ) {
459 $updates = array_merge( $updates, self::$preSendUpdates );
460 }
461 if ( $stage === self::ALL || $stage === self::POSTSEND ) {
462 $updates = array_merge( $updates, self::$postSendUpdates );
463 }
464 return $updates;
465 }
466
471 public static function clearPendingUpdates() {
472 self::$preSendUpdates = [];
473 self::$postSendUpdates = [];
474 }
475
479 private static function areDatabaseTransactionsActive() {
480 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
481 if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
482 return true;
483 }
484
485 $connsBusy = false;
486 $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
487 $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
488 if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
489 $connsBusy = true;
490 }
491 } );
492 } );
493
494 return $connsBusy;
495 }
496}
global $wgCommandLineMode
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Abstract base class for update jobs that do something with some secondary data extracted from article...
Class for managing the deferred updates.
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run a task and catch/log any exceptions.
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a task into the job queue system and catch/log any exceptions.
static pendingUpdatesCount()
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static push(array &$queue, DeferrableUpdate $update)
static clearPendingUpdates()
Clear all pending updates without performing them.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
static array null $executeContext
Information about the current execute() call or null if not running.
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until before request end.
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until after request end.
static getPendingUpdates( $stage=self::ALL)
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
PSR-3 logger instance factory.
MediaWikiServices is the service locator for the application scope of MediaWiki.
An interface for generating database load balancers.
Definition LBFactory.php:40
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Database connection, tracking, load balancing, and transaction manager for a cluster.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
doUpdate()
Perform the actual work.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement to signal that updates can be combined.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
An interface for generating database load balancers.
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
hasTransactionRound()
Check if an explicit transaction round is active.
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.