Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
67.00% covered (warning)
67.00%
67 / 100
76.92% covered (warning)
76.92%
10 / 13
CRAP
0.00% covered (danger)
0.00%
0 / 1
DeferredUpdates
67.68% covered (warning)
67.68%
67 / 99
76.92% covered (warning)
76.92%
10 / 13
79.77
0.00% covered (danger)
0.00%
0 / 1
 getScopeStack
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 setScopeStack
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 addUpdate
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 addCallableUpdate
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 run
41.67% covered (danger)
41.67%
15 / 36
0.00% covered (danger)
0.00%
0 / 1
13.15
 doUpdates
78.38% covered (warning)
78.38%
29 / 37
0.00% covered (danger)
0.00%
0 / 1
17.27
 tryOpportunisticExecute
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
4
 preventOpportunisticUpdates
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 pendingUpdatesCount
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getPendingUpdates
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 clearPendingUpdates
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getRecursiveExecutionStackDepth
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 attemptUpdate
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2/**
3 * @license GPL-2.0-or-later
4 * @file
5 */
6
7namespace MediaWiki\Deferred;
8
9use LogicException;
10use MediaWiki\Exception\ErrorPageError;
11use MediaWiki\Exception\MWExceptionHandler;
12use MediaWiki\Logger\LoggerFactory;
13use Throwable;
14use Wikimedia\Rdbms\IDatabase;
15use Wikimedia\ScopedCallback;
16
17/**
18 * Defer callable updates to run later in the PHP process
19 *
20 * This is a performance feature that enables MediaWiki to produce faster web responses.
21 * It allows you to postpone non-blocking work (e.g. work that does not change the web
22 * response) to after the HTTP response has been sent to the client (i.e. web browser).
23 *
24 * Once the response is finalized and sent to the browser, the webserver process stays
25 * for a little while longer (detached from the web request) to run your POSTSEND tasks.
26 *
27 * There is also a PRESEND option, which runs your task right before the finalized response
28 * is sent to the browser. This is for critical tasks that does need to block the response,
29 * but where you'd like to benefit from other DeferredUpdates features. Such as:
30 *
31 * - MergeableUpdate: batch updates from different components without coupling
32 *   or awareness of each other.
33 * - Automatic cancellation: pass a IDatabase object (for any wiki or database) to
34 *   DeferredUpdates::addCallableUpdate or AtomicSectionUpdate.
35 * - Reducing lock contention: if the response is likely to take several seconds
36 *   (e.g. uploading a large file to FileBackend, or saving an edit to a large article)
37 *   much of that work may overlap with a database transaction that is staying open for
38 *   the entire duration. By moving contentious writes out to a PRESEND update, these
39 *   get their own transaction (after the main one is committed), which give up some
40 *   atomicity for improved throughput.
41 *
42 * ## Expectation and comparison to job queue
43 *
44 * When scheduling a POSTSEND via the DeferredUpdates system you can generally expect
45 * it to complete well before the client makes their next request. Updates runs directly after
46 * the web response is sent, from the same process on the same server. This unlike the JobQueue,
47 * where jobs may need to wait in line for some minutes or hours.
48 *
49 * If your update fails, this failure is not known to the client and gets no retry. For updates
50 * that need re-tries for system consistency or data integrity, it is recommended to implement
51 * it as a job instead and use JobQueueGroup::lazyPush. This has the caveat of being delayed
52 * by default, the same as any other job.
53 *
54 * A hybrid solution is available via the EnqueueableDataUpdate interface. By implementing
55 * this interface, you can queue your update via the DeferredUpdates first, and if it fails,
56 * the system will automatically catch this and queue it as a job instead.
57 *
58 * ## How it works during web requests
59 *
60 * 1. Your request route is executed (e.g. Action or SpecialPage class, or API).
61 * 2. Output is finalized and main database transaction is committed.
62 * 3. PRESEND updates run via DeferredUpdates::doUpdates.
63 * 5. The web response is sent to the browser.
64 * 6. POSTSEND updates run via DeferredUpdates::doUpdates.
65 *
66 * @see MediaWiki::preOutputCommit
67 * @see MediaWiki::restInPeace
68 *
69 * ## How it works for Maintenance scripts
70 *
71 * In CLI mode, no distinction is made between PRESEND and POSTSEND deferred updates,
72 * and the queue is periodically executed throughout the process.
73 *
74 * @see DeferredUpdates::tryOpportunisticExecute
75 *
76 * ## How it works internally
77 *
78 * Each update is added via DeferredUpdates::addUpdate and stored in either the PRESEND or
79 * POSTSEND queue. If an update gets queued while another update is already running, then
80 * we store in a "sub"-queue associated with the current update. This allows nested updates
81 * to be completed before other updates, which improves ordering for process caching.
82 *
83 * @since 1.19
84 */
85class DeferredUpdates {
86    /** @var int Process all updates; in web requests, use only after flushing output buffer */
87    public const ALL = 0;
88    /** @var int Specify/process updates that should run before flushing output buffer */
89    public const PRESEND = 1;
90    /** @var int Specify/process updates that should run after flushing output buffer */
91    public const POSTSEND = 2;
92
93    /** @var int[] List of "defer until" queue stages that can be reached */
94    public const STAGES = [ self::PRESEND, self::POSTSEND ];
95
96    /** @var DeferredUpdatesScopeStack|null Queue states based on recursion level */
97    private static $scopeStack;
98
99    /**
100     * @var int Nesting level for preventOpportunisticUpdates()
101     */
102    private static $preventOpportunisticUpdates = 0;
103
104    private static function getScopeStack(): DeferredUpdatesScopeStack {
105        self::$scopeStack ??= new DeferredUpdatesScopeMediaWikiStack();
106        return self::$scopeStack;
107    }
108
109    /**
110     * @param DeferredUpdatesScopeStack $scopeStack
111     * @internal Only for use in tests.
112     */
113    public static function setScopeStack( DeferredUpdatesScopeStack $scopeStack ): void {
114        if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
115            throw new LogicException( 'Cannot reconfigure DeferredUpdates outside tests' );
116        }
117        self::$scopeStack = $scopeStack;
118    }
119
120    /**
121     * Add an update to the pending update queue for execution at the appropriate time
122     *
123     * In CLI mode, callback magic will also be used to run updates when safe
124     *
125     * If an update is already in progress, then what happens to this update is as follows:
126     *  - If it has a "defer until" stage at/before the actual run stage of the innermost
127     *    in-progress update, then it will go into the sub-queue of that in-progress update.
128     *    As soon as that update completes, MergeableUpdate instances in its sub-queue will be
129     *    merged into the top-queues and the non-MergeableUpdate instances will be executed.
130     *    This is done to better isolate updates from the failures of other updates and reduce
131     *    the chance of race conditions caused by updates not fully seeing the intended changes
132     *    of previously enqueued and executed updates.
133     *  - If it has a "defer until" stage later than the actual run stage of the innermost
134     *    in-progress update, then it will go into the normal top-queue for that stage.
135     *
136     * @param DeferrableUpdate $update Some object that implements doUpdate()
137     * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
138     * @since 1.28 Added the $stage parameter
139     */
140    public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
141        self::getScopeStack()->current()->addUpdate( $update, $stage );
142        self::tryOpportunisticExecute();
143    }
144
145    /**
146     * Add an update to the pending update queue that invokes the specified callback when run
147     *
148     * @param callable $callable One of the following:
149     *  - A Closure callback that takes the caller name as its argument
150     *  - A non-Closure callback that takes no arguments
151     * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
152     * @param IDatabase|IDatabase[] $dependeeDbws DB handles which might have pending writes
153     *  upon which this update depends. If any of the handles already has an open transaction,
154     *  a rollback thereof will cause this update to be cancelled (if it has not already run).
155     *  [optional] (since 1.28)
156     * @since 1.27 Added $stage parameter
157     * @since 1.28 Added the $dbw parameter
158     * @since 1.43 Closures are now given the caller name parameter
159     */
160    public static function addCallableUpdate(
161        $callable,
162        $stage = self::POSTSEND,
163        $dependeeDbws = []
164    ) {
165        self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dependeeDbws ), $stage );
166    }
167
168    /**
169     * Run an update, and, if an error was thrown, catch/log it and enqueue the update as
170     * a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate)
171     *
172     * @param DeferrableUpdate $update
173     * @return Throwable|null
174     */
175    private static function run( DeferrableUpdate $update ): ?Throwable {
176        $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
177
178        $type = get_class( $update )
179            . ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' );
180        $updateId = spl_object_id( $update );
181        $logger->debug( "DeferredUpdates::run: started $type #{updateId}", [ 'updateId' => $updateId ] );
182
183        $updateException = null;
184
185        $startTime = microtime( true );
186        try {
187            self::attemptUpdate( $update );
188        } catch ( Throwable $updateException ) {
189            MWExceptionHandler::logException( $updateException );
190            $logger->error(
191                "Deferred update '{deferred_type}' failed to run.",
192                [
193                    'deferred_type' => $type,
194                    'exception' => $updateException,
195                ]
196            );
197            self::getScopeStack()->onRunUpdateFailed( $update );
198        } finally {
199            $walltime = microtime( true ) - $startTime;
200            $logger->debug( "DeferredUpdates::run: ended $type #{updateId}, processing time: {walltime}", [
201                'updateId' => $updateId,
202                'walltime' => $walltime,
203            ] );
204        }
205
206        // Try to push the update as a job so it can run later if possible
207        if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
208            try {
209                self::getScopeStack()->queueDataUpdate( $update );
210            } catch ( Throwable $jobException ) {
211                MWExceptionHandler::logException( $jobException );
212                $logger->error(
213                    "Deferred update '{deferred_type}' failed to enqueue as a job.",
214                    [
215                        'deferred_type' => $type,
216                        'exception' => $jobException,
217                    ]
218                );
219                self::getScopeStack()->onRunUpdateFailed( $update );
220            }
221        }
222
223        return $updateException;
224    }
225
226    /**
227     * Consume and execute all pending updates
228     *
229     * Note that it is rarely the case that this method should be called outside of a few
230     * select entry points. For simplicity, that kind of recursion is discouraged. Recursion
231     * cannot happen if an explicit transaction round is active, which limits usage to updates
232     * with TRX_ROUND_ABSENT that do not leave open any transactions round of their own during
233     * the call to this method.
234     *
235     * In the less-common case of this being called within an in-progress DeferrableUpdate,
236     * this will not see any top-queue updates (since they were consumed and are being run
237     * inside an outer execution loop). In that case, it will instead operate on the sub-queue
238     * of the innermost in-progress update on the stack.
239     *
240     * @internal For use by MediaWiki, Maintenance, JobRunner, JobExecutor
241     * @param int $stage Which updates to process. One of
242     *  (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
243     */
244    public static function doUpdates( $stage = self::ALL ) {
245        /** @var ErrorPageError $guiError First presentable client-level error thrown */
246        $guiError = null;
247        /** @var Throwable $exception First of any error thrown */
248        $exception = null;
249
250        $scope = self::getScopeStack()->current();
251
252        // T249069: recursion is not possible once explicit transaction rounds are involved
253        $activeUpdate = $scope->getActiveUpdate();
254        if ( $activeUpdate ) {
255            $class = get_class( $activeUpdate );
256            if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
257                throw new LogicException(
258                    __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
259                );
260            }
261            if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
262                throw new LogicException(
263                    __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
264                );
265            }
266        }
267
268        $scope->processUpdates(
269            $stage,
270            static function ( DeferrableUpdate $update, $activeStage ) use ( &$guiError, &$exception ) {
271                $scopeStack = self::getScopeStack();
272                $childScope = $scopeStack->descend( $activeStage, $update );
273                try {
274                    $e = self::run( $update );
275                    $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
276                    $exception = $exception ?: $e;
277                    // Any addUpdate() calls between descend() and ascend() used the sub-queue.
278                    // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
279                    // doUpdates() itself. In any case, process remaining updates in the subqueue.
280                    // them, enqueueing them, or transferring them to the parent scope
281                    // queues as appropriate...
282                    $childScope->processUpdates(
283                        $activeStage,
284                        static function ( DeferrableUpdate $sub ) use ( &$guiError, &$exception ) {
285                            $e = self::run( $sub );
286                            $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
287                            $exception = $exception ?: $e;
288                        }
289                    );
290                } finally {
291                    $scopeStack->ascend();
292                }
293            }
294        );
295
296        // VW-style hack to work around T190178, so we can make sure
297        // PageMetaDataUpdater doesn't throw exceptions.
298        if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
299            throw $exception;
300        }
301
302        // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
303        // callers should check permissions *before* enqueueing updates. If the main transaction
304        // round actions succeed but some deferred updates fail due to permissions errors then
305        // there is a risk that some secondary data was not properly updated.
306        if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
307            throw $guiError;
308        }
309    }
310
311    /**
312     * Consume and execute pending updates now if possible, instead of waiting.
313     *
314     * In web requests, updates are always deferred until the end of the request.
315     *
316     * In CLI mode, updates run earlier and more often. This is important for long-running
317     * Maintenance scripts that would otherwise grow an excessively large queue, which increases
318     * memory use, and risks losing all updates if the script ends early or crashes.
319     *
320     * The folllowing conditions are required for updates to run early in CLI mode:
321     *
322     * - No update is already in progress (ensure linear flow, recursion guard).
323     * - LBFactory indicates that we don't have any "busy" database connections, i.e.
324     *   there are no pending writes or otherwise active and uncommitted transactions,
325     *   except if the transaction is empty and merely used for primary DB read queries,
326     *   in which case the transaction (and its repeatable-read snapshot) can be safely flushed.
327     *
328     * How this works:
329     *
330     * - When a maintenance script calls {@link Maintenance::commitTransaction()},
331     *   tryOpportunisticExecute() will be called after commit.
332     *
333     * - When a maintenance script calls {@link Maintenance::commitTransactionRound()},
334     *   tryOpportunisticExecute() will be called after all the commits.
335     *
336     * - For maintenance scripts that don't do much with the database, we also call
337     *   tryOpportunisticExecute() after every addUpdate() call.
338     *
339     * - Upon the completion of Maintenance::execute() via Maintenance::shutdown(),
340     *   any remaining updates are run.
341     *
342     * Note that this method runs both PRESEND and POSTSEND updates and thus should not be called
343     * during web requests. It is only intended for long-running Maintenance scripts.
344     *
345     * @internal For use by Maintenance
346     * @since 1.28
347     * @return bool Whether updates were allowed to run
348     */
349    public static function tryOpportunisticExecute(): bool {
350        // Leave execution up to the current loop if an update is already in progress
351        // or if updates are explicitly disabled
352        if ( self::getRecursiveExecutionStackDepth()
353            || self::$preventOpportunisticUpdates
354        ) {
355            return false;
356        }
357
358        if ( self::getScopeStack()->allowOpportunisticUpdates() ) {
359            self::doUpdates( self::ALL );
360            return true;
361        }
362
363        return false;
364    }
365
366    /**
367     * Prevent opportunistic updates until the returned ScopedCallback is
368     * consumed.
369     */
370    #[\NoDiscard]
371    public static function preventOpportunisticUpdates(): ScopedCallback {
372        self::$preventOpportunisticUpdates++;
373        return new ScopedCallback( static function () {
374            self::$preventOpportunisticUpdates--;
375        } );
376    }
377
378    /**
379     * Get the number of pending updates for the current execution context
380     *
381     * If an update is in progress, then this operates on the sub-queues of the
382     * innermost in-progress update. Otherwise, it acts on the top-queues.
383     *
384     * @return int
385     * @since 1.28
386     */
387    public static function pendingUpdatesCount() {
388        return self::getScopeStack()->current()->pendingUpdatesCount();
389    }
390
391    /**
392     * Get a list of the pending updates for the current execution context
393     *
394     * If an update is in progress, then this operates on the sub-queues of the
395     * innermost in-progress update. Otherwise, it acts on the top-queues.
396     *
397     * @param int $stage Look for updates with this "defer until" stage. One of
398     *  (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
399     * @return DeferrableUpdate[]
400     * @internal This method should only be used for unit tests
401     * @since 1.29
402     */
403    public static function getPendingUpdates( $stage = self::ALL ) {
404        return self::getScopeStack()->current()->getPendingUpdates( $stage );
405    }
406
407    /**
408     * Cancel all pending updates for the current execution context
409     *
410     * If an update is in progress, then this operates on the sub-queues of the
411     * innermost in-progress update. Otherwise, it acts on the top-queues.
412     *
413     * @internal This method should only be used for unit tests
414     */
415    public static function clearPendingUpdates() {
416        self::getScopeStack()->current()->clearPendingUpdates();
417    }
418
419    /**
420     * Get the number of in-progress calls to DeferredUpdates::doUpdates()
421     *
422     * @return int
423     * @internal This method should only be used for unit tests
424     */
425    public static function getRecursiveExecutionStackDepth() {
426        return self::getScopeStack()->getRecursiveDepth();
427    }
428
429    /**
430     * Attempt to run an update with the appropriate transaction round state if needed
431     *
432     * It is allowed for a DeferredUpdate to directly execute one or more other DeferredUpdate
433     * instances without queueing them by calling this method. In that case, the outer update
434     * must use TransactionRoundAwareUpdate::TRX_ROUND_ABSENT, e.g. by extending
435     * TransactionRoundDefiningUpdate, so that this method can give each update its own
436     * transaction round.
437     *
438     * @param DeferrableUpdate $update
439     * @since 1.34
440     */
441    public static function attemptUpdate( DeferrableUpdate $update ) {
442        self::getScopeStack()->onRunUpdateStart( $update );
443
444        $update->doUpdate();
445
446        self::getScopeStack()->onRunUpdateEnd( $update );
447    }
448}
449
450/** @deprecated class alias since 1.42 */
451class_alias( DeferredUpdates::class, 'DeferredUpdates' );