Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 286 |
|
0.00% |
0 / 37 |
CRAP | |
0.00% |
0 / 1 |
JobQueueRedis | |
0.00% |
0 / 286 |
|
0.00% |
0 / 37 |
8742 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
6 | |||
supportedOrders | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
optimalOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
supportsDelayedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doIsEmpty | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSize | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doGetAcquiredCount | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
doGetDelayedCount | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doGetAbandonedCount | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doBatchPush | |
0.00% |
0 / 29 |
|
0.00% |
0 / 1 |
90 | |||
pushBlobs | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
6 | |||
doPop | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
20 | |||
popAndAcquireBlob | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
2 | |||
doAck | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
20 | |||
doDeduplicateRootJob | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
30 | |||
doIsRootJobOldDuplicate | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
20 | |||
doDelete | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
getAllQueuedJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getAllDelayedJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getAllAcquiredJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getAllAbandonedJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getJobIterator | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
2 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
30 | |||
getJobFromUidInternal | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
20 | |||
getServerQueuesWithJobs | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
getNewJobFields | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
12 | |||
getJobFromFields | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
serialize | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
30 | |||
unserialize | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
30 | |||
getConnection | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
handleErrorAndMakeException | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
encodeQueueName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
decodeQueueName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getGlobalKey | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
getQueueKey | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | use MediaWiki\Logger\LoggerFactory; |
22 | use MediaWiki\WikiMap\WikiMap; |
23 | use Psr\Log\LoggerInterface; |
24 | |
25 | /** |
26 | * Redis-backed job queue storage. |
27 | * |
28 | * This is a faster and less resource-intensive job queue than JobQueueDB. |
29 | * All data for a queue using this class is placed into one redis server. |
30 | * |
31 | * When used on a wiki farm, you can optionally use the `redisJobRunnerService` background |
32 | * service from the `mediawiki/services/jobrunner.git` repository, to run jobs from a central |
33 | * system rather than per-wiki via one of the default job runners (e.g. maintenance/runJobs.php). |
34 | * |
35 | * There are eight main redis keys (per queue) used to track jobs: |
36 | * - l-unclaimed : A list of job IDs used for ready unclaimed jobs |
37 | * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries |
38 | * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs |
39 | * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs |
40 | * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication |
41 | * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication |
42 | * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries |
43 | * - h-data : A hash of (job ID => serialized blobs) for job storage |
44 | * |
45 | * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. |
46 | * If an ID appears in any of those lists, it should have a h-data entry for its ID. |
47 | * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then |
48 | * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById |
49 | * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its |
50 | * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. |
51 | * |
52 | * The following keys are used to track queue states: |
53 | * - s-queuesWithJobs : A set of all queues with non-abandoned jobs |
54 | * |
55 | * The background service takes care of undelaying, recycling, and pruning jobs as well as |
56 | * removing s-queuesWithJobs entries as queues empty. |
57 | * |
58 | * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. |
59 | * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. |
60 | * All the keys are prefixed with the relevant wiki ID information. |
61 | * |
62 | * This class requires Redis 2.6 or later as it uses Lua scripting for fast atomic operations. |
63 | * Additionally, it should be noted that redis has different persistence modes, such |
64 | * as rdb snapshots, journaling, or no persistence. Appropriate configuration should be |
65 | * made on the servers based on what queues are using it and what tolerance they have. |
66 | * |
67 | * @since 1.22 |
68 | * @ingroup JobQueue |
69 | * @ingroup Redis |
70 | */ |
71 | class JobQueueRedis extends JobQueue { |
72 | /** @var RedisConnectionPool */ |
73 | protected $redisPool; |
74 | /** @var LoggerInterface */ |
75 | protected $logger; |
76 | |
77 | /** @var string Server address */ |
78 | protected $server; |
79 | /** @var string Compression method to use */ |
80 | protected $compression; |
81 | |
82 | private const MAX_PUSH_SIZE = 25; // avoid tying up the server |
83 | |
84 | /** |
85 | * @param array $params Possible keys: |
86 | * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). |
87 | * Note that the serializer option is ignored as "none" is always used. |
88 | * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. |
89 | * If a hostname is specified but no port, the standard port number |
90 | * 6379 will be used. Required. |
91 | * - compression : The type of compression to use; one of (none,gzip). |
92 | * - daemonized : Set to true if the redisJobRunnerService runs in the background. |
93 | * This will disable job recycling/undelaying from the MediaWiki side |
94 | * to avoid redundancy and out-of-sync configuration. |
95 | * @throws InvalidArgumentException |
96 | */ |
97 | public function __construct( array $params ) { |
98 | parent::__construct( $params ); |
99 | $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua |
100 | $this->server = $params['redisServer']; |
101 | $this->compression = $params['compression'] ?? 'none'; |
102 | $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); |
103 | if ( empty( $params['daemonized'] ) ) { |
104 | throw new InvalidArgumentException( |
105 | "Non-daemonized mode is no longer supported. Please install the " . |
106 | "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); |
107 | } |
108 | $this->logger = LoggerFactory::getInstance( 'redis' ); |
109 | } |
110 | |
111 | protected function supportedOrders() { |
112 | return [ 'timestamp', 'fifo' ]; |
113 | } |
114 | |
115 | protected function optimalOrder() { |
116 | return 'fifo'; |
117 | } |
118 | |
119 | protected function supportsDelayedJobs() { |
120 | return true; |
121 | } |
122 | |
123 | /** |
124 | * @see JobQueue::doIsEmpty() |
125 | * @return bool |
126 | * @throws JobQueueError |
127 | */ |
128 | protected function doIsEmpty() { |
129 | return $this->doGetSize() == 0; |
130 | } |
131 | |
132 | /** |
133 | * @see JobQueue::doGetSize() |
134 | * @return int |
135 | * @throws JobQueueError |
136 | */ |
137 | protected function doGetSize() { |
138 | $conn = $this->getConnection(); |
139 | try { |
140 | return $conn->lLen( $this->getQueueKey( 'l-unclaimed' ) ); |
141 | } catch ( RedisException $e ) { |
142 | throw $this->handleErrorAndMakeException( $conn, $e ); |
143 | } |
144 | } |
145 | |
146 | /** |
147 | * @see JobQueue::doGetAcquiredCount() |
148 | * @return int |
149 | * @throws JobQueueError |
150 | */ |
151 | protected function doGetAcquiredCount() { |
152 | $conn = $this->getConnection(); |
153 | try { |
154 | $conn->multi( Redis::PIPELINE ); |
155 | $conn->zCard( $this->getQueueKey( 'z-claimed' ) ); |
156 | $conn->zCard( $this->getQueueKey( 'z-abandoned' ) ); |
157 | |
158 | return array_sum( $conn->exec() ); |
159 | } catch ( RedisException $e ) { |
160 | throw $this->handleErrorAndMakeException( $conn, $e ); |
161 | } |
162 | } |
163 | |
164 | /** |
165 | * @see JobQueue::doGetDelayedCount() |
166 | * @return int |
167 | * @throws JobQueueError |
168 | */ |
169 | protected function doGetDelayedCount() { |
170 | $conn = $this->getConnection(); |
171 | try { |
172 | return $conn->zCard( $this->getQueueKey( 'z-delayed' ) ); |
173 | } catch ( RedisException $e ) { |
174 | throw $this->handleErrorAndMakeException( $conn, $e ); |
175 | } |
176 | } |
177 | |
178 | /** |
179 | * @see JobQueue::doGetAbandonedCount() |
180 | * @return int |
181 | * @throws JobQueueError |
182 | */ |
183 | protected function doGetAbandonedCount() { |
184 | $conn = $this->getConnection(); |
185 | try { |
186 | return $conn->zCard( $this->getQueueKey( 'z-abandoned' ) ); |
187 | } catch ( RedisException $e ) { |
188 | throw $this->handleErrorAndMakeException( $conn, $e ); |
189 | } |
190 | } |
191 | |
192 | /** |
193 | * @see JobQueue::doBatchPush() |
194 | * @param IJobSpecification[] $jobs |
195 | * @param int $flags |
196 | * @return void |
197 | * @throws JobQueueError |
198 | */ |
199 | protected function doBatchPush( array $jobs, $flags ) { |
200 | // Convert the jobs into field maps (de-duplicated against each other) |
201 | $items = []; // (job ID => job fields map) |
202 | foreach ( $jobs as $job ) { |
203 | $item = $this->getNewJobFields( $job ); |
204 | if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate |
205 | $items[$item['sha1']] = $item; |
206 | } else { |
207 | $items[$item['uuid']] = $item; |
208 | } |
209 | } |
210 | |
211 | if ( $items === [] ) { |
212 | return; // nothing to do |
213 | } |
214 | |
215 | $conn = $this->getConnection(); |
216 | try { |
217 | // Actually push the non-duplicate jobs into the queue... |
218 | if ( $flags & self::QOS_ATOMIC ) { |
219 | $batches = [ $items ]; // all or nothing |
220 | } else { |
221 | $batches = array_chunk( $items, self::MAX_PUSH_SIZE ); |
222 | } |
223 | $failed = 0; |
224 | $pushed = 0; |
225 | foreach ( $batches as $itemBatch ) { |
226 | $added = $this->pushBlobs( $conn, $itemBatch ); |
227 | if ( is_int( $added ) ) { |
228 | $pushed += $added; |
229 | } else { |
230 | $failed += count( $itemBatch ); |
231 | } |
232 | } |
233 | $this->incrStats( 'inserts', $this->type, count( $items ) ); |
234 | $this->incrStats( 'inserts_actual', $this->type, $pushed ); |
235 | $this->incrStats( 'dupe_inserts', $this->type, |
236 | count( $items ) - $failed - $pushed ); |
237 | if ( $failed > 0 ) { |
238 | $err = "Could not insert {$failed} {$this->type} job(s)."; |
239 | wfDebugLog( 'JobQueue', $err ); |
240 | throw new RedisException( $err ); |
241 | } |
242 | } catch ( RedisException $e ) { |
243 | throw $this->handleErrorAndMakeException( $conn, $e ); |
244 | } |
245 | } |
246 | |
247 | /** |
248 | * @param RedisConnRef $conn |
249 | * @param array[] $items List of results from JobQueueRedis::getNewJobFields() |
250 | * @return int Number of jobs inserted (duplicates are ignored) |
251 | * @throws RedisException |
252 | */ |
253 | protected function pushBlobs( RedisConnRef $conn, array $items ) { |
254 | $args = [ $this->encodeQueueName() ]; |
255 | // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) |
256 | foreach ( $items as $item ) { |
257 | $args[] = (string)$item['uuid']; |
258 | $args[] = (string)$item['sha1']; |
259 | $args[] = (string)$item['rtimestamp']; |
260 | $args[] = (string)$this->serialize( $item ); |
261 | } |
262 | static $script = |
263 | /** @lang Lua */ |
264 | <<<LUA |
265 | local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS) |
266 | -- First argument is the queue ID |
267 | local queueId = ARGV[1] |
268 | -- Next arguments all come in 4s (one per job) |
269 | local variadicArgCount = #ARGV - 1 |
270 | if variadicArgCount % 4 ~= 0 then |
271 | return redis.error_reply('Unmatched arguments') |
272 | end |
273 | -- Insert each job into this queue as needed |
274 | local pushed = 0 |
275 | for i = 2,#ARGV,4 do |
276 | local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] |
277 | if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then |
278 | if 1*rtimestamp > 0 then |
279 | -- Insert into delayed queue (release time as score) |
280 | redis.call('zAdd',kDelayed,rtimestamp,id) |
281 | else |
282 | -- Insert into unclaimed queue |
283 | redis.call('lPush',kUnclaimed,id) |
284 | end |
285 | if sha1 ~= '' then |
286 | redis.call('hSet',kSha1ById,id,sha1) |
287 | redis.call('hSet',kIdBySha1,sha1,id) |
288 | end |
289 | redis.call('hSet',kData,id,blob) |
290 | pushed = pushed + 1 |
291 | end |
292 | end |
293 | -- Mark this queue as having jobs |
294 | redis.call('sAdd',kQwJobs,queueId) |
295 | return pushed |
296 | LUA; |
297 | return $conn->luaEval( $script, |
298 | array_merge( |
299 | [ |
300 | $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] |
301 | $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] |
302 | $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] |
303 | $this->getQueueKey( 'z-delayed' ), # KEYS[4] |
304 | $this->getQueueKey( 'h-data' ), # KEYS[5] |
305 | $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6] |
306 | ], |
307 | $args |
308 | ), |
309 | 6 # number of first argument(s) that are keys |
310 | ); |
311 | } |
312 | |
313 | /** |
314 | * @see JobQueue::doPop() |
315 | * @return RunnableJob|false |
316 | * @throws JobQueueError |
317 | */ |
318 | protected function doPop() { |
319 | $job = false; |
320 | |
321 | $conn = $this->getConnection(); |
322 | try { |
323 | do { |
324 | $blob = $this->popAndAcquireBlob( $conn ); |
325 | if ( !is_string( $blob ) ) { |
326 | break; // no jobs; nothing to do |
327 | } |
328 | |
329 | $this->incrStats( 'pops', $this->type ); |
330 | $item = $this->unserialize( $blob ); |
331 | if ( $item === false ) { |
332 | wfDebugLog( 'JobQueue', "Could not unserialize {$this->type} job." ); |
333 | continue; |
334 | } |
335 | |
336 | // If $item is invalid, the runner loop recycling will cleanup as needed |
337 | $job = $this->getJobFromFields( $item ); // may be false |
338 | } while ( !$job ); // job may be false if invalid |
339 | } catch ( RedisException $e ) { |
340 | throw $this->handleErrorAndMakeException( $conn, $e ); |
341 | } |
342 | |
343 | return $job; |
344 | } |
345 | |
346 | /** |
347 | * @param RedisConnRef $conn |
348 | * @return array Serialized string or false |
349 | * @throws RedisException |
350 | */ |
351 | protected function popAndAcquireBlob( RedisConnRef $conn ) { |
352 | static $script = |
353 | /** @lang Lua */ |
354 | <<<LUA |
355 | local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) |
356 | local rTime = unpack(ARGV) |
357 | -- Pop an item off the queue |
358 | local id = redis.call('rPop',kUnclaimed) |
359 | if not id then |
360 | return false |
361 | end |
362 | -- Allow new duplicates of this job |
363 | local sha1 = redis.call('hGet',kSha1ById,id) |
364 | if sha1 then redis.call('hDel',kIdBySha1,sha1) end |
365 | redis.call('hDel',kSha1ById,id) |
366 | -- Mark the jobs as claimed and return it |
367 | redis.call('zAdd',kClaimed,rTime,id) |
368 | redis.call('hIncrBy',kAttempts,id,1) |
369 | return redis.call('hGet',kData,id) |
370 | LUA; |
371 | return $conn->luaEval( $script, |
372 | [ |
373 | $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] |
374 | $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] |
375 | $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] |
376 | $this->getQueueKey( 'z-claimed' ), # KEYS[4] |
377 | $this->getQueueKey( 'h-attempts' ), # KEYS[5] |
378 | $this->getQueueKey( 'h-data' ), # KEYS[6] |
379 | time(), # ARGV[1] (injected to be replication-safe) |
380 | ], |
381 | 6 # number of first argument(s) that are keys |
382 | ); |
383 | } |
384 | |
385 | /** |
386 | * @see JobQueue::doAck() |
387 | * @param RunnableJob $job |
388 | * @return RunnableJob|bool |
389 | * @throws UnexpectedValueException |
390 | * @throws JobQueueError |
391 | */ |
392 | protected function doAck( RunnableJob $job ) { |
393 | $uuid = $job->getMetadata( 'uuid' ); |
394 | if ( $uuid === null ) { |
395 | throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); |
396 | } |
397 | |
398 | $conn = $this->getConnection(); |
399 | try { |
400 | static $script = |
401 | /** @lang Lua */ |
402 | <<<LUA |
403 | local kClaimed, kAttempts, kData = unpack(KEYS) |
404 | local id = unpack(ARGV) |
405 | -- Unmark the job as claimed |
406 | local removed = redis.call('zRem',kClaimed,id) |
407 | -- Check if the job was recycled |
408 | if removed == 0 then |
409 | return 0 |
410 | end |
411 | -- Delete the retry data |
412 | redis.call('hDel',kAttempts,id) |
413 | -- Delete the job data itself |
414 | return redis.call('hDel',kData,id) |
415 | LUA; |
416 | $res = $conn->luaEval( $script, |
417 | [ |
418 | $this->getQueueKey( 'z-claimed' ), # KEYS[1] |
419 | $this->getQueueKey( 'h-attempts' ), # KEYS[2] |
420 | $this->getQueueKey( 'h-data' ), # KEYS[3] |
421 | $uuid # ARGV[1] |
422 | ], |
423 | 3 # number of first argument(s) that are keys |
424 | ); |
425 | |
426 | if ( !$res ) { |
427 | wfDebugLog( 'JobQueue', "Could not acknowledge {$this->type} job $uuid." ); |
428 | |
429 | return false; |
430 | } |
431 | |
432 | $this->incrStats( 'acks', $this->type ); |
433 | } catch ( RedisException $e ) { |
434 | throw $this->handleErrorAndMakeException( $conn, $e ); |
435 | } |
436 | |
437 | return true; |
438 | } |
439 | |
440 | /** |
441 | * @see JobQueue::doDeduplicateRootJob() |
442 | * @param IJobSpecification $job |
443 | * @return bool |
444 | * @throws JobQueueError |
445 | * @throws LogicException |
446 | */ |
447 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
448 | if ( !$job->hasRootJobParams() ) { |
449 | throw new LogicException( "Cannot register root job; missing parameters." ); |
450 | } |
451 | $params = $job->getRootJobParams(); |
452 | |
453 | $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ); |
454 | |
455 | $conn = $this->getConnection(); |
456 | try { |
457 | $timestamp = $conn->get( $key ); // last known timestamp of such a root job |
458 | if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { |
459 | return true; // a newer version of this root job was enqueued |
460 | } |
461 | |
462 | // Update the timestamp of the last root job started at the location... |
463 | return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks |
464 | } catch ( RedisException $e ) { |
465 | throw $this->handleErrorAndMakeException( $conn, $e ); |
466 | } |
467 | } |
468 | |
469 | /** |
470 | * @see JobQueue::doIsRootJobOldDuplicate() |
471 | * @param IJobSpecification $job |
472 | * @return bool |
473 | * @throws JobQueueError |
474 | */ |
475 | protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { |
476 | if ( !$job->hasRootJobParams() ) { |
477 | return false; // job has no de-duplication info |
478 | } |
479 | $params = $job->getRootJobParams(); |
480 | |
481 | $conn = $this->getConnection(); |
482 | try { |
483 | // Get the last time this root job was enqueued |
484 | $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ) ); |
485 | } catch ( RedisException $e ) { |
486 | throw $this->handleErrorAndMakeException( $conn, $e ); |
487 | } |
488 | |
489 | // Check if a new root job was started at the location after this one's... |
490 | return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); |
491 | } |
492 | |
493 | /** |
494 | * @see JobQueue::doDelete() |
495 | * @return bool |
496 | * @throws JobQueueError |
497 | */ |
498 | protected function doDelete() { |
499 | static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned', |
500 | 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ]; |
501 | |
502 | $conn = $this->getConnection(); |
503 | try { |
504 | $keys = []; |
505 | foreach ( $props as $prop ) { |
506 | $keys[] = $this->getQueueKey( $prop ); |
507 | } |
508 | |
509 | $ok = ( $conn->del( $keys ) !== false ); |
510 | $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() ); |
511 | |
512 | return $ok; |
513 | } catch ( RedisException $e ) { |
514 | throw $this->handleErrorAndMakeException( $conn, $e ); |
515 | } |
516 | } |
517 | |
518 | /** |
519 | * @see JobQueue::getAllQueuedJobs() |
520 | * @return Iterator<RunnableJob> |
521 | * @throws JobQueueError |
522 | */ |
523 | public function getAllQueuedJobs() { |
524 | $conn = $this->getConnection(); |
525 | try { |
526 | $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); |
527 | } catch ( RedisException $e ) { |
528 | throw $this->handleErrorAndMakeException( $conn, $e ); |
529 | } |
530 | |
531 | return $this->getJobIterator( $conn, $uids ); |
532 | } |
533 | |
534 | /** |
535 | * @see JobQueue::getAllDelayedJobs() |
536 | * @return Iterator<RunnableJob> |
537 | * @throws JobQueueError |
538 | */ |
539 | public function getAllDelayedJobs() { |
540 | $conn = $this->getConnection(); |
541 | try { |
542 | $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); |
543 | } catch ( RedisException $e ) { |
544 | throw $this->handleErrorAndMakeException( $conn, $e ); |
545 | } |
546 | |
547 | return $this->getJobIterator( $conn, $uids ); |
548 | } |
549 | |
550 | /** |
551 | * @see JobQueue::getAllAcquiredJobs() |
552 | * @return Iterator<RunnableJob> |
553 | * @throws JobQueueError |
554 | */ |
555 | public function getAllAcquiredJobs() { |
556 | $conn = $this->getConnection(); |
557 | try { |
558 | $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); |
559 | } catch ( RedisException $e ) { |
560 | throw $this->handleErrorAndMakeException( $conn, $e ); |
561 | } |
562 | |
563 | return $this->getJobIterator( $conn, $uids ); |
564 | } |
565 | |
566 | /** |
567 | * @see JobQueue::getAllAbandonedJobs() |
568 | * @return Iterator<RunnableJob> |
569 | * @throws JobQueueError |
570 | */ |
571 | public function getAllAbandonedJobs() { |
572 | $conn = $this->getConnection(); |
573 | try { |
574 | $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); |
575 | } catch ( RedisException $e ) { |
576 | throw $this->handleErrorAndMakeException( $conn, $e ); |
577 | } |
578 | |
579 | return $this->getJobIterator( $conn, $uids ); |
580 | } |
581 | |
582 | /** |
583 | * @param RedisConnRef $conn |
584 | * @param array $uids List of job UUIDs |
585 | * @return MappedIterator<RunnableJob> |
586 | */ |
587 | protected function getJobIterator( RedisConnRef $conn, array $uids ) { |
588 | return new MappedIterator( |
589 | $uids, |
590 | function ( $uid ) use ( $conn ) { |
591 | return $this->getJobFromUidInternal( $uid, $conn ); |
592 | }, |
593 | [ 'accept' => static function ( $job ) { |
594 | return is_object( $job ); |
595 | } ] |
596 | ); |
597 | } |
598 | |
599 | public function getCoalesceLocationInternal() { |
600 | return "RedisServer:" . $this->server; |
601 | } |
602 | |
603 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
604 | return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); |
605 | } |
606 | |
607 | protected function doGetSiblingQueueSizes( array $types ) { |
608 | $sizes = []; // (type => size) |
609 | $types = array_values( $types ); // reindex |
610 | $conn = $this->getConnection(); |
611 | try { |
612 | $conn->multi( Redis::PIPELINE ); |
613 | foreach ( $types as $type ) { |
614 | $conn->lLen( $this->getQueueKey( 'l-unclaimed', $type ) ); |
615 | } |
616 | $res = $conn->exec(); |
617 | if ( is_array( $res ) ) { |
618 | foreach ( $res as $i => $size ) { |
619 | $sizes[$types[$i]] = $size; |
620 | } |
621 | } |
622 | } catch ( RedisException $e ) { |
623 | throw $this->handleErrorAndMakeException( $conn, $e ); |
624 | } |
625 | |
626 | return $sizes; |
627 | } |
628 | |
629 | /** |
630 | * This function should not be called outside JobQueueRedis |
631 | * |
632 | * @param string $uid |
633 | * @param RedisConnRef|Redis $conn |
634 | * @return RunnableJob|false Returns false if the job does not exist |
635 | * @throws JobQueueError |
636 | * @throws UnexpectedValueException |
637 | */ |
638 | public function getJobFromUidInternal( $uid, $conn ) { |
639 | try { |
640 | $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); |
641 | if ( $data === false ) { |
642 | return false; // not found |
643 | } |
644 | $item = $this->unserialize( $data ); |
645 | if ( !is_array( $item ) ) { // this shouldn't happen |
646 | throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." ); |
647 | } |
648 | |
649 | $params = $item['params']; |
650 | $params += [ 'namespace' => $item['namespace'], 'title' => $item['title'] ]; |
651 | $job = $this->factoryJob( $item['type'], $params ); |
652 | $job->setMetadata( 'uuid', $item['uuid'] ); |
653 | $job->setMetadata( 'timestamp', $item['timestamp'] ); |
654 | // Add in attempt count for debugging at showJobs.php |
655 | $job->setMetadata( 'attempts', |
656 | $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ) ); |
657 | |
658 | return $job; |
659 | } catch ( RedisException $e ) { |
660 | throw $this->handleErrorAndMakeException( $conn, $e ); |
661 | } |
662 | } |
663 | |
664 | /** |
665 | * @return array List of (wiki,type) tuples for queues with non-abandoned jobs |
666 | * @throws JobQueueConnectionError |
667 | * @throws JobQueueError |
668 | */ |
669 | public function getServerQueuesWithJobs() { |
670 | $queues = []; |
671 | |
672 | $conn = $this->getConnection(); |
673 | try { |
674 | $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) ); |
675 | foreach ( $set as $queue ) { |
676 | $queues[] = $this->decodeQueueName( $queue ); |
677 | } |
678 | } catch ( RedisException $e ) { |
679 | throw $this->handleErrorAndMakeException( $conn, $e ); |
680 | } |
681 | |
682 | return $queues; |
683 | } |
684 | |
685 | /** |
686 | * @param IJobSpecification $job |
687 | * @return array |
688 | */ |
689 | protected function getNewJobFields( IJobSpecification $job ) { |
690 | return [ |
691 | // Fields that describe the nature of the job |
692 | 'type' => $job->getType(), |
693 | 'namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, |
694 | 'title' => $job->getParams()['title'] ?? '', |
695 | 'params' => $job->getParams(), |
696 | // Some jobs cannot run until a "release timestamp" |
697 | 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, |
698 | // Additional job metadata |
699 | 'uuid' => $this->idGenerator->newRawUUIDv4(), |
700 | 'sha1' => $job->ignoreDuplicates() |
701 | ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) |
702 | : '', |
703 | 'timestamp' => time() // UNIX timestamp |
704 | ]; |
705 | } |
706 | |
707 | /** |
708 | * @param array $fields |
709 | * @return RunnableJob|false |
710 | */ |
711 | protected function getJobFromFields( array $fields ) { |
712 | $params = $fields['params']; |
713 | $params += [ 'namespace' => $fields['namespace'], 'title' => $fields['title'] ]; |
714 | |
715 | $job = $this->factoryJob( $fields['type'], $params ); |
716 | $job->setMetadata( 'uuid', $fields['uuid'] ); |
717 | $job->setMetadata( 'timestamp', $fields['timestamp'] ); |
718 | |
719 | return $job; |
720 | } |
721 | |
722 | /** |
723 | * @param array $fields |
724 | * @return string Serialized and possibly compressed version of $fields |
725 | */ |
726 | protected function serialize( array $fields ) { |
727 | $blob = serialize( $fields ); |
728 | if ( $this->compression === 'gzip' |
729 | && strlen( $blob ) >= 1024 |
730 | && function_exists( 'gzdeflate' ) |
731 | ) { |
732 | $object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ]; |
733 | $blobz = serialize( $object ); |
734 | |
735 | return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; |
736 | } else { |
737 | return $blob; |
738 | } |
739 | } |
740 | |
741 | /** |
742 | * @param string $blob |
743 | * @return array|false Unserialized version of $blob or false |
744 | */ |
745 | protected function unserialize( $blob ) { |
746 | $fields = unserialize( $blob ); |
747 | if ( is_object( $fields ) ) { |
748 | if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { |
749 | $fields = unserialize( gzinflate( $fields->blob ) ); |
750 | } else { |
751 | $fields = false; |
752 | } |
753 | } |
754 | |
755 | return is_array( $fields ) ? $fields : false; |
756 | } |
757 | |
758 | /** |
759 | * Get a connection to the server that handles all sub-queues for this queue |
760 | * |
761 | * @return RedisConnRef|Redis |
762 | * @throws JobQueueConnectionError |
763 | */ |
764 | protected function getConnection() { |
765 | $conn = $this->redisPool->getConnection( $this->server, $this->logger ); |
766 | if ( !$conn ) { |
767 | throw new JobQueueConnectionError( |
768 | "Unable to connect to redis server {$this->server}." ); |
769 | } |
770 | |
771 | return $conn; |
772 | } |
773 | |
774 | /** |
775 | * @param RedisConnRef $conn |
776 | * @param RedisException $e |
777 | * @return JobQueueError |
778 | */ |
779 | protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) { |
780 | $this->redisPool->handleError( $conn, $e ); |
781 | return new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); |
782 | } |
783 | |
784 | /** |
785 | * @return string JSON |
786 | */ |
787 | private function encodeQueueName() { |
788 | return json_encode( [ $this->type, $this->domain ] ); |
789 | } |
790 | |
791 | /** |
792 | * @param string $name JSON |
793 | * @return array (type, wiki) |
794 | */ |
795 | private function decodeQueueName( $name ) { |
796 | return json_decode( $name ); |
797 | } |
798 | |
799 | /** |
800 | * @param string $name |
801 | * @return string |
802 | */ |
803 | private function getGlobalKey( $name ) { |
804 | $parts = [ 'global', 'jobqueue', $name ]; |
805 | foreach ( $parts as $part ) { |
806 | if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) { |
807 | throw new InvalidArgumentException( "Key part characters are out of range." ); |
808 | } |
809 | } |
810 | |
811 | return implode( ':', $parts ); |
812 | } |
813 | |
814 | /** |
815 | * @param string $prop |
816 | * @param string|null $type Override this for sibling queues |
817 | * @return string |
818 | */ |
819 | private function getQueueKey( $prop, $type = null ) { |
820 | $type = is_string( $type ) ? $type : $this->type; |
821 | |
822 | // Use wiki ID for b/c |
823 | $keyspace = WikiMap::getWikiIdFromDbDomain( $this->domain ); |
824 | |
825 | $parts = [ $keyspace, 'jobqueue', $type, $prop ]; |
826 | |
827 | // Parts are typically ASCII, but encode to escape ":" |
828 | return implode( ':', array_map( 'rawurlencode', $parts ) ); |
829 | } |
830 | } |