MediaWiki REL1_31
DeferredUpdates.php
Go to the documentation of this file.
1<?php
26
55 private static $preSendUpdates = [];
57 private static $postSendUpdates = [];
58
59 const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
60 const PRESEND = 1; // for updates that should run before flushing output buffer
61 const POSTSEND = 2; // for updates that should run after flushing output buffer
62
63 const BIG_QUEUE_SIZE = 100;
64
66 private static $executeContext;
67
76 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
77 global $wgCommandLineMode;
78
79 if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) {
80 // This is a sub-DeferredUpdate; run it right after its parent update.
81 // Also, while post-send updates are running, push any "pre-send" jobs to the
82 // active post-send queue to make sure they get run this round (or at all).
83 self::$executeContext['subqueue'][] = $update;
84
85 return;
86 }
87
88 if ( $stage === self::PRESEND ) {
89 self::push( self::$preSendUpdates, $update );
90 } else {
91 self::push( self::$postSendUpdates, $update );
92 }
93
94 // Try to run the updates now if in CLI mode and no transaction is active.
95 // This covers scripts that don't/barely use the DB but make updates to other stores.
96 if ( $wgCommandLineMode ) {
97 self::tryOpportunisticExecute( 'run' );
98 }
99 }
100
111 public static function addCallableUpdate(
112 $callable, $stage = self::POSTSEND, $dbw = null
113 ) {
114 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
115 }
116
123 public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
124 $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
125
126 if ( $stage === self::ALL || $stage === self::PRESEND ) {
127 self::execute( self::$preSendUpdates, $mode, $stageEffective );
128 }
129
130 if ( $stage === self::ALL || $stage == self::POSTSEND ) {
131 self::execute( self::$postSendUpdates, $mode, $stageEffective );
132 }
133 }
134
140 public static function setImmediateMode( $value ) {
141 wfDeprecated( __METHOD__, '1.29' );
142 }
143
148 private static function push( array &$queue, DeferrableUpdate $update ) {
149 if ( $update instanceof MergeableUpdate ) {
150 $class = get_class( $update ); // fully-qualified class
151 if ( isset( $queue[$class] ) ) {
153 $existingUpdate = $queue[$class];
154 $existingUpdate->merge( $update );
155 } else {
156 $queue[$class] = $update;
157 }
158 } else {
159 $queue[] = $update;
160 }
161 }
162
172 protected static function execute( array &$queue, $mode, $stage ) {
173 $services = MediaWikiServices::getInstance();
174 $stats = $services->getStatsdDataFactory();
175 $lbFactory = $services->getDBLoadBalancerFactory();
176 $method = RequestContext::getMain()->getRequest()->getMethod();
177
178 $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
179
181 $reportableError = null;
183 $updates = $queue;
184
185 // Keep doing rounds of updates until none get enqueued...
186 while ( $updates ) {
187 $queue = []; // clear the queue
188
189 // Order will be DataUpdate followed by generic DeferrableUpdate tasks
190 $updatesByType = [ 'data' => [], 'generic' => [] ];
191 foreach ( $updates as $du ) {
192 if ( $du instanceof DataUpdate ) {
193 $du->setTransactionTicket( $ticket );
194 $updatesByType['data'][] = $du;
195 } else {
196 $updatesByType['generic'][] = $du;
197 }
198
199 $name = ( $du instanceof DeferrableCallback )
200 ? get_class( $du ) . '-' . $du->getOrigin()
201 : get_class( $du );
202 $stats->increment( 'deferred_updates.' . $method . '.' . $name );
203 }
204
205 // Execute all remaining tasks...
206 foreach ( $updatesByType as $updatesForType ) {
207 foreach ( $updatesForType as $update ) {
208 self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
210 $guiError = self::runUpdate( $update, $lbFactory, $mode, $stage );
211 $reportableError = $reportableError ?: $guiError;
212 // Do the subqueue updates for $update until there are none
213 while ( self::$executeContext['subqueue'] ) {
214 $subUpdate = reset( self::$executeContext['subqueue'] );
215 $firstKey = key( self::$executeContext['subqueue'] );
216 unset( self::$executeContext['subqueue'][$firstKey] );
217
218 if ( $subUpdate instanceof DataUpdate ) {
219 $subUpdate->setTransactionTicket( $ticket );
220 }
221
222 $guiError = self::runUpdate( $subUpdate, $lbFactory, $mode, $stage );
223 $reportableError = $reportableError ?: $guiError;
224 }
225 self::$executeContext = null;
226 }
227 }
228
229 $updates = $queue; // new snapshot of queue (check for new entries)
230 }
231
232 if ( $reportableError ) {
233 throw $reportableError; // throw the first of any GUI errors
234 }
235 }
236
244 private static function runUpdate(
245 DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage
246 ) {
247 $guiError = null;
248 try {
249 if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
250 // Run only the job enqueue logic to complete the update later
251 $spec = $update->getAsJobSpecification();
252 JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
253 } elseif ( $update instanceof TransactionRoundDefiningUpdate ) {
254 $update->doUpdate();
255 } else {
256 // Run the bulk of the update now
257 $fnameTrxOwner = get_class( $update ) . '::doUpdate';
258 $lbFactory->beginMasterChanges( $fnameTrxOwner );
259 $update->doUpdate();
260 $lbFactory->commitMasterChanges( $fnameTrxOwner );
261 }
262 } catch ( Exception $e ) {
263 // Reporting GUI exceptions does not work post-send
264 if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) {
265 $guiError = $e;
266 }
267 MWExceptionHandler::rollbackMasterChangesAndLog( $e );
268 }
269
270 return $guiError;
271 }
272
283 public static function tryOpportunisticExecute( $mode = 'run' ) {
284 // execute() loop is already running
285 if ( self::$executeContext ) {
286 return false;
287 }
288
289 // Avoiding running updates without them having outer scope
290 if ( !self::areDatabaseTransactionsActive() ) {
291 self::doUpdates( $mode );
292 return true;
293 }
294
295 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
296 // If we cannot run the updates with outer transaction context, try to
297 // at least enqueue all the updates that support queueing to job queue
298 self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
299 self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
300 }
301
302 return !self::pendingUpdatesCount();
303 }
304
311 private static function enqueueUpdates( array $updates ) {
312 $remaining = [];
313
314 foreach ( $updates as $update ) {
315 if ( $update instanceof EnqueueableDataUpdate ) {
316 $spec = $update->getAsJobSpecification();
317 JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
318 } else {
319 $remaining[] = $update;
320 }
321 }
322
323 return $remaining;
324 }
325
330 public static function pendingUpdatesCount() {
331 return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
332 }
333
339 public static function getPendingUpdates( $stage = self::ALL ) {
340 $updates = [];
341 if ( $stage === self::ALL || $stage === self::PRESEND ) {
342 $updates = array_merge( $updates, self::$preSendUpdates );
343 }
344 if ( $stage === self::ALL || $stage === self::POSTSEND ) {
345 $updates = array_merge( $updates, self::$postSendUpdates );
346 }
347 return $updates;
348 }
349
354 public static function clearPendingUpdates() {
355 self::$preSendUpdates = [];
356 self::$postSendUpdates = [];
357 }
358
362 private static function areDatabaseTransactionsActive() {
363 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
364 if ( $lbFactory->hasTransactionRound() ) {
365 return true;
366 }
367
368 $connsBusy = false;
369 $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
370 $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
371 if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
372 $connsBusy = true;
373 }
374 } );
375 } );
376
377 return $connsBusy;
378 }
379}
global $wgCommandLineMode
wfGetCaller( $level=2)
Get the name of the function which called this function wfGetCaller( 1 ) is the function with the wfG...
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Throws a warning that $function is deprecated.
Abstract base class for update jobs that do something with some secondary data extracted from article...
Class for managing the deferred updates.
static enqueueUpdates(array $updates)
Enqueue a job for each EnqueueableDataUpdate item and return the other items.
static runUpdate(DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage)
static doUpdates( $mode='run', $stage=self::ALL)
Do any deferred updates and clear the list.
static areDatabaseTransactionsActive()
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
static pendingUpdatesCount()
static tryOpportunisticExecute( $mode='run')
Run all deferred updates immediately if there are no DB writes active.
static push(array &$queue, DeferrableUpdate $update)
static clearPendingUpdates()
Clear all pending updates without performing them.
static setImmediateMode( $value)
static addCallableUpdate( $callable, $stage=self::POSTSEND, $dbw=null)
Add a callable update.
static array null $executeContext
Information about the current execute() call or null if not running.
static DeferrableUpdate[] $preSendUpdates
Updates to be deferred until before request end.
static execute(array &$queue, $mode, $stage)
Immediately run/queue a list of updates.
static DeferrableUpdate[] $postSendUpdates
Updates to be deferred until after request end.
static getPendingUpdates( $stage=self::ALL)
An error page which can definitely be safely rendered using the OutputPage.
static singleton( $domain=false)
Deferrable Update for closure/callback.
MediaWikiServices is the service locator for the application scope of MediaWiki.
static getMain()
Get the RequestContext object associated with the main request.
Deferrable update that must run outside of any explicit LBFactory transaction round.
An interface for generating database load balancers.
Definition LBFactory.php:39
Database connection, tracking, load balancing, and transaction manager for a cluster.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any and then calling but I prefer the flexibility This should also do the output encoding The system allocates a global one in $wgOut Title Represents the title of an and does all the work of translating among various forms such as plain database key
Definition design.txt:26
static configuration should be added through ResourceLoaderGetConfigVars instead can be used to get the real title after the basic globals have been set but before ordinary actions take place or wrap services the preferred way to define a new service is the $wgServiceWiringFiles array $services
Definition hooks.txt:2273
returning false will NOT prevent logging $e
Definition hooks.txt:2176
Callback wrapper that has an originating method.
Interface that deferrable updates should implement.
Interface that marks a DataUpdate as enqueuable via the JobQueue.
Interface that deferrable updates can implement.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
writesOrCallbacksPending()
Returns true if there is a transaction/round open with possible write queries or transaction pre-comm...
$batch execute()