MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
32 
64  private static $preSendUpdates = [];
66  private static $postSendUpdates = [];
67 
68  const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
69  const PRESEND = 1; // for updates that should run before flushing output buffer
70  const POSTSEND = 2; // for updates that should run after flushing output buffer
71 
72  const BIG_QUEUE_SIZE = 100;
73 
75  private static $executeContext;
76 
85  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
86  global $wgCommandLineMode;
87 
88  if (
89  self::$executeContext &&
90  self::$executeContext['stage'] >= $stage &&
91  !( $update instanceof MergeableUpdate )
92  ) {
93  // This is a sub-DeferredUpdate; run it right after its parent update.
94  // Also, while post-send updates are running, push any "pre-send" jobs to the
95  // active post-send queue to make sure they get run this round (or at all).
96  self::$executeContext['subqueue'][] = $update;
97 
98  return;
99  }
100 
101  if ( $stage === self::PRESEND ) {
102  self::push( self::$preSendUpdates, $update );
103  } else {
104  self::push( self::$postSendUpdates, $update );
105  }
106 
107  // Try to run the updates now if in CLI mode and no transaction is active.
108  // This covers scripts that don't/barely use the DB but make updates to other stores.
109  if ( $wgCommandLineMode ) {
110  self::tryOpportunisticExecute( 'run' );
111  }
112  }
113 
124  public static function addCallableUpdate(
125  $callable, $stage = self::POSTSEND, $dbw = null
126  ) {
127  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
128  }
129 
139  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
140  $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
141  // For ALL mode, make sure that any PRESEND updates added along the way get run.
142  // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
143  do {
144  if ( $stage === self::ALL || $stage === self::PRESEND ) {
145  self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
146  }
147 
148  if ( $stage === self::ALL || $stage == self::POSTSEND ) {
149  self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
150  }
151  } while ( $stage === self::ALL && self::$preSendUpdates );
152  }
153 
158  private static function push( array &$queue, DeferrableUpdate $update ) {
159  if ( $update instanceof MergeableUpdate ) {
160  $class = get_class( $update ); // fully-qualified class
161  if ( isset( $queue[$class] ) ) {
163  $existingUpdate = $queue[$class];
164  '@phan-var MergeableUpdate $existingUpdate';
165  $existingUpdate->merge( $update );
166  // Move the update to the end to handle things like mergeable purge
167  // updates that might depend on the prior updates in the queue running
168  unset( $queue[$class] );
169  $queue[$class] = $existingUpdate;
170  } else {
171  $queue[$class] = $update;
172  }
173  } else {
174  $queue[] = $update;
175  }
176  }
177 
187  protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
188  $services = MediaWikiServices::getInstance();
189  $stats = $services->getStatsdDataFactory();
190  $lbf = $services->getDBLoadBalancerFactory();
191  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
192  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
193  ? 'cli'
194  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
195 
197  $guiEx = null;
199  $updates = $queue;
200 
201  // Keep doing rounds of updates until none get enqueued...
202  while ( $updates ) {
203  $queue = []; // clear the queue
204 
205  // Segregate the queue into one for DataUpdate and one for everything else
206  $dataUpdateQueue = [];
207  $genericUpdateQueue = [];
208  foreach ( $updates as $update ) {
209  if ( $update instanceof DataUpdate ) {
210  $dataUpdateQueue[] = $update;
211  } else {
212  $genericUpdateQueue[] = $update;
213  }
214  }
215  // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
216  foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
217  foreach ( $updateQueue as $du ) {
218  // Enqueue the task into the job queue system instead if applicable
219  if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
220  self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
221  continue;
222  }
223  // Otherwise, execute the task and any subtasks that it spawns
224  self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
225  try {
226  $e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
227  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
228  // Do the subqueue updates for $update until there are none
229  while ( self::$executeContext['subqueue'] ) {
230  $duChild = reset( self::$executeContext['subqueue'] );
231  $firstKey = key( self::$executeContext['subqueue'] );
232  unset( self::$executeContext['subqueue'][$firstKey] );
233 
234  $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
235  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
236  }
237  } finally {
238  // Make sure we always clean up the context.
239  // Losing updates while rewinding the stack is acceptable,
240  // losing updates that are added later is not.
241  self::$executeContext = null;
242  }
243  }
244  }
245 
246  $updates = $queue; // new snapshot of queue (check for new entries)
247  }
248 
249  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
250  // callers should check permissions *before* enqueueing updates. If the main transaction
251  // round actions succeed but some deferred updates fail due to permissions errors then
252  // there is a risk that some secondary data was not properly updated.
253  if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
254  throw $guiEx;
255  }
256  }
257 
268  private static function run(
269  DeferrableUpdate $update,
270  LBFactory $lbFactory,
271  LoggerInterface $logger,
272  StatsdDataFactoryInterface $stats,
273  $httpMethod
274  ) {
275  $name = get_class( $update );
276  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
277  $stats->increment( "deferred_updates.$httpMethod.{$name}{$suffix}" );
278 
279  $e = null;
280  try {
281  self::attemptUpdate( $update, $lbFactory );
282  } catch ( Exception $e ) {
283  } catch ( Throwable $e ) {
284  }
285 
286  if ( $e ) {
287  $logger->error(
288  "Deferred update {type} failed: {message}",
289  [
290  'type' => $name . $suffix,
291  'message' => $e->getMessage(),
292  'trace' => $e->getTraceAsString()
293  ]
294  );
295  $lbFactory->rollbackMasterChanges( __METHOD__ );
296  // VW-style hack to work around T190178, so we can make sure
297  // PageMetaDataUpdater doesn't throw exceptions.
298  if ( defined( 'MW_PHPUNIT_TEST' ) ) {
299  throw $e;
300  }
301  }
302 
303  return $e;
304  }
305 
315  private static function jobify(
316  EnqueueableDataUpdate $update,
317  LBFactory $lbFactory,
318  LoggerInterface $logger,
319  StatsdDataFactoryInterface $stats,
320  $httpMethod
321  ) {
322  $stats->increment( "deferred_updates.$httpMethod." . get_class( $update ) );
323 
324  $e = null;
325  try {
326  $spec = $update->getAsJobSpecification();
327  JobQueueGroup::singleton( $spec['domain'] ?? $spec['wiki'] )->push( $spec['job'] );
328  } catch ( Exception $e ) {
329  } catch ( Throwable $e ) {
330  }
331 
332  if ( $e ) {
333  $logger->error(
334  "Job insertion of deferred update {type} failed: {message}",
335  [
336  'type' => get_class( $update ),
337  'message' => $e->getMessage(),
338  'trace' => $e->getTraceAsString()
339  ]
340  );
341  $lbFactory->rollbackMasterChanges( __METHOD__ );
342  }
343  }
344 
356  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
357  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
358  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
359  throw new DBTransactionError( null, "A database transaction round is pending." );
360  }
361 
362  if ( $update instanceof DataUpdate ) {
363  $update->setTransactionTicket( $ticket );
364  }
365 
366  // Designate $update::doUpdate() as the write round owner
367  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
368  ? $update->getOrigin()
369  : get_class( $update ) . '::doUpdate';
370  // Determine whether the write round will be explicit or implicit
371  $useExplicitTrxRound = !(
372  $update instanceof TransactionRoundAwareUpdate &&
373  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
374  );
375 
376  // Flush any pending changes left over from an implicit transaction round
377  if ( $useExplicitTrxRound ) {
378  $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
379  } else {
380  $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
381  }
382  // Run the update after any stale master view snapshots have been flushed
383  $update->doUpdate();
384  // Commit any pending changes from the explicit or implicit transaction round
385  $lbFactory->commitMasterChanges( $fnameTrxOwner );
386  }
387 
399  public static function tryOpportunisticExecute( $mode = 'run' ) {
400  // execute() loop is already running
401  if ( self::$executeContext ) {
402  return false;
403  }
404 
405  // Avoiding running updates without them having outer scope
406  if ( !self::areDatabaseTransactionsActive() ) {
407  self::doUpdates( $mode );
408  return true;
409  }
410 
411  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
412  // If we cannot run the updates with outer transaction context, try to
413  // at least enqueue all the updates that support queueing to job queue
414  self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
415  self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
416  }
417 
418  return !self::pendingUpdatesCount();
419  }
420 
427  private static function enqueueUpdates( array $updates ) {
428  $remaining = [];
429 
430  foreach ( $updates as $update ) {
431  if ( $update instanceof EnqueueableDataUpdate ) {
432  $spec = $update->getAsJobSpecification();
433  $domain = $spec['domain'] ?? $spec['wiki'];
434  JobQueueGroup::singleton( $domain )->push( $spec['job'] );
435  } else {
436  $remaining[] = $update;
437  }
438  }
439 
440  return $remaining;
441  }
442 
447  public static function pendingUpdatesCount() {
448  return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
449  }
450 
456  public static function getPendingUpdates( $stage = self::ALL ) {
457  $updates = [];
458  if ( $stage === self::ALL || $stage === self::PRESEND ) {
459  $updates = array_merge( $updates, self::$preSendUpdates );
460  }
461  if ( $stage === self::ALL || $stage === self::POSTSEND ) {
462  $updates = array_merge( $updates, self::$postSendUpdates );
463  }
464  return $updates;
465  }
466 
471  public static function clearPendingUpdates() {
472  self::$preSendUpdates = [];
473  self::$postSendUpdates = [];
474  }
475 
479  private static function areDatabaseTransactionsActive() {
480  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
481  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
482  return true;
483  }
484 
485  $connsBusy = false;
486  $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
487  $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
488  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
489  $connsBusy = true;
490  }
491  } );
492  } );
493 
494  return $connsBusy;
495  }
496 }
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static DeferrableUpdate [] $postSendUpdates
Updates to be deferred until after request end.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
static clearPendingUpdates()
Clear all pending updates without performing them.
An interface for generating database load balancers.
Definition: LBFactory.php:40
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects. ...
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement to signal that updates can be combined.
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.
static array null $executeContext
Information about the current execute() call or null if not running.
static areDatabaseTransactionsActive()
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run a task and catch/log any exceptions.
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
static pendingUpdatesCount()
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
static getMain()
Get the RequestContext object associated with the main request.
An error page which can definitely be safely rendered using the OutputPage.
Callback wrapper that has an originating method.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
hasTransactionRound()
Check if an explicit transaction round is active.
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
static getPendingUpdates( $stage=self::ALL)
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
An interface for generating database load balancers.
Definition: ILBFactory.php:33
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
global $wgCommandLineMode
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Definition: LBFactory.php:284
static push(array &$queue, DeferrableUpdate $update)
doUpdate()
Perform the actual work.
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a task into the job queue system and catch/log any exceptions.
static singleton( $domain=false)
Database connection, tracking, load balancing, and transaction manager for a cluster.
static DeferrableUpdate [] $preSendUpdates
Updates to be deferred until before request end.