Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
67.48% covered (warning)
67.48%
83 / 123
30.23% covered (danger)
30.23%
13 / 43
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueue
67.48% covered (warning)
67.48%
83 / 123
30.23% covered (danger)
30.23%
13 / 43
262.33
0.00% covered (danger)
0.00%
0 / 1
 __construct
84.21% covered (warning)
84.21%
16 / 19
0.00% covered (danger)
0.00%
0 / 1
7.19
 factory
71.43% covered (warning)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
3.21
 getDomain
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getType
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getOrder
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 supportedOrders
n/a
0 / 0
n/a
0 / 0
0
 optimalOrder
n/a
0 / 0
n/a
0 / 0
0
 supportsDelayedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 delayedJobsEnabled
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getReadOnlyReason
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 isEmpty
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doIsEmpty
n/a
0 / 0
n/a
0 / 0
0
 getSize
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doGetSize
n/a
0 / 0
n/a
0 / 0
0
 getAcquiredCount
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doGetAcquiredCount
n/a
0 / 0
n/a
0 / 0
0
 getDelayedCount
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 doGetDelayedCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAbandonedCount
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 doGetAbandonedCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 push
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 batchPush
75.00% covered (warning)
75.00%
9 / 12
0.00% covered (danger)
0.00%
0 / 1
7.77
 doBatchPush
n/a
0 / 0
n/a
0 / 0
0
 pop
66.67% covered (warning)
66.67%
6 / 9
0.00% covered (danger)
0.00%
0 / 1
5.93
 doPop
n/a
0 / 0
n/a
0 / 0
0
 ack
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 doAck
n/a
0 / 0
n/a
0 / 0
0
 deduplicateRootJob
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 doDeduplicateRootJob
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
5.05
 isRootJobOldDuplicate
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doIsRootJobOldDuplicate
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
6.07
 getRootJobCacheKey
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
1
 delete
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doDelete
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 waitForBackups
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doWaitForBackups
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 flushCaches
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 doFlushCaches
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllQueuedJobs
n/a
0 / 0
n/a
0 / 0
0
 getAllDelayedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAcquiredJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAbandonedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getCoalesceLocationInternal
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doGetSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doGetSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 factoryJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 assertNotReadOnly
50.00% covered (danger)
50.00%
1 / 2
0.00% covered (danger)
0.00%
0 / 1
2.50
 assertMatchingJobType
