Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 287 |
|
0.00% |
0 / 37 |
CRAP | |
0.00% |
0 / 1 |
JobQueueRedis | |
0.00% |
0 / 286 |
|
0.00% |
0 / 37 |
8742 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
6 | |||
supportedOrders | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
optimalOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
supportsDelayedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doIsEmpty | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSize | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doGetAcquiredCount | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
doGetDelayedCount | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doGetAbandonedCount | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doBatchPush | |
0.00% |
0 / 29 |
|
0.00% |
0 / 1 |
90 | |||
pushBlobs | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
6 | |||
doPop | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
20 | |||
popAndAcquireBlob | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
2 | |||
doAck | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
20 | |||
doDeduplicateRootJob | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
30 | |||
doIsRootJobOldDuplicate | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
20 | |||
doDelete | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
getAllQueuedJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getAllDelayedJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getAllAcquiredJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getAllAbandonedJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getJobIterator | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
2 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
30 | |||
getJobFromUidInternal | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
20 | |||
getServerQueuesWithJobs | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
getNewJobFields | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
12 | |||
getJobFromFields | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
serialize | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
30 | |||
unserialize | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
30 | |||
getConnection | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
handleErrorAndMakeException | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
encodeQueueName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
decodeQueueName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getGlobalKey | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
getQueueKey | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | namespace MediaWiki\JobQueue; |
22 | |
23 | use InvalidArgumentException; |
24 | use LogicException; |
25 | use MappedIterator; |
26 | use MediaWiki\JobQueue\Exceptions\JobQueueConnectionError; |
27 | use MediaWiki\JobQueue\Exceptions\JobQueueError; |
28 | use MediaWiki\Logger\LoggerFactory; |
29 | use MediaWiki\WikiMap\WikiMap; |
30 | use Psr\Log\LoggerInterface; |
31 | use Redis; |
32 | use RedisException; |
33 | use UnexpectedValueException; |
34 | use Wikimedia\ObjectCache\RedisConnectionPool; |
35 | use Wikimedia\ObjectCache\RedisConnRef; |
36 | |
37 | /** |
38 | * Redis-backed job queue storage. |
39 | * |
40 | * This is a faster and less resource-intensive job queue than JobQueueDB. |
41 | * All data for a queue using this class is placed into one redis server. |
42 | * |
43 | * When used on a wiki farm, you can optionally use the `redisJobRunnerService` background |
44 | * service from the `mediawiki/services/jobrunner.git` repository, to run jobs from a central |
45 | * system rather than per-wiki via one of the default job runners (e.g. maintenance/runJobs.php). |
46 | * |
47 | * There are eight main redis keys (per queue) used to track jobs: |
48 | * - l-unclaimed : A list of job IDs used for ready unclaimed jobs |
49 | * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries |
50 | * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs |
51 | * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs |
52 | * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication |
53 | * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication |
54 | * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries |
55 | * - h-data : A hash of (job ID => serialized blobs) for job storage |
56 | * |
57 | * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. |
58 | * If an ID appears in any of those lists, it should have a h-data entry for its ID. |
59 | * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then |
60 | * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById |
61 | * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its |
62 | * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. |
63 | * |
64 | * The following keys are used to track queue states: |
65 | * - s-queuesWithJobs : A set of all queues with non-abandoned jobs |
66 | * |
67 | * The background service takes care of undelaying, recycling, and pruning jobs as well as |
68 | * removing s-queuesWithJobs entries as queues empty. |
69 | * |
70 | * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. |
71 | * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. |
72 | * All the keys are prefixed with the relevant wiki ID information. |
73 | * |
74 | * This class requires Redis 2.6 or later as it uses Lua scripting for fast atomic operations. |
75 | * Additionally, it should be noted that redis has different persistence modes, such |
76 | * as rdb snapshots, journaling, or no persistence. Appropriate configuration should be |
77 | * made on the servers based on what queues are using it and what tolerance they have. |
78 | * |
79 | * @since 1.22 |
80 | * @ingroup JobQueue |
81 | * @ingroup Redis |
82 | */ |
83 | class JobQueueRedis extends JobQueue { |
84 | /** @var RedisConnectionPool */ |
85 | protected $redisPool; |
86 | /** @var LoggerInterface */ |
87 | protected $logger; |
88 | |
89 | /** @var string Server address */ |
90 | protected $server; |
91 | /** @var string Compression method to use */ |
92 | protected $compression; |
93 | |
94 | private const MAX_PUSH_SIZE = 25; // avoid tying up the server |
95 | |
96 | /** |
97 | * @param array $params Possible keys: |
98 | * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). |
99 | * Note that the serializer option is ignored as "none" is always used. |
100 | * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. |
101 | * If a hostname is specified but no port, the standard port number |
102 | * 6379 will be used. Required. |
103 | * - compression : The type of compression to use; one of (none,gzip). |
104 | * - daemonized : Set to true if the redisJobRunnerService runs in the background. |
105 | * This will disable job recycling/undelaying from the MediaWiki side |
106 | * to avoid redundancy and out-of-sync configuration. |
107 | */ |
108 | public function __construct( array $params ) { |
109 | parent::__construct( $params ); |
110 | $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua |
111 | $this->server = $params['redisServer']; |
112 | $this->compression = $params['compression'] ?? 'none'; |
113 | $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); |
114 | if ( empty( $params['daemonized'] ) ) { |
115 | throw new InvalidArgumentException( |
116 | "Non-daemonized mode is no longer supported. Please install the " . |
117 | "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); |
118 | } |
119 | $this->logger = LoggerFactory::getInstance( 'redis' ); |
120 | } |
121 | |
122 | protected function supportedOrders() { |
123 | return [ 'timestamp', 'fifo' ]; |
124 | } |
125 | |
126 | protected function optimalOrder() { |
127 | return 'fifo'; |
128 | } |
129 | |
130 | protected function supportsDelayedJobs() { |
131 | return true; |
132 | } |
133 | |
134 | /** |
135 | * @see JobQueue::doIsEmpty() |
136 | * @return bool |
137 | * @throws JobQueueError |
138 | */ |
139 | protected function doIsEmpty() { |
140 | return $this->doGetSize() == 0; |
141 | } |
142 | |
143 | /** |
144 | * @see JobQueue::doGetSize() |
145 | * @return int |
146 | * @throws JobQueueError |
147 | */ |
148 | protected function doGetSize() { |
149 | $conn = $this->getConnection(); |
150 | try { |
151 | return $conn->lLen( $this->getQueueKey( 'l-unclaimed' ) ); |
152 | } catch ( RedisException $e ) { |
153 | throw $this->handleErrorAndMakeException( $conn, $e ); |
154 | } |
155 | } |
156 | |
157 | /** |
158 | * @see JobQueue::doGetAcquiredCount() |
159 | * @return int |
160 | * @throws JobQueueError |
161 | */ |
162 | protected function doGetAcquiredCount() { |
163 | $conn = $this->getConnection(); |
164 | try { |
165 | $conn->multi( Redis::PIPELINE ); |
166 | $conn->zCard( $this->getQueueKey( 'z-claimed' ) ); |
167 | $conn->zCard( $this->getQueueKey( 'z-abandoned' ) ); |
168 | |
169 | return array_sum( $conn->exec() ); |
170 | } catch ( RedisException $e ) { |
171 | throw $this->handleErrorAndMakeException( $conn, $e ); |
172 | } |
173 | } |
174 | |
175 | /** |
176 | * @see JobQueue::doGetDelayedCount() |
177 | * @return int |
178 | * @throws JobQueueError |
179 | */ |
180 | protected function doGetDelayedCount() { |
181 | $conn = $this->getConnection(); |
182 | try { |
183 | return $conn->zCard( $this->getQueueKey( 'z-delayed' ) ); |
184 | } catch ( RedisException $e ) { |
185 | throw $this->handleErrorAndMakeException( $conn, $e ); |
186 | } |
187 | } |
188 | |
189 | /** |
190 | * @see JobQueue::doGetAbandonedCount() |
191 | * @return int |
192 | * @throws JobQueueError |
193 | */ |
194 | protected function doGetAbandonedCount() { |
195 | $conn = $this->getConnection(); |
196 | try { |
197 | return $conn->zCard( $this->getQueueKey( 'z-abandoned' ) ); |
198 | } catch ( RedisException $e ) { |
199 | throw $this->handleErrorAndMakeException( $conn, $e ); |
200 | } |
201 | } |
202 | |
203 | /** |
204 | * @see JobQueue::doBatchPush() |
205 | * @param IJobSpecification[] $jobs |
206 | * @param int $flags |
207 | * @return void |
208 | * @throws JobQueueError |
209 | */ |
210 | protected function doBatchPush( array $jobs, $flags ) { |
211 | // Convert the jobs into field maps (de-duplicated against each other) |
212 | $items = []; // (job ID => job fields map) |
213 | foreach ( $jobs as $job ) { |
214 | $item = $this->getNewJobFields( $job ); |
215 | if ( $item['sha1'] !== '' ) { // hash identifier => de-duplicate |
216 | $items[$item['sha1']] = $item; |
217 | } else { |
218 | $items[$item['uuid']] = $item; |
219 | } |
220 | } |
221 | |
222 | if ( $items === [] ) { |
223 | return; // nothing to do |
224 | } |
225 | |
226 | $conn = $this->getConnection(); |
227 | try { |
228 | // Actually push the non-duplicate jobs into the queue... |
229 | if ( $flags & self::QOS_ATOMIC ) { |
230 | $batches = [ $items ]; // all or nothing |
231 | } else { |
232 | $batches = array_chunk( $items, self::MAX_PUSH_SIZE ); |
233 | } |
234 | $failed = 0; |
235 | $pushed = 0; |
236 | foreach ( $batches as $itemBatch ) { |
237 | $added = $this->pushBlobs( $conn, $itemBatch ); |
238 | if ( is_int( $added ) ) { |
239 | $pushed += $added; |
240 | } else { |
241 | $failed += count( $itemBatch ); |
242 | } |
243 | } |
244 | $this->incrStats( 'inserts', $this->type, count( $items ) ); |
245 | $this->incrStats( 'inserts_actual', $this->type, $pushed ); |
246 | $this->incrStats( 'dupe_inserts', $this->type, |
247 | count( $items ) - $failed - $pushed ); |
248 | if ( $failed > 0 ) { |
249 | $err = "Could not insert {$failed} {$this->type} job(s)."; |
250 | wfDebugLog( 'JobQueue', $err ); |
251 | throw new RedisException( $err ); |
252 | } |
253 | } catch ( RedisException $e ) { |
254 | throw $this->handleErrorAndMakeException( $conn, $e ); |
255 | } |
256 | } |
257 | |
258 | /** |
259 | * @param RedisConnRef $conn |
260 | * @param array[] $items List of results from JobQueueRedis::getNewJobFields() |
261 | * @return int Number of jobs inserted (duplicates are ignored) |
262 | * @throws RedisException |
263 | */ |
264 | protected function pushBlobs( RedisConnRef $conn, array $items ) { |
265 | $args = [ $this->encodeQueueName() ]; |
266 | // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) |
267 | foreach ( $items as $item ) { |
268 | $args[] = (string)$item['uuid']; |
269 | $args[] = (string)$item['sha1']; |
270 | $args[] = (string)$item['rtimestamp']; |
271 | $args[] = (string)$this->serialize( $item ); |
272 | } |
273 | static $script = |
274 | /** @lang Lua */ |
275 | <<<LUA |
276 | local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS) |
277 | -- First argument is the queue ID |
278 | local queueId = ARGV[1] |
279 | -- Next arguments all come in 4s (one per job) |
280 | local variadicArgCount = #ARGV - 1 |
281 | if variadicArgCount % 4 ~= 0 then |
282 | return redis.error_reply('Unmatched arguments') |
283 | end |
284 | -- Insert each job into this queue as needed |
285 | local pushed = 0 |
286 | for i = 2,#ARGV,4 do |
287 | local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] |
288 | if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then |
289 | if 1*rtimestamp > 0 then |
290 | -- Insert into delayed queue (release time as score) |
291 | redis.call('zAdd',kDelayed,rtimestamp,id) |
292 | else |
293 | -- Insert into unclaimed queue |
294 | redis.call('lPush',kUnclaimed,id) |
295 | end |
296 | if sha1 ~= '' then |
297 | redis.call('hSet',kSha1ById,id,sha1) |
298 | redis.call('hSet',kIdBySha1,sha1,id) |
299 | end |
300 | redis.call('hSet',kData,id,blob) |
301 | pushed = pushed + 1 |
302 | end |
303 | end |
304 | -- Mark this queue as having jobs |
305 | redis.call('sAdd',kQwJobs,queueId) |
306 | return pushed |
307 | LUA; |
308 | return $conn->luaEval( $script, |
309 | array_merge( |
310 | [ |
311 | $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] |
312 | $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] |
313 | $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] |
314 | $this->getQueueKey( 'z-delayed' ), # KEYS[4] |
315 | $this->getQueueKey( 'h-data' ), # KEYS[5] |
316 | $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6] |
317 | ], |
318 | $args |
319 | ), |
320 | 6 # number of first argument(s) that are keys |
321 | ); |
322 | } |
323 | |
324 | /** |
325 | * @see JobQueue::doPop() |
326 | * @return RunnableJob|false |
327 | * @throws JobQueueError |
328 | */ |
329 | protected function doPop() { |
330 | $job = false; |
331 | |
332 | $conn = $this->getConnection(); |
333 | try { |
334 | do { |
335 | $blob = $this->popAndAcquireBlob( $conn ); |
336 | if ( !is_string( $blob ) ) { |
337 | break; // no jobs; nothing to do |
338 | } |
339 | |
340 | $this->incrStats( 'pops', $this->type ); |
341 | $item = $this->unserialize( $blob ); |
342 | if ( $item === false ) { |
343 | wfDebugLog( 'JobQueue', "Could not unserialize {$this->type} job." ); |
344 | continue; |
345 | } |
346 | |
347 | // If $item is invalid, the runner loop recycling will cleanup as needed |
348 | $job = $this->getJobFromFields( $item ); // may be false |
349 | } while ( !$job ); // job may be false if invalid |
350 | } catch ( RedisException $e ) { |
351 | throw $this->handleErrorAndMakeException( $conn, $e ); |
352 | } |
353 | |
354 | return $job; |
355 | } |
356 | |
357 | /** |
358 | * @param RedisConnRef $conn |
359 | * @return array Serialized string or false |
360 | * @throws RedisException |
361 | */ |
362 | protected function popAndAcquireBlob( RedisConnRef $conn ) { |
363 | static $script = |
364 | /** @lang Lua */ |
365 | <<<LUA |
366 | local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) |
367 | local rTime = unpack(ARGV) |
368 | -- Pop an item off the queue |
369 | local id = redis.call('rPop',kUnclaimed) |
370 | if not id then |
371 | return false |
372 | end |
373 | -- Allow new duplicates of this job |
374 | local sha1 = redis.call('hGet',kSha1ById,id) |
375 | if sha1 then redis.call('hDel',kIdBySha1,sha1) end |
376 | redis.call('hDel',kSha1ById,id) |
377 | -- Mark the jobs as claimed and return it |
378 | redis.call('zAdd',kClaimed,rTime,id) |
379 | redis.call('hIncrBy',kAttempts,id,1) |
380 | return redis.call('hGet',kData,id) |
381 | LUA; |
382 | return $conn->luaEval( $script, |
383 | [ |
384 | $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] |
385 | $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] |
386 | $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] |
387 | $this->getQueueKey( 'z-claimed' ), # KEYS[4] |
388 | $this->getQueueKey( 'h-attempts' ), # KEYS[5] |
389 | $this->getQueueKey( 'h-data' ), # KEYS[6] |
390 | time(), # ARGV[1] (injected to be replication-safe) |
391 | ], |
392 | 6 # number of first argument(s) that are keys |
393 | ); |
394 | } |
395 | |
396 | /** |
397 | * @see JobQueue::doAck() |
398 | * @param RunnableJob $job |
399 | * @return RunnableJob|bool |
400 | * @throws UnexpectedValueException |
401 | * @throws JobQueueError |
402 | */ |
403 | protected function doAck( RunnableJob $job ) { |
404 | $uuid = $job->getMetadata( 'uuid' ); |
405 | if ( $uuid === null ) { |
406 | throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); |
407 | } |
408 | |
409 | $conn = $this->getConnection(); |
410 | try { |
411 | static $script = |
412 | /** @lang Lua */ |
413 | <<<LUA |
414 | local kClaimed, kAttempts, kData = unpack(KEYS) |
415 | local id = unpack(ARGV) |
416 | -- Unmark the job as claimed |
417 | local removed = redis.call('zRem',kClaimed,id) |
418 | -- Check if the job was recycled |
419 | if removed == 0 then |
420 | return 0 |
421 | end |
422 | -- Delete the retry data |
423 | redis.call('hDel',kAttempts,id) |
424 | -- Delete the job data itself |
425 | return redis.call('hDel',kData,id) |
426 | LUA; |
427 | $res = $conn->luaEval( $script, |
428 | [ |
429 | $this->getQueueKey( 'z-claimed' ), # KEYS[1] |
430 | $this->getQueueKey( 'h-attempts' ), # KEYS[2] |
431 | $this->getQueueKey( 'h-data' ), # KEYS[3] |
432 | $uuid # ARGV[1] |
433 | ], |
434 | 3 # number of first argument(s) that are keys |
435 | ); |
436 | |
437 | if ( !$res ) { |
438 | wfDebugLog( 'JobQueue', "Could not acknowledge {$this->type} job $uuid." ); |
439 | |
440 | return false; |
441 | } |
442 | |
443 | $this->incrStats( 'acks', $this->type ); |
444 | } catch ( RedisException $e ) { |
445 | throw $this->handleErrorAndMakeException( $conn, $e ); |
446 | } |
447 | |
448 | return true; |
449 | } |
450 | |
451 | /** |
452 | * @see JobQueue::doDeduplicateRootJob() |
453 | * @param IJobSpecification $job |
454 | * @return bool |
455 | * @throws JobQueueError |
456 | */ |
457 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
458 | if ( !$job->hasRootJobParams() ) { |
459 | throw new LogicException( "Cannot register root job; missing parameters." ); |
460 | } |
461 | $params = $job->getRootJobParams(); |
462 | |
463 | $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ); |
464 | |
465 | $conn = $this->getConnection(); |
466 | try { |
467 | $timestamp = $conn->get( $key ); // last known timestamp of such a root job |
468 | if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { |
469 | return true; // a newer version of this root job was enqueued |
470 | } |
471 | |
472 | // Update the timestamp of the last root job started at the location... |
473 | return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks |
474 | } catch ( RedisException $e ) { |
475 | throw $this->handleErrorAndMakeException( $conn, $e ); |
476 | } |
477 | } |
478 | |
479 | /** |
480 | * @see JobQueue::doIsRootJobOldDuplicate() |
481 | * @param IJobSpecification $job |
482 | * @return bool |
483 | * @throws JobQueueError |
484 | */ |
485 | protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { |
486 | if ( !$job->hasRootJobParams() ) { |
487 | return false; // job has no de-duplication info |
488 | } |
489 | $params = $job->getRootJobParams(); |
490 | |
491 | $conn = $this->getConnection(); |
492 | try { |
493 | // Get the last time this root job was enqueued |
494 | $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ) ); |
495 | } catch ( RedisException $e ) { |
496 | throw $this->handleErrorAndMakeException( $conn, $e ); |
497 | } |
498 | |
499 | // Check if a new root job was started at the location after this one's... |
500 | return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); |
501 | } |
502 | |
503 | /** |
504 | * @see JobQueue::doDelete() |
505 | * @return bool |
506 | * @throws JobQueueError |
507 | */ |
508 | protected function doDelete() { |
509 | static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned', |
510 | 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ]; |
511 | |
512 | $conn = $this->getConnection(); |
513 | try { |
514 | $keys = []; |
515 | foreach ( $props as $prop ) { |
516 | $keys[] = $this->getQueueKey( $prop ); |
517 | } |
518 | |
519 | $ok = ( $conn->del( $keys ) !== false ); |
520 | $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() ); |
521 | |
522 | return $ok; |
523 | } catch ( RedisException $e ) { |
524 | throw $this->handleErrorAndMakeException( $conn, $e ); |
525 | } |
526 | } |
527 | |
528 | /** |
529 | * @see JobQueue::getAllQueuedJobs() |
530 | * @return \Iterator<RunnableJob> |
531 | * @throws JobQueueError |
532 | */ |
533 | public function getAllQueuedJobs() { |
534 | $conn = $this->getConnection(); |
535 | try { |
536 | $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); |
537 | } catch ( RedisException $e ) { |
538 | throw $this->handleErrorAndMakeException( $conn, $e ); |
539 | } |
540 | |
541 | return $this->getJobIterator( $conn, $uids ); |
542 | } |
543 | |
544 | /** |
545 | * @see JobQueue::getAllDelayedJobs() |
546 | * @return \Iterator<RunnableJob> |
547 | * @throws JobQueueError |
548 | */ |
549 | public function getAllDelayedJobs() { |
550 | $conn = $this->getConnection(); |
551 | try { |
552 | $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); |
553 | } catch ( RedisException $e ) { |
554 | throw $this->handleErrorAndMakeException( $conn, $e ); |
555 | } |
556 | |
557 | return $this->getJobIterator( $conn, $uids ); |
558 | } |
559 | |
560 | /** |
561 | * @see JobQueue::getAllAcquiredJobs() |
562 | * @return \Iterator<RunnableJob> |
563 | * @throws JobQueueError |
564 | */ |
565 | public function getAllAcquiredJobs() { |
566 | $conn = $this->getConnection(); |
567 | try { |
568 | $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); |
569 | } catch ( RedisException $e ) { |
570 | throw $this->handleErrorAndMakeException( $conn, $e ); |
571 | } |
572 | |
573 | return $this->getJobIterator( $conn, $uids ); |
574 | } |
575 | |
576 | /** |
577 | * @see JobQueue::getAllAbandonedJobs() |
578 | * @return \Iterator<RunnableJob> |
579 | * @throws JobQueueError |
580 | */ |
581 | public function getAllAbandonedJobs() { |
582 | $conn = $this->getConnection(); |
583 | try { |
584 | $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); |
585 | } catch ( RedisException $e ) { |
586 | throw $this->handleErrorAndMakeException( $conn, $e ); |
587 | } |
588 | |
589 | return $this->getJobIterator( $conn, $uids ); |
590 | } |
591 | |
592 | /** |
593 | * @param RedisConnRef $conn |
594 | * @param array $uids List of job UUIDs |
595 | * @return MappedIterator<RunnableJob> |
596 | */ |
597 | protected function getJobIterator( RedisConnRef $conn, array $uids ) { |
598 | return new MappedIterator( |
599 | $uids, |
600 | function ( $uid ) use ( $conn ) { |
601 | return $this->getJobFromUidInternal( $uid, $conn ); |
602 | }, |
603 | [ 'accept' => static function ( $job ) { |
604 | return is_object( $job ); |
605 | } ] |
606 | ); |
607 | } |
608 | |
609 | public function getCoalesceLocationInternal() { |
610 | return "RedisServer:" . $this->server; |
611 | } |
612 | |
613 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
614 | return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); |
615 | } |
616 | |
617 | protected function doGetSiblingQueueSizes( array $types ) { |
618 | $sizes = []; // (type => size) |
619 | $types = array_values( $types ); // reindex |
620 | $conn = $this->getConnection(); |
621 | try { |
622 | $conn->multi( Redis::PIPELINE ); |
623 | foreach ( $types as $type ) { |
624 | $conn->lLen( $this->getQueueKey( 'l-unclaimed', $type ) ); |
625 | } |
626 | $res = $conn->exec(); |
627 | if ( is_array( $res ) ) { |
628 | foreach ( $res as $i => $size ) { |
629 | $sizes[$types[$i]] = $size; |
630 | } |
631 | } |
632 | } catch ( RedisException $e ) { |
633 | throw $this->handleErrorAndMakeException( $conn, $e ); |
634 | } |
635 | |
636 | return $sizes; |
637 | } |
638 | |
639 | /** |
640 | * This function should not be called outside JobQueueRedis |
641 | * |
642 | * @param string $uid |
643 | * @param RedisConnRef|Redis $conn |
644 | * @return RunnableJob|false Returns false if the job does not exist |
645 | * @throws JobQueueError |
646 | * @throws UnexpectedValueException |
647 | */ |
648 | public function getJobFromUidInternal( $uid, $conn ) { |
649 | try { |
650 | $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); |
651 | if ( $data === false ) { |
652 | return false; // not found |
653 | } |
654 | $item = $this->unserialize( $data ); |
655 | if ( !is_array( $item ) ) { // this shouldn't happen |
656 | throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." ); |
657 | } |
658 | |
659 | $params = $item['params']; |
660 | $params += [ 'namespace' => $item['namespace'], 'title' => $item['title'] ]; |
661 | $job = $this->factoryJob( $item['type'], $params ); |
662 | $job->setMetadata( 'uuid', $item['uuid'] ); |
663 | $job->setMetadata( 'timestamp', $item['timestamp'] ); |
664 | // Add in attempt count for debugging at showJobs.php |
665 | $job->setMetadata( 'attempts', |
666 | $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ) ); |
667 | |
668 | return $job; |
669 | } catch ( RedisException $e ) { |
670 | throw $this->handleErrorAndMakeException( $conn, $e ); |
671 | } |
672 | } |
673 | |
674 | /** |
675 | * @return array List of (wiki,type) tuples for queues with non-abandoned jobs |
676 | * @throws JobQueueConnectionError |
677 | * @throws JobQueueError |
678 | */ |
679 | public function getServerQueuesWithJobs() { |
680 | $queues = []; |
681 | |
682 | $conn = $this->getConnection(); |
683 | try { |
684 | $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) ); |
685 | foreach ( $set as $queue ) { |
686 | $queues[] = $this->decodeQueueName( $queue ); |
687 | } |
688 | } catch ( RedisException $e ) { |
689 | throw $this->handleErrorAndMakeException( $conn, $e ); |
690 | } |
691 | |
692 | return $queues; |
693 | } |
694 | |
695 | /** |
696 | * @param IJobSpecification $job |
697 | * @return array |
698 | */ |
699 | protected function getNewJobFields( IJobSpecification $job ) { |
700 | return [ |
701 | // Fields that describe the nature of the job |
702 | 'type' => $job->getType(), |
703 | 'namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, |
704 | 'title' => $job->getParams()['title'] ?? '', |
705 | 'params' => $job->getParams(), |
706 | // Some jobs cannot run until a "release timestamp" |
707 | 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, |
708 | // Additional job metadata |
709 | 'uuid' => $this->idGenerator->newRawUUIDv4(), |
710 | 'sha1' => $job->ignoreDuplicates() |
711 | ? \Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) |
712 | : '', |
713 | 'timestamp' => time() // UNIX timestamp |
714 | ]; |
715 | } |
716 | |
717 | /** |
718 | * @param array $fields |
719 | * @return RunnableJob|false |
720 | */ |
721 | protected function getJobFromFields( array $fields ) { |
722 | $params = $fields['params']; |
723 | $params += [ 'namespace' => $fields['namespace'], 'title' => $fields['title'] ]; |
724 | |
725 | $job = $this->factoryJob( $fields['type'], $params ); |
726 | $job->setMetadata( 'uuid', $fields['uuid'] ); |
727 | $job->setMetadata( 'timestamp', $fields['timestamp'] ); |
728 | |
729 | return $job; |
730 | } |
731 | |
732 | /** |
733 | * @param array $fields |
734 | * @return string Serialized and possibly compressed version of $fields |
735 | */ |
736 | protected function serialize( array $fields ) { |
737 | $blob = serialize( $fields ); |
738 | if ( $this->compression === 'gzip' |
739 | && strlen( $blob ) >= 1024 |
740 | && function_exists( 'gzdeflate' ) |
741 | ) { |
742 | $object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ]; |
743 | $blobz = serialize( $object ); |
744 | |
745 | return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; |
746 | } else { |
747 | return $blob; |
748 | } |
749 | } |
750 | |
751 | /** |
752 | * @param string $blob |
753 | * @return array|false Unserialized version of $blob or false |
754 | */ |
755 | protected function unserialize( $blob ) { |
756 | $fields = unserialize( $blob ); |
757 | if ( is_object( $fields ) ) { |
758 | if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { |
759 | $fields = unserialize( gzinflate( $fields->blob ) ); |
760 | } else { |
761 | $fields = false; |
762 | } |
763 | } |
764 | |
765 | return is_array( $fields ) ? $fields : false; |
766 | } |
767 | |
768 | /** |
769 | * Get a connection to the server that handles all sub-queues for this queue |
770 | * |
771 | * @return RedisConnRef|Redis |
772 | * @throws JobQueueConnectionError |
773 | */ |
774 | protected function getConnection() { |
775 | $conn = $this->redisPool->getConnection( $this->server, $this->logger ); |
776 | if ( !$conn ) { |
777 | throw new JobQueueConnectionError( |
778 | "Unable to connect to redis server {$this->server}." ); |
779 | } |
780 | |
781 | return $conn; |
782 | } |
783 | |
784 | /** |
785 | * @param RedisConnRef $conn |
786 | * @param RedisException $e |
787 | * @return JobQueueError |
788 | */ |
789 | protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) { |
790 | $this->redisPool->handleError( $conn, $e ); |
791 | return new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); |
792 | } |
793 | |
794 | /** |
795 | * @return string JSON |
796 | */ |
797 | private function encodeQueueName() { |
798 | return json_encode( [ $this->type, $this->domain ] ); |
799 | } |
800 | |
801 | /** |
802 | * @param string $name JSON |
803 | * @return array (type, wiki) |
804 | */ |
805 | private function decodeQueueName( $name ) { |
806 | return json_decode( $name ); |
807 | } |
808 | |
809 | /** |
810 | * @param string $name |
811 | * @return string |
812 | */ |
813 | private function getGlobalKey( $name ) { |
814 | $parts = [ 'global', 'jobqueue', $name ]; |
815 | foreach ( $parts as $part ) { |
816 | if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) { |
817 | throw new InvalidArgumentException( "Key part characters are out of range." ); |
818 | } |
819 | } |
820 | |
821 | return implode( ':', $parts ); |
822 | } |
823 | |
824 | /** |
825 | * @param string $prop |
826 | * @param string|null $type Override this for sibling queues |
827 | * @return string |
828 | */ |
829 | private function getQueueKey( $prop, $type = null ) { |
830 | $type = is_string( $type ) ? $type : $this->type; |
831 | |
832 | // Use wiki ID for b/c |
833 | $keyspace = WikiMap::getWikiIdFromDbDomain( $this->domain ); |
834 | |
835 | $parts = [ $keyspace, 'jobqueue', $type, $prop ]; |
836 | |
837 | // Parts are typically ASCII, but encode to escape ":" |
838 | return implode( ':', array_map( 'rawurlencode', $parts ) ); |
839 | } |
840 | } |
841 | |
842 | /** @deprecated class alias since 1.44 */ |
843 | class_alias( JobQueueRedis::class, 'JobQueueRedis' ); |