Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
67.48% |
83 / 123 |
|
30.23% |
13 / 43 |
CRAP | |
0.00% |
0 / 1 |
JobQueue | |
67.48% |
83 / 123 |
|
30.23% |
13 / 43 |
262.33 | |
0.00% |
0 / 1 |
__construct | |
84.21% |
16 / 19 |
|
0.00% |
0 / 1 |
7.19 | |||
factory | |
71.43% |
5 / 7 |
|
0.00% |
0 / 1 |
3.21 | |||
getDomain | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getType | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
supportedOrders | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
optimalOrder | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
supportsDelayedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
delayedJobsEnabled | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getReadOnlyReason | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
isEmpty | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doIsEmpty | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
getSize | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doGetSize | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
getAcquiredCount | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doGetAcquiredCount | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
getDelayedCount | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
doGetDelayedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAbandonedCount | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
doGetAbandonedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
push | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
batchPush | |
75.00% |
9 / 12 |
|
0.00% |
0 / 1 |
7.77 | |||
doBatchPush | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
pop | |
66.67% |
6 / 9 |
|
0.00% |
0 / 1 |
5.93 | |||
doPop | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
ack | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
doAck | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
deduplicateRootJob | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
doDeduplicateRootJob | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
5.05 | |||
isRootJobOldDuplicate | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doIsRootJobOldDuplicate | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
6.07 | |||
getRootJobCacheKey | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
1 | |||
delete | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
doDelete | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
waitForBackups | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doWaitForBackups | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
flushCaches | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
doFlushCaches | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllQueuedJobs | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
getAllDelayedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllAcquiredJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllAbandonedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getSiblingQueuesWithJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getSiblingQueueSizes | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
factoryJob | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
assertNotReadOnly | |
50.00% |
1 / 2 |
|
0.00% |
0 / 1 |
2.50 | |||
assertMatchingJobType | |
50.00% |
2 / 4 |
|
0.00% |
0 / 1 |
4.12 | |||
incrStats | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
supportsTypeAgnostic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
22 | use MediaWiki\JobQueue\JobFactory; |
23 | use MediaWiki\MediaWikiServices; |
24 | use Wikimedia\RequestTimeout\TimeoutException; |
25 | use Wikimedia\UUID\GlobalIdGenerator; |
26 | |
27 | /** |
28 | * @defgroup JobQueue JobQueue |
29 | * |
30 | * |
31 | * See [the architecture doc](@ref jobqueuearch) for more information. |
32 | */ |
33 | |
34 | /** |
35 | * Base class for queueing and running background jobs from a storage backend. |
36 | * |
37 | * See [the architecture doc](@ref jobqueuearch) for more information. |
38 | * |
39 | * @ingroup JobQueue |
40 | * @since 1.21 |
41 | * @stable to extend |
42 | */ |
43 | abstract class JobQueue { |
44 | /** @var string DB domain ID */ |
45 | protected $domain; |
46 | /** @var string Job type */ |
47 | protected $type; |
48 | /** @var string Job priority for pop() */ |
49 | protected $order; |
50 | /** @var int Time to live in seconds */ |
51 | protected $claimTTL; |
52 | /** @var int Maximum number of times to try a job */ |
53 | protected $maxTries; |
54 | /** @var string|false Read only rationale (or false if r/w) */ |
55 | protected $readOnlyReason; |
56 | /** @var StatsdDataFactoryInterface */ |
57 | protected $stats; |
58 | /** @var GlobalIdGenerator */ |
59 | protected $idGenerator; |
60 | |
61 | /** @var WANObjectCache */ |
62 | protected $wanCache; |
63 | |
64 | /** @var bool */ |
65 | protected $typeAgnostic; |
66 | |
67 | private JobFactory $jobFactory; |
68 | |
69 | /* Bit flag for "all-or-nothing" job insertions */ |
70 | protected const QOS_ATOMIC = 1; |
71 | |
72 | /* Seconds to remember root jobs (28 days) */ |
73 | protected const ROOTJOB_TTL = 28 * 24 * 3600; |
74 | |
75 | /** |
76 | * @stable to call |
77 | * |
78 | * @param array $params |
79 | * - type : A job type, 'default' if typeAgnostic is set |
80 | * - domain : A DB domain ID |
81 | * - idGenerator : A GlobalIdGenerator instance. |
82 | * - wanCache : An instance of WANObjectCache to use for caching [default: none] |
83 | * - stats : An instance of StatsdDataFactoryInterface [default: none] |
84 | * - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever] |
85 | * - maxTries : Total times a job can be tried, assuming claims expire [default: 3] |
86 | * - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable] |
87 | * - readOnlyReason : Mark the queue as read-only with this reason [default: false] |
88 | * - typeAgnostic : If the jobqueue should operate agnostic to the job types |
89 | * @throws JobQueueError |
90 | * |
91 | */ |
92 | protected function __construct( array $params ) { |
93 | $this->domain = $params['domain'] ?? $params['wiki']; // b/c |
94 | $this->type = $params['type']; |
95 | $this->claimTTL = $params['claimTTL'] ?? 0; |
96 | $this->maxTries = $params['maxTries'] ?? 3; |
97 | if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { |
98 | $this->order = $params['order']; |
99 | } else { |
100 | $this->order = $this->optimalOrder(); |
101 | } |
102 | if ( !in_array( $this->order, $this->supportedOrders() ) ) { |
103 | throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." ); |
104 | } |
105 | $this->readOnlyReason = $params['readOnlyReason'] ?? false; |
106 | $this->stats = $params['stats'] ?? new NullStatsdDataFactory(); |
107 | $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty(); |
108 | $this->idGenerator = $params['idGenerator']; |
109 | if ( ( $params['typeAgnostic'] ?? false ) && !$this->supportsTypeAgnostic() ) { |
110 | throw new JobQueueError( __CLASS__ . " does not support type agnostic queues." ); |
111 | } |
112 | $this->typeAgnostic = ( $params['typeAgnostic'] ?? false ); |
113 | if ( $this->typeAgnostic ) { |
114 | $this->type = 'default'; |
115 | } |
116 | |
117 | $this->jobFactory = MediaWikiServices::getInstance()->getJobFactory(); |
118 | } |
119 | |
120 | /** |
121 | * Get a job queue object of the specified type. |
122 | * $params includes: |
123 | * - class : What job class to use (determines job type) |
124 | * - domain : Database domain ID of the wiki the jobs are for (defaults to current wiki) |
125 | * - type : The name of the job types this queue handles |
126 | * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". |
127 | * If "fifo" is used, the queue will effectively be FIFO. Note that job |
128 | * completion will not appear to be exactly FIFO if there are multiple |
129 | * job runners since jobs can take different times to finish once popped. |
130 | * If "timestamp" is used, the queue will at least be loosely ordered |
131 | * by timestamp, allowing for some jobs to be popped off out of order. |
132 | * If "random" is used, pop() will pick jobs in random order. |
133 | * Note that it may only be weakly random (e.g. a lottery of the oldest X). |
134 | * If "any" is chosen, the queue will use whatever order is the fastest. |
135 | * This might be useful for improving concurrency for job acquisition. |
136 | * - claimTTL : If supported, the queue will recycle jobs that have been popped |
137 | * but not acknowledged as completed after this many seconds. Recycling |
138 | * of jobs simply means re-inserting them into the queue. Jobs can be |
139 | * attempted up to three times before being discarded. |
140 | * - readOnlyReason : Set this to a string to make the queue read-only. [optional] |
141 | * - idGenerator : A GlobalIdGenerator instance. |
142 | * - stats : A StatsdDataFactoryInterface. [optional] |
143 | * |
144 | * Queue classes should throw an exception if they do not support the options given. |
145 | * |
146 | * @param array $params |
147 | * @return JobQueue |
148 | * @throws JobQueueError |
149 | */ |
150 | final public static function factory( array $params ) { |
151 | $class = $params['class']; |
152 | if ( !class_exists( $class ) ) { |
153 | throw new JobQueueError( "Invalid job queue class '$class'." ); |
154 | } |
155 | |
156 | $obj = new $class( $params ); |
157 | if ( !( $obj instanceof self ) ) { |
158 | throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." ); |
159 | } |
160 | |
161 | return $obj; |
162 | } |
163 | |
164 | /** |
165 | * @return string Database domain ID |
166 | */ |
167 | final public function getDomain() { |
168 | return $this->domain; |
169 | } |
170 | |
171 | /** |
172 | * @return string Job type that this queue handles |
173 | */ |
174 | final public function getType() { |
175 | return $this->type; |
176 | } |
177 | |
178 | /** |
179 | * @return string One of (random, timestamp, fifo, undefined) |
180 | */ |
181 | final public function getOrder() { |
182 | return $this->order; |
183 | } |
184 | |
185 | /** |
186 | * Get the allowed queue orders for configuration validation |
187 | * |
188 | * @return array Subset of (random, timestamp, fifo, undefined) |
189 | */ |
190 | abstract protected function supportedOrders(); |
191 | |
192 | /** |
193 | * Get the default queue order to use if configuration does not specify one |
194 | * |
195 | * @return string One of (random, timestamp, fifo, undefined) |
196 | */ |
197 | abstract protected function optimalOrder(); |
198 | |
199 | /** |
200 | * Find out if delayed jobs are supported for configuration validation |
201 | * |
202 | * @stable to override |
203 | * @return bool Whether delayed jobs are supported |
204 | */ |
205 | protected function supportsDelayedJobs() { |
206 | return false; // not implemented |
207 | } |
208 | |
209 | /** |
210 | * @return bool Whether delayed jobs are enabled |
211 | * @since 1.22 |
212 | */ |
213 | final public function delayedJobsEnabled() { |
214 | return $this->supportsDelayedJobs(); |
215 | } |
216 | |
217 | /** |
218 | * @return string|false Read-only rational or false if r/w |
219 | * @since 1.27 |
220 | */ |
221 | public function getReadOnlyReason() { |
222 | return $this->readOnlyReason; |
223 | } |
224 | |
225 | /** |
226 | * Quickly check if the queue has no available (unacquired, non-delayed) jobs. |
227 | * Queue classes should use caching if they are any slower without memcached. |
228 | * |
229 | * If caching is used, this might return false when there are actually no jobs. |
230 | * If pop() is called and returns false then it should correct the cache. Also, |
231 | * calling flushCaches() first prevents this. However, this effect is typically |
232 | * not distinguishable from the race condition between isEmpty() and pop(). |
233 | * |
234 | * @return bool |
235 | * @throws JobQueueError |
236 | */ |
237 | final public function isEmpty() { |
238 | $res = $this->doIsEmpty(); |
239 | |
240 | return $res; |
241 | } |
242 | |
243 | /** |
244 | * @see JobQueue::isEmpty() |
245 | * @return bool |
246 | */ |
247 | abstract protected function doIsEmpty(); |
248 | |
249 | /** |
250 | * Get the number of available (unacquired, non-delayed) jobs in the queue. |
251 | * Queue classes should use caching if they are any slower without memcached. |
252 | * |
253 | * If caching is used, this number might be out of date for a minute. |
254 | * |
255 | * @return int |
256 | * @throws JobQueueError |
257 | */ |
258 | final public function getSize() { |
259 | $res = $this->doGetSize(); |
260 | |
261 | return $res; |
262 | } |
263 | |
264 | /** |
265 | * @see JobQueue::getSize() |
266 | * @return int |
267 | */ |
268 | abstract protected function doGetSize(); |
269 | |
270 | /** |
271 | * Get the number of acquired jobs (these are temporarily out of the queue). |
272 | * Queue classes should use caching if they are any slower without memcached. |
273 | * |
274 | * If caching is used, this number might be out of date for a minute. |
275 | * |
276 | * @return int |
277 | * @throws JobQueueError |
278 | */ |
279 | final public function getAcquiredCount() { |
280 | $res = $this->doGetAcquiredCount(); |
281 | |
282 | return $res; |
283 | } |
284 | |
285 | /** |
286 | * @see JobQueue::getAcquiredCount() |
287 | * @return int |
288 | */ |
289 | abstract protected function doGetAcquiredCount(); |
290 | |
291 | /** |
292 | * Get the number of delayed jobs (these are temporarily out of the queue). |
293 | * Queue classes should use caching if they are any slower without memcached. |
294 | * |
295 | * If caching is used, this number might be out of date for a minute. |
296 | * |
297 | * @return int |
298 | * @throws JobQueueError |
299 | * @since 1.22 |
300 | */ |
301 | final public function getDelayedCount() { |
302 | $res = $this->doGetDelayedCount(); |
303 | |
304 | return $res; |
305 | } |
306 | |
307 | /** |
308 | * @stable to override |
309 | * @see JobQueue::getDelayedCount() |
310 | * @return int |
311 | */ |
312 | protected function doGetDelayedCount() { |
313 | return 0; // not implemented |
314 | } |
315 | |
316 | /** |
317 | * Get the number of acquired jobs that can no longer be attempted. |
318 | * Queue classes should use caching if they are any slower without memcached. |
319 | * |
320 | * If caching is used, this number might be out of date for a minute. |
321 | * |
322 | * @return int |
323 | * @throws JobQueueError |
324 | */ |
325 | final public function getAbandonedCount() { |
326 | $res = $this->doGetAbandonedCount(); |
327 | |
328 | return $res; |
329 | } |
330 | |
331 | /** |
332 | * @stable to override |
333 | * @see JobQueue::getAbandonedCount() |
334 | * @return int |
335 | */ |
336 | protected function doGetAbandonedCount() { |
337 | return 0; // not implemented |
338 | } |
339 | |
340 | /** |
341 | * Push one or more jobs into the queue. |
342 | * This does not require $wgJobClasses to be set for the given job type. |
343 | * Outside callers should use JobQueueGroup::push() instead of this function. |
344 | * |
345 | * @param IJobSpecification|IJobSpecification[] $jobs |
346 | * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) |
347 | * @return void |
348 | * @throws JobQueueError |
349 | */ |
350 | final public function push( $jobs, $flags = 0 ) { |
351 | $jobs = is_array( $jobs ) ? $jobs : [ $jobs ]; |
352 | $this->batchPush( $jobs, $flags ); |
353 | } |
354 | |
355 | /** |
356 | * Push a batch of jobs into the queue. |
357 | * This does not require $wgJobClasses to be set for the given job type. |
358 | * Outside callers should use JobQueueGroup::push() instead of this function. |
359 | * |
360 | * @param IJobSpecification[] $jobs |
361 | * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) |
362 | * @return void |
363 | * @throws JobQueueError |
364 | */ |
365 | final public function batchPush( array $jobs, $flags = 0 ) { |
366 | $this->assertNotReadOnly(); |
367 | |
368 | if ( $jobs === [] ) { |
369 | return; // nothing to do |
370 | } |
371 | |
372 | foreach ( $jobs as $job ) { |
373 | $this->assertMatchingJobType( $job ); |
374 | if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) { |
375 | throw new JobQueueError( |
376 | "Got delayed '{$job->getType()}' job; delays are not supported." ); |
377 | } |
378 | } |
379 | |
380 | $this->doBatchPush( $jobs, $flags ); |
381 | |
382 | foreach ( $jobs as $job ) { |
383 | if ( $job->isRootJob() ) { |
384 | $this->deduplicateRootJob( $job ); |
385 | } |
386 | } |
387 | } |
388 | |
389 | /** |
390 | * @see JobQueue::batchPush() |
391 | * @param IJobSpecification[] $jobs |
392 | * @param int $flags |
393 | */ |
394 | abstract protected function doBatchPush( array $jobs, $flags ); |
395 | |
396 | /** |
397 | * Pop a job off of the queue. |
398 | * This requires $wgJobClasses to be set for the given job type. |
399 | * Outside callers should use JobQueueGroup::pop() instead of this function. |
400 | * |
401 | * @throws JobQueueError |
402 | * @return RunnableJob|false Returns false if there are no jobs |
403 | */ |
404 | final public function pop() { |
405 | $this->assertNotReadOnly(); |
406 | |
407 | $job = $this->doPop(); |
408 | |
409 | // Flag this job as an old duplicate based on its "root" job... |
410 | try { |
411 | if ( $job && $this->isRootJobOldDuplicate( $job ) ) { |
412 | $this->incrStats( 'dupe_pops', $job->getType() ); |
413 | $job = DuplicateJob::newFromJob( $job ); // convert to a no-op |
414 | } |
415 | } catch ( TimeoutException $e ) { |
416 | throw $e; |
417 | } catch ( Exception $e ) { |
418 | // don't lose jobs over this |
419 | } |
420 | |
421 | return $job; |
422 | } |
423 | |
424 | /** |
425 | * @see JobQueue::pop() |
426 | * @return RunnableJob|false |
427 | */ |
428 | abstract protected function doPop(); |
429 | |
430 | /** |
431 | * Acknowledge that a job was completed. |
432 | * |
433 | * This does nothing for certain queue classes or if "claimTTL" is not set. |
434 | * Outside callers should use JobQueueGroup::ack() instead of this function. |
435 | * |
436 | * @param RunnableJob $job |
437 | * @return void |
438 | * @throws JobQueueError |
439 | */ |
440 | final public function ack( RunnableJob $job ) { |
441 | $this->assertNotReadOnly(); |
442 | $this->assertMatchingJobType( $job ); |
443 | |
444 | $this->doAck( $job ); |
445 | } |
446 | |
447 | /** |
448 | * @see JobQueue::ack() |
449 | * @param RunnableJob $job |
450 | */ |
451 | abstract protected function doAck( RunnableJob $job ); |
452 | |
453 | /** |
454 | * Register the "root job" of a given job into the queue for de-duplication. |
455 | * This should only be called right *after* all the new jobs have been inserted. |
456 | * This is used to turn older, duplicate, job entries into no-ops. The root job |
457 | * information will remain in the registry until it simply falls out of cache. |
458 | * |
459 | * This requires that $job has two special fields in the "params" array: |
460 | * - rootJobSignature : hash (e.g. SHA1) that identifies the task |
461 | * - rootJobTimestamp : TS_MW timestamp of this instance of the task |
462 | * |
463 | * A "root job" is a conceptual job that consist of potentially many smaller jobs |
464 | * that are actually inserted into the queue. For example, "refreshLinks" jobs are |
465 | * spawned when a template is edited. One can think of the task as "update links |
466 | * of pages that use template X" and an instance of that task as a "root job". |
467 | * However, what actually goes into the queue are range and leaf job subtypes. |
468 | * Since these jobs include things like page ID ranges and DB primary positions, |
469 | * and can morph into smaller jobs recursively, simple duplicate detection |
470 | * for individual jobs being identical (like that of job_sha1) is not useful. |
471 | * |
472 | * In the case of "refreshLinks", if these jobs are still in the queue when the template |
473 | * is edited again, we want all of these old refreshLinks jobs for that template to become |
474 | * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing. |
475 | * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a |
476 | * previous "root job" for the same task of "update links of pages that use template X". |
477 | * |
478 | * This does nothing for certain queue classes. |
479 | * |
480 | * @internal For use within JobQueue only |
481 | * @param IJobSpecification $job |
482 | * @throws JobQueueError |
483 | * @return bool |
484 | */ |
485 | final public function deduplicateRootJob( IJobSpecification $job ) { |
486 | $this->assertNotReadOnly(); |
487 | $this->assertMatchingJobType( $job ); |
488 | |
489 | return $this->doDeduplicateRootJob( $job ); |
490 | } |
491 | |
492 | /** |
493 | * @stable to override |
494 | * @see JobQueue::deduplicateRootJob() |
495 | * @param IJobSpecification $job |
496 | * @throws JobQueueError |
497 | * @return bool |
498 | */ |
499 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
500 | $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null; |
501 | if ( !$params ) { |
502 | throw new JobQueueError( "Cannot register root job; missing parameters." ); |
503 | } |
504 | |
505 | $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ); |
506 | // Callers should call JobQueueGroup::push() before this method so that if the |
507 | // insert fails, the de-duplication registration will be aborted. Having only the |
508 | // de-duplication registration succeed would cause jobs to become no-ops without |
509 | // any actual jobs that made them redundant. |
510 | $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job |
511 | if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) { |
512 | return true; // a newer version of this root job was enqueued |
513 | } |
514 | |
515 | // Update the timestamp of the last root job started at the location... |
516 | return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); |
517 | } |
518 | |
519 | /** |
520 | * Check if the "root" job of a given job has been superseded by a newer one |
521 | * |
522 | * @param IJobSpecification $job |
523 | * @throws JobQueueError |
524 | * @return bool |
525 | */ |
526 | final protected function isRootJobOldDuplicate( IJobSpecification $job ) { |
527 | $this->assertMatchingJobType( $job ); |
528 | |
529 | return $this->doIsRootJobOldDuplicate( $job ); |
530 | } |
531 | |
532 | /** |
533 | * @stable to override |
534 | * @see JobQueue::isRootJobOldDuplicate() |
535 | * @param IJobSpecification $job |
536 | * @return bool |
537 | */ |
538 | protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { |
539 | $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null; |
540 | if ( !$params ) { |
541 | return false; // job has no de-duplication info |
542 | } |
543 | |
544 | $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ); |
545 | // Get the last time this root job was enqueued |
546 | $timestamp = $this->wanCache->get( $key ); |
547 | if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) { |
548 | // Update the timestamp of the last known root job started at the location... |
549 | $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); |
550 | } |
551 | |
552 | // Check if a new root job was started at the location after this one's... |
553 | return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); |
554 | } |
555 | |
556 | /** |
557 | * @param string $signature Hash identifier of the root job |
558 | * @param string $type job type |
559 | * @return string |
560 | */ |
561 | protected function getRootJobCacheKey( $signature, $type ) { |
562 | return $this->wanCache->makeGlobalKey( |
563 | 'jobqueue', |
564 | $this->domain, |
565 | $type, |
566 | 'rootjob', |
567 | $signature |
568 | ); |
569 | } |
570 | |
571 | /** |
572 | * Delete all unclaimed and delayed jobs from the queue |
573 | * |
574 | * @throws JobQueueError |
575 | * @since 1.22 |
576 | * @return void |
577 | */ |
578 | final public function delete() { |
579 | $this->assertNotReadOnly(); |
580 | |
581 | $this->doDelete(); |
582 | } |
583 | |
584 | /** |
585 | * @stable to override |
586 | * @see JobQueue::delete() |
587 | * @throws JobQueueError |
588 | */ |
589 | protected function doDelete() { |
590 | throw new JobQueueError( "This method is not implemented." ); |
591 | } |
592 | |
593 | /** |
594 | * Wait for any replica DBs or backup servers to catch up. |
595 | * |
596 | * This does nothing for certain queue classes. |
597 | * |
598 | * @return void |
599 | * @throws JobQueueError |
600 | */ |
601 | final public function waitForBackups() { |
602 | $this->doWaitForBackups(); |
603 | } |
604 | |
605 | /** |
606 | * @stable to override |
607 | * @see JobQueue::waitForBackups() |
608 | * @return void |
609 | */ |
610 | protected function doWaitForBackups() { |
611 | } |
612 | |
613 | /** |
614 | * Clear any process and persistent caches |
615 | * |
616 | * @return void |
617 | */ |
618 | final public function flushCaches() { |
619 | $this->doFlushCaches(); |
620 | } |
621 | |
622 | /** |
623 | * @stable to override |
624 | * @see JobQueue::flushCaches() |
625 | * @return void |
626 | */ |
627 | protected function doFlushCaches() { |
628 | } |
629 | |
630 | /** |
631 | * Get an iterator to traverse over all available jobs in this queue. |
632 | * This does not include jobs that are currently acquired or delayed. |
633 | * Note: results may be stale if the queue is concurrently modified. |
634 | * |
635 | * @return Iterator<RunnableJob> |
636 | * @throws JobQueueError |
637 | */ |
638 | abstract public function getAllQueuedJobs(); |
639 | |
640 | /** |
641 | * Get an iterator to traverse over all delayed jobs in this queue. |
642 | * Note: results may be stale if the queue is concurrently modified. |
643 | * |
644 | * @stable to override |
645 | * @return Iterator<RunnableJob> |
646 | * @throws JobQueueError |
647 | * @since 1.22 |
648 | */ |
649 | public function getAllDelayedJobs() { |
650 | return new ArrayIterator( [] ); // not implemented |
651 | } |
652 | |
653 | /** |
654 | * Get an iterator to traverse over all claimed jobs in this queue |
655 | * |
656 | * Callers should be quick to iterator over it or few results |
657 | * will be returned due to jobs being acknowledged and deleted |
658 | * |
659 | * @stable to override |
660 | * @return Iterator<RunnableJob> |
661 | * @throws JobQueueError |
662 | * @since 1.26 |
663 | */ |
664 | public function getAllAcquiredJobs() { |
665 | return new ArrayIterator( [] ); // not implemented |
666 | } |
667 | |
668 | /** |
669 | * Get an iterator to traverse over all abandoned jobs in this queue |
670 | * |
671 | * @stable to override |
672 | * @return Iterator<RunnableJob> |
673 | * @throws JobQueueError |
674 | * @since 1.25 |
675 | */ |
676 | public function getAllAbandonedJobs() { |
677 | return new ArrayIterator( [] ); // not implemented |
678 | } |
679 | |
680 | /** |
681 | * Do not use this function outside of JobQueue/JobQueueGroup |
682 | * |
683 | * @stable to override |
684 | * @return string|null |
685 | * @since 1.22 |
686 | */ |
687 | public function getCoalesceLocationInternal() { |
688 | return null; |
689 | } |
690 | |
691 | /** |
692 | * Check whether each of the given queues are empty. |
693 | * This is used for batching checks for queues stored at the same place. |
694 | * |
695 | * @param array $types List of queues types |
696 | * @return array|null (list of non-empty queue types) or null if unsupported |
697 | * @throws JobQueueError |
698 | * @since 1.22 |
699 | */ |
700 | final public function getSiblingQueuesWithJobs( array $types ) { |
701 | return $this->doGetSiblingQueuesWithJobs( $types ); |
702 | } |
703 | |
704 | /** |
705 | * @stable to override |
706 | * @see JobQueue::getSiblingQueuesWithJobs() |
707 | * @param array $types List of queues types |
708 | * @return array|null (list of queue types) or null if unsupported |
709 | */ |
710 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
711 | return null; // not supported |
712 | } |
713 | |
714 | /** |
715 | * Check the size of each of the given queues. |
716 | * For queues not served by the same store as this one, 0 is returned. |
717 | * This is used for batching checks for queues stored at the same place. |
718 | * |
719 | * @param array $types List of queues types |
720 | * @return array|null (job type => whether queue is empty) or null if unsupported |
721 | * @throws JobQueueError |
722 | * @since 1.22 |
723 | */ |
724 | final public function getSiblingQueueSizes( array $types ) { |
725 | return $this->doGetSiblingQueueSizes( $types ); |
726 | } |
727 | |
728 | /** |
729 | * @stable to override |
730 | * @see JobQueue::getSiblingQueuesSize() |
731 | * @param array $types List of queues types |
732 | * @return array|null (list of queue types) or null if unsupported |
733 | */ |
734 | protected function doGetSiblingQueueSizes( array $types ) { |
735 | return null; // not supported |
736 | } |
737 | |
738 | /** |
739 | * @param string $command |
740 | * @param array $params |
741 | * @return Job |
742 | */ |
743 | protected function factoryJob( $command, $params ) { |
744 | return $this->jobFactory->newJob( $command, $params ); |
745 | } |
746 | |
747 | /** |
748 | * @throws JobQueueReadOnlyError |
749 | */ |
750 | protected function assertNotReadOnly() { |
751 | if ( $this->readOnlyReason !== false ) { |
752 | throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" ); |
753 | } |
754 | } |
755 | |
756 | /** |
757 | * @param IJobSpecification $job |
758 | * @throws JobQueueError |
759 | */ |
760 | private function assertMatchingJobType( IJobSpecification $job ) { |
761 | if ( $this->typeAgnostic ) { |
762 | return; |
763 | } |
764 | if ( $job->getType() !== $this->type ) { |
765 | throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." ); |
766 | } |
767 | } |
768 | |
769 | /** |
770 | * Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type |
771 | * |
772 | * @param string $key Event type |
773 | * @param string $type Job type |
774 | * @param int $delta |
775 | * @since 1.22 |
776 | */ |
777 | protected function incrStats( $key, $type, $delta = 1 ) { |
778 | $this->stats->updateCount( "jobqueue.{$key}.all", $delta ); |
779 | $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta ); |
780 | } |
781 | |
782 | /** |
783 | * Subclasses should set this to true if they support type agnostic queues |
784 | * |
785 | * @return bool |
786 | * @since 1.38 |
787 | */ |
788 | protected function supportsTypeAgnostic(): bool { |
789 | return false; |
790 | } |
791 | } |