Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
67.48% |
83 / 123 |
|
30.23% |
13 / 43 |
CRAP | |
0.00% |
0 / 1 |
JobQueue | |
67.48% |
83 / 123 |
|
30.23% |
13 / 43 |
262.33 | |
0.00% |
0 / 1 |
__construct | |
84.21% |
16 / 19 |
|
0.00% |
0 / 1 |
7.19 | |||
factory | |
71.43% |
5 / 7 |
|
0.00% |
0 / 1 |
3.21 | |||
getDomain | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getType | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
supportedOrders | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
optimalOrder | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
supportsDelayedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
delayedJobsEnabled | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getReadOnlyReason | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
isEmpty | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doIsEmpty | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
getSize | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doGetSize | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
getAcquiredCount | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doGetAcquiredCount | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
getDelayedCount | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
doGetDelayedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAbandonedCount | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
doGetAbandonedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
push | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
batchPush | |
75.00% |
9 / 12 |
|
0.00% |
0 / 1 |
7.77 | |||
doBatchPush | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
pop | |
66.67% |
6 / 9 |
|
0.00% |
0 / 1 |
5.93 | |||
doPop | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
ack | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
doAck | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
deduplicateRootJob | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
doDeduplicateRootJob | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
5.05 | |||
isRootJobOldDuplicate | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doIsRootJobOldDuplicate | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
6.07 | |||
getRootJobCacheKey | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
1 | |||
delete | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doDelete | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
waitForBackups | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doWaitForBackups | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
flushCaches | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
doFlushCaches | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllQueuedJobs | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
getAllDelayedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllAcquiredJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllAbandonedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getSiblingQueuesWithJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getSiblingQueueSizes | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
factoryJob | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
assertNotReadOnly | |
50.00% |
1 / 2 |
|
0.00% |
0 / 1 |
2.50 | |||
assertMatchingJobType | |
50.00% |
2 / 4 |
|
0.00% |
0 / 1 |
4.12 | |||
incrStats | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
supportsTypeAgnostic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
22 | use MediaWiki\JobQueue\JobFactory; |
23 | use MediaWiki\MediaWikiServices; |
24 | use Wikimedia\ObjectCache\WANObjectCache; |
25 | use Wikimedia\RequestTimeout\TimeoutException; |
26 | use Wikimedia\Stats\NullStatsdDataFactory; |
27 | use Wikimedia\UUID\GlobalIdGenerator; |
28 | |
29 | /** |
30 | * @defgroup JobQueue JobQueue |
31 | * |
32 | * |
33 | * See [the architecture doc](@ref jobqueuearch) for more information. |
34 | */ |
35 | |
36 | /** |
37 | * Base class for queueing and running background jobs from a storage backend. |
38 | * |
39 | * See [the architecture doc](@ref jobqueuearch) for more information. |
40 | * |
41 | * @ingroup JobQueue |
42 | * @since 1.21 |
43 | * @stable to extend |
44 | */ |
45 | abstract class JobQueue { |
46 | /** @var string DB domain ID */ |
47 | protected $domain; |
48 | /** @var string Job type */ |
49 | protected $type; |
50 | /** @var string Job priority for pop() */ |
51 | protected $order; |
52 | /** @var int Time to live in seconds */ |
53 | protected $claimTTL; |
54 | /** @var int Maximum number of times to try a job */ |
55 | protected $maxTries; |
56 | /** @var string|false Read only rationale (or false if r/w) */ |
57 | protected $readOnlyReason; |
58 | /** @var StatsdDataFactoryInterface */ |
59 | protected $stats; |
60 | /** @var GlobalIdGenerator */ |
61 | protected $idGenerator; |
62 | |
63 | /** @var WANObjectCache */ |
64 | protected $wanCache; |
65 | |
66 | /** @var bool */ |
67 | protected $typeAgnostic; |
68 | |
69 | private JobFactory $jobFactory; |
70 | |
71 | /* Bit flag for "all-or-nothing" job insertions */ |
72 | protected const QOS_ATOMIC = 1; |
73 | |
74 | /* Seconds to remember root jobs (28 days) */ |
75 | protected const ROOTJOB_TTL = 28 * 24 * 3600; |
76 | |
77 | /** |
78 | * @stable to call |
79 | * |
80 | * @param array $params |
81 | * - type : A job type, 'default' if typeAgnostic is set |
82 | * - domain : A DB domain ID |
83 | * - idGenerator : A GlobalIdGenerator instance. |
84 | * - wanCache : An instance of WANObjectCache to use for caching [default: none] |
85 | * - stats : An instance of StatsdDataFactoryInterface [default: none] |
86 | * - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever] |
87 | * - maxTries : Total times a job can be tried, assuming claims expire [default: 3] |
88 | * - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable] |
89 | * - readOnlyReason : Mark the queue as read-only with this reason [default: false] |
90 | * - typeAgnostic : If the jobqueue should operate agnostic to the job types |
91 | * @throws JobQueueError |
92 | * |
93 | */ |
94 | protected function __construct( array $params ) { |
95 | $this->domain = $params['domain'] ?? $params['wiki']; // b/c |
96 | $this->type = $params['type']; |
97 | $this->claimTTL = $params['claimTTL'] ?? 0; |
98 | $this->maxTries = $params['maxTries'] ?? 3; |
99 | if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { |
100 | $this->order = $params['order']; |
101 | } else { |
102 | $this->order = $this->optimalOrder(); |
103 | } |
104 | if ( !in_array( $this->order, $this->supportedOrders() ) ) { |
105 | throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." ); |
106 | } |
107 | $this->readOnlyReason = $params['readOnlyReason'] ?? false; |
108 | $this->stats = $params['stats'] ?? new NullStatsdDataFactory(); |
109 | $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty(); |
110 | $this->idGenerator = $params['idGenerator']; |
111 | if ( ( $params['typeAgnostic'] ?? false ) && !$this->supportsTypeAgnostic() ) { |
112 | throw new JobQueueError( __CLASS__ . " does not support type agnostic queues." ); |
113 | } |
114 | $this->typeAgnostic = ( $params['typeAgnostic'] ?? false ); |
115 | if ( $this->typeAgnostic ) { |
116 | $this->type = 'default'; |
117 | } |
118 | |
119 | $this->jobFactory = MediaWikiServices::getInstance()->getJobFactory(); |
120 | } |
121 | |
122 | /** |
123 | * Get a job queue object of the specified type. |
124 | * $params includes: |
125 | * - class : What job class to use (determines job type) |
126 | * - domain : Database domain ID of the wiki the jobs are for (defaults to current wiki) |
127 | * - type : The name of the job types this queue handles |
128 | * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". |
129 | * If "fifo" is used, the queue will effectively be FIFO. Note that job |
130 | * completion will not appear to be exactly FIFO if there are multiple |
131 | * job runners since jobs can take different times to finish once popped. |
132 | * If "timestamp" is used, the queue will at least be loosely ordered |
133 | * by timestamp, allowing for some jobs to be popped off out of order. |
134 | * If "random" is used, pop() will pick jobs in random order. |
135 | * Note that it may only be weakly random (e.g. a lottery of the oldest X). |
136 | * If "any" is chosen, the queue will use whatever order is the fastest. |
137 | * This might be useful for improving concurrency for job acquisition. |
138 | * - claimTTL : If supported, the queue will recycle jobs that have been popped |
139 | * but not acknowledged as completed after this many seconds. Recycling |
140 | * of jobs simply means re-inserting them into the queue. Jobs can be |
141 | * attempted up to three times before being discarded. |
142 | * - readOnlyReason : Set this to a string to make the queue read-only. [optional] |
143 | * - idGenerator : A GlobalIdGenerator instance. |
144 | * - stats : A StatsdDataFactoryInterface. [optional] |
145 | * |
146 | * Queue classes should throw an exception if they do not support the options given. |
147 | * |
148 | * @param array $params |
149 | * @return JobQueue |
150 | * @throws JobQueueError |
151 | */ |
152 | final public static function factory( array $params ) { |
153 | $class = $params['class']; |
154 | if ( !class_exists( $class ) ) { |
155 | throw new JobQueueError( "Invalid job queue class '$class'." ); |
156 | } |
157 | |
158 | $obj = new $class( $params ); |
159 | if ( !( $obj instanceof self ) ) { |
160 | throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." ); |
161 | } |
162 | |
163 | return $obj; |
164 | } |
165 | |
166 | /** |
167 | * @return string Database domain ID |
168 | */ |
169 | final public function getDomain() { |
170 | return $this->domain; |
171 | } |
172 | |
173 | /** |
174 | * @return string Job type that this queue handles |
175 | */ |
176 | final public function getType() { |
177 | return $this->type; |
178 | } |
179 | |
180 | /** |
181 | * @return string One of (random, timestamp, fifo, undefined) |
182 | */ |
183 | final public function getOrder() { |
184 | return $this->order; |
185 | } |
186 | |
187 | /** |
188 | * Get the allowed queue orders for configuration validation |
189 | * |
190 | * @return array Subset of (random, timestamp, fifo, undefined) |
191 | */ |
192 | abstract protected function supportedOrders(); |
193 | |
194 | /** |
195 | * Get the default queue order to use if configuration does not specify one |
196 | * |
197 | * @return string One of (random, timestamp, fifo, undefined) |
198 | */ |
199 | abstract protected function optimalOrder(); |
200 | |
201 | /** |
202 | * Find out if delayed jobs are supported for configuration validation |
203 | * |
204 | * @stable to override |
205 | * @return bool Whether delayed jobs are supported |
206 | */ |
207 | protected function supportsDelayedJobs() { |
208 | return false; // not implemented |
209 | } |
210 | |
211 | /** |
212 | * @return bool Whether delayed jobs are enabled |
213 | * @since 1.22 |
214 | */ |
215 | final public function delayedJobsEnabled() { |
216 | return $this->supportsDelayedJobs(); |
217 | } |
218 | |
219 | /** |
220 | * @return string|false Read-only rational or false if r/w |
221 | * @since 1.27 |
222 | */ |
223 | public function getReadOnlyReason() { |
224 | return $this->readOnlyReason; |
225 | } |
226 | |
227 | /** |
228 | * Quickly check if the queue has no available (unacquired, non-delayed) jobs. |
229 | * Queue classes should use caching if they are any slower without memcached. |
230 | * |
231 | * If caching is used, this might return false when there are actually no jobs. |
232 | * If pop() is called and returns false then it should correct the cache. Also, |
233 | * calling flushCaches() first prevents this. However, this effect is typically |
234 | * not distinguishable from the race condition between isEmpty() and pop(). |
235 | * |
236 | * @return bool |
237 | * @throws JobQueueError |
238 | */ |
239 | final public function isEmpty() { |
240 | $res = $this->doIsEmpty(); |
241 | |
242 | return $res; |
243 | } |
244 | |
245 | /** |
246 | * @see JobQueue::isEmpty() |
247 | * @return bool |
248 | */ |
249 | abstract protected function doIsEmpty(); |
250 | |
251 | /** |
252 | * Get the number of available (unacquired, non-delayed) jobs in the queue. |
253 | * Queue classes should use caching if they are any slower without memcached. |
254 | * |
255 | * If caching is used, this number might be out of date for a minute. |
256 | * |
257 | * @return int |
258 | * @throws JobQueueError |
259 | */ |
260 | final public function getSize() { |
261 | $res = $this->doGetSize(); |
262 | |
263 | return $res; |
264 | } |
265 | |
266 | /** |
267 | * @see JobQueue::getSize() |
268 | * @return int |
269 | */ |
270 | abstract protected function doGetSize(); |
271 | |
272 | /** |
273 | * Get the number of acquired jobs (these are temporarily out of the queue). |
274 | * Queue classes should use caching if they are any slower without memcached. |
275 | * |
276 | * If caching is used, this number might be out of date for a minute. |
277 | * |
278 | * @return int |
279 | * @throws JobQueueError |
280 | */ |
281 | final public function getAcquiredCount() { |
282 | $res = $this->doGetAcquiredCount(); |
283 | |
284 | return $res; |
285 | } |
286 | |
287 | /** |
288 | * @see JobQueue::getAcquiredCount() |
289 | * @return int |
290 | */ |
291 | abstract protected function doGetAcquiredCount(); |
292 | |
293 | /** |
294 | * Get the number of delayed jobs (these are temporarily out of the queue). |
295 | * Queue classes should use caching if they are any slower without memcached. |
296 | * |
297 | * If caching is used, this number might be out of date for a minute. |
298 | * |
299 | * @return int |
300 | * @throws JobQueueError |
301 | * @since 1.22 |
302 | */ |
303 | final public function getDelayedCount() { |
304 | $res = $this->doGetDelayedCount(); |
305 | |
306 | return $res; |
307 | } |
308 | |
309 | /** |
310 | * @stable to override |
311 | * @see JobQueue::getDelayedCount() |
312 | * @return int |
313 | */ |
314 | protected function doGetDelayedCount() { |
315 | return 0; // not implemented |
316 | } |
317 | |
318 | /** |
319 | * Get the number of acquired jobs that can no longer be attempted. |
320 | * Queue classes should use caching if they are any slower without memcached. |
321 | * |
322 | * If caching is used, this number might be out of date for a minute. |
323 | * |
324 | * @return int |
325 | * @throws JobQueueError |
326 | */ |
327 | final public function getAbandonedCount() { |
328 | $res = $this->doGetAbandonedCount(); |
329 | |
330 | return $res; |
331 | } |
332 | |
333 | /** |
334 | * @stable to override |
335 | * @see JobQueue::getAbandonedCount() |
336 | * @return int |
337 | */ |
338 | protected function doGetAbandonedCount() { |
339 | return 0; // not implemented |
340 | } |
341 | |
342 | /** |
343 | * Push one or more jobs into the queue. |
344 | * This does not require $wgJobClasses to be set for the given job type. |
345 | * Outside callers should use JobQueueGroup::push() instead of this function. |
346 | * |
347 | * @param IJobSpecification|IJobSpecification[] $jobs |
348 | * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) |
349 | * @return void |
350 | * @throws JobQueueError |
351 | */ |
352 | final public function push( $jobs, $flags = 0 ) { |
353 | $jobs = is_array( $jobs ) ? $jobs : [ $jobs ]; |
354 | $this->batchPush( $jobs, $flags ); |
355 | } |
356 | |
357 | /** |
358 | * Push a batch of jobs into the queue. |
359 | * This does not require $wgJobClasses to be set for the given job type. |
360 | * Outside callers should use JobQueueGroup::push() instead of this function. |
361 | * |
362 | * @param IJobSpecification[] $jobs |
363 | * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) |
364 | * @return void |
365 | * @throws JobQueueError |
366 | */ |
367 | final public function batchPush( array $jobs, $flags = 0 ) { |
368 | $this->assertNotReadOnly(); |
369 | |
370 | if ( $jobs === [] ) { |
371 | return; // nothing to do |
372 | } |
373 | |
374 | foreach ( $jobs as $job ) { |
375 | $this->assertMatchingJobType( $job ); |
376 | if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) { |
377 | throw new JobQueueError( |
378 | "Got delayed '{$job->getType()}' job; delays are not supported." ); |
379 | } |
380 | } |
381 | |
382 | $this->doBatchPush( $jobs, $flags ); |
383 | |
384 | foreach ( $jobs as $job ) { |
385 | if ( $job->isRootJob() ) { |
386 | $this->deduplicateRootJob( $job ); |
387 | } |
388 | } |
389 | } |
390 | |
391 | /** |
392 | * @see JobQueue::batchPush() |
393 | * @param IJobSpecification[] $jobs |
394 | * @param int $flags |
395 | */ |
396 | abstract protected function doBatchPush( array $jobs, $flags ); |
397 | |
398 | /** |
399 | * Pop a job off of the queue. |
400 | * This requires $wgJobClasses to be set for the given job type. |
401 | * Outside callers should use JobQueueGroup::pop() instead of this function. |
402 | * |
403 | * @throws JobQueueError |
404 | * @return RunnableJob|false Returns false if there are no jobs |
405 | */ |
406 | final public function pop() { |
407 | $this->assertNotReadOnly(); |
408 | |
409 | $job = $this->doPop(); |
410 | |
411 | // Flag this job as an old duplicate based on its "root" job... |
412 | try { |
413 | if ( $job && $this->isRootJobOldDuplicate( $job ) ) { |
414 | $this->incrStats( 'dupe_pops', $job->getType() ); |
415 | $job = DuplicateJob::newFromJob( $job ); // convert to a no-op |
416 | } |
417 | } catch ( TimeoutException $e ) { |
418 | throw $e; |
419 | } catch ( Exception $e ) { |
420 | // don't lose jobs over this |
421 | } |
422 | |
423 | return $job; |
424 | } |
425 | |
426 | /** |
427 | * @see JobQueue::pop() |
428 | * @return RunnableJob|false |
429 | */ |
430 | abstract protected function doPop(); |
431 | |
432 | /** |
433 | * Acknowledge that a job was completed. |
434 | * |
435 | * This does nothing for certain queue classes or if "claimTTL" is not set. |
436 | * Outside callers should use JobQueueGroup::ack() instead of this function. |
437 | * |
438 | * @param RunnableJob $job |
439 | * @return void |
440 | * @throws JobQueueError |
441 | */ |
442 | final public function ack( RunnableJob $job ) { |
443 | $this->assertNotReadOnly(); |
444 | $this->assertMatchingJobType( $job ); |
445 | |
446 | $this->doAck( $job ); |
447 | } |
448 | |
449 | /** |
450 | * @see JobQueue::ack() |
451 | * @param RunnableJob $job |
452 | */ |
453 | abstract protected function doAck( RunnableJob $job ); |
454 | |
455 | /** |
456 | * Register the "root job" of a given job into the queue for de-duplication. |
457 | * This should only be called right *after* all the new jobs have been inserted. |
458 | * This is used to turn older, duplicate, job entries into no-ops. The root job |
459 | * information will remain in the registry until it simply falls out of cache. |
460 | * |
461 | * This requires that $job has two special fields in the "params" array: |
462 | * - rootJobSignature : hash (e.g. SHA1) that identifies the task |
463 | * - rootJobTimestamp : TS_MW timestamp of this instance of the task |
464 | * |
465 | * A "root job" is a conceptual job that consist of potentially many smaller jobs |
466 | * that are actually inserted into the queue. For example, "refreshLinks" jobs are |
467 | * spawned when a template is edited. One can think of the task as "update links |
468 | * of pages that use template X" and an instance of that task as a "root job". |
469 | * However, what actually goes into the queue are range and leaf job subtypes. |
470 | * Since these jobs include things like page ID ranges and DB primary positions, |
471 | * and can morph into smaller jobs recursively, simple duplicate detection |
472 | * for individual jobs being identical (like that of job_sha1) is not useful. |
473 | * |
474 | * In the case of "refreshLinks", if these jobs are still in the queue when the template |
475 | * is edited again, we want all of these old refreshLinks jobs for that template to become |
476 | * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing. |
477 | * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a |
478 | * previous "root job" for the same task of "update links of pages that use template X". |
479 | * |
480 | * This does nothing for certain queue classes. |
481 | * |
482 | * @internal For use within JobQueue only |
483 | * @param IJobSpecification $job |
484 | * @throws JobQueueError |
485 | * @return bool |
486 | */ |
487 | final public function deduplicateRootJob( IJobSpecification $job ) { |
488 | $this->assertNotReadOnly(); |
489 | $this->assertMatchingJobType( $job ); |
490 | |
491 | return $this->doDeduplicateRootJob( $job ); |
492 | } |
493 | |
494 | /** |
495 | * @stable to override |
496 | * @see JobQueue::deduplicateRootJob() |
497 | * @param IJobSpecification $job |
498 | * @throws JobQueueError |
499 | * @return bool |
500 | */ |
501 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
502 | $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null; |
503 | if ( !$params ) { |
504 | throw new JobQueueError( "Cannot register root job; missing parameters." ); |
505 | } |
506 | |
507 | $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ); |
508 | // Callers should call JobQueueGroup::push() before this method so that if the |
509 | // insert fails, the de-duplication registration will be aborted. Having only the |
510 | // de-duplication registration succeed would cause jobs to become no-ops without |
511 | // any actual jobs that made them redundant. |
512 | $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job |
513 | if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) { |
514 | return true; // a newer version of this root job was enqueued |
515 | } |
516 | |
517 | // Update the timestamp of the last root job started at the location... |
518 | return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); |
519 | } |
520 | |
521 | /** |
522 | * Check if the "root" job of a given job has been superseded by a newer one |
523 | * |
524 | * @param IJobSpecification $job |
525 | * @throws JobQueueError |
526 | * @return bool |
527 | */ |
528 | final protected function isRootJobOldDuplicate( IJobSpecification $job ) { |
529 | $this->assertMatchingJobType( $job ); |
530 | |
531 | return $this->doIsRootJobOldDuplicate( $job ); |
532 | } |
533 | |
534 | /** |
535 | * @stable to override |
536 | * @see JobQueue::isRootJobOldDuplicate() |
537 | * @param IJobSpecification $job |
538 | * @return bool |
539 | */ |
540 | protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { |
541 | $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null; |
542 | if ( !$params ) { |
543 | return false; // job has no de-duplication info |
544 | } |
545 | |
546 | $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ); |
547 | // Get the last time this root job was enqueued |
548 | $timestamp = $this->wanCache->get( $key ); |
549 | if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) { |
550 | // Update the timestamp of the last known root job started at the location... |
551 | $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); |
552 | } |
553 | |
554 | // Check if a new root job was started at the location after this one's... |
555 | return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); |
556 | } |
557 | |
558 | /** |
559 | * @param string $signature Hash identifier of the root job |
560 | * @param string $type job type |
561 | * @return string |
562 | */ |
563 | protected function getRootJobCacheKey( $signature, $type ) { |
564 | return $this->wanCache->makeGlobalKey( |
565 | 'jobqueue', |
566 | $this->domain, |
567 | $type, |
568 | 'rootjob', |
569 | $signature |
570 | ); |
571 | } |
572 | |
573 | /** |
574 | * Delete all unclaimed and delayed jobs from the queue |
575 | * |
576 | * @throws JobQueueError |
577 | * @since 1.22 |
578 | * @return void |
579 | */ |
580 | final public function delete() { |
581 | $this->assertNotReadOnly(); |
582 | |
583 | $this->doDelete(); |
584 | } |
585 | |
586 | /** |
587 | * @stable to override |
588 | * @see JobQueue::delete() |
589 | * @throws JobQueueError |
590 | */ |
591 | protected function doDelete() { |
592 | throw new JobQueueError( "This method is not implemented." ); |
593 | } |
594 | |
595 | /** |
596 | * Wait for any replica DBs or backup servers to catch up. |
597 | * |
598 | * This does nothing for certain queue classes. |
599 | * |
600 | * @return void |
601 | * @throws JobQueueError |
602 | */ |
603 | final public function waitForBackups() { |
604 | $this->doWaitForBackups(); |
605 | } |
606 | |
607 | /** |
608 | * @stable to override |
609 | * @see JobQueue::waitForBackups() |
610 | * @return void |
611 | */ |
612 | protected function doWaitForBackups() { |
613 | } |
614 | |
615 | /** |
616 | * Clear any process and persistent caches |
617 | * |
618 | * @return void |
619 | */ |
620 | final public function flushCaches() { |
621 | $this->doFlushCaches(); |
622 | } |
623 | |
624 | /** |
625 | * @stable to override |
626 | * @see JobQueue::flushCaches() |
627 | * @return void |
628 | */ |
629 | protected function doFlushCaches() { |
630 | } |
631 | |
632 | /** |
633 | * Get an iterator to traverse over all available jobs in this queue. |
634 | * This does not include jobs that are currently acquired or delayed. |
635 | * Note: results may be stale if the queue is concurrently modified. |
636 | * |
637 | * @return Iterator<RunnableJob> |
638 | * @throws JobQueueError |
639 | */ |
640 | abstract public function getAllQueuedJobs(); |
641 | |
642 | /** |
643 | * Get an iterator to traverse over all delayed jobs in this queue. |
644 | * Note: results may be stale if the queue is concurrently modified. |
645 | * |
646 | * @stable to override |
647 | * @return Iterator<RunnableJob> |
648 | * @throws JobQueueError |
649 | * @since 1.22 |
650 | */ |
651 | public function getAllDelayedJobs() { |
652 | return new ArrayIterator( [] ); // not implemented |
653 | } |
654 | |
655 | /** |
656 | * Get an iterator to traverse over all claimed jobs in this queue |
657 | * |
658 | * Callers should be quick to iterator over it or few results |
659 | * will be returned due to jobs being acknowledged and deleted |
660 | * |
661 | * @stable to override |
662 | * @return Iterator<RunnableJob> |
663 | * @throws JobQueueError |
664 | * @since 1.26 |
665 | */ |
666 | public function getAllAcquiredJobs() { |
667 | return new ArrayIterator( [] ); // not implemented |
668 | } |
669 | |
670 | /** |
671 | * Get an iterator to traverse over all abandoned jobs in this queue |
672 | * |
673 | * @stable to override |
674 | * @return Iterator<RunnableJob> |
675 | * @throws JobQueueError |
676 | * @since 1.25 |
677 | */ |
678 | public function getAllAbandonedJobs() { |
679 | return new ArrayIterator( [] ); // not implemented |
680 | } |
681 | |
682 | /** |
683 | * Do not use this function outside of JobQueue/JobQueueGroup |
684 | * |
685 | * @stable to override |
686 | * @return string|null |
687 | * @since 1.22 |
688 | */ |
689 | public function getCoalesceLocationInternal() { |
690 | return null; |
691 | } |
692 | |
693 | /** |
694 | * Check whether each of the given queues are empty. |
695 | * This is used for batching checks for queues stored at the same place. |
696 | * |
697 | * @param array $types List of queues types |
698 | * @return array|null (list of non-empty queue types) or null if unsupported |
699 | * @throws JobQueueError |
700 | * @since 1.22 |
701 | */ |
702 | final public function getSiblingQueuesWithJobs( array $types ) { |
703 | return $this->doGetSiblingQueuesWithJobs( $types ); |
704 | } |
705 | |
706 | /** |
707 | * @stable to override |
708 | * @see JobQueue::getSiblingQueuesWithJobs() |
709 | * @param array $types List of queues types |
710 | * @return array|null (list of queue types) or null if unsupported |
711 | */ |
712 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
713 | return null; // not supported |
714 | } |
715 | |
716 | /** |
717 | * Check the size of each of the given queues. |
718 | * For queues not served by the same store as this one, 0 is returned. |
719 | * This is used for batching checks for queues stored at the same place. |
720 | * |
721 | * @param array $types List of queues types |
722 | * @return array|null (job type => whether queue is empty) or null if unsupported |
723 | * @throws JobQueueError |
724 | * @since 1.22 |
725 | */ |
726 | final public function getSiblingQueueSizes( array $types ) { |
727 | return $this->doGetSiblingQueueSizes( $types ); |
728 | } |
729 | |
730 | /** |
731 | * @stable to override |
732 | * @see JobQueue::getSiblingQueuesSize() |
733 | * @param array $types List of queues types |
734 | * @return array|null (list of queue types) or null if unsupported |
735 | */ |
736 | protected function doGetSiblingQueueSizes( array $types ) { |
737 | return null; // not supported |
738 | } |
739 | |
740 | /** |
741 | * @param string $command |
742 | * @param array $params |
743 | * @return Job |
744 | */ |
745 | protected function factoryJob( $command, $params ) { |
746 | return $this->jobFactory->newJob( $command, $params ); |
747 | } |
748 | |
749 | /** |
750 | * @throws JobQueueReadOnlyError |
751 | */ |
752 | protected function assertNotReadOnly() { |
753 | if ( $this->readOnlyReason !== false ) { |
754 | throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" ); |
755 | } |
756 | } |
757 | |
758 | /** |
759 | * @param IJobSpecification $job |
760 | * @throws JobQueueError |
761 | */ |
762 | private function assertMatchingJobType( IJobSpecification $job ) { |
763 | if ( $this->typeAgnostic ) { |
764 | return; |
765 | } |
766 | if ( $job->getType() !== $this->type ) { |
767 | throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." ); |
768 | } |
769 | } |
770 | |
771 | /** |
772 | * Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type |
773 | * |
774 | * @param string $key Event type |
775 | * @param string $type Job type |
776 | * @param int $delta |
777 | * @since 1.22 |
778 | */ |
779 | protected function incrStats( $key, $type, $delta = 1 ) { |
780 | $this->stats->updateCount( "jobqueue.{$key}.all", $delta ); |
781 | $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta ); |
782 | } |
783 | |
784 | /** |
785 | * Subclasses should set this to true if they support type agnostic queues |
786 | * |
787 | * @return bool |
788 | * @since 1.38 |
789 | */ |
790 | protected function supportsTypeAgnostic(): bool { |
791 | return false; |
792 | } |
793 | } |