Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
72.73% covered (warning)
72.73%
88 / 121
76.92% covered (warning)
76.92%
10 / 13
CRAP
0.00% covered (danger)
0.00%
0 / 1
DeferredUpdates
73.33% covered (warning)
73.33%
88 / 120
76.92% covered (warning)
76.92%
10 / 13
65.38
0.00% covered (danger)
0.00%
0 / 1
 getScopeStack
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 setScopeStack
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 addUpdate
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 addCallableUpdate
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 run
41.67% covered (danger)
41.67%
15 / 36
0.00% covered (danger)
0.00%
0 / 1
13.15
 doUpdates
78.38% covered (warning)
78.38%
29 / 37
0.00% covered (danger)
0.00%
0 / 1
17.27
 tryOpportunisticExecute
100.00% covered (success)
100.00%
28 / 28
100.00% covered (success)
100.00%
1 / 1
6
 preventOpportunisticUpdates
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 pendingUpdatesCount
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getPendingUpdates
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 clearPendingUpdates
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getRecursiveExecutionStackDepth
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 attemptUpdate
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21namespace MediaWiki\Deferred;
22
23use ErrorPageError;
24use LogicException;
25use MediaWiki\Logger\LoggerFactory;
26use MWExceptionHandler;
27use Throwable;
28use Wikimedia\Rdbms\IDatabase;
29use Wikimedia\ScopedCallback;
30
31/**
32 * Defer callable updates to run later in the PHP process
33 *
34 * This is a performance feature that enables MediaWiki to produce faster web responses.
35 * It allows you to postpone non-blocking work (e.g. work that does not change the web
36 * response) to after the HTTP response has been sent to the client (i.e. web browser).
37 *
38 * Once the response is finalized and sent to the browser, the webserver process stays
39 * for a little while longer (detached from the web request) to run your POSTSEND tasks.
40 *
41 * There is also a PRESEND option, which runs your task right before the finalized response
42 * is sent to the browser. This is for critical tasks that does need to block the response,
43 * but where you'd like to benefit from other DeferredUpdates features. Such as:
44 *
45 * - MergeableUpdate: batch updates from different components without coupling
46 *   or awareness of each other.
47 * - Automatic cancellation: pass a IDatabase object (for any wiki or database) to
48 *   DeferredUpdates::addCallableUpdate or AtomicSectionUpdate.
49 * - Reducing lock contention: if the response is likely to take several seconds
50 *   (e.g. uploading a large file to FileBackend, or saving an edit to a large article)
51 *   much of that work may overlap with a database transaction that is staying open for
52 *   the entire duration. By moving contentious writes out to a PRESEND update, these
53 *   get their own transaction (after the main one is committed), which give up some
54 *   atomicity for improved throughput.
55 *
56 * ## Expectation and comparison to job queue
57 *
58 * When scheduling a POSTSEND via the DeferredUpdates system you can generally expect
59 * it to complete well before the client makes their next request. Updates runs directly after
60 * the web response is sent, from the same process on the same server. This unlike the JobQueue,
61 * where jobs may need to wait in line for some minutes or hours.
62 *
63 * If your update fails, this failure is not known to the client and gets no retry. For updates
64 * that need re-tries for system consistency or data integrity, it is recommended to implement
65 * it as a job instead and use JobQueueGroup::lazyPush. This has the caveat of being delayed
66 * by default, the same as any other job.
67 *
68 * A hybrid solution is available via the EnqueueableDataUpdate interface. By implementing
69 * this interface, you can queue your update via the DeferredUpdates first, and if it fails,
70 * the system will automatically catch this and queue it as a job instead.
71 *
72 * ## How it works during web requests
73 *
74 * 1. Your request route is executed (e.g. Action or SpecialPage class, or API).
75 * 2. Output is finalized and main database transaction is committed.
76 * 3. PRESEND updates run via DeferredUpdates::doUpdates.
77 * 5. The web response is sent to the browser.
78 * 6. POSTSEND updates run via DeferredUpdates::doUpdates.
79 *
80 * @see MediaWiki::preOutputCommit
81 * @see MediaWiki::restInPeace
82 *
83 * ## How it works for Maintenance scripts
84 *
85 * In CLI mode, no distinction is made between PRESEND and POSTSEND deferred updates,
86 * and the queue is periodically executed throughout the process.
87 *
88 * @see DeferredUpdates::tryOpportunisticExecute
89 *
90 * ## How it works internally
91 *
92 * Each update is added via DeferredUpdates::addUpdate and stored in either the PRESEND or
93 * POSTSEND queue. If an update gets queued while another update is already running, then
94 * we store in a "sub"-queue associated with the current update. This allows nested updates
95 * to be completed before other updates, which improves ordering for process caching.
96 *
97 * @since 1.19
98 */
99class DeferredUpdates {
100    /** @var int Process all updates; in web requests, use only after flushing output buffer */
101    public const ALL = 0;
102    /** @var int Specify/process updates that should run before flushing output buffer */
103    public const PRESEND = 1;
104    /** @var int Specify/process updates that should run after flushing output buffer */
105    public const POSTSEND = 2;
106
107    /** @var int[] List of "defer until" queue stages that can be reached */
108    public const STAGES = [ self::PRESEND, self::POSTSEND ];
109
110    /** @var int Queue size threshold for converting updates into jobs */
111    private const BIG_QUEUE_SIZE = 100;
112
113    /** @var DeferredUpdatesScopeStack|null Queue states based on recursion level */
114    private static $scopeStack;
115
116    /**
117     * @var int Nesting level for preventOpportunisticUpdates()
118     */
119    private static $preventOpportunisticUpdates = 0;
120
121    /**
122     * @return DeferredUpdatesScopeStack
123     */
124    private static function getScopeStack(): DeferredUpdatesScopeStack {
125        self::$scopeStack ??= new DeferredUpdatesScopeMediaWikiStack();
126        return self::$scopeStack;
127    }
128
129    /**
130     * @param DeferredUpdatesScopeStack $scopeStack
131     * @internal Only for use in tests.
132     */
133    public static function setScopeStack( DeferredUpdatesScopeStack $scopeStack ): void {
134        if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
135            throw new LogicException( 'Cannot reconfigure DeferredUpdates outside tests' );
136        }
137        self::$scopeStack = $scopeStack;
138    }
139
140    /**
141     * Add an update to the pending update queue for execution at the appropriate time
142     *
143     * In CLI mode, callback magic will also be used to run updates when safe
144     *
145     * If an update is already in progress, then what happens to this update is as follows:
146     *  - If it has a "defer until" stage at/before the actual run stage of the innermost
147     *    in-progress update, then it will go into the sub-queue of that in-progress update.
148     *    As soon as that update completes, MergeableUpdate instances in its sub-queue will be
149     *    merged into the top-queues and the non-MergeableUpdate instances will be executed.
150     *    This is done to better isolate updates from the failures of other updates and reduce
151     *    the chance of race conditions caused by updates not fully seeing the intended changes
152     *    of previously enqueued and executed updates.
153     *  - If it has a "defer until" stage later than the actual run stage of the innermost
154     *    in-progress update, then it will go into the normal top-queue for that stage.
155     *
156     * @param DeferrableUpdate $update Some object that implements doUpdate()
157     * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
158     * @since 1.28 Added the $stage parameter
159     */
160    public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
161        self::getScopeStack()->current()->addUpdate( $update, $stage );
162        self::tryOpportunisticExecute();
163    }
164
165    /**
166     * Add an update to the pending update queue that invokes the specified callback when run
167     *
168     * @param callable $callable
169     * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
170     * @param IDatabase|IDatabase[]|null $dbw Cancel the update if a DB transaction
171     *  is rolled back [optional]
172     * @since 1.27 Added $stage parameter
173     * @since 1.28 Added the $dbw parameter
174     */
175    public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
176        self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
177    }
178
179    /**
180     * Run an update, and, if an error was thrown, catch/log it and enqueue the update as
181     * a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate)
182     *
183     * @param DeferrableUpdate $update
184     * @return Throwable|null
185     */
186    private static function run( DeferrableUpdate $update ): ?Throwable {
187        $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
188
189        $type = get_class( $update )
190            . ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' );
191        $updateId = spl_object_id( $update );
192        $logger->debug( "DeferredUpdates::run: started $type #{updateId}", [ 'updateId' => $updateId ] );
193
194        $updateException = null;
195
196        $startTime = microtime( true );
197        try {
198            self::attemptUpdate( $update );
199        } catch ( Throwable $updateException ) {
200            MWExceptionHandler::logException( $updateException );
201            $logger->error(
202                "Deferred update '{deferred_type}' failed to run.",
203                [
204                    'deferred_type' => $type,
205                    'exception' => $updateException,
206                ]
207            );
208            self::getScopeStack()->onRunUpdateFailed( $update );
209        } finally {
210            $walltime = microtime( true ) - $startTime;
211            $logger->debug( "DeferredUpdates::run: ended $type #{updateId}, processing time: {walltime}", [
212                'updateId' => $updateId,
213                'walltime' => $walltime,
214            ] );
215        }
216
217        // Try to push the update as a job so it can run later if possible
218        if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
219            try {
220                self::getScopeStack()->queueDataUpdate( $update );
221            } catch ( Throwable $jobException ) {
222                MWExceptionHandler::logException( $jobException );
223                $logger->error(
224                    "Deferred update '{deferred_type}' failed to enqueue as a job.",
225                    [
226                        'deferred_type' => $type,
227                        'exception' => $jobException,
228                    ]
229                );
230                self::getScopeStack()->onRunUpdateFailed( $update );
231            }
232        }
233
234        return $updateException;
235    }
236
237    /**
238     * Consume and execute all pending updates
239     *
240     * Note that it is rarely the case that this method should be called outside of a few
241     * select entry points. For simplicity, that kind of recursion is discouraged. Recursion
242     * cannot happen if an explicit transaction round is active, which limits usage to updates
243     * with TRX_ROUND_ABSENT that do not leave open any transactions round of their own during
244     * the call to this method.
245     *
246     * In the less-common case of this being called within an in-progress DeferrableUpdate,
247     * this will not see any top-queue updates (since they were consumed and are being run
248     * inside an outer execution loop). In that case, it will instead operate on the sub-queue
249     * of the innermost in-progress update on the stack.
250     *
251     * @internal For use by MediaWiki, Maintenance, JobRunner, JobExecutor
252     * @param int $stage Which updates to process. One of
253     *  (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
254     */
255    public static function doUpdates( $stage = self::ALL ) {
256        /** @var ErrorPageError $guiError First presentable client-level error thrown */
257        $guiError = null;
258        /** @var Throwable $exception First of any error thrown */
259        $exception = null;
260
261        $scope = self::getScopeStack()->current();
262
263        // T249069: recursion is not possible once explicit transaction rounds are involved
264        $activeUpdate = $scope->getActiveUpdate();
265        if ( $activeUpdate ) {
266            $class = get_class( $activeUpdate );
267            if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
268                throw new LogicException(
269                    __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
270                );
271            }
272            if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
273                throw new LogicException(
274                    __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
275                );
276            }
277        }
278
279        $scope->processUpdates(
280            $stage,
281            static function ( DeferrableUpdate $update, $activeStage ) use ( &$guiError, &$exception ) {
282                $scopeStack = self::getScopeStack();
283                $childScope = $scopeStack->descend( $activeStage, $update );
284                try {
285                    $e = self::run( $update );
286                    $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
287                    $exception = $exception ?: $e;
288                    // Any addUpdate() calls between descend() and ascend() used the sub-queue.
289                    // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
290                    // doUpdates() itself. In any case, process remaining updates in the subqueue.
291                    // them, enqueueing them, or transferring them to the parent scope
292                    // queues as appropriate...
293                    $childScope->processUpdates(
294                        $activeStage,
295                        static function ( DeferrableUpdate $sub ) use ( &$guiError, &$exception ) {
296                            $e = self::run( $sub );
297                            $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
298                            $exception = $exception ?: $e;
299                        }
300                    );
301                } finally {
302                    $scopeStack->ascend();
303                }
304            }
305        );
306
307        // VW-style hack to work around T190178, so we can make sure
308        // PageMetaDataUpdater doesn't throw exceptions.
309        if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
310            throw $exception;
311        }
312
313        // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
314        // callers should check permissions *before* enqueueing updates. If the main transaction
315        // round actions succeed but some deferred updates fail due to permissions errors then
316        // there is a risk that some secondary data was not properly updated.
317        if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
318            throw $guiError;
319        }
320    }
321
322    /**
323     * Consume and execute pending updates now if possible, instead of waiting.
324     *
325     * In web requests, updates are always deferred until the end of the request.
326     *
327     * In CLI mode, updates run earlier and more often. This is important for long-running
328     * Maintenance scripts that would otherwise grow an excessively large queue, which increases
329     * memory use, and risks losing all updates if the script ends early or crashes.
330     *
331     * The folllowing conditions are required for updates to run early in CLI mode:
332     *
333     * - No update is already in progress (ensure linear flow, recursion guard).
334     * - LBFactory indicates that we don't have any "busy" database connections, i.e.
335     *   there are no pending writes or otherwise active and uncommitted transactions,
336     *   except if the transaction is empty and merely used for primary DB read queries,
337     *   in which case the transaction (and its repeatable-read snapshot) can be safely flushed.
338     *
339     * How this works:
340     *
341     * - When a maintenance script commits a change or waits for replication, such as
342     *   via. IConnectionProvider::commitAndWaitForReplication, then ILBFactory calls
343     *   tryOpportunisticExecute(). This is injected via MWLBFactory::applyGlobalState.
344     *
345     * - For maintenance scripts that don't do much with the database, we also call
346     *   tryOpportunisticExecute() after every addUpdate() call.
347     *
348     * - Upon the completion of Maintenance::execute() via Maintenance::shutdown(),
349     *   any remaining updates are run.
350     *
351     * Note that this method runs both PRESEND and POSTSEND updates and thus should not be called
352     * during web requests. It is only intended for long-running Maintenance scripts.
353     *
354     * @internal For use by Maintenance
355     * @since 1.28
356     * @return bool Whether updates were allowed to run
357     */
358    public static function tryOpportunisticExecute(): bool {
359        // Leave execution up to the current loop if an update is already in progress
360        // or if updates are explicitly disabled
361        if ( self::getRecursiveExecutionStackDepth()
362            || self::$preventOpportunisticUpdates
363        ) {
364            return false;
365        }
366
367        if ( self::getScopeStack()->allowOpportunisticUpdates() ) {
368            self::doUpdates( self::ALL );
369            return true;
370        }
371
372        if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
373            // There are a large number of pending updates and none of them can run yet.
374            // The odds of losing updates due to an error increases when executing long queues
375            // and when large amounts of time pass while tasks are queued. Mitigate this by
376            // trying to eagerly move updates to the JobQueue when possible.
377            //
378            // TODO: Do we still need this now maintenance scripts automatically call
379            // tryOpportunisticExecute from addUpdate, from every commit, and every
380            // waitForReplication call?
381            $enqueuedUpdates = [];
382            self::getScopeStack()->current()->consumeMatchingUpdates(
383                self::ALL,
384                EnqueueableDataUpdate::class,
385                static function ( EnqueueableDataUpdate $update ) use ( &$enqueuedUpdates ) {
386                    self::getScopeStack()->queueDataUpdate( $update );
387                    $type = get_class( $update );
388                    $enqueuedUpdates[$type] ??= 0;
389                    $enqueuedUpdates[$type]++;
390                }
391            );
392            if ( $enqueuedUpdates ) {
393                LoggerFactory::getInstance( 'DeferredUpdates' )->debug(
394                    'Enqueued {enqueuedUpdatesCount} updates as jobs',
395                    [
396                        'enqueuedUpdatesCount' => array_sum( $enqueuedUpdates ),
397                        'enqueuedUpdates' => implode( ', ',
398                            array_map( fn ( $k, $v ) => "$k$v", array_keys( $enqueuedUpdates ), $enqueuedUpdates ) ),
399                    ]
400                );
401            }
402        }
403
404        return false;
405    }
406
407    /**
408     * Prevent opportunistic updates until the returned ScopedCallback is
409     * consumed.
410     *
411     * @return ScopedCallback
412     */
413    public static function preventOpportunisticUpdates() {
414        self::$preventOpportunisticUpdates++;
415        return new ScopedCallback( static function () {
416            self::$preventOpportunisticUpdates--;
417        } );
418    }
419
420    /**
421     * Get the number of pending updates for the current execution context
422     *
423     * If an update is in progress, then this operates on the sub-queues of the
424     * innermost in-progress update. Otherwise, it acts on the top-queues.
425     *
426     * @return int
427     * @since 1.28
428     */
429    public static function pendingUpdatesCount() {
430        return self::getScopeStack()->current()->pendingUpdatesCount();
431    }
432
433    /**
434     * Get a list of the pending updates for the current execution context
435     *
436     * If an update is in progress, then this operates on the sub-queues of the
437     * innermost in-progress update. Otherwise, it acts on the top-queues.
438     *
439     * @param int $stage Look for updates with this "defer until" stage. One of
440     *  (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
441     * @return DeferrableUpdate[]
442     * @internal This method should only be used for unit tests
443     * @since 1.29
444     */
445    public static function getPendingUpdates( $stage = self::ALL ) {
446        return self::getScopeStack()->current()->getPendingUpdates( $stage );
447    }
448
449    /**
450     * Cancel all pending updates for the current execution context
451     *
452     * If an update is in progress, then this operates on the sub-queues of the
453     * innermost in-progress update. Otherwise, it acts on the top-queues.
454     *
455     * @internal This method should only be used for unit tests
456     */
457    public static function clearPendingUpdates() {
458        self::getScopeStack()->current()->clearPendingUpdates();
459    }
460
461    /**
462     * Get the number of in-progress calls to DeferredUpdates::doUpdates()
463     *
464     * @return int
465     * @internal This method should only be used for unit tests
466     */
467    public static function getRecursiveExecutionStackDepth() {
468        return self::getScopeStack()->getRecursiveDepth();
469    }
470
471    /**
472     * Attempt to run an update with the appropriate transaction round state if needed
473     *
474     * It is allowed for a DeferredUpdate to directly execute one or more other DeferredUpdate
475     * instances without queueing them by calling this method. In that case, the outer update
476     * must use TransactionRoundAwareUpdate::TRX_ROUND_ABSENT, e.g. by extending
477     * TransactionRoundDefiningUpdate, so that this method can give each update its own
478     * transaction round.
479     *
480     * @param DeferrableUpdate $update
481     * @since 1.34
482     */
483    public static function attemptUpdate( DeferrableUpdate $update ) {
484        self::getScopeStack()->onRunUpdateStart( $update );
485
486        $update->doUpdate();
487
488        self::getScopeStack()->onRunUpdateEnd( $update );
489    }
490}
491
492/** @deprecated class alias since 1.42 */
493class_alias( DeferredUpdates::class, 'DeferredUpdates' );