MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
32 
64  private static $preSendUpdates = [];
66  private static $postSendUpdates = [];
67 
68  const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
69  const PRESEND = 1; // for updates that should run before flushing output buffer
70  const POSTSEND = 2; // for updates that should run after flushing output buffer
71 
72  const BIG_QUEUE_SIZE = 100;
73 
75  private static $executeContext;
76 
85  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
86  global $wgCommandLineMode;
87 
88  if (
89  self::$executeContext !== null &&
90  // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
91  self::$executeContext['stage'] >= $stage &&
92  !( $update instanceof MergeableUpdate )
93  ) {
94  // This is a sub-DeferredUpdate; run it right after its parent update.
95  // Also, while post-send updates are running, push any "pre-send" jobs to the
96  // active post-send queue to make sure they get run this round (or at all).
97  self::$executeContext['subqueue'][] = $update;
98 
99  return;
100  }
101 
102  if ( $stage === self::PRESEND ) {
103  self::push( self::$preSendUpdates, $update );
104  } else {
105  self::push( self::$postSendUpdates, $update );
106  }
107 
108  // Try to run the updates now if in CLI mode and no transaction is active.
109  // This covers scripts that don't/barely use the DB but make updates to other stores.
110  if ( $wgCommandLineMode ) {
111  self::tryOpportunisticExecute( 'run' );
112  }
113  }
114 
125  public static function addCallableUpdate(
126  $callable, $stage = self::POSTSEND, $dbw = null
127  ) {
128  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
129  }
130 
140  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
141  $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
142  // For ALL mode, make sure that any PRESEND updates added along the way get run.
143  // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
144  do {
145  if ( $stage === self::ALL || $stage === self::PRESEND ) {
146  self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
147  }
148 
149  if ( $stage === self::ALL || $stage == self::POSTSEND ) {
150  self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
151  }
152  } while ( $stage === self::ALL && self::$preSendUpdates );
153  }
154 
159  private static function push( array &$queue, DeferrableUpdate $update ) {
160  if ( $update instanceof MergeableUpdate ) {
161  $class = get_class( $update ); // fully-qualified class
162  if ( isset( $queue[$class] ) ) {
164  $existingUpdate = $queue[$class];
165  '@phan-var MergeableUpdate $existingUpdate';
166  $existingUpdate->merge( $update );
167  // Move the update to the end to handle things like mergeable purge
168  // updates that might depend on the prior updates in the queue running
169  unset( $queue[$class] );
170  $queue[$class] = $existingUpdate;
171  } else {
172  $queue[$class] = $update;
173  }
174  } else {
175  $queue[] = $update;
176  }
177  }
178 
191  protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
192  $services = MediaWikiServices::getInstance();
193  $stats = $services->getStatsdDataFactory();
194  $lbf = $services->getDBLoadBalancerFactory();
195  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
196  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
197  ? 'cli'
198  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
199 
201  $guiEx = null;
203  $exception = null;
204 
206  $updates = $queue;
207 
208  // Keep doing rounds of updates until none get enqueued...
209  while ( $updates ) {
210  $queue = []; // clear the queue
211 
212  // Segregate the queue into one for DataUpdate and one for everything else
213  $dataUpdateQueue = [];
214  $genericUpdateQueue = [];
215  foreach ( $updates as $update ) {
216  if ( $update instanceof DataUpdate ) {
217  $dataUpdateQueue[] = $update;
218  } else {
219  $genericUpdateQueue[] = $update;
220  }
221  }
222  // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
223  foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
224  foreach ( $updateQueue as $du ) {
225  // Enqueue the task into the job queue system instead if applicable
226  if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
227  self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
228  continue;
229  }
230  // Otherwise, execute the task and any subtasks that it spawns
231  self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
232  try {
233  $e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
234  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
235  $exception = $exception ?: $e;
236  // Do the subqueue updates for $update until there are none
237  while ( self::$executeContext['subqueue'] ) {
238  $duChild = reset( self::$executeContext['subqueue'] );
239  $firstKey = key( self::$executeContext['subqueue'] );
240  unset( self::$executeContext['subqueue'][$firstKey] );
241 
242  $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
243  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
244  $exception = $exception ?: $e;
245  }
246  } finally {
247  // Make sure we always clean up the context.
248  // Losing updates while rewinding the stack is acceptable,
249  // losing updates that are added later is not.
250  self::$executeContext = null;
251  }
252  }
253  }
254 
255  $updates = $queue; // new snapshot of queue (check for new entries)
256  }
257 
258  // VW-style hack to work around T190178, so we can make sure
259  // PageMetaDataUpdater doesn't throw exceptions.
260  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
261  throw $exception;
262  }
263 
264  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
265  // callers should check permissions *before* enqueueing updates. If the main transaction
266  // round actions succeed but some deferred updates fail due to permissions errors then
267  // there is a risk that some secondary data was not properly updated.
268  if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
269  throw $guiEx;
270  }
271  }
272 
283  private static function run(
284  DeferrableUpdate $update,
285  LBFactory $lbFactory,
286  LoggerInterface $logger,
287  StatsdDataFactoryInterface $stats,
288  $httpMethod
289  ) {
290  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
291  $type = get_class( $update ) . $suffix;
292  $stats->increment( "deferred_updates.$httpMethod.$type" );
293 
294  $e = null;
295  try {
296  self::attemptUpdate( $update, $lbFactory );
297 
298  return null;
299  } catch ( Exception $e ) {
300  } catch ( Throwable $e ) {
301  }
302 
303  $logger->error(
304  "Deferred update {type} failed to run: {exception_message}",
305  [
306  'type' => $type,
307  'trace' => $e->getTraceAsString(),
308  'exception_message' => $e->getMessage()
309  ]
310  );
311 
312  $lbFactory->rollbackMasterChanges( __METHOD__ );
313 
314  // Try to push the update as a job so it can run later if possible
315  if ( $update instanceof EnqueueableDataUpdate ) {
316  $jobEx = null;
317  try {
318  $spec = $update->getAsJobSpecification();
319  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
320 
321  return $e;
322  } catch ( Exception $jobEx ) {
323  } catch ( Throwable $jobEx ) {
324  }
325 
326  $logger->error(
327  "Job enqueue of deferred update, {type}, failed: {exception_message}",
328  [
329  'type' => $type,
330  'trace' => $jobEx->getTraceAsString(),
331  'exception_message' => $jobEx->getMessage()
332  ]
333  );
334 
335  $lbFactory->rollbackMasterChanges( __METHOD__ );
336  }
337 
338  return $e;
339  }
340 
350  private static function jobify(
351  EnqueueableDataUpdate $update,
352  LBFactory $lbFactory,
353  LoggerInterface $logger,
354  StatsdDataFactoryInterface $stats,
355  $httpMethod
356  ) {
357  $type = get_class( $update );
358  $stats->increment( "deferred_updates.$httpMethod.$type" );
359 
360  $e = null;
361  try {
362  $spec = $update->getAsJobSpecification();
363  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
364 
365  return;
366  } catch ( Exception $e ) {
367  } catch ( Throwable $e ) {
368  }
369 
370  $logger->error(
371  "Job enqueue of deferred update, {type}, failed: {exception_message}",
372  [ 'exception' => $e ]
373  );
374 
375  $lbFactory->rollbackMasterChanges( __METHOD__ );
376  }
377 
389  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
390  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
391  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
392  throw new DBTransactionError( null, "A database transaction round is pending." );
393  }
394 
395  if ( $update instanceof DataUpdate ) {
396  $update->setTransactionTicket( $ticket );
397  }
398 
399  // Designate $update::doUpdate() as the write round owner
400  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
401  ? $update->getOrigin()
402  : get_class( $update ) . '::doUpdate';
403  // Determine whether the write round will be explicit or implicit
404  $useExplicitTrxRound = !(
405  $update instanceof TransactionRoundAwareUpdate &&
406  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
407  );
408 
409  // Flush any pending changes left over from an implicit transaction round
410  if ( $useExplicitTrxRound ) {
411  $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
412  } else {
413  $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
414  }
415  // Run the update after any stale master view snapshots have been flushed
416  $update->doUpdate();
417  // Commit any pending changes from the explicit or implicit transaction round
418  $lbFactory->commitMasterChanges( $fnameTrxOwner );
419  }
420 
432  public static function tryOpportunisticExecute( $mode = 'run' ) {
433  // execute() loop is already running
434  if ( self::$executeContext ) {
435  return false;
436  }
437 
438  // Avoiding running updates without them having outer scope
439  if ( !self::areDatabaseTransactionsActive() ) {
440  self::doUpdates( $mode );
441  return true;
442  }
443 
444  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
445  // If we cannot run the updates with outer transaction context, try to
446  // at least enqueue all the updates that support queueing to job queue
447  self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
448  self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
449  }
450 
451  return !self::pendingUpdatesCount();
452  }
453 
460  private static function enqueueUpdates( array $updates ) {
461  $remaining = [];
462 
463  foreach ( $updates as $update ) {
464  if ( $update instanceof EnqueueableDataUpdate ) {
465  $spec = $update->getAsJobSpecification();
466  $domain = $spec['domain'] ?? $spec['wiki'];
467  JobQueueGroup::singleton( $domain )->push( $spec['job'] );
468  } else {
469  $remaining[] = $update;
470  }
471  }
472 
473  return $remaining;
474  }
475 
480  public static function pendingUpdatesCount() {
481  return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
482  }
483 
489  public static function getPendingUpdates( $stage = self::ALL ) {
490  $updates = [];
491  if ( $stage === self::ALL || $stage === self::PRESEND ) {
492  $updates = array_merge( $updates, self::$preSendUpdates );
493  }
494  if ( $stage === self::ALL || $stage === self::POSTSEND ) {
495  $updates = array_merge( $updates, self::$postSendUpdates );
496  }
497  return $updates;
498  }
499 
504  public static function clearPendingUpdates() {
505  self::$preSendUpdates = [];
506  self::$postSendUpdates = [];
507  }
508 
512  private static function areDatabaseTransactionsActive() {
513  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
514  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
515  return true;
516  }
517 
518  $connsBusy = false;
519  $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
520  $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
521  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
522  $connsBusy = true;
523  }
524  } );
525  } );
526 
527  return $connsBusy;
528  }
529 }
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static DeferrableUpdate [] $postSendUpdates
Updates to be deferred until after request end.
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
static clearPendingUpdates()
Clear all pending updates without performing them.
An interface for generating database load balancers.
Definition: LBFactory.php:40
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects. ...
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement to signal that updates can be combined.
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.
static array null $executeContext
Information about the current execute() call or null if not running.
static areDatabaseTransactionsActive()
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run a task and catch/log any exceptions.
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
static pendingUpdatesCount()
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
static getMain()
Get the RequestContext object associated with the main request.
An error page which can definitely be safely rendered using the OutputPage.
Callback wrapper that has an originating method.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
hasTransactionRound()
Check if an explicit transaction round is active.
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
static getPendingUpdates( $stage=self::ALL)
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
An interface for generating database load balancers.
Definition: ILBFactory.php:33
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
global $wgCommandLineMode
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Definition: LBFactory.php:287
static push(array &$queue, DeferrableUpdate $update)
doUpdate()
Perform the actual work.
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a task into the job queue system and catch/log any exceptions.
static singleton( $domain=false)
Database connection, tracking, load balancing, and transaction manager for a cluster.
static DeferrableUpdate [] $preSendUpdates
Updates to be deferred until before request end.