MediaWiki  master
JobQueueRedis.php
Go to the documentation of this file.
1 <?php
25 
68 class JobQueueRedis extends JobQueue {
70  protected $redisPool;
72  protected $logger;
73 
75  protected $server;
77  protected $compression;
78 
79  const MAX_PUSH_SIZE = 25; // avoid tying up the server
80 
94  public function __construct( array $params ) {
95  parent::__construct( $params );
96  $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
97  $this->server = $params['redisServer'];
98  $this->compression = $params['compression'] ?? 'none';
99  $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
100  if ( empty( $params['daemonized'] ) ) {
101  throw new InvalidArgumentException(
102  "Non-daemonized mode is no longer supported. Please install the " .
103  "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
104  }
105  $this->logger = LoggerFactory::getInstance( 'redis' );
106  }
107 
108  protected function supportedOrders() {
109  return [ 'timestamp', 'fifo' ];
110  }
111 
112  protected function optimalOrder() {
113  return 'fifo';
114  }
115 
116  protected function supportsDelayedJobs() {
117  return true;
118  }
119 
125  protected function doIsEmpty() {
126  return $this->doGetSize() == 0;
127  }
128 
134  protected function doGetSize() {
135  $conn = $this->getConnection();
136  try {
137  return $conn->lLen( $this->getQueueKey( 'l-unclaimed' ) );
138  } catch ( RedisException $e ) {
139  throw $this->handleErrorAndMakeException( $conn, $e );
140  }
141  }
142 
148  protected function doGetAcquiredCount() {
149  $conn = $this->getConnection();
150  try {
151  $conn->multi( Redis::PIPELINE );
152  $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
153  $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
154 
155  return array_sum( $conn->exec() );
156  } catch ( RedisException $e ) {
157  throw $this->handleErrorAndMakeException( $conn, $e );
158  }
159  }
160 
166  protected function doGetDelayedCount() {
167  $conn = $this->getConnection();
168  try {
169  return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
170  } catch ( RedisException $e ) {
171  throw $this->handleErrorAndMakeException( $conn, $e );
172  }
173  }
174 
180  protected function doGetAbandonedCount() {
181  $conn = $this->getConnection();
182  try {
183  return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
184  } catch ( RedisException $e ) {
185  throw $this->handleErrorAndMakeException( $conn, $e );
186  }
187  }
188 
196  protected function doBatchPush( array $jobs, $flags ) {
197  // Convert the jobs into field maps (de-duplicated against each other)
198  $items = []; // (job ID => job fields map)
199  foreach ( $jobs as $job ) {
200  $item = $this->getNewJobFields( $job );
201  if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
202  $items[$item['sha1']] = $item;
203  } else {
204  $items[$item['uuid']] = $item;
205  }
206  }
207 
208  if ( $items === [] ) {
209  return; // nothing to do
210  }
211 
212  $conn = $this->getConnection();
213  try {
214  // Actually push the non-duplicate jobs into the queue...
215  if ( $flags & self::QOS_ATOMIC ) {
216  $batches = [ $items ]; // all or nothing
217  } else {
218  $batches = array_chunk( $items, self::MAX_PUSH_SIZE );
219  }
220  $failed = 0;
221  $pushed = 0;
222  foreach ( $batches as $itemBatch ) {
223  $added = $this->pushBlobs( $conn, $itemBatch );
224  if ( is_int( $added ) ) {
225  $pushed += $added;
226  } else {
227  $failed += count( $itemBatch );
228  }
229  }
230  $this->incrStats( 'inserts', $this->type, count( $items ) );
231  $this->incrStats( 'inserts_actual', $this->type, $pushed );
232  $this->incrStats( 'dupe_inserts', $this->type,
233  count( $items ) - $failed - $pushed );
234  if ( $failed > 0 ) {
235  $err = "Could not insert {$failed} {$this->type} job(s).";
236  wfDebugLog( 'JobQueueRedis', $err );
237  throw new RedisException( $err );
238  }
239  } catch ( RedisException $e ) {
240  throw $this->handleErrorAndMakeException( $conn, $e );
241  }
242  }
243 
250  protected function pushBlobs( RedisConnRef $conn, array $items ) {
251  $args = [ $this->encodeQueueName() ];
252  // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
253  foreach ( $items as $item ) {
254  $args[] = (string)$item['uuid'];
255  $args[] = (string)$item['sha1'];
256  $args[] = (string)$item['rtimestamp'];
257  $args[] = (string)$this->serialize( $item );
258  }
259  static $script =
261 <<<LUA
262  local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
263  -- First argument is the queue ID
264  local queueId = ARGV[1]
265  -- Next arguments all come in 4s (one per job)
266  local variadicArgCount = #ARGV - 1
267  if variadicArgCount % 4 ~= 0 then
268  return redis.error_reply('Unmatched arguments')
269  end
270  -- Insert each job into this queue as needed
271  local pushed = 0
272  for i = 2,#ARGV,4 do
273  local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
274  if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
275  if 1*rtimestamp > 0 then
276  -- Insert into delayed queue (release time as score)
277  redis.call('zAdd',kDelayed,rtimestamp,id)
278  else
279  -- Insert into unclaimed queue
280  redis.call('lPush',kUnclaimed,id)
281  end
282  if sha1 ~= '' then
283  redis.call('hSet',kSha1ById,id,sha1)
284  redis.call('hSet',kIdBySha1,sha1,id)
285  end
286  redis.call('hSet',kData,id,blob)
287  pushed = pushed + 1
288  end
289  end
290  -- Mark this queue as having jobs
291  redis.call('sAdd',kQwJobs,queueId)
292  return pushed
293 LUA;
294  return $conn->luaEval( $script,
295  array_merge(
296  [
297  $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
298  $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
299  $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
300  $this->getQueueKey( 'z-delayed' ), # KEYS[4]
301  $this->getQueueKey( 'h-data' ), # KEYS[5]
302  $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
303  ],
304  $args
305  ),
306  6 # number of first argument(s) that are keys
307  );
308  }
309 
315  protected function doPop() {
316  $job = false;
317 
318  $conn = $this->getConnection();
319  try {
320  do {
321  $blob = $this->popAndAcquireBlob( $conn );
322  if ( !is_string( $blob ) ) {
323  break; // no jobs; nothing to do
324  }
325 
326  $this->incrStats( 'pops', $this->type );
327  $item = $this->unserialize( $blob );
328  if ( $item === false ) {
329  wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
330  continue;
331  }
332 
333  // If $item is invalid, the runner loop recyling will cleanup as needed
334  $job = $this->getJobFromFields( $item ); // may be false
335  } while ( !$job ); // job may be false if invalid
336  } catch ( RedisException $e ) {
337  throw $this->handleErrorAndMakeException( $conn, $e );
338  }
339 
340  return $job;
341  }
342 
348  protected function popAndAcquireBlob( RedisConnRef $conn ) {
349  static $script =
351 <<<LUA
352  local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
353  local rTime = unpack(ARGV)
354  -- Pop an item off the queue
355  local id = redis.call('rPop',kUnclaimed)
356  if not id then
357  return false
358  end
359  -- Allow new duplicates of this job
360  local sha1 = redis.call('hGet',kSha1ById,id)
361  if sha1 then redis.call('hDel',kIdBySha1,sha1) end
362  redis.call('hDel',kSha1ById,id)
363  -- Mark the jobs as claimed and return it
364  redis.call('zAdd',kClaimed,rTime,id)
365  redis.call('hIncrBy',kAttempts,id,1)
366  return redis.call('hGet',kData,id)
367 LUA;
368  return $conn->luaEval( $script,
369  [
370  $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
371  $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
372  $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
373  $this->getQueueKey( 'z-claimed' ), # KEYS[4]
374  $this->getQueueKey( 'h-attempts' ), # KEYS[5]
375  $this->getQueueKey( 'h-data' ), # KEYS[6]
376  time(), # ARGV[1] (injected to be replication-safe)
377  ],
378  6 # number of first argument(s) that are keys
379  );
380  }
381 
389  protected function doAck( RunnableJob $job ) {
390  $uuid = $job->getMetadata( 'uuid' );
391  if ( $uuid === null ) {
392  throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." );
393  }
394 
395  $conn = $this->getConnection();
396  try {
397  static $script =
399 <<<LUA
400  local kClaimed, kAttempts, kData = unpack(KEYS)
401  local id = unpack(ARGV)
402  -- Unmark the job as claimed
403  local removed = redis.call('zRem',kClaimed,id)
404  -- Check if the job was recycled
405  if removed == 0 then
406  return 0
407  end
408  -- Delete the retry data
409  redis.call('hDel',kAttempts,id)
410  -- Delete the job data itself
411  return redis.call('hDel',kData,id)
412 LUA;
413  $res = $conn->luaEval( $script,
414  [
415  $this->getQueueKey( 'z-claimed' ), # KEYS[1]
416  $this->getQueueKey( 'h-attempts' ), # KEYS[2]
417  $this->getQueueKey( 'h-data' ), # KEYS[3]
418  $uuid # ARGV[1]
419  ],
420  3 # number of first argument(s) that are keys
421  );
422 
423  if ( !$res ) {
424  wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." );
425 
426  return false;
427  }
428 
429  $this->incrStats( 'acks', $this->type );
430  } catch ( RedisException $e ) {
431  throw $this->handleErrorAndMakeException( $conn, $e );
432  }
433 
434  return true;
435  }
436 
445  if ( !$job->hasRootJobParams() ) {
446  throw new LogicException( "Cannot register root job; missing parameters." );
447  }
448  $params = $job->getRootJobParams();
449 
450  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
451 
452  $conn = $this->getConnection();
453  try {
454  $timestamp = $conn->get( $key ); // last known timestamp of such a root job
455  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
456  return true; // a newer version of this root job was enqueued
457  }
458 
459  // Update the timestamp of the last root job started at the location...
460  return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
461  } catch ( RedisException $e ) {
462  throw $this->handleErrorAndMakeException( $conn, $e );
463  }
464  }
465 
473  if ( !$job->hasRootJobParams() ) {
474  return false; // job has no de-deplication info
475  }
476  $params = $job->getRootJobParams();
477 
478  $conn = $this->getConnection();
479  try {
480  // Get the last time this root job was enqueued
481  $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
482  } catch ( RedisException $e ) {
483  throw $this->handleErrorAndMakeException( $conn, $e );
484  }
485 
486  // Check if a new root job was started at the location after this one's...
487  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
488  }
489 
495  protected function doDelete() {
496  static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned',
497  'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ];
498 
499  $conn = $this->getConnection();
500  try {
501  $keys = [];
502  foreach ( $props as $prop ) {
503  $keys[] = $this->getQueueKey( $prop );
504  }
505 
506  $ok = ( $conn->del( $keys ) !== false );
507  $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
508 
509  return $ok;
510  } catch ( RedisException $e ) {
511  throw $this->handleErrorAndMakeException( $conn, $e );
512  }
513  }
514 
520  public function getAllQueuedJobs() {
521  $conn = $this->getConnection();
522  try {
523  $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
524  } catch ( RedisException $e ) {
525  throw $this->handleErrorAndMakeException( $conn, $e );
526  }
527 
528  return $this->getJobIterator( $conn, $uids );
529  }
530 
536  public function getAllDelayedJobs() {
537  $conn = $this->getConnection();
538  try {
539  $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
540  } catch ( RedisException $e ) {
541  throw $this->handleErrorAndMakeException( $conn, $e );
542  }
543 
544  return $this->getJobIterator( $conn, $uids );
545  }
546 
552  public function getAllAcquiredJobs() {
553  $conn = $this->getConnection();
554  try {
555  $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
556  } catch ( RedisException $e ) {
557  throw $this->handleErrorAndMakeException( $conn, $e );
558  }
559 
560  return $this->getJobIterator( $conn, $uids );
561  }
562 
568  public function getAllAbandonedJobs() {
569  $conn = $this->getConnection();
570  try {
571  $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
572  } catch ( RedisException $e ) {
573  throw $this->handleErrorAndMakeException( $conn, $e );
574  }
575 
576  return $this->getJobIterator( $conn, $uids );
577  }
578 
584  protected function getJobIterator( RedisConnRef $conn, array $uids ) {
585  return new MappedIterator(
586  $uids,
587  function ( $uid ) use ( $conn ) {
588  return $this->getJobFromUidInternal( $uid, $conn );
589  },
590  [ 'accept' => function ( $job ) {
591  return is_object( $job );
592  } ]
593  );
594  }
595 
596  public function getCoalesceLocationInternal() {
597  return "RedisServer:" . $this->server;
598  }
599 
600  protected function doGetSiblingQueuesWithJobs( array $types ) {
601  return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
602  }
603 
604  protected function doGetSiblingQueueSizes( array $types ) {
605  $sizes = []; // (type => size)
606  $types = array_values( $types ); // reindex
607  $conn = $this->getConnection();
608  try {
609  $conn->multi( Redis::PIPELINE );
610  foreach ( $types as $type ) {
611  $conn->lLen( $this->getQueueKey( 'l-unclaimed', $type ) );
612  }
613  $res = $conn->exec();
614  if ( is_array( $res ) ) {
615  foreach ( $res as $i => $size ) {
616  $sizes[$types[$i]] = $size;
617  }
618  }
619  } catch ( RedisException $e ) {
620  throw $this->handleErrorAndMakeException( $conn, $e );
621  }
622 
623  return $sizes;
624  }
625 
635  public function getJobFromUidInternal( $uid, $conn ) {
636  try {
637  $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
638  if ( $data === false ) {
639  return false; // not found
640  }
641  $item = $this->unserialize( $data );
642  if ( !is_array( $item ) ) { // this shouldn't happen
643  throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." );
644  }
645 
646  $params = $item['params'];
647  $params += [ 'namespace' => $item['namespace'], 'title' => $item['title'] ];
648  $job = $this->factoryJob( $item['type'], $params );
649  $job->setMetadata( 'uuid', $item['uuid'] );
650  $job->setMetadata( 'timestamp', $item['timestamp'] );
651  // Add in attempt count for debugging at showJobs.php
652  $job->setMetadata( 'attempts',
653  $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ) );
654 
655  return $job;
656  } catch ( RedisException $e ) {
657  throw $this->handleErrorAndMakeException( $conn, $e );
658  }
659  }
660 
666  public function getServerQueuesWithJobs() {
667  $queues = [];
668 
669  $conn = $this->getConnection();
670  try {
671  $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
672  foreach ( $set as $queue ) {
673  $queues[] = $this->decodeQueueName( $queue );
674  }
675  } catch ( RedisException $e ) {
676  throw $this->handleErrorAndMakeException( $conn, $e );
677  }
678 
679  return $queues;
680  }
681 
686  protected function getNewJobFields( IJobSpecification $job ) {
687  return [
688  // Fields that describe the nature of the job
689  'type' => $job->getType(),
690  'namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
691  'title' => $job->getParams()['title'] ?? '',
692  'params' => $job->getParams(),
693  // Some jobs cannot run until a "release timestamp"
694  'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
695  // Additional job metadata
697  'sha1' => $job->ignoreDuplicates()
698  ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
699  : '',
700  'timestamp' => time() // UNIX timestamp
701  ];
702  }
703 
708  protected function getJobFromFields( array $fields ) {
709  $params = $fields['params'];
710  $params += [ 'namespace' => $fields['namespace'], 'title' => $fields['title'] ];
711 
712  $job = $this->factoryJob( $fields['type'], $params );
713  $job->setMetadata( 'uuid', $fields['uuid'] );
714  $job->setMetadata( 'timestamp', $fields['timestamp'] );
715 
716  return $job;
717  }
718 
723  protected function serialize( array $fields ) {
724  $blob = serialize( $fields );
725  if ( $this->compression === 'gzip'
726  && strlen( $blob ) >= 1024
727  && function_exists( 'gzdeflate' )
728  ) {
729  $object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ];
730  $blobz = serialize( $object );
731 
732  return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
733  } else {
734  return $blob;
735  }
736  }
737 
742  protected function unserialize( $blob ) {
743  $fields = unserialize( $blob );
744  if ( is_object( $fields ) ) {
745  if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
746  $fields = unserialize( gzinflate( $fields->blob ) );
747  } else {
748  $fields = false;
749  }
750  }
751 
752  return is_array( $fields ) ? $fields : false;
753  }
754 
761  protected function getConnection() {
762  $conn = $this->redisPool->getConnection( $this->server, $this->logger );
763  if ( !$conn ) {
764  throw new JobQueueConnectionError(
765  "Unable to connect to redis server {$this->server}." );
766  }
767 
768  return $conn;
769  }
770 
776  protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) {
777  $this->redisPool->handleError( $conn, $e );
778  return new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
779  }
780 
784  private function encodeQueueName() {
785  return json_encode( [ $this->type, $this->domain ] );
786  }
787 
792  private function decodeQueueName( $name ) {
793  return json_decode( $name );
794  }
795 
800  private function getGlobalKey( $name ) {
801  $parts = [ 'global', 'jobqueue', $name ];
802  foreach ( $parts as $part ) {
803  if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
804  throw new InvalidArgumentException( "Key part characters are out of range." );
805  }
806  }
807 
808  return implode( ':', $parts );
809  }
810 
816  private function getQueueKey( $prop, $type = null ) {
817  $type = is_string( $type ) ? $type : $this->type;
818 
819  // Use wiki ID for b/c
820  $keyspace = WikiMap::getWikiIdFromDbDomain( $this->domain );
821 
822  $parts = [ $keyspace, 'jobqueue', $type, $prop ];
823 
824  // Parts are typically ASCII, but encode for sanity to escape ":"
825  return implode( ':', array_map( 'rawurlencode', $parts ) );
826  }
827 }
Class to handle job queues stored in Redis.
pushBlobs(RedisConnRef $conn, array $items)
factoryJob( $command, $params)
Definition: JobQueue.php:706
getQueueKey( $prop, $type=null)
getJobIterator(RedisConnRef $conn, array $uids)
string $compression
Compression method to use.
RedisConnectionPool $redisPool
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:728
const NS_SPECIAL
Definition: Defines.php:49
decodeQueueName( $name)
luaEval( $script, array $params, $numKeys)
doGetSiblingQueueSizes(array $types)
serialize(array $fields)
static singleton(array $options)
string $type
Job type.
Definition: JobQueue.php:37
getNewJobFields(IJobSpecification $job)
if( $line===false) $args
Definition: mcc.php:124
getGlobalKey( $name)
getMetadata( $field=null)
getRootJobCacheKey( $signature)
Definition: JobQueue.php:533
unserialize( $blob)
getJobFromUidInternal( $uid, $conn)
This function should not be called outside JobQueueRedis.
doDeduplicateRootJob(IJobSpecification $job)
getDeduplicationInfo()
Subclasses may need to override this to make duplication detection work.
string $server
Server address.
popAndAcquireBlob(RedisConnRef $conn)
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
static newRawUUIDv4( $flags=0)
Return an RFC4122 compliant v4 UUID.
Convenience class for generating iterators from iterators.
__construct(array $params)
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:33
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not...
if(count( $args)< 1) $job
doGetSiblingQueuesWithJobs(array $types)
getJobFromFields(array $fields)
Helper class to handle automatically marking connectons as reusable (via RAII pattern) ...
Interface for serializable objects that describe a job queue task.
getConnection()
Get a connection to the server that handles all sub-queues for this queue.
doAck(RunnableJob $job)
doIsRootJobOldDuplicate(IJobSpecification $job)
const QUICK_RAND
doBatchPush(array $jobs, $flags)
handleErrorAndMakeException(RedisConnRef $conn, $e)
LoggerInterface $logger