Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
67.48% covered (warning)
67.48%
83 / 123
30.23% covered (danger)
30.23%
13 / 43
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueue
67.48% covered (warning)
67.48%
83 / 123
30.23% covered (danger)
30.23%
13 / 43
262.33
0.00% covered (danger)
0.00%
0 / 1
 __construct
84.21% covered (warning)
84.21%
16 / 19
0.00% covered (danger)
0.00%
0 / 1
7.19
 factory
71.43% covered (warning)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
3.21
 getDomain
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getType
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getOrder
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 supportedOrders
n/a
0 / 0
n/a
0 / 0
0
 optimalOrder
n/a
0 / 0
n/a
0 / 0
0
 supportsDelayedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 delayedJobsEnabled
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getReadOnlyReason
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 isEmpty
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doIsEmpty
n/a
0 / 0
n/a
0 / 0
0
 getSize
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doGetSize
n/a
0 / 0
n/a
0 / 0
0
 getAcquiredCount
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doGetAcquiredCount
n/a
0 / 0
n/a
0 / 0
0
 getDelayedCount
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 doGetDelayedCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAbandonedCount
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 doGetAbandonedCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 push
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 batchPush
75.00% covered (warning)
75.00%
9 / 12
0.00% covered (danger)
0.00%
0 / 1
7.77
 doBatchPush
n/a
0 / 0
n/a
0 / 0
0
 pop
66.67% covered (warning)
66.67%
6 / 9
0.00% covered (danger)
0.00%
0 / 1
5.93
 doPop
n/a
0 / 0
n/a
0 / 0
0
 ack
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 doAck
n/a
0 / 0
n/a
0 / 0
0
 deduplicateRootJob
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 doDeduplicateRootJob
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
5.05
 isRootJobOldDuplicate
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doIsRootJobOldDuplicate
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
6.07
 getRootJobCacheKey
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
1
 delete
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 doDelete
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 waitForBackups
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doWaitForBackups
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 flushCaches
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 doFlushCaches
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllQueuedJobs
n/a
0 / 0
n/a
0 / 0
0
 getAllDelayedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAcquiredJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAbandonedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getCoalesceLocationInternal
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doGetSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doGetSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 factoryJob
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 assertNotReadOnly
50.00% covered (danger)
50.00%
1 / 2
0.00% covered (danger)
0.00%
0 / 1
2.50
 assertMatchingJobType
