MediaWiki REL1_33
DeferredUpdates.php
Go to the documentation of this file.
1<?php
26
58 private static $preSendUpdates = [];
60 private static $postSendUpdates = [];
61
62 const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
63 const PRESEND = 1; // for updates that should run before flushing output buffer
64 const POSTSEND = 2; // for updates that should run after flushing output buffer
65
66 const BIG_QUEUE_SIZE = 100;
67
69 private static $executeContext;
70
79 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
80 global $wgCommandLineMode;
81
82 if (
83 self::$executeContext &&
84 self::$executeContext['stage'] >= $stage &&
85 !( $update instanceof MergeableUpdate )
86 ) {
87 // This is a sub-DeferredUpdate; run it right after its parent update.
88 // Also, while post-send updates are running, push any "pre-send" jobs to the
89 // active post-send queue to make sure they get run this round (or at all).
90 self::$executeContext['subqueue'][] = $update;
91
92 return;
93 }
94
95 if ( $stage === self::PRESEND ) {
96 self::push( self::$preSendUpdates, $update );
97 } else {
98 self::push( self::$postSendUpdates, $update );
99 }
100
101 // Try to run the updates now if in CLI mode and no transaction is active.
102 // This covers scripts that don't/barely use the DB but make updates to other stores.
103 if ( $wgCommandLineMode ) {
104 self::tryOpportunisticExecute( 'run' );
105 }
106 }
107
118 public static function addCallableUpdate(
119 $callable, $stage = self::POSTSEND, $dbw = null
120 ) {
121 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
122 }
123
133 public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
134 $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
135 // For ALL mode, make sure that any PRESEND updates added along the way get run.
136 // Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
137 do {
138 if ( $stage === self::ALL || $stage === self::PRESEND ) {
139 self::execute( self::$preSendUpdates, $mode, $stageEffective );
140 }
141
142 if ( $stage === self::ALL || $stage == self::POSTSEND ) {
143 self::execute( self::$postSendUpdates, $mode, $stageEffective );
144 }
145 } while ( $stage === self::ALL && self::$preSendUpdates );
146 }
147
152 private static function push( array &$queue, DeferrableUpdate $update ) {
153 if ( $update instanceof MergeableUpdate ) {
154 $class = get_class( $update ); // fully-qualified class
155 if ( isset( $queue[$class] ) ) {
157 $existingUpdate = $queue[$class];
158 $existingUpdate->merge( $update );
159 // Move the update to the end to handle things like mergeable purge
160 // updates that might depend on the prior updates in the queue running
161 unset( $queue[$class] );
162 $queue[$class] = $existingUpdate;
163 } else {
164 $queue[$class] = $update;
165 }
166 } else {
167 $queue[] = $update;
168 }
169 }
170
180 protected static function execute( array &$queue, $mode, $stage ) {
181 $services = MediaWikiServices::getInstance();
182 $stats = $services->getStatsdDataFactory();
183 $lbFactory = $services->getDBLoadBalancerFactory();
184 $method = RequestContext::getMain()->getRequest()->getMethod();
185
186 $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
187
189 $reportableError = null;
191 $updates = $queue;
192
193 // Keep doing rounds of updates until none get enqueued...
194 while ( $updates ) {
195 $queue = []; // clear the queue
196
197 // Order will be DataUpdate followed by generic DeferrableUpdate tasks
198 $updatesByType = [ 'data' => [], 'generic' => [] ];
199 foreach ( $updates as $du ) {
200 if ( $du instanceof DataUpdate ) {
201 $du->setTransactionTicket( $ticket );
202 $updatesByType['data'][] = $du;
203 } else {
204 $updatesByType['generic'][] = $du;
205 }
206
207 $name = ( $du instanceof DeferrableCallback )
208 ? get_class( $du ) . '-' . $du->getOrigin()
209 : get_class( $du );
210 $stats->increment( 'deferred_updates.' . $method . '.' . $name );
211 }
212
213 // Execute all remaining tasks...
214 foreach ( $updatesByType as $updatesForType ) {
215 foreach ( $updatesForType as $update ) {
216 self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
217 try {
219 $guiError = self::runUpdate( $update, $lbFactory, $mode, $stage );
220 $reportableError = $reportableError ?: $guiError;
221 // Do the subqueue updates for $update until there are none
222 while ( self::$executeContext['subqueue'] ) {
223 $subUpdate = reset( self::$executeContext['subqueue'] );
224 $firstKey = key( self::$executeContext['subqueue'] );
225 unset( self::$executeContext['subqueue'][$firstKey] );
226
227 if ( $subUpdate instanceof DataUpdate ) {
228 $subUpdate->setTransactionTicket( $ticket );
229 }
230
231 $guiError = self::runUpdate( $subUpdate, $lbFactory, $mode, $stage );
232 $reportableError = $reportableError ?: $guiError;
233 }
234 } finally {
235 // Make sure we always clean up the context.
236 // Losing updates while rewinding the stack is acceptable,
237 // losing updates that are added later is not.
238 self::$executeContext = null;
239 }
240 }
241 }
242
243 $updates = $queue; // new snapshot of queue (check for new entries)
244 }
245
246 if ( $reportableError ) {
247 throw $reportableError; // throw the first of any GUI errors
248 }
249 }
250
258 private static function runUpdate(
259 DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage
260 ) {
261 $guiError = null;
262 try {
263 if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
264 // Run only the job enqueue logic to complete the update later
265 $spec = $update->getAsJobSpecification();
266 $domain = $spec['domain'] ?? $spec['wiki'];
267 JobQueueGroup::singleton( $domain )->push( $spec['job'] );
268 } elseif ( $update instanceof TransactionRoundDefiningUpdate ) {
269 $update->doUpdate();
270 } else {
271 // Run the bulk of the update now
272 $fnameTrxOwner = get_class( $update ) . '::doUpdate';
273 $lbFactory->beginMasterChanges( $fnameTrxOwner );
274 $update->doUpdate();
275 $lbFactory->commitMasterChanges( $fnameTrxOwner );
276 }
277 } catch ( Exception $e ) {
278 // Reporting GUI exceptions does not work post-send
279 if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) {
280 $guiError = $e;
281 }
282 MWExceptionHandler::rollbackMasterChangesAndLog( $e );
283
284 // VW-style hack to work around T190178, so we can make sure
285 // PageMetaDataUpdater doesn't throw exceptions.
286 if ( defined( 'MW_PHPUNIT_TEST' ) ) {
287 throw $e;
288 }
289 }
290
291 return $guiError;
292 }
293
305 public static function tryOpportunisticExecute( $mode = 'run' ) {
306 // execute() loop is already running
307 if ( self::$executeContext ) {
308 return false;
309 }
310
311 // Avoiding running updates without them having outer scope
312 if ( !self::areDatabaseTransactionsActive() ) {
313 self::doUpdates( $mode );
314 return true;
315 }
316
317 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
318 // If we cannot run the updates with outer transaction context, try to
319 // at least enqueue all the updates that support queueing to job queue
320 self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
321 self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
322 }
323
324 return !self::pendingUpdatesCount();
325 }
326
333 private static function enqueueUpdates( array $updates ) {
334 $remaining = [];
335
336 foreach ( $updates as $update ) {
337 if ( $update instanceof EnqueueableDataUpdate ) {
338 $spec = $update->getAsJobSpecification();
339 $domain = $spec['domain'] ?? $spec['wiki'];
340 JobQueueGroup::singleton( $domain )->push( $spec['job'] );
341 } else {
342 $remaining[] = $update;
343 }
344 }
345
346 return $remaining;
347 }
348
353 public static function pendingUpdatesCount() {
354 return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
355 }
356
362 public static function getPendingUpdates( $stage = self::ALL ) {
363 $updates = [];
364 if ( $stage === self::ALL || $stage === self::PRESEND ) {
365 $updates = array_merge( $updates, self::$preSendUpdates );
366 }
367 if ( $stage === self::ALL || $stage === self::POSTSEND ) {
368 $updates = array_merge( $updates, self::$postSendUpdates );
369 }
370 return $updates;
371 }
372
377 public static function clearPendingUpdates() {
378 self::$preSendUpdates = [];
379 self::$postSendUpdates = [];
380 }
381
385 private static function areDatabaseTransactionsActive() {
386 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
387 if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
388 return true;
389 }
390
391 $connsBusy = false;
392 $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
393 $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
394 if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
395 $connsBusy = true;
396 }
397 } );
398 } );
399
400 return $connsBusy;
401 }
402}
and that you know you can do these things To protect your we need to make restrictions that forbid anyone to deny you these rights or to ask you to surrender the rights These restrictions translate to certain responsibilities for you if you distribute copies of the or if you modify it For if you distribute copies of such a whether gratis or for a you must give the recipients all the rights that you have You must make sure that receive or can get the source code And you must show them these terms so they know their rights We protect your rights with two and(2) offer you this license which gives you legal permission to copy
global $wgCommandLineMode
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
Abstract base class for update jobs that do something with some secondary data extracted from article...
Class for managing the deferred updates.
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static runUpdate(DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage)
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
static pendingUpdatesCount()
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static push(array &$queue, DeferrableUpdate $update)
static clearPendingUpdates()
Clear all pending updates without performing them.
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
static array null $executeContext
Information about the current execute() call or null if not running.
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until before request end.
static execute(array &$queue, $mode, $stage)
Immediately run/queue a list of updates.
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until after request end.
static getPendingUpdates( $stage=self::ALL)
An error page which can definitely be safely rendered using the OutputPage.
Deferrable Update for closure/callback.
MediaWikiServices is the service locator for the application scope of MediaWiki.
Deferrable update that must run outside of any explicit LBFactory transaction round.
An interface for generating database load balancers.
Definition LBFactory.php:39
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
commitMasterChanges( $fname=__METHOD__, array $options=[])
Commit changes and clear view snapshots on all master connections.
Database connection, tracking, load balancing, and transaction manager for a cluster.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
either a unescaped string or a HtmlArmor object after in associative array form externallinks including delete and has completed for all link tables whether this was an auto creation use $formDescriptor instead default is conds Array Extra conditions for the No matching items in log is displayed if loglist is empty msgKey Array If you want a nice box with a set this to the key of the message First element is the message key
Definition hooks.txt:2163
static configuration should be added through ResourceLoaderGetConfigVars instead can be used to get the real title e g db for database replication lag or jobqueue for job queue size converted to pseudo seconds It is possible to add more fields and they will be returned to the user in the API response after the basic globals have been set but before ordinary actions take place or wrap services the preferred way to define a new service is the $wgServiceWiringFiles array $services
Definition hooks.txt:2290
returning false will NOT prevent logging $e
Definition hooks.txt:2175
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement to signal that updates can be combined.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
$batch execute()
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))