50.00% covered (danger)
50.00%
2 / 4
0.00% covered (danger)
0.00%
0 / 1
4.12
 incrStats
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 supportsTypeAgnostic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
22use MediaWiki\JobQueue\JobFactory;
23use MediaWiki\MediaWikiServices;
24use Wikimedia\ObjectCache\WANObjectCache;
25use Wikimedia\RequestTimeout\TimeoutException;
26use Wikimedia\Stats\NullStatsdDataFactory;
27use Wikimedia\UUID\GlobalIdGenerator;
28
29/**
30 * @defgroup JobQueue JobQueue
31 *
32 *
33 * See [the architecture doc](@ref jobqueuearch) for more information.
34 */
35
36/**
37 * Base class for queueing and running background jobs from a storage backend.
38 *
39 * See [the architecture doc](@ref jobqueuearch) for more information.
40 *
41 * @ingroup JobQueue
42 * @since 1.21
43 * @stable to extend
44 */
45abstract class JobQueue {
46    /** @var string DB domain ID */
47    protected $domain;
48    /** @var string Job type */
49    protected $type;
50    /** @var string Job priority for pop() */
51    protected $order;
52    /** @var int Time to live in seconds */
53    protected $claimTTL;
54    /** @var int Maximum number of times to try a job */
55    protected $maxTries;
56    /** @var string|false Read only rationale (or false if r/w) */
57    protected $readOnlyReason;
58    /** @var StatsdDataFactoryInterface */
59    protected $stats;
60    /** @var GlobalIdGenerator */
61    protected $idGenerator;
62
63    /** @var WANObjectCache */
64    protected $wanCache;
65
66    /** @var bool */
67    protected $typeAgnostic;
68
69    private JobFactory $jobFactory;
70
71    /* Bit flag for "all-or-nothing" job insertions */
72    protected const QOS_ATOMIC = 1;
73
74    /* Seconds to remember root jobs (28 days) */
75    protected const ROOTJOB_TTL = 28 * 24 * 3600;
76
77    /**
78     * @stable to call
79     *
80     * @param array $params
81     *      - type : A job type, 'default' if typeAgnostic is set
82     *   - domain : A DB domain ID
83     *   - idGenerator : A GlobalIdGenerator instance.
84     *   - wanCache : An instance of WANObjectCache to use for caching [default: none]
85     *   - stats : An instance of StatsdDataFactoryInterface [default: none]
86     *   - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever]
87     *   - maxTries : Total times a job can be tried, assuming claims expire [default: 3]
88     *   - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
89     *   - readOnlyReason : Mark the queue as read-only with this reason [default: false]
90     *   - typeAgnostic : If the jobqueue should operate agnostic to the job types
91     * @throws JobQueueError
92     *
93     */
94    protected function __construct( array $params ) {
95        $this->domain = $params['domain'] ?? $params['wiki']; // b/c
96        $this->type = $params['type'];
97        $this->claimTTL = $params['claimTTL'] ?? 0;
98        $this->maxTries = $params['maxTries'] ?? 3;
99        if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
100            $this->order = $params['order'];
101        } else {
102            $this->order = $this->optimalOrder();
103        }
104        if ( !in_array( $this->order, $this->supportedOrders() ) ) {
105            throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
106        }
107        $this->readOnlyReason = $params['readOnlyReason'] ?? false;
108        $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
109        $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
110        $this->idGenerator = $params['idGenerator'];
111        if ( ( $params['typeAgnostic'] ?? false ) && !$this->supportsTypeAgnostic() ) {
112            throw new JobQueueError( __CLASS__ . " does not support type agnostic queues." );
113        }
114        $this->typeAgnostic = ( $params['typeAgnostic'] ?? false );
115        if ( $this->typeAgnostic ) {
116            $this->type = 'default';
117        }
118
119        $this->jobFactory = MediaWikiServices::getInstance()->getJobFactory();
120    }
121
122    /**
123     * Get a job queue object of the specified type.
124     * $params includes:
125     *   - class : What job class to use (determines job type)
126     *   - domain : Database domain ID of the wiki the jobs are for (defaults to current wiki)
127     *   - type : The name of the job types this queue handles
128     *   - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
129     *      If "fifo" is used, the queue will effectively be FIFO. Note that job
130     *      completion will not appear to be exactly FIFO if there are multiple
131     *      job runners since jobs can take different times to finish once popped.
132     *      If "timestamp" is used, the queue will at least be loosely ordered
133     *      by timestamp, allowing for some jobs to be popped off out of order.
134     *      If "random" is used, pop() will pick jobs in random order.
135     *      Note that it may only be weakly random (e.g. a lottery of the oldest X).
136     *      If "any" is chosen, the queue will use whatever order is the fastest.
137     *      This might be useful for improving concurrency for job acquisition.
138     *   - claimTTL : If supported, the queue will recycle jobs that have been popped
139     *      but not acknowledged as completed after this many seconds. Recycling
140     *      of jobs simply means re-inserting them into the queue. Jobs can be
141     *      attempted up to three times before being discarded.
142     *   - readOnlyReason : Set this to a string to make the queue read-only. [optional]
143     *   - idGenerator : A GlobalIdGenerator instance.
144     *   - stats  : A StatsdDataFactoryInterface. [optional]
145     *
146     * Queue classes should throw an exception if they do not support the options given.
147     *
148     * @param array $params
149     * @return JobQueue
150     * @throws JobQueueError
151     */
152    final public static function factory( array $params ) {
153        $class = $params['class'];
154        if ( !class_exists( $class ) ) {
155            throw new JobQueueError( "Invalid job queue class '$class'." );
156        }
157
158        $obj = new $class( $params );
159        if ( !( $obj instanceof self ) ) {
160            throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
161        }
162
163        return $obj;
164    }
165
166    /**
167     * @return string Database domain ID
168     */
169    final public function getDomain() {
170        return $this->domain;
171    }
172
173    /**
174     * @return string Job type that this queue handles
175     */
176    final public function getType() {
177        return $this->type;
178    }
179
180    /**
181     * @return string One of (random, timestamp, fifo, undefined)
182     */
183    final public function getOrder() {
184        return $this->order;
185    }
186
187    /**
188     * Get the allowed queue orders for configuration validation
189     *
190     * @return array Subset of (random, timestamp, fifo, undefined)
191     */
192    abstract protected function supportedOrders();
193
194    /**
195     * Get the default queue order to use if configuration does not specify one
196     *
197     * @return string One of (random, timestamp, fifo, undefined)
198     */
199    abstract protected function optimalOrder();
200
201    /**
202     * Find out if delayed jobs are supported for configuration validation
203     *
204     * @stable to override
205     * @return bool Whether delayed jobs are supported
206     */
207    protected function supportsDelayedJobs() {
208        return false; // not implemented
209    }
210
211    /**
212     * @return bool Whether delayed jobs are enabled
213     * @since 1.22
214     */
215    final public function delayedJobsEnabled() {
216        return $this->supportsDelayedJobs();
217    }
218
219    /**
220     * @return string|false Read-only rational or false if r/w
221     * @since 1.27
222     */
223    public function getReadOnlyReason() {
224        return $this->readOnlyReason;
225    }
226
227    /**
228     * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
229     * Queue classes should use caching if they are any slower without memcached.
230     *
231     * If caching is used, this might return false when there are actually no jobs.
232     * If pop() is called and returns false then it should correct the cache. Also,
233     * calling flushCaches() first prevents this. However, this effect is typically
234     * not distinguishable from the race condition between isEmpty() and pop().
235     *
236     * @return bool
237     * @throws JobQueueError
238     */
239    final public function isEmpty() {
240        $res = $this->doIsEmpty();
241
242        return $res;
243    }
244
245    /**
246     * @see JobQueue::isEmpty()
247     * @return bool
248     */
249    abstract protected function doIsEmpty();
250
251    /**
252     * Get the number of available (unacquired, non-delayed) jobs in the queue.
253     * Queue classes should use caching if they are any slower without memcached.
254     *
255     * If caching is used, this number might be out of date for a minute.
256     *
257     * @return int
258     * @throws JobQueueError
259     */
260    final public function getSize() {
261        $res = $this->doGetSize();
262
263        return $res;
264    }
265
266    /**
267     * @see JobQueue::getSize()
268     * @return int
269     */
270    abstract protected function doGetSize();
271
272    /**
273     * Get the number of acquired jobs (these are temporarily out of the queue).
274     * Queue classes should use caching if they are any slower without memcached.
275     *
276     * If caching is used, this number might be out of date for a minute.
277     *
278     * @return int
279     * @throws JobQueueError
280     */
281    final public function getAcquiredCount() {
282        $res = $this->doGetAcquiredCount();
283
284        return $res;
285    }
286
287    /**
288     * @see JobQueue::getAcquiredCount()
289     * @return int
290     */
291    abstract protected function doGetAcquiredCount();
292
293    /**
294     * Get the number of delayed jobs (these are temporarily out of the queue).
295     * Queue classes should use caching if they are any slower without memcached.
296     *
297     * If caching is used, this number might be out of date for a minute.
298     *
299     * @return int
300     * @throws JobQueueError
301     * @since 1.22
302     */
303    final public function getDelayedCount() {
304        $res = $this->doGetDelayedCount();
305
306        return $res;
307    }
308
309    /**
310     * @stable to override
311     * @see JobQueue::getDelayedCount()
312     * @return int
313     */
314    protected function doGetDelayedCount() {
315        return 0; // not implemented
316    }
317
318    /**
319     * Get the number of acquired jobs that can no longer be attempted.
320     * Queue classes should use caching if they are any slower without memcached.
321     *
322     * If caching is used, this number might be out of date for a minute.
323     *
324     * @return int
325     * @throws JobQueueError
326     */
327    final public function getAbandonedCount() {
328        $res = $this->doGetAbandonedCount();
329
330        return $res;
331    }
332
333    /**
334     * @stable to override
335     * @see JobQueue::getAbandonedCount()
336     * @return int
337     */
338    protected function doGetAbandonedCount() {
339        return 0; // not implemented
340    }
341
342    /**
343     * Push one or more jobs into the queue.
344     * This does not require $wgJobClasses to be set for the given job type.
345     * Outside callers should use JobQueueGroup::push() instead of this function.
346     *
347     * @param IJobSpecification|IJobSpecification[] $jobs
348     * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
349     * @return void
350     * @throws JobQueueError
351     */
352    final public function push( $jobs, $flags = 0 ) {
353        $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
354        $this->batchPush( $jobs, $flags );
355    }
356
357    /**
358     * Push a batch of jobs into the queue.
359     * This does not require $wgJobClasses to be set for the given job type.
360     * Outside callers should use JobQueueGroup::push() instead of this function.
361     *
362     * @param IJobSpecification[] $jobs
363     * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
364     * @return void
365     * @throws JobQueueError
366     */
367    final public function batchPush( array $jobs, $flags = 0 ) {
368        $this->assertNotReadOnly();
369
370        if ( $jobs === [] ) {
371            return; // nothing to do
372        }
373
374        foreach ( $jobs as $job ) {
375            $this->assertMatchingJobType( $job );
376            if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
377                throw new JobQueueError(
378                    "Got delayed '{$job->getType()}' job; delays are not supported." );
379            }
380        }
381
382        $this->doBatchPush( $jobs, $flags );
383
384        foreach ( $jobs as $job ) {
385            if ( $job->isRootJob() ) {
386                $this->deduplicateRootJob( $job );
387            }
388        }
389    }
390
391    /**
392     * @see JobQueue::batchPush()
393     * @param IJobSpecification[] $jobs
394     * @param int $flags
395     */
396    abstract protected function doBatchPush( array $jobs, $flags );
397
398    /**
399     * Pop a job off of the queue.
400     * This requires $wgJobClasses to be set for the given job type.
401     * Outside callers should use JobQueueGroup::pop() instead of this function.
402     *
403     * @throws JobQueueError
404     * @return RunnableJob|false Returns false if there are no jobs
405     */
406    final public function pop() {
407        $this->assertNotReadOnly();
408
409        $job = $this->doPop();
410
411        // Flag this job as an old duplicate based on its "root" job...
412        try {
413            if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
414                $this->incrStats( 'dupe_pops', $job->getType() );
415                $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
416            }
417        } catch ( TimeoutException $e ) {
418            throw $e;
419        } catch ( Exception $e ) {
420            // don't lose jobs over this
421        }
422
423        return $job;
424    }
425
426    /**
427     * @see JobQueue::pop()
428     * @return RunnableJob|false
429     */
430    abstract protected function doPop();
431
432    /**
433     * Acknowledge that a job was completed.
434     *
435     * This does nothing for certain queue classes or if "claimTTL" is not set.
436     * Outside callers should use JobQueueGroup::ack() instead of this function.
437     *
438     * @param RunnableJob $job
439     * @return void
440     * @throws JobQueueError
441     */
442    final public function ack( RunnableJob $job ) {
443        $this->assertNotReadOnly();
444        $this->assertMatchingJobType( $job );
445
446        $this->doAck( $job );
447    }
448
449    /**
450     * @see JobQueue::ack()
451     * @param RunnableJob $job
452     */
453    abstract protected function doAck( RunnableJob $job );
454
455    /**
456     * Register the "root job" of a given job into the queue for de-duplication.
457     * This should only be called right *after* all the new jobs have been inserted.
458     * This is used to turn older, duplicate, job entries into no-ops. The root job
459     * information will remain in the registry until it simply falls out of cache.
460     *
461     * This requires that $job has two special fields in the "params" array:
462     *   - rootJobSignature : hash (e.g. SHA1) that identifies the task
463     *   - rootJobTimestamp : TS_MW timestamp of this instance of the task
464     *
465     * A "root job" is a conceptual job that consist of potentially many smaller jobs
466     * that are actually inserted into the queue. For example, "refreshLinks" jobs are
467     * spawned when a template is edited. One can think of the task as "update links
468     * of pages that use template X" and an instance of that task as a "root job".
469     * However, what actually goes into the queue are range and leaf job subtypes.
470     * Since these jobs include things like page ID ranges and DB primary positions,
471     * and can morph into smaller jobs recursively, simple duplicate detection
472     * for individual jobs being identical (like that of job_sha1) is not useful.
473     *
474     * In the case of "refreshLinks", if these jobs are still in the queue when the template
475     * is edited again, we want all of these old refreshLinks jobs for that template to become
476     * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
477     * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
478     * previous "root job" for the same task of "update links of pages that use template X".
479     *
480     * This does nothing for certain queue classes.
481     *
482     * @internal For use within JobQueue only
483     * @param IJobSpecification $job
484     * @throws JobQueueError
485     * @return bool
486     */
487    final public function deduplicateRootJob( IJobSpecification $job ) {
488        $this->assertNotReadOnly();
489        $this->assertMatchingJobType( $job );
490
491        return $this->doDeduplicateRootJob( $job );
492    }
493
494    /**
495     * @stable to override
496     * @see JobQueue::deduplicateRootJob()
497     * @param IJobSpecification $job
498     * @throws JobQueueError
499     * @return bool
500     */
501    protected function doDeduplicateRootJob( IJobSpecification $job ) {
502        $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
503        if ( !$params ) {
504            throw new JobQueueError( "Cannot register root job; missing parameters." );
505        }
506
507        $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
508        // Callers should call JobQueueGroup::push() before this method so that if the
509        // insert fails, the de-duplication registration will be aborted. Having only the
510        // de-duplication registration succeed would cause jobs to become no-ops without
511        // any actual jobs that made them redundant.
512        $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
513        if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
514            return true; // a newer version of this root job was enqueued
515        }
516
517        // Update the timestamp of the last root job started at the location...
518        return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
519    }
520
521    /**
522     * Check if the "root" job of a given job has been superseded by a newer one
523     *
524     * @param IJobSpecification $job
525     * @throws JobQueueError
526     * @return bool
527     */
528    final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
529        $this->assertMatchingJobType( $job );
530
531        return $this->doIsRootJobOldDuplicate( $job );
532    }
533
534    /**
535     * @stable to override
536     * @see JobQueue::isRootJobOldDuplicate()
537     * @param IJobSpecification $job
538     * @return bool
539     */
540    protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
541        $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
542        if ( !$params ) {
543            return false; // job has no de-duplication info
544        }
545
546        $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
547        // Get the last time this root job was enqueued
548        $timestamp = $this->wanCache->get( $key );
549        if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
550            // Update the timestamp of the last known root job started at the location...
551            $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
552        }
553
554        // Check if a new root job was started at the location after this one's...
555        return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
556    }
557
558    /**
559     * @param string $signature Hash identifier of the root job
560     * @param string $type job type
561     * @return string
562     */
563    protected function getRootJobCacheKey( $signature, $type ) {
564        return $this->wanCache->makeGlobalKey(
565            'jobqueue',
566            $this->domain,
567            $type,
568            'rootjob',
569            $signature
570        );
571    }
572
573    /**
574     * Delete all unclaimed and delayed jobs from the queue
575     *
576     * @throws JobQueueError
577     * @since 1.22
578     * @return void
579     */
580    final public function delete() {
581        $this->assertNotReadOnly();
582
583        $this->doDelete();
584    }
585
586    /**
587     * @stable to override
588     * @see JobQueue::delete()
589     * @throws JobQueueError
590     */
591    protected function doDelete() {
592        throw new JobQueueError( "This method is not implemented." );
593    }
594
595    /**
596     * Wait for any replica DBs or backup servers to catch up.
597     *
598     * This does nothing for certain queue classes.
599     *
600     * @return void
601     * @throws JobQueueError
602     */
603    final public function waitForBackups() {
604        $this->doWaitForBackups();
605    }
606
607    /**
608     * @stable to override
609     * @see JobQueue::waitForBackups()
610     * @return void
611     */
612    protected function doWaitForBackups() {
613    }
614
615    /**
616     * Clear any process and persistent caches
617     *
618     * @return void
619     */
620    final public function flushCaches() {
621        $this->doFlushCaches();
622    }
623
624    /**
625     * @stable to override
626     * @see JobQueue::flushCaches()
627     * @return void
628     */
629    protected function doFlushCaches() {
630    }
631
632    /**
633     * Get an iterator to traverse over all available jobs in this queue.
634     * This does not include jobs that are currently acquired or delayed.
635     * Note: results may be stale if the queue is concurrently modified.
636     *
637     * @return Iterator<RunnableJob>
638     * @throws JobQueueError
639     */
640    abstract public function getAllQueuedJobs();
641
642    /**
643     * Get an iterator to traverse over all delayed jobs in this queue.
644     * Note: results may be stale if the queue is concurrently modified.
645     *
646     * @stable to override
647     * @return Iterator<RunnableJob>
648     * @throws JobQueueError
649     * @since 1.22
650     */
651    public function getAllDelayedJobs() {
652        return new ArrayIterator( [] ); // not implemented
653    }
654
655    /**
656     * Get an iterator to traverse over all claimed jobs in this queue
657     *
658     * Callers should be quick to iterator over it or few results
659     * will be returned due to jobs being acknowledged and deleted
660     *
661     * @stable to override
662     * @return Iterator<RunnableJob>
663     * @throws JobQueueError
664     * @since 1.26
665     */
666    public function getAllAcquiredJobs() {
667        return new ArrayIterator( [] ); // not implemented
668    }
669
670    /**
671     * Get an iterator to traverse over all abandoned jobs in this queue
672     *
673     * @stable to override
674     * @return Iterator<RunnableJob>
675     * @throws JobQueueError
676     * @since 1.25
677     */
678    public function getAllAbandonedJobs() {
679        return new ArrayIterator( [] ); // not implemented
680    }
681
682    /**
683     * Do not use this function outside of JobQueue/JobQueueGroup
684     *
685     * @stable to override
686     * @return string|null
687     * @since 1.22
688     */
689    public function getCoalesceLocationInternal() {
690        return null;
691    }
692
693    /**
694     * Check whether each of the given queues are empty.
695     * This is used for batching checks for queues stored at the same place.
696     *
697     * @param array $types List of queues types
698     * @return array|null (list of non-empty queue types) or null if unsupported
699     * @throws JobQueueError
700     * @since 1.22
701     */
702    final public function getSiblingQueuesWithJobs( array $types ) {
703        return $this->doGetSiblingQueuesWithJobs( $types );
704    }
705
706    /**
707     * @stable to override
708     * @see JobQueue::getSiblingQueuesWithJobs()
709     * @param array $types List of queues types
710     * @return array|null (list of queue types) or null if unsupported
711     */
712    protected function doGetSiblingQueuesWithJobs( array $types ) {
713        return null; // not supported
714    }
715
716    /**
717     * Check the size of each of the given queues.
718     * For queues not served by the same store as this one, 0 is returned.
719     * This is used for batching checks for queues stored at the same place.
720     *
721     * @param array $types List of queues types
722     * @return array|null (job type => whether queue is empty) or null if unsupported
723     * @throws JobQueueError
724     * @since 1.22
725     */
726    final public function getSiblingQueueSizes( array $types ) {
727        return $this->doGetSiblingQueueSizes( $types );
728    }
729
730    /**
731     * @stable to override
732     * @see JobQueue::getSiblingQueuesSize()
733     * @param array $types List of queues types
734     * @return array|null (list of queue types) or null if unsupported
735     */
736    protected function doGetSiblingQueueSizes( array $types ) {
737        return null; // not supported
738    }
739
740    /**
741     * @param string $command
742     * @param array $params
743     * @return Job
744     */
745    protected function factoryJob( $command, $params ) {
746        return $this->jobFactory->newJob( $command, $params );
747    }
748
749    /**
750     * @throws JobQueueReadOnlyError
751     */
752    protected function assertNotReadOnly() {
753        if ( $this->readOnlyReason !== false ) {
754            throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
755        }
756    }
757
758    /**
759     * @param IJobSpecification $job
760     * @throws JobQueueError
761     */
762    private function assertMatchingJobType( IJobSpecification $job ) {
763        if ( $this->typeAgnostic ) {
764            return;
765        }
766        if ( $job->getType() !== $this->type ) {
767            throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
768        }
769    }
770
771    /**
772     * Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type
773     *
774     * @param string $key Event type
775     * @param string $type Job type
776     * @param int $delta
777     * @since 1.22
778     */
779    protected function incrStats( $key, $type, $delta = 1 ) {
780        $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
781        $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
782    }
783
784    /**
785     * Subclasses should set this to true if they support type agnostic queues
786     *
787     * @return bool
788     * @since 1.38
789     */
790    protected function supportsTypeAgnostic(): bool {
791        return false;
792    }
793}