MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26 use Psr\Log\LoggerInterface;
32 
64  private static $preSendUpdates = [];
66  private static $postSendUpdates = [];
67 
68  const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
69  const PRESEND = 1; // for updates that should run before flushing output buffer
70  const POSTSEND = 2; // for updates that should run after flushing output buffer
71 
72  const BIG_QUEUE_SIZE = 100;
73 
75  private static $executeContext;
76 
85  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
86  global $wgCommandLineMode;
87 
88  if (
89  self::$executeContext !== null &&
90  // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
91  self::$executeContext['stage'] >= $stage &&
92  !( $update instanceof MergeableUpdate )
93  ) {
94  // This is a sub-DeferredUpdate; run it right after its parent update.
95  // Also, while post-send updates are running, push any "pre-send" jobs to the
96  // active post-send queue to make sure they get run this round (or at all).
97  self::$executeContext['subqueue'][] = $update;
98 
99  return;
100  }
101 
102  if ( $stage === self::PRESEND ) {
103  self::push( self::$preSendUpdates, $update );
104  } else {
105  self::push( self::$postSendUpdates, $update );
106  }
107 
108  // Try to run the updates now if in CLI mode and no transaction is active.
109  // This covers scripts that don't/barely use the DB but make updates to other stores.
110  if ( $wgCommandLineMode ) {
112  }
113  }
114 
125  public static function addCallableUpdate(
126  $callable, $stage = self::POSTSEND, $dbw = null
127  ) {
128  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
129  }
130 
140  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
141  $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
142  // For ALL mode, make sure that any PRESEND updates added along the way get run.
143  // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
144  do {
145  if ( $stage === self::ALL || $stage === self::PRESEND ) {
146  self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
147  }
148 
149  if ( $stage === self::ALL || $stage == self::POSTSEND ) {
150  self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
151  }
152  } while ( $stage === self::ALL && self::$preSendUpdates );
153  }
154 
159  private static function push( array &$queue, DeferrableUpdate $update ) {
160  if ( $update instanceof MergeableUpdate ) {
161  $class = get_class( $update ); // fully-qualified class
162  if ( isset( $queue[$class] ) ) {
164  $existingUpdate = $queue[$class];
165  '@phan-var MergeableUpdate $existingUpdate';
166  $existingUpdate->merge( $update );
167  // Move the update to the end to handle things like mergeable purge
168  // updates that might depend on the prior updates in the queue running
169  unset( $queue[$class] );
170  $queue[$class] = $existingUpdate;
171  } else {
172  $queue[$class] = $update;
173  }
174  } else {
175  $queue[] = $update;
176  }
177  }
178 
191  protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
192  $services = MediaWikiServices::getInstance();
193  $stats = $services->getStatsdDataFactory();
194  $lbf = $services->getDBLoadBalancerFactory();
195  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
196  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
197  ? 'cli'
198  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
199 
201  $guiEx = null;
203  $exception = null;
204 
206  $updates = $queue;
207 
208  // Keep doing rounds of updates until none get enqueued...
209  while ( $updates ) {
210  $queue = []; // clear the queue
211 
212  // Segregate the queue into one for DataUpdate and one for everything else
213  $dataUpdateQueue = [];
214  $genericUpdateQueue = [];
215  foreach ( $updates as $update ) {
216  if ( $update instanceof DataUpdate ) {
217  $dataUpdateQueue[] = $update;
218  } else {
219  $genericUpdateQueue[] = $update;
220  }
221  }
222  // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
223  foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
224  foreach ( $updateQueue as $du ) {
225  // Enqueue the task into the job queue system instead if applicable
226  if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
227  self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
228  continue;
229  }
230  // Otherwise, execute the task and any subtasks that it spawns
231  self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
232  try {
233  $e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
234  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
235  $exception = $exception ?: $e;
236  // Do the subqueue updates for $update until there are none
237  while ( self::$executeContext['subqueue'] ) {
238  $duChild = reset( self::$executeContext['subqueue'] );
239  $firstKey = key( self::$executeContext['subqueue'] );
240  unset( self::$executeContext['subqueue'][$firstKey] );
241 
242  $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
243  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
244  $exception = $exception ?: $e;
245  }
246  } finally {
247  // Make sure we always clean up the context.
248  // Losing updates while rewinding the stack is acceptable,
249  // losing updates that are added later is not.
250  self::$executeContext = null;
251  }
252  }
253  }
254 
255  $updates = $queue; // new snapshot of queue (check for new entries)
256  }
257 
258  // VW-style hack to work around T190178, so we can make sure
259  // PageMetaDataUpdater doesn't throw exceptions.
260  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
261  throw $exception;
262  }
263 
264  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
265  // callers should check permissions *before* enqueueing updates. If the main transaction
266  // round actions succeed but some deferred updates fail due to permissions errors then
267  // there is a risk that some secondary data was not properly updated.
268  if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
269  throw $guiEx;
270  }
271  }
272 
283  private static function run(
284  DeferrableUpdate $update,
285  LBFactory $lbFactory,
286  LoggerInterface $logger,
287  StatsdDataFactoryInterface $stats,
288  $httpMethod
289  ) : ?Throwable {
290  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
291  $type = get_class( $update ) . $suffix;
292  $stats->increment( "deferred_updates.$httpMethod.$type" );
293 
294  $e = null;
295  try {
296  self::attemptUpdate( $update, $lbFactory );
297 
298  return null;
299  } catch ( Throwable $e ) {
300  }
301 
303  $logger->error(
304  "Deferred update '$type' failed to run.",
305  [
306  'deferred_type' => $type,
307  'exception' => $e,
308  ]
309  );
310 
311  $lbFactory->rollbackMasterChanges( __METHOD__ );
312 
313  // Try to push the update as a job so it can run later if possible
314  if ( $update instanceof EnqueueableDataUpdate ) {
315  $jobEx = null;
316  try {
317  $spec = $update->getAsJobSpecification();
318  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
319 
320  return $e;
321  } catch ( Throwable $jobEx ) {
322  }
323 
325  $logger->error(
326  "Deferred update '$type' failed to enqueue as a job.",
327  [
328  'deferred_type' => $type,
329  'exception' => $jobEx,
330  ]
331  );
332 
333  $lbFactory->rollbackMasterChanges( __METHOD__ );
334  }
335 
336  return $e;
337  }
338 
348  private static function jobify(
349  EnqueueableDataUpdate $update,
350  LBFactory $lbFactory,
351  LoggerInterface $logger,
352  StatsdDataFactoryInterface $stats,
353  $httpMethod
354  ) {
355  $type = get_class( $update );
356  $stats->increment( "deferred_updates.$httpMethod.$type" );
357 
358  $jobEx = null;
359  try {
360  $spec = $update->getAsJobSpecification();
361  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
362 
363  return;
364  } catch ( Throwable $jobEx ) {
365  }
366 
368  $logger->error(
369  "Deferred update '$type' failed to enqueue as a job.",
370  [
371  'deferred_type' => $type,
372  'exception' => $jobEx,
373  ]
374  );
375 
376  $lbFactory->rollbackMasterChanges( __METHOD__ );
377  }
378 
390  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
391  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
392  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
393  throw new DBTransactionError( null, "A database transaction round is pending." );
394  }
395 
396  if ( $update instanceof DataUpdate ) {
397  $update->setTransactionTicket( $ticket );
398  }
399 
400  // Designate $update::doUpdate() as the write round owner
401  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
402  ? $update->getOrigin()
403  : get_class( $update ) . '::doUpdate';
404  // Determine whether the write round will be explicit or implicit
405  $useExplicitTrxRound = !(
406  $update instanceof TransactionRoundAwareUpdate &&
407  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
408  );
409 
410  // Flush any pending changes left over from an implicit transaction round
411  if ( $useExplicitTrxRound ) {
412  $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
413  } else {
414  $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
415  }
416  // Run the update after any stale master view snapshots have been flushed
417  $update->doUpdate();
418  // Commit any pending changes from the explicit or implicit transaction round
419  $lbFactory->commitMasterChanges( $fnameTrxOwner );
420  }
421 
433  public static function tryOpportunisticExecute( $mode = 'run' ) {
434  // execute() loop is already running
435  if ( self::$executeContext ) {
436  return false;
437  }
438 
439  // Avoiding running updates without them having outer scope
440  if ( !self::areDatabaseTransactionsActive() ) {
441  self::doUpdates( $mode );
442  return true;
443  }
444 
445  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
446  // If we cannot run the updates with outer transaction context, try to
447  // at least enqueue all the updates that support queueing to job queue
448  self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
449  self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
450  }
451 
452  return !self::pendingUpdatesCount();
453  }
454 
461  private static function enqueueUpdates( array $updates ) {
462  $remaining = [];
463 
464  foreach ( $updates as $update ) {
465  if ( $update instanceof EnqueueableDataUpdate ) {
466  $spec = $update->getAsJobSpecification();
467  $domain = $spec['domain'] ?? $spec['wiki'];
468  JobQueueGroup::singleton( $domain )->push( $spec['job'] );
469  } else {
470  $remaining[] = $update;
471  }
472  }
473 
474  return $remaining;
475  }
476 
481  public static function pendingUpdatesCount() {
482  return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
483  }
484 
490  public static function getPendingUpdates( $stage = self::ALL ) {
491  $updates = [];
492  if ( $stage === self::ALL || $stage === self::PRESEND ) {
493  $updates = array_merge( $updates, self::$preSendUpdates );
494  }
495  if ( $stage === self::ALL || $stage === self::POSTSEND ) {
496  $updates = array_merge( $updates, self::$postSendUpdates );
497  }
498  return $updates;
499  }
500 
505  public static function clearPendingUpdates() {
506  self::$preSendUpdates = [];
507  self::$postSendUpdates = [];
508  }
509 
513  private static function areDatabaseTransactionsActive() {
514  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
515  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
516  return true;
517  }
518 
519  $connsBusy = false;
520  $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
521  $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
522  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
523  $connsBusy = true;
524  }
525  } );
526  } );
527 
528  return $connsBusy;
529  }
530 }
DeferredUpdates\jobify
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a task into the job queue system and catch/log any exceptions.
Definition: DeferredUpdates.php:348
DeferredUpdates\ALL
const ALL
Definition: DeferredUpdates.php:68
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:137
DeferredUpdates\attemptUpdate
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
Definition: DeferredUpdates.php:390
MergeableUpdate
Interface that deferrable updates can implement to signal that updates can be combined.
Definition: MergeableUpdate.php:18
DeferrableUpdate\doUpdate
doUpdate()
Perform the actual work.
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
Definition: DeferredUpdates.php:85
Wikimedia\Rdbms\LoadBalancer\forEachOpenMasterConnection
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
Definition: LoadBalancer.php:2144
DeferredUpdates\clearPendingUpdates
static clearPendingUpdates()
Clear all pending updates without performing them.
Definition: DeferredUpdates.php:505
DeferredUpdates\$postSendUpdates
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until after request end.
Definition: DeferredUpdates.php:66
DeferrableCallback
Callback wrapper that has an originating method.
Definition: DeferrableCallback.php:8
TransactionRoundAwareUpdate
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Definition: TransactionRoundAwareUpdate.php:9
DataUpdate
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:28
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
EnqueueableDataUpdate
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Definition: EnqueueableDataUpdate.php:10
MWExceptionHandler\logException
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Definition: MWExceptionHandler.php:635
DeferredUpdates\run
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run a task and catch/log any throwables.
Definition: DeferredUpdates.php:283
MediaWiki\Logger\LoggerFactory
PSR-3 logger instance factory.
Definition: LoggerFactory.php:45
$wgCommandLineMode
global $wgCommandLineMode
Definition: DevelopmentSettings.php:28
DeferredUpdates
Class for managing the deferred updates.
Definition: DeferredUpdates.php:62
EnqueueableDataUpdate\getAsJobSpecification
getAsJobSpecification()
DeferredUpdates\handleUpdateQueue
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
Definition: DeferredUpdates.php:191
$queue
$queue
Definition: mergeMessageFileList.php:157
DeferredUpdates\POSTSEND
const POSTSEND
Definition: DeferredUpdates.php:70
Wikimedia\Rdbms\LoadBalancer
Database connection, tracking, load balancing, and transaction manager for a cluster.
Definition: LoadBalancer.php:42
Wikimedia\Rdbms\ILBFactory\hasTransactionRound
hasTransactionRound()
Check if an explicit transaction round is active.
DeferredUpdates\BIG_QUEUE_SIZE
const BIG_QUEUE_SIZE
Definition: DeferredUpdates.php:72
DeferredUpdates\tryOpportunisticExecute
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
Definition: DeferredUpdates.php:433
Wikimedia\Rdbms\IDatabase\explicitTrxActive
explicitTrxActive()
DeferredUpdates\$executeContext
static array null $executeContext
Information about the current execute() call or null if not running.
Definition: DeferredUpdates.php:75
DeferredUpdates\areDatabaseTransactionsActive
static areDatabaseTransactionsActive()
Definition: DeferredUpdates.php:513
RequestContext\getMain
static getMain()
Get the RequestContext object associated with the main request.
Definition: RequestContext.php:451
DeferredUpdates\doUpdates
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
Definition: DeferredUpdates.php:140
DeferredUpdates\pendingUpdatesCount
static pendingUpdatesCount()
Definition: DeferredUpdates.php:481
Wikimedia\Rdbms\ILBFactory\beginMasterChanges
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
MWCallableUpdate
Deferrable Update for closure/callback.
Definition: MWCallableUpdate.php:8
Wikimedia\Rdbms\DBTransactionError
Definition: DBTransactionError.php:27
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:70
Wikimedia\Rdbms\LBFactory\rollbackMasterChanges
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Definition: LBFactory.php:319
DeferredUpdates\PRESEND
const PRESEND
Definition: DeferredUpdates.php:69
DeferredUpdates\getPendingUpdates
static getPendingUpdates( $stage=self::ALL)
Definition: DeferredUpdates.php:490
Wikimedia\Rdbms\LBFactory
An interface for generating database load balancers.
Definition: LBFactory.php:40
Wikimedia\Rdbms\ILBFactory\commitMasterChanges
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
DeferredUpdates\enqueueUpdates
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
Definition: DeferredUpdates.php:461
DeferredUpdates\push
static push(array &$queue, DeferrableUpdate $update)
Definition: DeferredUpdates.php:159
DeferrableUpdate
Interface that deferrable updates should implement.
Definition: DeferrableUpdate.php:9
DeferredUpdates\$preSendUpdates
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until before request end.
Definition: DeferredUpdates.php:64
ErrorPageError
An error page which can definitely be safely rendered using the OutputPage.
Definition: ErrorPageError.php:27
Wikimedia\Rdbms\ILBFactory\getEmptyTransactionTicket
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.
DeferredUpdates\addCallableUpdate
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
Definition: DeferredUpdates.php:125
wfGetCaller
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Definition: GlobalFunctions.php:1391
Wikimedia\Rdbms\ILBFactory
An interface for generating database load balancers.
Definition: ILBFactory.php:33
$type
$type
Definition: testCompression.php:52
Wikimedia\Rdbms\IDatabase\writesOrCallbacksPending
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...