MediaWiki  1.34.0
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25 use Psr\Log\LoggerInterface;
32 
64  private static $preSendUpdates = [];
66  private static $postSendUpdates = [];
67 
68  const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
69  const PRESEND = 1; // for updates that should run before flushing output buffer
70  const POSTSEND = 2; // for updates that should run after flushing output buffer
71 
72  const BIG_QUEUE_SIZE = 100;
73 
75  private static $executeContext;
76 
85  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
86  global $wgCommandLineMode;
87 
88  if (
89  self::$executeContext &&
90  self::$executeContext['stage'] >= $stage &&
91  !( $update instanceof MergeableUpdate )
92  ) {
93  // This is a sub-DeferredUpdate; run it right after its parent update.
94  // Also, while post-send updates are running, push any "pre-send" jobs to the
95  // active post-send queue to make sure they get run this round (or at all).
96  self::$executeContext['subqueue'][] = $update;
97 
98  return;
99  }
100 
101  if ( $stage === self::PRESEND ) {
102  self::push( self::$preSendUpdates, $update );
103  } else {
104  self::push( self::$postSendUpdates, $update );
105  }
106 
107  // Try to run the updates now if in CLI mode and no transaction is active.
108  // This covers scripts that don't/barely use the DB but make updates to other stores.
109  if ( $wgCommandLineMode ) {
111  }
112  }
113 
124  public static function addCallableUpdate(
125  $callable, $stage = self::POSTSEND, $dbw = null
126  ) {
127  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
128  }
129 
139  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
140  $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
141  // For ALL mode, make sure that any PRESEND updates added along the way get run.
142  // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
143  do {
144  if ( $stage === self::ALL || $stage === self::PRESEND ) {
145  self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
146  }
147 
148  if ( $stage === self::ALL || $stage == self::POSTSEND ) {
149  self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
150  }
151  } while ( $stage === self::ALL && self::$preSendUpdates );
152  }
153 
158  private static function push( array &$queue, DeferrableUpdate $update ) {
159  if ( $update instanceof MergeableUpdate ) {
160  $class = get_class( $update ); // fully-qualified class
161  if ( isset( $queue[$class] ) ) {
163  $existingUpdate = $queue[$class];
164  '@phan-var MergeableUpdate $existingUpdate';
165  $existingUpdate->merge( $update );
166  // Move the update to the end to handle things like mergeable purge
167  // updates that might depend on the prior updates in the queue running
168  unset( $queue[$class] );
169  $queue[$class] = $existingUpdate;
170  } else {
171  $queue[$class] = $update;
172  }
173  } else {
174  $queue[] = $update;
175  }
176  }
177 
187  protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
188  $services = MediaWikiServices::getInstance();
189  $stats = $services->getStatsdDataFactory();
190  $lbf = $services->getDBLoadBalancerFactory();
191  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
192  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
193  ? 'cli'
194  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
195 
197  $guiEx = null;
199  $updates = $queue;
200 
201  // Keep doing rounds of updates until none get enqueued...
202  while ( $updates ) {
203  $queue = []; // clear the queue
204 
205  // Segregate the queue into one for DataUpdate and one for everything else
206  $dataUpdateQueue = [];
207  $genericUpdateQueue = [];
208  foreach ( $updates as $update ) {
209  if ( $update instanceof DataUpdate ) {
210  $dataUpdateQueue[] = $update;
211  } else {
212  $genericUpdateQueue[] = $update;
213  }
214  }
215  // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
216  foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
217  foreach ( $updateQueue as $du ) {
218  // Enqueue the task into the job queue system instead if applicable
219  if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
220  self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
221  continue;
222  }
223  // Otherwise, execute the task and any subtasks that it spawns
224  self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
225  try {
226  $e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
227  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
228  // Do the subqueue updates for $update until there are none
229  while ( self::$executeContext['subqueue'] ) {
230  $duChild = reset( self::$executeContext['subqueue'] );
231  $firstKey = key( self::$executeContext['subqueue'] );
232  unset( self::$executeContext['subqueue'][$firstKey] );
233 
234  $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
235  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
236  }
237  } finally {
238  // Make sure we always clean up the context.
239  // Losing updates while rewinding the stack is acceptable,
240  // losing updates that are added later is not.
241  self::$executeContext = null;
242  }
243  }
244  }
245 
246  $updates = $queue; // new snapshot of queue (check for new entries)
247  }
248 
249  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
250  // callers should check permissions *before* enqueueing updates. If the main transaction
251  // round actions succeed but some deferred updates fail due to permissions errors then
252  // there is a risk that some secondary data was not properly updated.
253  if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
254  throw $guiEx;
255  }
256  }
257 
268  private static function run(
269  DeferrableUpdate $update,
270  LBFactory $lbFactory,
271  LoggerInterface $logger,
272  StatsdDataFactoryInterface $stats,
273  $httpMethod
274  ) {
275  $name = get_class( $update );
276  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
277  $stats->increment( "deferred_updates.$httpMethod.{$name}{$suffix}" );
278 
279  $e = null;
280  try {
281  self::attemptUpdate( $update, $lbFactory );
282  } catch ( Exception $e ) {
283  } catch ( Throwable $e ) {
284  }
285 
286  if ( $e ) {
287  $logger->error(
288  "Deferred update {type} failed: {message}",
289  [
290  'type' => $name . $suffix,
291  'message' => $e->getMessage(),
292  'trace' => $e->getTraceAsString()
293  ]
294  );
295  $lbFactory->rollbackMasterChanges( __METHOD__ );
296  // VW-style hack to work around T190178, so we can make sure
297  // PageMetaDataUpdater doesn't throw exceptions.
298  if ( defined( 'MW_PHPUNIT_TEST' ) ) {
299  throw $e;
300  }
301  }
302 
303  return $e;
304  }
305 
315  private static function jobify(
316  EnqueueableDataUpdate $update,
317  LBFactory $lbFactory,
318  LoggerInterface $logger,
319  StatsdDataFactoryInterface $stats,
320  $httpMethod
321  ) {
322  $stats->increment( "deferred_updates.$httpMethod." . get_class( $update ) );
323 
324  $e = null;
325  try {
326  $spec = $update->getAsJobSpecification();
327  JobQueueGroup::singleton( $spec['domain'] ?? $spec['wiki'] )->push( $spec['job'] );
328  } catch ( Exception $e ) {
329  } catch ( Throwable $e ) {
330  }
331 
332  if ( $e ) {
333  $logger->error(
334  "Job insertion of deferred update {type} failed: {message}",
335  [
336  'type' => get_class( $update ),
337  'message' => $e->getMessage(),
338  'trace' => $e->getTraceAsString()
339  ]
340  );
341  $lbFactory->rollbackMasterChanges( __METHOD__ );
342  }
343  }
344 
356  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
357  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
358  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
359  throw new DBTransactionError( null, "A database transaction round is pending." );
360  }
361 
362  if ( $update instanceof DataUpdate ) {
363  $update->setTransactionTicket( $ticket );
364  }
365 
366  // Designate $update::doUpdate() as the write round owner
367  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
368  ? $update->getOrigin()
369  : get_class( $update ) . '::doUpdate';
370  // Determine whether the write round will be explicit or implicit
371  $useExplicitTrxRound = !(
372  $update instanceof TransactionRoundAwareUpdate &&
373  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
374  );
375 
376  // Flush any pending changes left over from an implicit transaction round
377  if ( $useExplicitTrxRound ) {
378  $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
379  } else {
380  $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
381  }
382  // Run the update after any stale master view snapshots have been flushed
383  $update->doUpdate();
384  // Commit any pending changes from the explicit or implicit transaction round
385  $lbFactory->commitMasterChanges( $fnameTrxOwner );
386  }
387 
399  public static function tryOpportunisticExecute( $mode = 'run' ) {
400  // execute() loop is already running
401  if ( self::$executeContext ) {
402  return false;
403  }
404 
405  // Avoiding running updates without them having outer scope
406  if ( !self::areDatabaseTransactionsActive() ) {
407  self::doUpdates( $mode );
408  return true;
409  }
410 
411  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
412  // If we cannot run the updates with outer transaction context, try to
413  // at least enqueue all the updates that support queueing to job queue
414  self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
415  self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
416  }
417 
418  return !self::pendingUpdatesCount();
419  }
420 
427  private static function enqueueUpdates( array $updates ) {
428  $remaining = [];
429 
430  foreach ( $updates as $update ) {
431  if ( $update instanceof EnqueueableDataUpdate ) {
432  $spec = $update->getAsJobSpecification();
433  $domain = $spec['domain'] ?? $spec['wiki'];
434  JobQueueGroup::singleton( $domain )->push( $spec['job'] );
435  } else {
436  $remaining[] = $update;
437  }
438  }
439 
440  return $remaining;
441  }
442 
447  public static function pendingUpdatesCount() {
448  return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
449  }
450 
456  public static function getPendingUpdates( $stage = self::ALL ) {
457  $updates = [];
458  if ( $stage === self::ALL || $stage === self::PRESEND ) {
459  $updates = array_merge( $updates, self::$preSendUpdates );
460  }
461  if ( $stage === self::ALL || $stage === self::POSTSEND ) {
462  $updates = array_merge( $updates, self::$postSendUpdates );
463  }
464  return $updates;
465  }
466 
471  public static function clearPendingUpdates() {
472  self::$preSendUpdates = [];
473  self::$postSendUpdates = [];
474  }
475 
479  private static function areDatabaseTransactionsActive() {
480  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
481  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
482  return true;
483  }
484 
485  $connsBusy = false;
486  $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
487  $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
488  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
489  $connsBusy = true;
490  }
491  } );
492  } );
493 
494  return $connsBusy;
495  }
496 }
DeferredUpdates\jobify
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a task into the job queue system and catch/log any exceptions.
Definition: DeferredUpdates.php:315
DeferredUpdates\ALL
const ALL
Definition: DeferredUpdates.php:68
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:117
DeferredUpdates\attemptUpdate
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
Definition: DeferredUpdates.php:356
MergeableUpdate
Interface that deferrable updates can implement to signal that updates can be combined.
Definition: MergeableUpdate.php:18
DeferrableUpdate\doUpdate
doUpdate()
Perform the actual work.
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
Definition: DeferredUpdates.php:85
Wikimedia\Rdbms\LoadBalancer\forEachOpenMasterConnection
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
Definition: LoadBalancer.php:2103
DeferredUpdates\clearPendingUpdates
static clearPendingUpdates()
Clear all pending updates without performing them.
Definition: DeferredUpdates.php:471
DeferredUpdates\$postSendUpdates
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until after request end.
Definition: DeferredUpdates.php:66
DeferrableCallback
Callback wrapper that has an originating method.
Definition: DeferrableCallback.php:8
TransactionRoundAwareUpdate
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Definition: TransactionRoundAwareUpdate.php:9
DataUpdate
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:28
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
EnqueueableDataUpdate
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Definition: EnqueueableDataUpdate.php:10
DeferredUpdates\run
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run a task and catch/log any exceptions.
Definition: DeferredUpdates.php:268
MediaWiki\Logger\LoggerFactory
PSR-3 logger instance factory.
Definition: LoggerFactory.php:45
$wgCommandLineMode
global $wgCommandLineMode
Definition: DevelopmentSettings.php:28
DeferredUpdates
Class for managing the deferred updates.
Definition: DeferredUpdates.php:62
EnqueueableDataUpdate\getAsJobSpecification
getAsJobSpecification()
DeferredUpdates\handleUpdateQueue
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
Definition: DeferredUpdates.php:187
$queue
$queue
Definition: mergeMessageFileList.php:157
DeferredUpdates\POSTSEND
const POSTSEND
Definition: DeferredUpdates.php:70
Wikimedia\Rdbms\LoadBalancer
Database connection, tracking, load balancing, and transaction manager for a cluster.
Definition: LoadBalancer.php:42
Wikimedia\Rdbms\ILBFactory\hasTransactionRound
hasTransactionRound()
Check if an explicit transaction round is active.
DeferredUpdates\BIG_QUEUE_SIZE
const BIG_QUEUE_SIZE
Definition: DeferredUpdates.php:72
DeferredUpdates\tryOpportunisticExecute
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
Definition: DeferredUpdates.php:399
Wikimedia\Rdbms\IDatabase\explicitTrxActive
explicitTrxActive()
DeferredUpdates\$executeContext
static array null $executeContext
Information about the current execute() call or null if not running.
Definition: DeferredUpdates.php:75
DeferredUpdates\areDatabaseTransactionsActive
static areDatabaseTransactionsActive()
Definition: DeferredUpdates.php:479
RequestContext\getMain
static getMain()
Get the RequestContext object associated with the main request.
Definition: RequestContext.php:431
DeferredUpdates\doUpdates
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
Definition: DeferredUpdates.php:139
DeferredUpdates\pendingUpdatesCount
static pendingUpdatesCount()
Definition: DeferredUpdates.php:447
Wikimedia\Rdbms\ILBFactory\beginMasterChanges
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
MWCallableUpdate
Deferrable Update for closure/callback.
Definition: MWCallableUpdate.php:8
Wikimedia\Rdbms\DBTransactionError
Definition: DBTransactionError.php:27
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:70
Wikimedia\Rdbms\LBFactory\rollbackMasterChanges
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Definition: LBFactory.php:284
DeferredUpdates\PRESEND
const PRESEND
Definition: DeferredUpdates.php:69
DeferredUpdates\getPendingUpdates
static getPendingUpdates( $stage=self::ALL)
Definition: DeferredUpdates.php:456
Wikimedia\Rdbms\LBFactory
An interface for generating database load balancers.
Definition: LBFactory.php:40
Wikimedia\Rdbms\ILBFactory\commitMasterChanges
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
DeferredUpdates\enqueueUpdates
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
Definition: DeferredUpdates.php:427
DeferredUpdates\push
static push(array &$queue, DeferrableUpdate $update)
Definition: DeferredUpdates.php:158
DeferrableUpdate
Interface that deferrable updates should implement.
Definition: DeferrableUpdate.php:9
DeferredUpdates\$preSendUpdates
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until before request end.
Definition: DeferredUpdates.php:64
ErrorPageError
An error page which can definitely be safely rendered using the OutputPage.
Definition: ErrorPageError.php:27
Wikimedia\Rdbms\ILBFactory\getEmptyTransactionTicket
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.
DeferredUpdates\addCallableUpdate
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
Definition: DeferredUpdates.php:124
wfGetCaller
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Definition: GlobalFunctions.php:1454
Wikimedia\Rdbms\ILBFactory
An interface for generating database load balancers.
Definition: ILBFactory.php:33
Wikimedia\Rdbms\IDatabase\writesOrCallbacksPending
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...