MediaWiki REL1_30
DeferredUpdates.php
Go to the documentation of this file.
1<?php
26
55 private static $preSendUpdates = [];
57 private static $postSendUpdates = [];
58
59 const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
60 const PRESEND = 1; // for updates that should run before flushing output buffer
61 const POSTSEND = 2; // for updates that should run after flushing output buffer
62
63 const BIG_QUEUE_SIZE = 100;
64
66 private static $executeContext;
67
76 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
77 global $wgCommandLineMode;
78
79 if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) {
80 // This is a sub-DeferredUpdate; run it right after its parent update.
81 // Also, while post-send updates are running, push any "pre-send" jobs to the
82 // active post-send queue to make sure they get run this round (or at all).
83 self::$executeContext['subqueue'][] = $update;
84
85 return;
86 }
87
88 if ( $stage === self::PRESEND ) {
89 self::push( self::$preSendUpdates, $update );
90 } else {
91 self::push( self::$postSendUpdates, $update );
92 }
93
94 // Try to run the updates now if in CLI mode and no transaction is active.
95 // This covers scripts that don't/barely use the DB but make updates to other stores.
96 if ( $wgCommandLineMode ) {
97 self::tryOpportunisticExecute( 'run' );
98 }
99 }
100
111 public static function addCallableUpdate(
112 $callable, $stage = self::POSTSEND, IDatabase $dbw = null
113 ) {
114 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
115 }
116
123 public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
124 $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
125
126 if ( $stage === self::ALL || $stage === self::PRESEND ) {
127 self::execute( self::$preSendUpdates, $mode, $stageEffective );
128 }
129
130 if ( $stage === self::ALL || $stage == self::POSTSEND ) {
131 self::execute( self::$postSendUpdates, $mode, $stageEffective );
132 }
133 }
134
140 public static function setImmediateMode( $value ) {
141 wfDeprecated( __METHOD__, '1.29' );
142 }
143
148 private static function push( array &$queue, DeferrableUpdate $update ) {
149 if ( $update instanceof MergeableUpdate ) {
150 $class = get_class( $update ); // fully-qualified class
151 if ( isset( $queue[$class] ) ) {
153 $existingUpdate = $queue[$class];
154 $existingUpdate->merge( $update );
155 } else {
156 $queue[$class] = $update;
157 }
158 } else {
159 $queue[] = $update;
160 }
161 }
162
172 protected static function execute( array &$queue, $mode, $stage ) {
173 $services = MediaWikiServices::getInstance();
174 $stats = $services->getStatsdDataFactory();
175 $lbFactory = $services->getDBLoadBalancerFactory();
176 $method = RequestContext::getMain()->getRequest()->getMethod();
177
178 $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
179
181 $reportableError = null;
183 $updates = $queue;
184
185 // Keep doing rounds of updates until none get enqueued...
186 while ( $updates ) {
187 $queue = []; // clear the queue
188
189 // Order will be DataUpdate followed by generic DeferrableUpdate tasks
190 $updatesByType = [ 'data' => [], 'generic' => [] ];
191 foreach ( $updates as $du ) {
192 if ( $du instanceof DataUpdate ) {
193 $du->setTransactionTicket( $ticket );
194 $updatesByType['data'][] = $du;
195 } else {
196 $updatesByType['generic'][] = $du;
197 }
198
199 $name = ( $du instanceof DeferrableCallback )
200 ? get_class( $du ) . '-' . $du->getOrigin()
201 : get_class( $du );
202 $stats->increment( 'deferred_updates.' . $method . '.' . $name );
203 }
204
205 // Execute all remaining tasks...
206 foreach ( $updatesByType as $updatesForType ) {
207 foreach ( $updatesForType as $update ) {
208 self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
210 $guiError = self::runUpdate( $update, $lbFactory, $mode, $stage );
211 $reportableError = $reportableError ?: $guiError;
212 // Do the subqueue updates for $update until there are none
213 while ( self::$executeContext['subqueue'] ) {
214 $subUpdate = reset( self::$executeContext['subqueue'] );
215 $firstKey = key( self::$executeContext['subqueue'] );
216 unset( self::$executeContext['subqueue'][$firstKey] );
217
218 if ( $subUpdate instanceof DataUpdate ) {
219 $subUpdate->setTransactionTicket( $ticket );
220 }
221
222 $guiError = self::runUpdate( $subUpdate, $lbFactory, $mode, $stage );
223 $reportableError = $reportableError ?: $guiError;
224 }
225 self::$executeContext = null;
226 }
227 }
228
229 $updates = $queue; // new snapshot of queue (check for new entries)
230 }
231
232 if ( $reportableError ) {
233 throw $reportableError; // throw the first of any GUI errors
234 }
235 }
236
244 private static function runUpdate(
245 DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage
246 ) {
247 $guiError = null;
248 try {
249 if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
250 // Run only the job enqueue logic to complete the update later
251 $spec = $update->getAsJobSpecification();
252 JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
253 } else {
254 // Run the bulk of the update now
255 $fnameTrxOwner = get_class( $update ) . '::doUpdate';
256 $lbFactory->beginMasterChanges( $fnameTrxOwner );
257 $update->doUpdate();
258 $lbFactory->commitMasterChanges( $fnameTrxOwner );
259 }
260 } catch ( Exception $e ) {
261 // Reporting GUI exceptions does not work post-send
262 if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) {
263 $guiError = $e;
264 }
265 MWExceptionHandler::rollbackMasterChangesAndLog( $e );
266 }
267
268 return $guiError;
269 }
270
281 public static function tryOpportunisticExecute( $mode = 'run' ) {
282 // execute() loop is already running
283 if ( self::$executeContext ) {
284 return false;
285 }
286
287 // Avoiding running updates without them having outer scope
288 if ( !self::areDatabaseTransactionsActive() ) {
289 self::doUpdates( $mode );
290 return true;
291 }
292
293 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
294 // If we cannot run the updates with outer transaction context, try to
295 // at least enqueue all the updates that support queueing to job queue
296 self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
297 self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
298 }
299
300 return !self::pendingUpdatesCount();
301 }
302
309 private static function enqueueUpdates( array $updates ) {
310 $remaining = [];
311
312 foreach ( $updates as $update ) {
313 if ( $update instanceof EnqueueableDataUpdate ) {
314 $spec = $update->getAsJobSpecification();
315 JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
316 } else {
317 $remaining[] = $update;
318 }
319 }
320
321 return $remaining;
322 }
323
328 public static function pendingUpdatesCount() {
329 return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
330 }
331
337 public static function getPendingUpdates( $stage = self::ALL ) {
338 $updates = [];
339 if ( $stage === self::ALL || $stage === self::PRESEND ) {
340 $updates = array_merge( $updates, self::$preSendUpdates );
341 }
342 if ( $stage === self::ALL || $stage === self::POSTSEND ) {
343 $updates = array_merge( $updates, self::$postSendUpdates );
344 }
345 return $updates;
346 }
347
352 public static function clearPendingUpdates() {
353 self::$preSendUpdates = [];
354 self::$postSendUpdates = [];
355 }
356
360 private static function areDatabaseTransactionsActive() {
361 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
362 if ( $lbFactory->hasTransactionRound() ) {
363 return true;
364 }
365
366 $connsBusy = false;
367 $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
368 $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
369 if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
370 $connsBusy = true;
371 }
372 } );
373 } );
374
375 return $connsBusy;
376 }
377}
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Throws a warning that $function is deprecated.
global $wgCommandLineMode
Definition Setup.php:526
Abstract base class for update jobs that do something with some secondary data extracted from article...
Class for managing the deferred updates.
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static runUpdate(DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage)
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
static addCallableUpdate( $callable, $stage=self::POSTSEND, IDatabase $dbw=null)
Add a callable update.
static pendingUpdatesCount()
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static push(array &$queue, DeferrableUpdate $update)
static clearPendingUpdates()
Clear all pending updates without performing them.
static setImmediateMode( $value)
static array null $executeContext
Information about the current execute() call or null if not running.
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until before request end.
static execute(array &$queue, $mode, $stage)
Immediately run/queue a list of updates.
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until after request end.
static getPendingUpdates( $stage=self::ALL)
An error page which can definitely be safely rendered using the OutputPage.
static singleton( $wiki=false)
Deferrable Update for closure/callback.
MediaWikiServices is the service locator for the application scope of MediaWiki.
static getMain()
Static methods.
An interface for generating database load balancers.
Definition LBFactory.php:38
Database connection, tracking, load balancing, and transaction manager for a cluster.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any and then calling but I prefer the flexibility This should also do the output encoding The system allocates a global one in $wgOut Title Represents the title of an and does all the work of translating among various forms such as plain database key
Definition design.txt:26
static configuration should be added through ResourceLoaderGetConfigVars instead can be used to get the real title after the basic globals have been set but before ordinary actions take place or wrap services the preferred way to define a new service is the $wgServiceWiringFiles array $services
Definition hooks.txt:2243
returning false will NOT prevent logging $e
Definition hooks.txt:2146
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:40
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
$batch execute()