50.00% covered (danger)
50.00%
2 / 4
0.00% covered (danger)
0.00%
0 / 1
4.12
 incrStats
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 supportsTypeAgnostic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
22use MediaWiki\JobQueue\JobFactory;
23use MediaWiki\MediaWikiServices;
24use Wikimedia\RequestTimeout\TimeoutException;
25use Wikimedia\UUID\GlobalIdGenerator;
26
27/**
28 * @defgroup JobQueue JobQueue
29 *
30 *
31 * See [the architecture doc](@ref jobqueuearch) for more information.
32 */
33
34/**
35 * Base class for queueing and running background jobs from a storage backend.
36 *
37 * See [the architecture doc](@ref jobqueuearch) for more information.
38 *
39 * @ingroup JobQueue
40 * @since 1.21
41 * @stable to extend
42 */
43abstract class JobQueue {
44    /** @var string DB domain ID */
45    protected $domain;
46    /** @var string Job type */
47    protected $type;
48    /** @var string Job priority for pop() */
49    protected $order;
50    /** @var int Time to live in seconds */
51    protected $claimTTL;
52    /** @var int Maximum number of times to try a job */
53    protected $maxTries;
54    /** @var string|false Read only rationale (or false if r/w) */
55    protected $readOnlyReason;
56    /** @var StatsdDataFactoryInterface */
57    protected $stats;
58    /** @var GlobalIdGenerator */
59    protected $idGenerator;
60
61    /** @var WANObjectCache */
62    protected $wanCache;
63
64    /** @var bool */
65    protected $typeAgnostic;
66
67    private JobFactory $jobFactory;
68
69    /* Bit flag for "all-or-nothing" job insertions */
70    protected const QOS_ATOMIC = 1;
71
72    /* Seconds to remember root jobs (28 days) */
73    protected const ROOTJOB_TTL = 28 * 24 * 3600;
74
75    /**
76     * @stable to call
77     *
78     * @param array $params
79     *      - type : A job type, 'default' if typeAgnostic is set
80     *   - domain : A DB domain ID
81     *   - idGenerator : A GlobalIdGenerator instance.
82     *   - wanCache : An instance of WANObjectCache to use for caching [default: none]
83     *   - stats : An instance of StatsdDataFactoryInterface [default: none]
84     *   - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever]
85     *   - maxTries : Total times a job can be tried, assuming claims expire [default: 3]
86     *   - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
87     *   - readOnlyReason : Mark the queue as read-only with this reason [default: false]
88     *   - typeAgnostic : If the jobqueue should operate agnostic to the job types
89     * @throws JobQueueError
90     *
91     */
92    protected function __construct( array $params ) {
93        $this->domain = $params['domain'] ?? $params['wiki']; // b/c
94        $this->type = $params['type'];
95        $this->claimTTL = $params['claimTTL'] ?? 0;
96        $this->maxTries = $params['maxTries'] ?? 3;
97        if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
98            $this->order = $params['order'];
99        } else {
100            $this->order = $this->optimalOrder();
101        }
102        if ( !in_array( $this->order, $this->supportedOrders() ) ) {
103            throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
104        }
105        $this->readOnlyReason = $params['readOnlyReason'] ?? false;
106        $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
107        $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
108        $this->idGenerator = $params['idGenerator'];
109        if ( ( $params['typeAgnostic'] ?? false ) && !$this->supportsTypeAgnostic() ) {
110            throw new JobQueueError( __CLASS__ . " does not support type agnostic queues." );
111        }
112        $this->typeAgnostic = ( $params['typeAgnostic'] ?? false );
113        if ( $this->typeAgnostic ) {
114            $this->type = 'default';
115        }
116
117        $this->jobFactory = MediaWikiServices::getInstance()->getJobFactory();
118    }
119
120    /**
121     * Get a job queue object of the specified type.
122     * $params includes:
123     *   - class : What job class to use (determines job type)
124     *   - domain : Database domain ID of the wiki the jobs are for (defaults to current wiki)
125     *   - type : The name of the job types this queue handles
126     *   - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
127     *      If "fifo" is used, the queue will effectively be FIFO. Note that job
128     *      completion will not appear to be exactly FIFO if there are multiple
129     *      job runners since jobs can take different times to finish once popped.
130     *      If "timestamp" is used, the queue will at least be loosely ordered
131     *      by timestamp, allowing for some jobs to be popped off out of order.
132     *      If "random" is used, pop() will pick jobs in random order.
133     *      Note that it may only be weakly random (e.g. a lottery of the oldest X).
134     *      If "any" is chosen, the queue will use whatever order is the fastest.
135     *      This might be useful for improving concurrency for job acquisition.
136     *   - claimTTL : If supported, the queue will recycle jobs that have been popped
137     *      but not acknowledged as completed after this many seconds. Recycling
138     *      of jobs simply means re-inserting them into the queue. Jobs can be
139     *      attempted up to three times before being discarded.
140     *   - readOnlyReason : Set this to a string to make the queue read-only. [optional]
141     *   - idGenerator : A GlobalIdGenerator instance.
142     *   - stats  : A StatsdDataFactoryInterface. [optional]
143     *
144     * Queue classes should throw an exception if they do not support the options given.
145     *
146     * @param array $params
147     * @return JobQueue
148     * @throws JobQueueError
149     */
150    final public static function factory( array $params ) {
151        $class = $params['class'];
152        if ( !class_exists( $class ) ) {
153            throw new JobQueueError( "Invalid job queue class '$class'." );
154        }
155
156        $obj = new $class( $params );
157        if ( !( $obj instanceof self ) ) {
158            throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
159        }
160
161        return $obj;
162    }
163
164    /**
165     * @return string Database domain ID
166     */
167    final public function getDomain() {
168        return $this->domain;
169    }
170
171    /**
172     * @return string Job type that this queue handles
173     */
174    final public function getType() {
175        return $this->type;
176    }
177
178    /**
179     * @return string One of (random, timestamp, fifo, undefined)
180     */
181    final public function getOrder() {
182        return $this->order;
183    }
184
185    /**
186     * Get the allowed queue orders for configuration validation
187     *
188     * @return array Subset of (random, timestamp, fifo, undefined)
189     */
190    abstract protected function supportedOrders();
191
192    /**
193     * Get the default queue order to use if configuration does not specify one
194     *
195     * @return string One of (random, timestamp, fifo, undefined)
196     */
197    abstract protected function optimalOrder();
198
199    /**
200     * Find out if delayed jobs are supported for configuration validation
201     *
202     * @stable to override
203     * @return bool Whether delayed jobs are supported
204     */
205    protected function supportsDelayedJobs() {
206        return false; // not implemented
207    }
208
209    /**
210     * @return bool Whether delayed jobs are enabled
211     * @since 1.22
212     */
213    final public function delayedJobsEnabled() {
214        return $this->supportsDelayedJobs();
215    }
216
217    /**
218     * @return string|false Read-only rational or false if r/w
219     * @since 1.27
220     */
221    public function getReadOnlyReason() {
222        return $this->readOnlyReason;
223    }
224
225    /**
226     * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
227     * Queue classes should use caching if they are any slower without memcached.
228     *
229     * If caching is used, this might return false when there are actually no jobs.
230     * If pop() is called and returns false then it should correct the cache. Also,
231     * calling flushCaches() first prevents this. However, this effect is typically
232     * not distinguishable from the race condition between isEmpty() and pop().
233     *
234     * @return bool
235     * @throws JobQueueError
236     */
237    final public function isEmpty() {
238        $res = $this->doIsEmpty();
239
240        return $res;
241    }
242
243    /**
244     * @see JobQueue::isEmpty()
245     * @return bool
246     */
247    abstract protected function doIsEmpty();
248
249    /**
250     * Get the number of available (unacquired, non-delayed) jobs in the queue.
251     * Queue classes should use caching if they are any slower without memcached.
252     *
253     * If caching is used, this number might be out of date for a minute.
254     *
255     * @return int
256     * @throws JobQueueError
257     */
258    final public function getSize() {
259        $res = $this->doGetSize();
260
261        return $res;
262    }
263
264    /**
265     * @see JobQueue::getSize()
266     * @return int
267     */
268    abstract protected function doGetSize();
269
270    /**
271     * Get the number of acquired jobs (these are temporarily out of the queue).
272     * Queue classes should use caching if they are any slower without memcached.
273     *
274     * If caching is used, this number might be out of date for a minute.
275     *
276     * @return int
277     * @throws JobQueueError
278     */
279    final public function getAcquiredCount() {
280        $res = $this->doGetAcquiredCount();
281
282        return $res;
283    }
284
285    /**
286     * @see JobQueue::getAcquiredCount()
287     * @return int
288     */
289    abstract protected function doGetAcquiredCount();
290
291    /**
292     * Get the number of delayed jobs (these are temporarily out of the queue).
293     * Queue classes should use caching if they are any slower without memcached.
294     *
295     * If caching is used, this number might be out of date for a minute.
296     *
297     * @return int
298     * @throws JobQueueError
299     * @since 1.22
300     */
301    final public function getDelayedCount() {
302        $res = $this->doGetDelayedCount();
303
304        return $res;
305    }
306
307    /**
308     * @stable to override
309     * @see JobQueue::getDelayedCount()
310     * @return int
311     */
312    protected function doGetDelayedCount() {
313        return 0; // not implemented
314    }
315
316    /**
317     * Get the number of acquired jobs that can no longer be attempted.
318     * Queue classes should use caching if they are any slower without memcached.
319     *
320     * If caching is used, this number might be out of date for a minute.
321     *
322     * @return int
323     * @throws JobQueueError
324     */
325    final public function getAbandonedCount() {
326        $res = $this->doGetAbandonedCount();
327
328        return $res;
329    }
330
331    /**
332     * @stable to override
333     * @see JobQueue::getAbandonedCount()
334     * @return int
335     */
336    protected function doGetAbandonedCount() {
337        return 0; // not implemented
338    }
339
340    /**
341     * Push one or more jobs into the queue.
342     * This does not require $wgJobClasses to be set for the given job type.
343     * Outside callers should use JobQueueGroup::push() instead of this function.
344     *
345     * @param IJobSpecification|IJobSpecification[] $jobs
346     * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
347     * @return void
348     * @throws JobQueueError
349     */
350    final public function push( $jobs, $flags = 0 ) {
351        $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
352        $this->batchPush( $jobs, $flags );
353    }
354
355    /**
356     * Push a batch of jobs into the queue.
357     * This does not require $wgJobClasses to be set for the given job type.
358     * Outside callers should use JobQueueGroup::push() instead of this function.
359     *
360     * @param IJobSpecification[] $jobs
361     * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
362     * @return void
363     * @throws JobQueueError
364     */
365    final public function batchPush( array $jobs, $flags = 0 ) {
366        $this->assertNotReadOnly();
367
368        if ( $jobs === [] ) {
369            return; // nothing to do
370        }
371
372        foreach ( $jobs as $job ) {
373            $this->assertMatchingJobType( $job );
374            if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
375                throw new JobQueueError(
376                    "Got delayed '{$job->getType()}' job; delays are not supported." );
377            }
378        }
379
380        $this->doBatchPush( $jobs, $flags );
381
382        foreach ( $jobs as $job ) {
383            if ( $job->isRootJob() ) {
384                $this->deduplicateRootJob( $job );
385            }
386        }
387    }
388
389    /**
390     * @see JobQueue::batchPush()
391     * @param IJobSpecification[] $jobs
392     * @param int $flags
393     */
394    abstract protected function doBatchPush( array $jobs, $flags );
395
396    /**
397     * Pop a job off of the queue.
398     * This requires $wgJobClasses to be set for the given job type.
399     * Outside callers should use JobQueueGroup::pop() instead of this function.
400     *
401     * @throws JobQueueError
402     * @return RunnableJob|false Returns false if there are no jobs
403     */
404    final public function pop() {
405        $this->assertNotReadOnly();
406
407        $job = $this->doPop();
408
409        // Flag this job as an old duplicate based on its "root" job...
410        try {
411            if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
412                $this->incrStats( 'dupe_pops', $job->getType() );
413                $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
414            }
415        } catch ( TimeoutException $e ) {
416            throw $e;
417        } catch ( Exception $e ) {
418            // don't lose jobs over this
419        }
420
421        return $job;
422    }
423
424    /**
425     * @see JobQueue::pop()
426     * @return RunnableJob|false
427     */
428    abstract protected function doPop();
429
430    /**
431     * Acknowledge that a job was completed.
432     *
433     * This does nothing for certain queue classes or if "claimTTL" is not set.
434     * Outside callers should use JobQueueGroup::ack() instead of this function.
435     *
436     * @param RunnableJob $job
437     * @return void
438     * @throws JobQueueError
439     */
440    final public function ack( RunnableJob $job ) {
441        $this->assertNotReadOnly();
442        $this->assertMatchingJobType( $job );
443
444        $this->doAck( $job );
445    }
446
447    /**
448     * @see JobQueue::ack()
449     * @param RunnableJob $job
450     */
451    abstract protected function doAck( RunnableJob $job );
452
453    /**
454     * Register the "root job" of a given job into the queue for de-duplication.
455     * This should only be called right *after* all the new jobs have been inserted.
456     * This is used to turn older, duplicate, job entries into no-ops. The root job
457     * information will remain in the registry until it simply falls out of cache.
458     *
459     * This requires that $job has two special fields in the "params" array:
460     *   - rootJobSignature : hash (e.g. SHA1) that identifies the task
461     *   - rootJobTimestamp : TS_MW timestamp of this instance of the task
462     *
463     * A "root job" is a conceptual job that consist of potentially many smaller jobs
464     * that are actually inserted into the queue. For example, "refreshLinks" jobs are
465     * spawned when a template is edited. One can think of the task as "update links
466     * of pages that use template X" and an instance of that task as a "root job".
467     * However, what actually goes into the queue are range and leaf job subtypes.
468     * Since these jobs include things like page ID ranges and DB primary positions,
469     * and can morph into smaller jobs recursively, simple duplicate detection
470     * for individual jobs being identical (like that of job_sha1) is not useful.
471     *
472     * In the case of "refreshLinks", if these jobs are still in the queue when the template
473     * is edited again, we want all of these old refreshLinks jobs for that template to become
474     * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
475     * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
476     * previous "root job" for the same task of "update links of pages that use template X".
477     *
478     * This does nothing for certain queue classes.
479     *
480     * @internal For use within JobQueue only
481     * @param IJobSpecification $job
482     * @throws JobQueueError
483     * @return bool
484     */
485    final public function deduplicateRootJob( IJobSpecification $job ) {
486        $this->assertNotReadOnly();
487        $this->assertMatchingJobType( $job );
488
489        return $this->doDeduplicateRootJob( $job );
490    }
491
492    /**
493     * @stable to override
494     * @see JobQueue::deduplicateRootJob()
495     * @param IJobSpecification $job
496     * @throws JobQueueError
497     * @return bool
498     */
499    protected function doDeduplicateRootJob( IJobSpecification $job ) {
500        $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
501        if ( !$params ) {
502            throw new JobQueueError( "Cannot register root job; missing parameters." );
503        }
504
505        $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
506        // Callers should call JobQueueGroup::push() before this method so that if the
507        // insert fails, the de-duplication registration will be aborted. Having only the
508        // de-duplication registration succeed would cause jobs to become no-ops without
509        // any actual jobs that made them redundant.
510        $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
511        if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
512            return true; // a newer version of this root job was enqueued
513        }
514
515        // Update the timestamp of the last root job started at the location...
516        return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
517    }
518
519    /**
520     * Check if the "root" job of a given job has been superseded by a newer one
521     *
522     * @param IJobSpecification $job
523     * @throws JobQueueError
524     * @return bool
525     */
526    final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
527        $this->assertMatchingJobType( $job );
528
529        return $this->doIsRootJobOldDuplicate( $job );
530    }
531
532    /**
533     * @stable to override
534     * @see JobQueue::isRootJobOldDuplicate()
535     * @param IJobSpecification $job
536     * @return bool
537     */
538    protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
539        $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
540        if ( !$params ) {
541            return false; // job has no de-duplication info
542        }
543
544        $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
545        // Get the last time this root job was enqueued
546        $timestamp = $this->wanCache->get( $key );
547        if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
548            // Update the timestamp of the last known root job started at the location...
549            $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
550        }
551
552        // Check if a new root job was started at the location after this one's...
553        return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
554    }
555
556    /**
557     * @param string $signature Hash identifier of the root job
558     * @param string $type job type
559     * @return string
560     */
561    protected function getRootJobCacheKey( $signature, $type ) {
562        return $this->wanCache->makeGlobalKey(
563            'jobqueue',
564            $this->domain,
565            $type,
566            'rootjob',
567            $signature
568        );
569    }
570
571    /**
572     * Delete all unclaimed and delayed jobs from the queue
573     *
574     * @throws JobQueueError
575     * @since 1.22
576     * @return void
577     */
578    final public function delete() {
579        $this->assertNotReadOnly();
580
581        $this->doDelete();
582    }
583
584    /**
585     * @stable to override
586     * @see JobQueue::delete()
587     * @throws JobQueueError
588     */
589    protected function doDelete() {
590        throw new JobQueueError( "This method is not implemented." );
591    }
592
593    /**
594     * Wait for any replica DBs or backup servers to catch up.
595     *
596     * This does nothing for certain queue classes.
597     *
598     * @return void
599     * @throws JobQueueError
600     */
601    final public function waitForBackups() {
602        $this->doWaitForBackups();
603    }
604
605    /**
606     * @stable to override
607     * @see JobQueue::waitForBackups()
608     * @return void
609     */
610    protected function doWaitForBackups() {
611    }
612
613    /**
614     * Clear any process and persistent caches
615     *
616     * @return void
617     */
618    final public function flushCaches() {
619        $this->doFlushCaches();
620    }
621
622    /**
623     * @stable to override
624     * @see JobQueue::flushCaches()
625     * @return void
626     */
627    protected function doFlushCaches() {
628    }
629
630    /**
631     * Get an iterator to traverse over all available jobs in this queue.
632     * This does not include jobs that are currently acquired or delayed.
633     * Note: results may be stale if the queue is concurrently modified.
634     *
635     * @return Iterator<RunnableJob>
636     * @throws JobQueueError
637     */
638    abstract public function getAllQueuedJobs();
639
640    /**
641     * Get an iterator to traverse over all delayed jobs in this queue.
642     * Note: results may be stale if the queue is concurrently modified.
643     *
644     * @stable to override
645     * @return Iterator<RunnableJob>
646     * @throws JobQueueError
647     * @since 1.22
648     */
649    public function getAllDelayedJobs() {
650        return new ArrayIterator( [] ); // not implemented
651    }
652
653    /**
654     * Get an iterator to traverse over all claimed jobs in this queue
655     *
656     * Callers should be quick to iterator over it or few results
657     * will be returned due to jobs being acknowledged and deleted
658     *
659     * @stable to override
660     * @return Iterator<RunnableJob>
661     * @throws JobQueueError
662     * @since 1.26
663     */
664    public function getAllAcquiredJobs() {
665        return new ArrayIterator( [] ); // not implemented
666    }
667
668    /**
669     * Get an iterator to traverse over all abandoned jobs in this queue
670     *
671     * @stable to override
672     * @return Iterator<RunnableJob>
673     * @throws JobQueueError
674     * @since 1.25
675     */
676    public function getAllAbandonedJobs() {
677        return new ArrayIterator( [] ); // not implemented
678    }
679
680    /**
681     * Do not use this function outside of JobQueue/JobQueueGroup
682     *
683     * @stable to override
684     * @return string|null
685     * @since 1.22
686     */
687    public function getCoalesceLocationInternal() {
688        return null;
689    }
690
691    /**
692     * Check whether each of the given queues are empty.
693     * This is used for batching checks for queues stored at the same place.
694     *
695     * @param array $types List of queues types
696     * @return array|null (list of non-empty queue types) or null if unsupported
697     * @throws JobQueueError
698     * @since 1.22
699     */
700    final public function getSiblingQueuesWithJobs( array $types ) {
701        return $this->doGetSiblingQueuesWithJobs( $types );
702    }
703
704    /**
705     * @stable to override
706     * @see JobQueue::getSiblingQueuesWithJobs()
707     * @param array $types List of queues types
708     * @return array|null (list of queue types) or null if unsupported
709     */
710    protected function doGetSiblingQueuesWithJobs( array $types ) {
711        return null; // not supported
712    }
713
714    /**
715     * Check the size of each of the given queues.
716     * For queues not served by the same store as this one, 0 is returned.
717     * This is used for batching checks for queues stored at the same place.
718     *
719     * @param array $types List of queues types
720     * @return array|null (job type => whether queue is empty) or null if unsupported
721     * @throws JobQueueError
722     * @since 1.22
723     */
724    final public function getSiblingQueueSizes( array $types ) {
725        return $this->doGetSiblingQueueSizes( $types );
726    }
727
728    /**
729     * @stable to override
730     * @see JobQueue::getSiblingQueuesSize()
731     * @param array $types List of queues types
732     * @return array|null (list of queue types) or null if unsupported
733     */
734    protected function doGetSiblingQueueSizes( array $types ) {
735        return null; // not supported
736    }
737
738    /**
739     * @param string $command
740     * @param array $params
741     * @return Job
742     */
743    protected function factoryJob( $command, $params ) {
744        return $this->jobFactory->newJob( $command, $params );
745    }
746
747    /**
748     * @throws JobQueueReadOnlyError
749     */
750    protected function assertNotReadOnly() {
751        if ( $this->readOnlyReason !== false ) {
752            throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
753        }
754    }
755
756    /**
757     * @param IJobSpecification $job
758     * @throws JobQueueError
759     */
760    private function assertMatchingJobType( IJobSpecification $job ) {
761        if ( $this->typeAgnostic ) {
762            return;
763        }
764        if ( $job->getType() !== $this->type ) {
765            throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
766        }
767    }
768
769    /**
770     * Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type
771     *
772     * @param string $key Event type
773     * @param string $type Job type
774     * @param int $delta
775     * @since 1.22
776     */
777    protected function incrStats( $key, $type, $delta = 1 ) {
778        $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
779        $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
780    }
781
782    /**
783     * Subclasses should set this to true if they support type agnostic queues
784     *
785     * @return bool
786     * @since 1.38
787     */
788    protected function supportsTypeAgnostic(): bool {
789        return false;
790    }
791}