MediaWiki master
JobQueueRedis.php
Go to the documentation of this file.
1<?php
21namespace MediaWiki\JobQueue;
22
23use InvalidArgumentException;
24use LogicException;
30use Psr\Log\LoggerInterface;
31use Redis;
32use RedisException;
33use UnexpectedValueException;
36
83class JobQueueRedis extends JobQueue {
85 protected $redisPool;
87 protected $logger;
88
90 protected $server;
92 protected $compression;
93
94 private const MAX_PUSH_SIZE = 25; // avoid tying up the server
95
108 public function __construct( array $params ) {
109 parent::__construct( $params );
110 $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
111 $this->server = $params['redisServer'];
112 $this->compression = $params['compression'] ?? 'none';
113 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
114 if ( empty( $params['daemonized'] ) ) {
115 throw new InvalidArgumentException(
116 "Non-daemonized mode is no longer supported. Please install the " .
117 "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
118 }
119 $this->logger = LoggerFactory::getInstance( 'redis' );
120 }
121
122 protected function supportedOrders() {
123 return [ 'timestamp', 'fifo' ];
124 }
125
126 protected function optimalOrder() {
127 return 'fifo';
128 }
129
130 protected function supportsDelayedJobs() {
131 return true;
132 }
133
139 protected function doIsEmpty() {
140 return $this->doGetSize() == 0;
141 }
142
148 protected function doGetSize() {
149 $conn = $this->getConnection();
150 try {
151 return $conn->lLen( $this->getQueueKey( 'l-unclaimed' ) );
152 } catch ( RedisException $e ) {
153 throw $this->handleErrorAndMakeException( $conn, $e );
154 }
155 }
156
162 protected function doGetAcquiredCount() {
163 $conn = $this->getConnection();
164 try {
165 $conn->multi( Redis::PIPELINE );
166 $conn->zCard( $this->getQueueKey( 'z-claimed' ) );
167 $conn->zCard( $this->getQueueKey( 'z-abandoned' ) );
168
169 return array_sum( $conn->exec() );
170 } catch ( RedisException $e ) {
171 throw $this->handleErrorAndMakeException( $conn, $e );
172 }
173 }
174
180 protected function doGetDelayedCount() {
181 $conn = $this->getConnection();
182 try {
183 return $conn->zCard( $this->getQueueKey( 'z-delayed' ) );
184 } catch ( RedisException $e ) {
185 throw $this->handleErrorAndMakeException( $conn, $e );
186 }
187 }
188
194 protected function doGetAbandonedCount() {
195 $conn = $this->getConnection();
196 try {
197 return $conn->zCard( $this->getQueueKey( 'z-abandoned' ) );
198 } catch ( RedisException $e ) {
199 throw $this->handleErrorAndMakeException( $conn, $e );
200 }
201 }
202
210 protected function doBatchPush( array $jobs, $flags ) {
211 // Convert the jobs into field maps (de-duplicated against each other)
212 $items = []; // (job ID => job fields map)
213 foreach ( $jobs as $job ) {
214 $item = $this->getNewJobFields( $job );
215 if ( $item['sha1'] !== '' ) { // hash identifier => de-duplicate
216 $items[$item['sha1']] = $item;
217 } else {
218 $items[$item['uuid']] = $item;
219 }
220 }
221
222 if ( $items === [] ) {
223 return; // nothing to do
224 }
225
226 $conn = $this->getConnection();
227 try {
228 // Actually push the non-duplicate jobs into the queue...
229 if ( $flags & self::QOS_ATOMIC ) {
230 $batches = [ $items ]; // all or nothing
231 } else {
232 $batches = array_chunk( $items, self::MAX_PUSH_SIZE );
233 }
234 $failed = 0;
235 $pushed = 0;
236 foreach ( $batches as $itemBatch ) {
237 $added = $this->pushBlobs( $conn, $itemBatch );
238 if ( is_int( $added ) ) {
239 $pushed += $added;
240 } else {
241 $failed += count( $itemBatch );
242 }
243 }
244 $this->incrStats( 'inserts', $this->type, count( $items ) );
245 $this->incrStats( 'inserts_actual', $this->type, $pushed );
246 $this->incrStats( 'dupe_inserts', $this->type,
247 count( $items ) - $failed - $pushed );
248 if ( $failed > 0 ) {
249 $err = "Could not insert {$failed} {$this->type} job(s).";
250 wfDebugLog( 'JobQueue', $err );
251 throw new RedisException( $err );
252 }
253 } catch ( RedisException $e ) {
254 throw $this->handleErrorAndMakeException( $conn, $e );
255 }
256 }
257
264 protected function pushBlobs( RedisConnRef $conn, array $items ) {
265 $args = [ $this->encodeQueueName() ];
266 // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
267 foreach ( $items as $item ) {
268 $args[] = (string)$item['uuid'];
269 $args[] = (string)$item['sha1'];
270 $args[] = (string)$item['rtimestamp'];
271 $args[] = (string)$this->serialize( $item );
272 }
273 static $script =
275<<<LUA
276 local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
277 -- First argument is the queue ID
278 local queueId = ARGV[1]
279 -- Next arguments all come in 4s (one per job)
280 local variadicArgCount = #ARGV - 1
281 if variadicArgCount % 4 ~= 0 then
282 return redis.error_reply('Unmatched arguments')
283 end
284 -- Insert each job into this queue as needed
285 local pushed = 0
286 for i = 2,#ARGV,4 do
287 local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
288 if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
289 if 1*rtimestamp > 0 then
290 -- Insert into delayed queue (release time as score)
291 redis.call('zAdd',kDelayed,rtimestamp,id)
292 else
293 -- Insert into unclaimed queue
294 redis.call('lPush',kUnclaimed,id)
295 end
296 if sha1 ~= '' then
297 redis.call('hSet',kSha1ById,id,sha1)
298 redis.call('hSet',kIdBySha1,sha1,id)
299 end
300 redis.call('hSet',kData,id,blob)
301 pushed = pushed + 1
302 end
303 end
304 -- Mark this queue as having jobs
305 redis.call('sAdd',kQwJobs,queueId)
306 return pushed
307LUA;
308 return $conn->luaEval( $script,
309 array_merge(
310 [
311 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
312 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
313 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
314 $this->getQueueKey( 'z-delayed' ), # KEYS[4]
315 $this->getQueueKey( 'h-data' ), # KEYS[5]
316 $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
317 ],
318 $args
319 ),
320 6 # number of first argument(s) that are keys
321 );
322 }
323
329 protected function doPop() {
330 $job = false;
331
332 $conn = $this->getConnection();
333 try {
334 do {
335 $blob = $this->popAndAcquireBlob( $conn );
336 if ( !is_string( $blob ) ) {
337 break; // no jobs; nothing to do
338 }
339
340 $this->incrStats( 'pops', $this->type );
341 $item = $this->unserialize( $blob );
342 if ( $item === false ) {
343 wfDebugLog( 'JobQueue', "Could not unserialize {$this->type} job." );
344 continue;
345 }
346
347 // If $item is invalid, the runner loop recycling will cleanup as needed
348 $job = $this->getJobFromFields( $item ); // may be false
349 } while ( !$job ); // job may be false if invalid
350 } catch ( RedisException $e ) {
351 throw $this->handleErrorAndMakeException( $conn, $e );
352 }
353
354 return $job;
355 }
356
362 protected function popAndAcquireBlob( RedisConnRef $conn ) {
363 static $script =
365<<<LUA
366 local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
367 local rTime = unpack(ARGV)
368 -- Pop an item off the queue
369 local id = redis.call('rPop',kUnclaimed)
370 if not id then
371 return false
372 end
373 -- Allow new duplicates of this job
374 local sha1 = redis.call('hGet',kSha1ById,id)
375 if sha1 then redis.call('hDel',kIdBySha1,sha1) end
376 redis.call('hDel',kSha1ById,id)
377 -- Mark the jobs as claimed and return it
378 redis.call('zAdd',kClaimed,rTime,id)
379 redis.call('hIncrBy',kAttempts,id,1)
380 return redis.call('hGet',kData,id)
381LUA;
382 return $conn->luaEval( $script,
383 [
384 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
385 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
386 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
387 $this->getQueueKey( 'z-claimed' ), # KEYS[4]
388 $this->getQueueKey( 'h-attempts' ), # KEYS[5]
389 $this->getQueueKey( 'h-data' ), # KEYS[6]
390 time(), # ARGV[1] (injected to be replication-safe)
391 ],
392 6 # number of first argument(s) that are keys
393 );
394 }
395
403 protected function doAck( RunnableJob $job ) {
404 $uuid = $job->getMetadata( 'uuid' );
405 if ( $uuid === null ) {
406 throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." );
407 }
408
409 $conn = $this->getConnection();
410 try {
411 static $script =
413<<<LUA
414 local kClaimed, kAttempts, kData = unpack(KEYS)
415 local id = unpack(ARGV)
416 -- Unmark the job as claimed
417 local removed = redis.call('zRem',kClaimed,id)
418 -- Check if the job was recycled
419 if removed == 0 then
420 return 0
421 end
422 -- Delete the retry data
423 redis.call('hDel',kAttempts,id)
424 -- Delete the job data itself
425 return redis.call('hDel',kData,id)
426LUA;
427 $res = $conn->luaEval( $script,
428 [
429 $this->getQueueKey( 'z-claimed' ), # KEYS[1]
430 $this->getQueueKey( 'h-attempts' ), # KEYS[2]
431 $this->getQueueKey( 'h-data' ), # KEYS[3]
432 $uuid # ARGV[1]
433 ],
434 3 # number of first argument(s) that are keys
435 );
436
437 if ( !$res ) {
438 wfDebugLog( 'JobQueue', "Could not acknowledge {$this->type} job $uuid." );
439
440 return false;
441 }
442
443 $this->incrStats( 'acks', $this->type );
444 } catch ( RedisException $e ) {
445 throw $this->handleErrorAndMakeException( $conn, $e );
446 }
447
448 return true;
449 }
450
458 if ( !$job->hasRootJobParams() ) {
459 throw new LogicException( "Cannot register root job; missing parameters." );
460 }
461 $params = $job->getRootJobParams();
462
463 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
464
465 $conn = $this->getConnection();
466 try {
467 $timestamp = $conn->get( $key ); // last known timestamp of such a root job
468 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
469 return true; // a newer version of this root job was enqueued
470 }
471
472 // Update the timestamp of the last root job started at the location...
473 return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
474 } catch ( RedisException $e ) {
475 throw $this->handleErrorAndMakeException( $conn, $e );
476 }
477 }
478
486 if ( !$job->hasRootJobParams() ) {
487 return false; // job has no de-duplication info
488 }
489 $params = $job->getRootJobParams();
490
491 $conn = $this->getConnection();
492 try {
493 // Get the last time this root job was enqueued
494 $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ) );
495 } catch ( RedisException $e ) {
496 throw $this->handleErrorAndMakeException( $conn, $e );
497 }
498
499 // Check if a new root job was started at the location after this one's...
500 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
501 }
502
508 protected function doDelete() {
509 static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned',
510 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ];
511
512 $conn = $this->getConnection();
513 try {
514 $keys = [];
515 foreach ( $props as $prop ) {
516 $keys[] = $this->getQueueKey( $prop );
517 }
518
519 $ok = ( $conn->del( $keys ) !== false );
520 $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
521
522 return $ok;
523 } catch ( RedisException $e ) {
524 throw $this->handleErrorAndMakeException( $conn, $e );
525 }
526 }
527
533 public function getAllQueuedJobs() {
534 $conn = $this->getConnection();
535 try {
536 $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
537 } catch ( RedisException $e ) {
538 throw $this->handleErrorAndMakeException( $conn, $e );
539 }
540
541 return $this->getJobIterator( $conn, $uids );
542 }
543
549 public function getAllDelayedJobs() {
550 $conn = $this->getConnection();
551 try {
552 $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
553 } catch ( RedisException $e ) {
554 throw $this->handleErrorAndMakeException( $conn, $e );
555 }
556
557 return $this->getJobIterator( $conn, $uids );
558 }
559
565 public function getAllAcquiredJobs() {
566 $conn = $this->getConnection();
567 try {
568 $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
569 } catch ( RedisException $e ) {
570 throw $this->handleErrorAndMakeException( $conn, $e );
571 }
572
573 return $this->getJobIterator( $conn, $uids );
574 }
575
581 public function getAllAbandonedJobs() {
582 $conn = $this->getConnection();
583 try {
584 $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
585 } catch ( RedisException $e ) {
586 throw $this->handleErrorAndMakeException( $conn, $e );
587 }
588
589 return $this->getJobIterator( $conn, $uids );
590 }
591
597 protected function getJobIterator( RedisConnRef $conn, array $uids ) {
598 return new MappedIterator(
599 $uids,
600 function ( $uid ) use ( $conn ) {
601 return $this->getJobFromUidInternal( $uid, $conn );
602 },
603 [ 'accept' => static function ( $job ) {
604 return is_object( $job );
605 } ]
606 );
607 }
608
609 public function getCoalesceLocationInternal() {
610 return "RedisServer:" . $this->server;
611 }
612
613 protected function doGetSiblingQueuesWithJobs( array $types ) {
614 return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
615 }
616
617 protected function doGetSiblingQueueSizes( array $types ) {
618 $sizes = []; // (type => size)
619 $types = array_values( $types ); // reindex
620 $conn = $this->getConnection();
621 try {
622 $conn->multi( Redis::PIPELINE );
623 foreach ( $types as $type ) {
624 $conn->lLen( $this->getQueueKey( 'l-unclaimed', $type ) );
625 }
626 $res = $conn->exec();
627 if ( is_array( $res ) ) {
628 foreach ( $res as $i => $size ) {
629 $sizes[$types[$i]] = $size;
630 }
631 }
632 } catch ( RedisException $e ) {
633 throw $this->handleErrorAndMakeException( $conn, $e );
634 }
635
636 return $sizes;
637 }
638
648 public function getJobFromUidInternal( $uid, $conn ) {
649 try {
650 $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
651 if ( $data === false ) {
652 return false; // not found
653 }
654 $item = $this->unserialize( $data );
655 if ( !is_array( $item ) ) { // this shouldn't happen
656 throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." );
657 }
658
659 $params = $item['params'];
660 $params += [ 'namespace' => $item['namespace'], 'title' => $item['title'] ];
661 $job = $this->factoryJob( $item['type'], $params );
662 $job->setMetadata( 'uuid', $item['uuid'] );
663 $job->setMetadata( 'timestamp', $item['timestamp'] );
664 // Add in attempt count for debugging at showJobs.php
665 $job->setMetadata( 'attempts',
666 $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ) );
667
668 return $job;
669 } catch ( RedisException $e ) {
670 throw $this->handleErrorAndMakeException( $conn, $e );
671 }
672 }
673
679 public function getServerQueuesWithJobs() {
680 $queues = [];
681
682 $conn = $this->getConnection();
683 try {
684 $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
685 foreach ( $set as $queue ) {
686 $queues[] = $this->decodeQueueName( $queue );
687 }
688 } catch ( RedisException $e ) {
689 throw $this->handleErrorAndMakeException( $conn, $e );
690 }
691
692 return $queues;
693 }
694
699 protected function getNewJobFields( IJobSpecification $job ) {
700 return [
701 // Fields that describe the nature of the job
702 'type' => $job->getType(),
703 'namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
704 'title' => $job->getParams()['title'] ?? '',
705 'params' => $job->getParams(),
706 // Some jobs cannot run until a "release timestamp"
707 'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
708 // Additional job metadata
709 'uuid' => $this->idGenerator->newRawUUIDv4(),
710 'sha1' => $job->ignoreDuplicates()
711 ? \Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
712 : '',
713 'timestamp' => time() // UNIX timestamp
714 ];
715 }
716
721 protected function getJobFromFields( array $fields ) {
722 $params = $fields['params'];
723 $params += [ 'namespace' => $fields['namespace'], 'title' => $fields['title'] ];
724
725 $job = $this->factoryJob( $fields['type'], $params );
726 $job->setMetadata( 'uuid', $fields['uuid'] );
727 $job->setMetadata( 'timestamp', $fields['timestamp'] );
728
729 return $job;
730 }
731
736 protected function serialize( array $fields ) {
737 $blob = serialize( $fields );
738 if ( $this->compression === 'gzip'
739 && strlen( $blob ) >= 1024
740 && function_exists( 'gzdeflate' )
741 ) {
742 $object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ];
743 $blobz = serialize( $object );
744
745 return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
746 } else {
747 return $blob;
748 }
749 }
750
755 protected function unserialize( $blob ) {
756 $fields = unserialize( $blob );
757 if ( is_object( $fields ) ) {
758 if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
759 $fields = unserialize( gzinflate( $fields->blob ) );
760 } else {
761 $fields = false;
762 }
763 }
764
765 return is_array( $fields ) ? $fields : false;
766 }
767
774 protected function getConnection() {
775 $conn = $this->redisPool->getConnection( $this->server, $this->logger );
776 if ( !$conn ) {
777 throw new JobQueueConnectionError(
778 "Unable to connect to redis server {$this->server}." );
779 }
780
781 return $conn;
782 }
783
789 protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) {
790 $this->redisPool->handleError( $conn, $e );
791 return new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
792 }
793
797 private function encodeQueueName() {
798 return json_encode( [ $this->type, $this->domain ] );
799 }
800
805 private function decodeQueueName( $name ) {
806 return json_decode( $name );
807 }
808
813 private function getGlobalKey( $name ) {
814 $parts = [ 'global', 'jobqueue', $name ];
815 foreach ( $parts as $part ) {
816 if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
817 throw new InvalidArgumentException( "Key part characters are out of range." );
818 }
819 }
820
821 return implode( ':', $parts );
822 }
823
829 private function getQueueKey( $prop, $type = null ) {
830 $type = is_string( $type ) ? $type : $this->type;
831
832 // Use wiki ID for b/c
833 $keyspace = WikiMap::getWikiIdFromDbDomain( $this->domain );
834
835 $parts = [ $keyspace, 'jobqueue', $type, $prop ];
836
837 // Parts are typically ASCII, but encode to escape ":"
838 return implode( ':', array_map( 'rawurlencode', $parts ) );
839 }
840}
841
843class_alias( JobQueueRedis::class, 'JobQueueRedis' );
const NS_SPECIAL
Definition Defines.php:54
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Convenience class for generating iterators from iterators.
Redis-backed job queue storage.
handleErrorAndMakeException(RedisConnRef $conn, $e)
pushBlobs(RedisConnRef $conn, array $items)
string $server
Server address.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
popAndAcquireBlob(RedisConnRef $conn)
getJobIterator(RedisConnRef $conn, array $uids)
doIsRootJobOldDuplicate(IJobSpecification $job)
string $compression
Compression method to use.
getConnection()
Get a connection to the server that handles all sub-queues for this queue.
supportedOrders()
Get the allowed queue orders for configuration validation.
doDeduplicateRootJob(IJobSpecification $job)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getNewJobFields(IJobSpecification $job)
getJobFromUidInternal( $uid, $conn)
This function should not be called outside JobQueueRedis.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Base class for queueing and running background jobs from a storage backend.
Definition JobQueue.php:50
incrStats( $event, $type, $delta=1)
Call StatsFactory::incrementBy() for the queue overall and for the queue type.
Definition JobQueue.php:784
string $type
Job type.
Definition JobQueue.php:54
factoryJob( $command, $params)
Definition JobQueue.php:750
getRootJobCacheKey( $signature, $type)
Definition JobQueue.php:568
Create PSR-3 logger objects.
Tools for dealing with other locally-hosted wikis.
Definition WikiMap.php:33
Wrapper class for Redis connections that automatically reuses connections (via RAII pattern)
luaEval( $script, array $params, $numKeys)
Manage one or more Redis client connection.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
if(count( $args)< 1) $job