MediaWiki REL1_40
JobQueueRedis.php
Go to the documentation of this file.
1<?php
25use Psr\Log\LoggerInterface;
26
69class JobQueueRedis extends JobQueue {
71 protected $redisPool;
73 protected $logger;
74
76 protected $server;
78 protected $compression;
79
80 private const MAX_PUSH_SIZE = 25; // avoid tying up the server
81
95 public function __construct( array $params ) {
96 parent::__construct( $params );
97 $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
98 $this->server = $params['redisServer'];
99 $this->compression = $params['compression'] ?? 'none';
100 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
101 if ( empty( $params['daemonized'] ) ) {
102 throw new InvalidArgumentException(
103 "Non-daemonized mode is no longer supported. Please install the " .
104 "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
105 }
106 $this->logger = LoggerFactory::getInstance( 'redis' );
107 }
108
109 protected function supportedOrders() {
110 return [ 'timestamp', 'fifo' ];
111 }
112
113 protected function optimalOrder() {
114 return 'fifo';
115 }
116
117 protected function supportsDelayedJobs() {
118 return true;
119 }
120
126 protected function doIsEmpty() {
127 return $this->doGetSize() == 0;
128 }
129
135 protected function doGetSize() {
136 $conn = $this->getConnection();
137 try {
138 return $conn->lLen( $this->getQueueKey( 'l-unclaimed' ) );
139 } catch ( RedisException $e ) {
140 throw $this->handleErrorAndMakeException( $conn, $e );
141 }
142 }
143
149 protected function doGetAcquiredCount() {
150 $conn = $this->getConnection();
151 try {
152 $conn->multi( Redis::PIPELINE );
153 $conn->zCard( $this->getQueueKey( 'z-claimed' ) );
154 $conn->zCard( $this->getQueueKey( 'z-abandoned' ) );
155
156 return array_sum( $conn->exec() );
157 } catch ( RedisException $e ) {
158 throw $this->handleErrorAndMakeException( $conn, $e );
159 }
160 }
161
167 protected function doGetDelayedCount() {
168 $conn = $this->getConnection();
169 try {
170 return $conn->zCard( $this->getQueueKey( 'z-delayed' ) );
171 } catch ( RedisException $e ) {
172 throw $this->handleErrorAndMakeException( $conn, $e );
173 }
174 }
175
181 protected function doGetAbandonedCount() {
182 $conn = $this->getConnection();
183 try {
184 return $conn->zCard( $this->getQueueKey( 'z-abandoned' ) );
185 } catch ( RedisException $e ) {
186 throw $this->handleErrorAndMakeException( $conn, $e );
187 }
188 }
189
197 protected function doBatchPush( array $jobs, $flags ) {
198 // Convert the jobs into field maps (de-duplicated against each other)
199 $items = []; // (job ID => job fields map)
200 foreach ( $jobs as $job ) {
201 $item = $this->getNewJobFields( $job );
202 if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
203 $items[$item['sha1']] = $item;
204 } else {
205 $items[$item['uuid']] = $item;
206 }
207 }
208
209 if ( $items === [] ) {
210 return; // nothing to do
211 }
212
213 $conn = $this->getConnection();
214 try {
215 // Actually push the non-duplicate jobs into the queue...
216 if ( $flags & self::QOS_ATOMIC ) {
217 $batches = [ $items ]; // all or nothing
218 } else {
219 $batches = array_chunk( $items, self::MAX_PUSH_SIZE );
220 }
221 $failed = 0;
222 $pushed = 0;
223 foreach ( $batches as $itemBatch ) {
224 $added = $this->pushBlobs( $conn, $itemBatch );
225 if ( is_int( $added ) ) {
226 $pushed += $added;
227 } else {
228 $failed += count( $itemBatch );
229 }
230 }
231 $this->incrStats( 'inserts', $this->type, count( $items ) );
232 $this->incrStats( 'inserts_actual', $this->type, $pushed );
233 $this->incrStats( 'dupe_inserts', $this->type,
234 count( $items ) - $failed - $pushed );
235 if ( $failed > 0 ) {
236 $err = "Could not insert {$failed} {$this->type} job(s).";
237 wfDebugLog( 'JobQueueRedis', $err );
238 throw new RedisException( $err );
239 }
240 } catch ( RedisException $e ) {
241 throw $this->handleErrorAndMakeException( $conn, $e );
242 }
243 }
244
251 protected function pushBlobs( RedisConnRef $conn, array $items ) {
252 $args = [ $this->encodeQueueName() ];
253 // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
254 foreach ( $items as $item ) {
255 $args[] = (string)$item['uuid'];
256 $args[] = (string)$item['sha1'];
257 $args[] = (string)$item['rtimestamp'];
258 $args[] = (string)$this->serialize( $item );
259 }
260 static $script =
262<<<LUA
263 local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
264 -- First argument is the queue ID
265 local queueId = ARGV[1]
266 -- Next arguments all come in 4s (one per job)
267 local variadicArgCount = #ARGV - 1
268 if variadicArgCount % 4 ~= 0 then
269 return redis.error_reply('Unmatched arguments')
270 end
271 -- Insert each job into this queue as needed
272 local pushed = 0
273 for i = 2,#ARGV,4 do
274 local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
275 if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
276 if 1*rtimestamp > 0 then
277 -- Insert into delayed queue (release time as score)
278 redis.call('zAdd',kDelayed,rtimestamp,id)
279 else
280 -- Insert into unclaimed queue
281 redis.call('lPush',kUnclaimed,id)
282 end
283 if sha1 ~= '' then
284 redis.call('hSet',kSha1ById,id,sha1)
285 redis.call('hSet',kIdBySha1,sha1,id)
286 end
287 redis.call('hSet',kData,id,blob)
288 pushed = pushed + 1
289 end
290 end
291 -- Mark this queue as having jobs
292 redis.call('sAdd',kQwJobs,queueId)
293 return pushed
294LUA;
295 return $conn->luaEval( $script,
296 array_merge(
297 [
298 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
299 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
300 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
301 $this->getQueueKey( 'z-delayed' ), # KEYS[4]
302 $this->getQueueKey( 'h-data' ), # KEYS[5]
303 $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
304 ],
305 $args
306 ),
307 6 # number of first argument(s) that are keys
308 );
309 }
310
316 protected function doPop() {
317 $job = false;
318
319 $conn = $this->getConnection();
320 try {
321 do {
322 $blob = $this->popAndAcquireBlob( $conn );
323 if ( !is_string( $blob ) ) {
324 break; // no jobs; nothing to do
325 }
326
327 $this->incrStats( 'pops', $this->type );
328 $item = $this->unserialize( $blob );
329 if ( $item === false ) {
330 wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
331 continue;
332 }
333
334 // If $item is invalid, the runner loop recycling will cleanup as needed
335 $job = $this->getJobFromFields( $item ); // may be false
336 } while ( !$job ); // job may be false if invalid
337 } catch ( RedisException $e ) {
338 throw $this->handleErrorAndMakeException( $conn, $e );
339 }
340
341 return $job;
342 }
343
349 protected function popAndAcquireBlob( RedisConnRef $conn ) {
350 static $script =
352<<<LUA
353 local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
354 local rTime = unpack(ARGV)
355 -- Pop an item off the queue
356 local id = redis.call('rPop',kUnclaimed)
357 if not id then
358 return false
359 end
360 -- Allow new duplicates of this job
361 local sha1 = redis.call('hGet',kSha1ById,id)
362 if sha1 then redis.call('hDel',kIdBySha1,sha1) end
363 redis.call('hDel',kSha1ById,id)
364 -- Mark the jobs as claimed and return it
365 redis.call('zAdd',kClaimed,rTime,id)
366 redis.call('hIncrBy',kAttempts,id,1)
367 return redis.call('hGet',kData,id)
368LUA;
369 return $conn->luaEval( $script,
370 [
371 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
372 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
373 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
374 $this->getQueueKey( 'z-claimed' ), # KEYS[4]
375 $this->getQueueKey( 'h-attempts' ), # KEYS[5]
376 $this->getQueueKey( 'h-data' ), # KEYS[6]
377 time(), # ARGV[1] (injected to be replication-safe)
378 ],
379 6 # number of first argument(s) that are keys
380 );
381 }
382
390 protected function doAck( RunnableJob $job ) {
391 $uuid = $job->getMetadata( 'uuid' );
392 if ( $uuid === null ) {
393 throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." );
394 }
395
396 $conn = $this->getConnection();
397 try {
398 static $script =
400<<<LUA
401 local kClaimed, kAttempts, kData = unpack(KEYS)
402 local id = unpack(ARGV)
403 -- Unmark the job as claimed
404 local removed = redis.call('zRem',kClaimed,id)
405 -- Check if the job was recycled
406 if removed == 0 then
407 return 0
408 end
409 -- Delete the retry data
410 redis.call('hDel',kAttempts,id)
411 -- Delete the job data itself
412 return redis.call('hDel',kData,id)
413LUA;
414 $res = $conn->luaEval( $script,
415 [
416 $this->getQueueKey( 'z-claimed' ), # KEYS[1]
417 $this->getQueueKey( 'h-attempts' ), # KEYS[2]
418 $this->getQueueKey( 'h-data' ), # KEYS[3]
419 $uuid # ARGV[1]
420 ],
421 3 # number of first argument(s) that are keys
422 );
423
424 if ( !$res ) {
425 wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." );
426
427 return false;
428 }
429
430 $this->incrStats( 'acks', $this->type );
431 } catch ( RedisException $e ) {
432 throw $this->handleErrorAndMakeException( $conn, $e );
433 }
434
435 return true;
436 }
437
446 if ( !$job->hasRootJobParams() ) {
447 throw new LogicException( "Cannot register root job; missing parameters." );
448 }
449 $params = $job->getRootJobParams();
450
451 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
452
453 $conn = $this->getConnection();
454 try {
455 $timestamp = $conn->get( $key ); // last known timestamp of such a root job
456 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
457 return true; // a newer version of this root job was enqueued
458 }
459
460 // Update the timestamp of the last root job started at the location...
461 return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
462 } catch ( RedisException $e ) {
463 throw $this->handleErrorAndMakeException( $conn, $e );
464 }
465 }
466
474 if ( !$job->hasRootJobParams() ) {
475 return false; // job has no de-duplication info
476 }
477 $params = $job->getRootJobParams();
478
479 $conn = $this->getConnection();
480 try {
481 // Get the last time this root job was enqueued
482 $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ) );
483 } catch ( RedisException $e ) {
484 throw $this->handleErrorAndMakeException( $conn, $e );
485 }
486
487 // Check if a new root job was started at the location after this one's...
488 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
489 }
490
496 protected function doDelete() {
497 static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned',
498 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ];
499
500 $conn = $this->getConnection();
501 try {
502 $keys = [];
503 foreach ( $props as $prop ) {
504 $keys[] = $this->getQueueKey( $prop );
505 }
506
507 $ok = ( $conn->del( $keys ) !== false );
508 $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
509
510 return $ok;
511 } catch ( RedisException $e ) {
512 throw $this->handleErrorAndMakeException( $conn, $e );
513 }
514 }
515
521 public function getAllQueuedJobs() {
522 $conn = $this->getConnection();
523 try {
524 $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
525 } catch ( RedisException $e ) {
526 throw $this->handleErrorAndMakeException( $conn, $e );
527 }
528
529 return $this->getJobIterator( $conn, $uids );
530 }
531
537 public function getAllDelayedJobs() {
538 $conn = $this->getConnection();
539 try {
540 $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
541 } catch ( RedisException $e ) {
542 throw $this->handleErrorAndMakeException( $conn, $e );
543 }
544
545 return $this->getJobIterator( $conn, $uids );
546 }
547
553 public function getAllAcquiredJobs() {
554 $conn = $this->getConnection();
555 try {
556 $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
557 } catch ( RedisException $e ) {
558 throw $this->handleErrorAndMakeException( $conn, $e );
559 }
560
561 return $this->getJobIterator( $conn, $uids );
562 }
563
569 public function getAllAbandonedJobs() {
570 $conn = $this->getConnection();
571 try {
572 $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
573 } catch ( RedisException $e ) {
574 throw $this->handleErrorAndMakeException( $conn, $e );
575 }
576
577 return $this->getJobIterator( $conn, $uids );
578 }
579
585 protected function getJobIterator( RedisConnRef $conn, array $uids ) {
586 return new MappedIterator(
587 $uids,
588 function ( $uid ) use ( $conn ) {
589 return $this->getJobFromUidInternal( $uid, $conn );
590 },
591 [ 'accept' => static function ( $job ) {
592 return is_object( $job );
593 } ]
594 );
595 }
596
597 public function getCoalesceLocationInternal() {
598 return "RedisServer:" . $this->server;
599 }
600
601 protected function doGetSiblingQueuesWithJobs( array $types ) {
602 return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
603 }
604
605 protected function doGetSiblingQueueSizes( array $types ) {
606 $sizes = []; // (type => size)
607 $types = array_values( $types ); // reindex
608 $conn = $this->getConnection();
609 try {
610 $conn->multi( Redis::PIPELINE );
611 foreach ( $types as $type ) {
612 $conn->lLen( $this->getQueueKey( 'l-unclaimed', $type ) );
613 }
614 $res = $conn->exec();
615 if ( is_array( $res ) ) {
616 foreach ( $res as $i => $size ) {
617 $sizes[$types[$i]] = $size;
618 }
619 }
620 } catch ( RedisException $e ) {
621 throw $this->handleErrorAndMakeException( $conn, $e );
622 }
623
624 return $sizes;
625 }
626
636 public function getJobFromUidInternal( $uid, $conn ) {
637 try {
638 $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
639 if ( $data === false ) {
640 return false; // not found
641 }
642 $item = $this->unserialize( $data );
643 if ( !is_array( $item ) ) { // this shouldn't happen
644 throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." );
645 }
646
647 $params = $item['params'];
648 $params += [ 'namespace' => $item['namespace'], 'title' => $item['title'] ];
649 $job = $this->factoryJob( $item['type'], $params );
650 $job->setMetadata( 'uuid', $item['uuid'] );
651 $job->setMetadata( 'timestamp', $item['timestamp'] );
652 // Add in attempt count for debugging at showJobs.php
653 $job->setMetadata( 'attempts',
654 $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ) );
655
656 return $job;
657 } catch ( RedisException $e ) {
658 throw $this->handleErrorAndMakeException( $conn, $e );
659 }
660 }
661
667 public function getServerQueuesWithJobs() {
668 $queues = [];
669
670 $conn = $this->getConnection();
671 try {
672 $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
673 foreach ( $set as $queue ) {
674 $queues[] = $this->decodeQueueName( $queue );
675 }
676 } catch ( RedisException $e ) {
677 throw $this->handleErrorAndMakeException( $conn, $e );
678 }
679
680 return $queues;
681 }
682
687 protected function getNewJobFields( IJobSpecification $job ) {
688 return [
689 // Fields that describe the nature of the job
690 'type' => $job->getType(),
691 'namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
692 'title' => $job->getParams()['title'] ?? '',
693 'params' => $job->getParams(),
694 // Some jobs cannot run until a "release timestamp"
695 'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
696 // Additional job metadata
697 'uuid' => $this->idGenerator->newRawUUIDv4(),
698 'sha1' => $job->ignoreDuplicates()
699 ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
700 : '',
701 'timestamp' => time() // UNIX timestamp
702 ];
703 }
704
709 protected function getJobFromFields( array $fields ) {
710 $params = $fields['params'];
711 $params += [ 'namespace' => $fields['namespace'], 'title' => $fields['title'] ];
712
713 $job = $this->factoryJob( $fields['type'], $params );
714 $job->setMetadata( 'uuid', $fields['uuid'] );
715 $job->setMetadata( 'timestamp', $fields['timestamp'] );
716
717 return $job;
718 }
719
724 protected function serialize( array $fields ) {
725 $blob = serialize( $fields );
726 if ( $this->compression === 'gzip'
727 && strlen( $blob ) >= 1024
728 && function_exists( 'gzdeflate' )
729 ) {
730 $object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ];
731 $blobz = serialize( $object );
732
733 return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
734 } else {
735 return $blob;
736 }
737 }
738
743 protected function unserialize( $blob ) {
744 $fields = unserialize( $blob );
745 if ( is_object( $fields ) ) {
746 if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
747 $fields = unserialize( gzinflate( $fields->blob ) );
748 } else {
749 $fields = false;
750 }
751 }
752
753 return is_array( $fields ) ? $fields : false;
754 }
755
762 protected function getConnection() {
763 $conn = $this->redisPool->getConnection( $this->server, $this->logger );
764 if ( !$conn ) {
765 throw new JobQueueConnectionError(
766 "Unable to connect to redis server {$this->server}." );
767 }
768
769 return $conn;
770 }
771
777 protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) {
778 $this->redisPool->handleError( $conn, $e );
779 return new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
780 }
781
785 private function encodeQueueName() {
786 return json_encode( [ $this->type, $this->domain ] );
787 }
788
793 private function decodeQueueName( $name ) {
794 return json_decode( $name );
795 }
796
801 private function getGlobalKey( $name ) {
802 $parts = [ 'global', 'jobqueue', $name ];
803 foreach ( $parts as $part ) {
804 if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
805 throw new InvalidArgumentException( "Key part characters are out of range." );
806 }
807 }
808
809 return implode( ':', $parts );
810 }
811
817 private function getQueueKey( $prop, $type = null ) {
818 $type = is_string( $type ) ? $type : $this->type;
819
820 // Use wiki ID for b/c
821 $keyspace = WikiMap::getWikiIdFromDbDomain( $this->domain );
822
823 $parts = [ $keyspace, 'jobqueue', $type, $prop ];
824
825 // Parts are typically ASCII, but encode to escape ":"
826 return implode( ':', array_map( 'rawurlencode', $parts ) );
827 }
828}
const NS_SPECIAL
Definition Defines.php:53
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Class to handle job queues stored in Redis.
doDeduplicateRootJob(IJobSpecification $job)
__construct(array $params)
popAndAcquireBlob(RedisConnRef $conn)
doAck(RunnableJob $job)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
getJobFromUidInternal( $uid, $conn)
This function should not be called outside JobQueueRedis.
doGetSiblingQueuesWithJobs(array $types)
doGetSiblingQueueSizes(array $types)
RedisConnectionPool $redisPool
pushBlobs(RedisConnRef $conn, array $items)
string $server
Server address.
supportedOrders()
Get the allowed queue orders for configuration validation.
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
string $compression
Compression method to use.
LoggerInterface $logger
serialize(array $fields)
doIsRootJobOldDuplicate(IJobSpecification $job)
doBatchPush(array $jobs, $flags)
getJobIterator(RedisConnRef $conn, array $uids)
getConnection()
Get a connection to the server that handles all sub-queues for this queue.
getJobFromFields(array $fields)
optimalOrder()
Get the default queue order to use if configuration does not specify one.
handleErrorAndMakeException(RedisConnRef $conn, $e)
getNewJobFields(IJobSpecification $job)
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:39
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition JobQueue.php:780
string $type
Job type.
Definition JobQueue.php:43
factoryJob( $command, $params)
Definition JobQueue.php:746
getRootJobCacheKey( $signature, $type)
Definition JobQueue.php:564
Convenience class for generating iterators from iterators.
PSR-3 logger instance factory.
Helper tools for dealing with other locally-hosted wikis.
Definition WikiMap.php:33
Helper class to handle automatically marking connections as reusable (via RAII pattern)
luaEval( $script, array $params, $numKeys)
Helper class to manage Redis connections.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
if(count( $args)< 1) $job