MediaWiki  master
DeferredUpdates.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
26 use Psr\Log\LoggerInterface;
32 
64  private static $preSendUpdates = [];
66  private static $postSendUpdates = [];
67 
68  const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
69  const PRESEND = 1; // for updates that should run before flushing output buffer
70  const POSTSEND = 2; // for updates that should run after flushing output buffer
71 
72  const BIG_QUEUE_SIZE = 100;
73 
75  private static $executeContext;
76 
85  public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
86  global $wgCommandLineMode;
87 
88  if (
89  self::$executeContext !== null &&
90  // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
91  self::$executeContext['stage'] >= $stage &&
92  !( $update instanceof MergeableUpdate )
93  ) {
94  // This is a sub-DeferredUpdate; run it right after its parent update.
95  // Also, while post-send updates are running, push any "pre-send" jobs to the
96  // active post-send queue to make sure they get run this round (or at all).
97  self::$executeContext['subqueue'][] = $update;
98 
99  return;
100  }
101 
102  if ( $stage === self::PRESEND ) {
103  self::push( self::$preSendUpdates, $update );
104  } else {
105  self::push( self::$postSendUpdates, $update );
106  }
107 
108  // Try to run the updates now if in CLI mode and no transaction is active.
109  // This covers scripts that don't/barely use the DB but make updates to other stores.
110  if ( $wgCommandLineMode ) {
112  }
113  }
114 
125  public static function addCallableUpdate(
126  $callable, $stage = self::POSTSEND, $dbw = null
127  ) {
128  self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
129  }
130 
140  public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
141  $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
142  // For ALL mode, make sure that any PRESEND updates added along the way get run.
143  // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
144  do {
145  if ( $stage === self::ALL || $stage === self::PRESEND ) {
146  self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
147  }
148 
149  if ( $stage === self::ALL || $stage == self::POSTSEND ) {
150  self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
151  }
152  } while ( $stage === self::ALL && self::$preSendUpdates );
153  }
154 
159  private static function push( array &$queue, DeferrableUpdate $update ) {
160  if ( $update instanceof MergeableUpdate ) {
161  $class = get_class( $update ); // fully-qualified class
162  if ( isset( $queue[$class] ) ) {
164  $existingUpdate = $queue[$class];
165  '@phan-var MergeableUpdate $existingUpdate';
166  $existingUpdate->merge( $update );
167  // Move the update to the end to handle things like mergeable purge
168  // updates that might depend on the prior updates in the queue running
169  unset( $queue[$class] );
170  $queue[$class] = $existingUpdate;
171  } else {
172  $queue[$class] = $update;
173  }
174  } else {
175  $queue[] = $update;
176  }
177  }
178 
191  protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
192  $services = MediaWikiServices::getInstance();
193  $stats = $services->getStatsdDataFactory();
194  $lbf = $services->getDBLoadBalancerFactory();
195  $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
196  $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' )
197  ? 'cli'
198  : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
199 
201  $guiEx = null;
203  $exception = null;
204 
206  $updates = $queue;
207 
208  // Keep doing rounds of updates until none get enqueued...
209  while ( $updates ) {
210  $queue = []; // clear the queue
211 
212  // Segregate the queue into one for DataUpdate and one for everything else
213  $dataUpdateQueue = [];
214  $genericUpdateQueue = [];
215  foreach ( $updates as $update ) {
216  if ( $update instanceof DataUpdate ) {
217  $dataUpdateQueue[] = $update;
218  } else {
219  $genericUpdateQueue[] = $update;
220  }
221  }
222  // Execute all DataUpdate queue followed by the DeferrableUpdate queue...
223  foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
224  foreach ( $updateQueue as $du ) {
225  // Enqueue the task into the job queue system instead if applicable
226  if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
227  self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
228  continue;
229  }
230  // Otherwise, execute the task and any subtasks that it spawns
231  self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
232  try {
233  $e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
234  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
235  $exception = $exception ?: $e;
236  // Do the subqueue updates for $update until there are none
237  while ( self::$executeContext['subqueue'] ) {
238  $duChild = reset( self::$executeContext['subqueue'] );
239  $firstKey = key( self::$executeContext['subqueue'] );
240  unset( self::$executeContext['subqueue'][$firstKey] );
241 
242  $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
243  $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
244  $exception = $exception ?: $e;
245  }
246  } finally {
247  // Make sure we always clean up the context.
248  // Losing updates while rewinding the stack is acceptable,
249  // losing updates that are added later is not.
250  self::$executeContext = null;
251  }
252  }
253  }
254 
255  $updates = $queue; // new snapshot of queue (check for new entries)
256  }
257 
258  // VW-style hack to work around T190178, so we can make sure
259  // PageMetaDataUpdater doesn't throw exceptions.
260  if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
261  throw $exception;
262  }
263 
264  // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
265  // callers should check permissions *before* enqueueing updates. If the main transaction
266  // round actions succeed but some deferred updates fail due to permissions errors then
267  // there is a risk that some secondary data was not properly updated.
268  if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
269  throw $guiEx;
270  }
271  }
272 
283  private static function run(
284  DeferrableUpdate $update,
285  LBFactory $lbFactory,
286  LoggerInterface $logger,
287  StatsdDataFactoryInterface $stats,
288  $httpMethod
289  ) {
290  $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : '';
291  $type = get_class( $update ) . $suffix;
292  $stats->increment( "deferred_updates.$httpMethod.$type" );
293 
294  $e = null;
295  try {
296  self::attemptUpdate( $update, $lbFactory );
297 
298  return null;
299  } catch ( Exception $e ) {
300  } catch ( Throwable $e ) {
301  }
302 
303  $error = get_class( $e ) . ': ' . $e->getMessage();
304  $logger->error(
305  "Deferred update '$type' failed to run. $error",
306  [
307  'deferred_type' => $type,
308  'exception' => $e,
309  ]
310  );
311 
312  $lbFactory->rollbackMasterChanges( __METHOD__ );
313 
314  // Try to push the update as a job so it can run later if possible
315  if ( $update instanceof EnqueueableDataUpdate ) {
316  $jobEx = null;
317  try {
318  $spec = $update->getAsJobSpecification();
319  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
320 
321  return $e;
322  } catch ( Exception $jobEx ) {
323  } catch ( Throwable $jobEx ) {
324  }
325 
326  $error = get_class( $jobEx ) . ': ' . $jobEx->getMessage();
327  $logger->error(
328  "Job enqueue of deferred update '$type' failed. $error",
329  [
330  'deferred_type' => $type,
331  'exception' => $jobEx,
332  ]
333  );
334 
335  $lbFactory->rollbackMasterChanges( __METHOD__ );
336  }
337 
338  return $e;
339  }
340 
350  private static function jobify(
351  EnqueueableDataUpdate $update,
352  LBFactory $lbFactory,
353  LoggerInterface $logger,
354  StatsdDataFactoryInterface $stats,
355  $httpMethod
356  ) {
357  $type = get_class( $update );
358  $stats->increment( "deferred_updates.$httpMethod.$type" );
359 
360  $e = null;
361  try {
362  $spec = $update->getAsJobSpecification();
363  JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] );
364 
365  return;
366  } catch ( Exception $e ) {
367  } catch ( Throwable $e ) {
368  }
369 
370  $error = get_class( $e ) . ': ' . $e->getMessage();
371  $logger->error(
372  "Job enqueue of deferred update '$type' failed. $error",
373  [
374  'deferred_type' => $type,
375  'exception' => $e,
376  ]
377  );
378 
379  $lbFactory->rollbackMasterChanges( __METHOD__ );
380  }
381 
393  public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
394  $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
395  if ( !$ticket || $lbFactory->hasTransactionRound() ) {
396  throw new DBTransactionError( null, "A database transaction round is pending." );
397  }
398 
399  if ( $update instanceof DataUpdate ) {
400  $update->setTransactionTicket( $ticket );
401  }
402 
403  // Designate $update::doUpdate() as the write round owner
404  $fnameTrxOwner = ( $update instanceof DeferrableCallback )
405  ? $update->getOrigin()
406  : get_class( $update ) . '::doUpdate';
407  // Determine whether the write round will be explicit or implicit
408  $useExplicitTrxRound = !(
409  $update instanceof TransactionRoundAwareUpdate &&
410  $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
411  );
412 
413  // Flush any pending changes left over from an implicit transaction round
414  if ( $useExplicitTrxRound ) {
415  $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
416  } else {
417  $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
418  }
419  // Run the update after any stale master view snapshots have been flushed
420  $update->doUpdate();
421  // Commit any pending changes from the explicit or implicit transaction round
422  $lbFactory->commitMasterChanges( $fnameTrxOwner );
423  }
424 
436  public static function tryOpportunisticExecute( $mode = 'run' ) {
437  // execute() loop is already running
438  if ( self::$executeContext ) {
439  return false;
440  }
441 
442  // Avoiding running updates without them having outer scope
443  if ( !self::areDatabaseTransactionsActive() ) {
444  self::doUpdates( $mode );
445  return true;
446  }
447 
448  if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
449  // If we cannot run the updates with outer transaction context, try to
450  // at least enqueue all the updates that support queueing to job queue
451  self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
452  self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
453  }
454 
455  return !self::pendingUpdatesCount();
456  }
457 
464  private static function enqueueUpdates( array $updates ) {
465  $remaining = [];
466 
467  foreach ( $updates as $update ) {
468  if ( $update instanceof EnqueueableDataUpdate ) {
469  $spec = $update->getAsJobSpecification();
470  $domain = $spec['domain'] ?? $spec['wiki'];
471  JobQueueGroup::singleton( $domain )->push( $spec['job'] );
472  } else {
473  $remaining[] = $update;
474  }
475  }
476 
477  return $remaining;
478  }
479 
484  public static function pendingUpdatesCount() {
485  return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
486  }
487 
493  public static function getPendingUpdates( $stage = self::ALL ) {
494  $updates = [];
495  if ( $stage === self::ALL || $stage === self::PRESEND ) {
496  $updates = array_merge( $updates, self::$preSendUpdates );
497  }
498  if ( $stage === self::ALL || $stage === self::POSTSEND ) {
499  $updates = array_merge( $updates, self::$postSendUpdates );
500  }
501  return $updates;
502  }
503 
508  public static function clearPendingUpdates() {
509  self::$preSendUpdates = [];
510  self::$postSendUpdates = [];
511  }
512 
516  private static function areDatabaseTransactionsActive() {
517  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
518  if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
519  return true;
520  }
521 
522  $connsBusy = false;
523  $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
524  $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
525  if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
526  $connsBusy = true;
527  }
528  } );
529  } );
530 
531  return $connsBusy;
532  }
533 }
DeferredUpdates\jobify
static jobify(EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Push a task into the job queue system and catch/log any exceptions.
Definition: DeferredUpdates.php:350
DeferredUpdates\ALL
const ALL
Definition: DeferredUpdates.php:68
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:130
DeferredUpdates\attemptUpdate
static attemptUpdate(DeferrableUpdate $update, ILBFactory $lbFactory)
Attempt to run an update with the appropriate transaction round state it expects.
Definition: DeferredUpdates.php:393
MergeableUpdate
Interface that deferrable updates can implement to signal that updates can be combined.
Definition: MergeableUpdate.php:18
DeferrableUpdate\doUpdate
doUpdate()
Perform the actual work.
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
Definition: DeferredUpdates.php:85
Wikimedia\Rdbms\LoadBalancer\forEachOpenMasterConnection
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
Definition: LoadBalancer.php:2139
DeferredUpdates\clearPendingUpdates
static clearPendingUpdates()
Clear all pending updates without performing them.
Definition: DeferredUpdates.php:508
DeferredUpdates\$postSendUpdates
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until after request end.
Definition: DeferredUpdates.php:66
DeferrableCallback
Callback wrapper that has an originating method.
Definition: DeferrableCallback.php:8
TransactionRoundAwareUpdate
Deferrable update that specifies whether it must run outside of any explicit LBFactory transaction ro...
Definition: TransactionRoundAwareUpdate.php:9
DataUpdate
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:28
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
EnqueueableDataUpdate
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Definition: EnqueueableDataUpdate.php:10
DeferredUpdates\run
static run(DeferrableUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod)
Run a task and catch/log any exceptions.
Definition: DeferredUpdates.php:283
MediaWiki\Logger\LoggerFactory
PSR-3 logger instance factory.
Definition: LoggerFactory.php:45
$wgCommandLineMode
global $wgCommandLineMode
Definition: DevelopmentSettings.php:28
DeferredUpdates
Class for managing the deferred updates.
Definition: DeferredUpdates.php:62
EnqueueableDataUpdate\getAsJobSpecification
getAsJobSpecification()
DeferredUpdates\handleUpdateQueue
static handleUpdateQueue(array &$queue, $mode, $stage)
Immediately run or enqueue a list of updates.
Definition: DeferredUpdates.php:191
$queue
$queue
Definition: mergeMessageFileList.php:157
DeferredUpdates\POSTSEND
const POSTSEND
Definition: DeferredUpdates.php:70
Wikimedia\Rdbms\LoadBalancer
Database connection, tracking, load balancing, and transaction manager for a cluster.
Definition: LoadBalancer.php:42
Wikimedia\Rdbms\ILBFactory\hasTransactionRound
hasTransactionRound()
Check if an explicit transaction round is active.
DeferredUpdates\BIG_QUEUE_SIZE
const BIG_QUEUE_SIZE
Definition: DeferredUpdates.php:72
DeferredUpdates\tryOpportunisticExecute
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
Definition: DeferredUpdates.php:436
Wikimedia\Rdbms\IDatabase\explicitTrxActive
explicitTrxActive()
DeferredUpdates\$executeContext
static array null $executeContext
Information about the current execute() call or null if not running.
Definition: DeferredUpdates.php:75
DeferredUpdates\areDatabaseTransactionsActive
static areDatabaseTransactionsActive()
Definition: DeferredUpdates.php:516
RequestContext\getMain
static getMain()
Get the RequestContext object associated with the main request.
Definition: RequestContext.php:447
DeferredUpdates\doUpdates
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
Definition: DeferredUpdates.php:140
DeferredUpdates\pendingUpdatesCount
static pendingUpdatesCount()
Definition: DeferredUpdates.php:484
Wikimedia\Rdbms\ILBFactory\beginMasterChanges
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
MWCallableUpdate
Deferrable Update for closure/callback.
Definition: MWCallableUpdate.php:8
Wikimedia\Rdbms\DBTransactionError
Definition: DBTransactionError.php:27
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:70
Wikimedia\Rdbms\LBFactory\rollbackMasterChanges
rollbackMasterChanges( $fname=__METHOD__)
Rollback changes on all master connections.
Definition: LBFactory.php:287
DeferredUpdates\PRESEND
const PRESEND
Definition: DeferredUpdates.php:69
DeferredUpdates\getPendingUpdates
static getPendingUpdates( $stage=self::ALL)
Definition: DeferredUpdates.php:493
Wikimedia\Rdbms\LBFactory
An interface for generating database load balancers.
Definition: LBFactory.php:40
Wikimedia\Rdbms\ILBFactory\commitMasterChanges
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
DeferredUpdates\enqueueUpdates
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
Definition: DeferredUpdates.php:464
DeferredUpdates\push
static push(array &$queue, DeferrableUpdate $update)
Definition: DeferredUpdates.php:159
DeferrableUpdate
Interface that deferrable updates should implement.
Definition: DeferrableUpdate.php:9
DeferredUpdates\$preSendUpdates
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until before request end.
Definition: DeferredUpdates.php:64
ErrorPageError
An error page which can definitely be safely rendered using the OutputPage.
Definition: ErrorPageError.php:27
Wikimedia\Rdbms\ILBFactory\getEmptyTransactionTicket
getEmptyTransactionTicket( $fname)
Get a token asserting that no transaction writes are active.
DeferredUpdates\addCallableUpdate
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
Definition: DeferredUpdates.php:125
wfGetCaller
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Definition: GlobalFunctions.php:1453
Wikimedia\Rdbms\ILBFactory
An interface for generating database load balancers.
Definition: ILBFactory.php:33
$type
$type
Definition: testCompression.php:50
Wikimedia\Rdbms\IDatabase\writesOrCallbacksPending
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...