MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25 
35 abstract class JobQueue {
37  protected $domain;
39  protected $type;
41  protected $order;
43  protected $claimTTL;
45  protected $maxTries;
47  protected $readOnlyReason;
49  protected $stats;
51  protected $idGenerator;
52 
54  protected $wanCache;
55 
57  protected $typeAgnostic;
58 
59  protected const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
60 
61  protected const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
62 
80  protected function __construct( array $params ) {
81  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
82  $this->type = $params['type'];
83  $this->claimTTL = $params['claimTTL'] ?? 0;
84  $this->maxTries = $params['maxTries'] ?? 3;
85  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
86  $this->order = $params['order'];
87  } else {
88  $this->order = $this->optimalOrder();
89  }
90  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
91  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
92  }
93  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
94  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
95  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
96  $this->idGenerator = $params['idGenerator'];
97  if ( ( $params['typeAgnostic'] ?? false ) && !$this->supportsTypeAgnostic() ) {
98  throw new JobQueueError( __CLASS__ . " does not support type agnostic queues." );
99  }
100  $this->typeAgnostic = ( $params['typeAgnostic'] ?? false );
101  if ( $this->typeAgnostic ) {
102  $this->type = 'default';
103  }
104  }
105 
136  final public static function factory( array $params ) {
137  $class = $params['class'];
138  if ( !class_exists( $class ) ) {
139  throw new JobQueueError( "Invalid job queue class '$class'." );
140  }
141 
142  $obj = new $class( $params );
143  if ( !( $obj instanceof self ) ) {
144  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
145  }
146 
147  return $obj;
148  }
149 
153  final public function getDomain() {
154  return $this->domain;
155  }
156 
161  final public function getWiki() {
162  wfDeprecated( __METHOD__, '1.33' );
163  return WikiMap::getWikiIdFromDbDomain( $this->domain );
164  }
165 
169  final public function getType() {
170  return $this->type;
171  }
172 
176  final public function getOrder() {
177  return $this->order;
178  }
179 
185  abstract protected function supportedOrders();
186 
192  abstract protected function optimalOrder();
193 
200  protected function supportsDelayedJobs() {
201  return false; // not implemented
202  }
203 
208  final public function delayedJobsEnabled() {
209  return $this->supportsDelayedJobs();
210  }
211 
216  public function getReadOnlyReason() {
217  return $this->readOnlyReason;
218  }
219 
232  final public function isEmpty() {
233  $res = $this->doIsEmpty();
234 
235  return $res;
236  }
237 
242  abstract protected function doIsEmpty();
243 
253  final public function getSize() {
254  $res = $this->doGetSize();
255 
256  return $res;
257  }
258 
263  abstract protected function doGetSize();
264 
274  final public function getAcquiredCount() {
275  $res = $this->doGetAcquiredCount();
276 
277  return $res;
278  }
279 
284  abstract protected function doGetAcquiredCount();
285 
296  final public function getDelayedCount() {
297  $res = $this->doGetDelayedCount();
298 
299  return $res;
300  }
301 
307  protected function doGetDelayedCount() {
308  return 0; // not implemented
309  }
310 
320  final public function getAbandonedCount() {
321  $res = $this->doGetAbandonedCount();
322 
323  return $res;
324  }
325 
331  protected function doGetAbandonedCount() {
332  return 0; // not implemented
333  }
334 
345  final public function push( $jobs, $flags = 0 ) {
346  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
347  $this->batchPush( $jobs, $flags );
348  }
349 
360  final public function batchPush( array $jobs, $flags = 0 ) {
361  $this->assertNotReadOnly();
362 
363  if ( $jobs === [] ) {
364  return; // nothing to do
365  }
366 
367  foreach ( $jobs as $job ) {
368  $this->assertMatchingJobType( $job );
369  if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
370  throw new JobQueueError(
371  "Got delayed '{$job->getType()}' job; delays are not supported." );
372  }
373  }
374 
375  $this->doBatchPush( $jobs, $flags );
376 
377  foreach ( $jobs as $job ) {
378  if ( $job->isRootJob() ) {
379  $this->deduplicateRootJob( $job );
380  }
381  }
382  }
383 
389  abstract protected function doBatchPush( array $jobs, $flags );
390 
399  final public function pop() {
400  $this->assertNotReadOnly();
401 
402  $job = $this->doPop();
403 
404  // Flag this job as an old duplicate based on its "root" job...
405  try {
406  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
407  $this->incrStats( 'dupe_pops', $job->getType() );
408  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
409  }
410  } catch ( Exception $e ) {
411  // don't lose jobs over this
412  }
413 
414  return $job;
415  }
416 
421  abstract protected function doPop();
422 
433  final public function ack( RunnableJob $job ) {
434  $this->assertNotReadOnly();
435  $this->assertMatchingJobType( $job );
436 
437  $this->doAck( $job );
438  }
439 
444  abstract protected function doAck( RunnableJob $job );
445 
477  final public function deduplicateRootJob( IJobSpecification $job ) {
478  $this->assertNotReadOnly();
479  $this->assertMatchingJobType( $job );
480 
481  return $this->doDeduplicateRootJob( $job );
482  }
483 
492  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
493  if ( !$params ) {
494  throw new JobQueueError( "Cannot register root job; missing parameters." );
495  }
496 
497  $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
498  // Callers should call JobQueueGroup::push() before this method so that if the
499  // insert fails, the de-duplication registration will be aborted. Having only the
500  // de-duplication registration succeed would cause jobs to become no-ops without
501  // any actual jobs that made them redundant.
502  $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
503  if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
504  return true; // a newer version of this root job was enqueued
505  }
506 
507  // Update the timestamp of the last root job started at the location...
508  return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
509  }
510 
518  final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
519  $this->assertMatchingJobType( $job );
520 
521  return $this->doIsRootJobOldDuplicate( $job );
522  }
523 
531  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
532  if ( !$params ) {
533  return false; // job has no de-duplication info
534  }
535 
536  $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
537  // Get the last time this root job was enqueued
538  $timestamp = $this->wanCache->get( $key );
539  if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
540  // Update the timestamp of the last known root job started at the location...
541  $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
542  }
543 
544  // Check if a new root job was started at the location after this one's...
545  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
546  }
547 
553  protected function getRootJobCacheKey( $signature, $type ) {
554  return $this->wanCache->makeGlobalKey(
555  'jobqueue',
556  $this->domain,
557  $type,
558  'rootjob',
559  $signature
560  );
561  }
562 
570  final public function delete() {
571  $this->assertNotReadOnly();
572 
573  $this->doDelete();
574  }
575 
581  protected function doDelete() {
582  throw new JobQueueError( "This method is not implemented." );
583  }
584 
593  final public function waitForBackups() {
594  $this->doWaitForBackups();
595  }
596 
602  protected function doWaitForBackups() {
603  }
604 
610  final public function flushCaches() {
611  $this->doFlushCaches();
612  }
613 
619  protected function doFlushCaches() {
620  }
621 
630  abstract public function getAllQueuedJobs();
631 
641  public function getAllDelayedJobs() {
642  return new ArrayIterator( [] ); // not implemented
643  }
644 
656  public function getAllAcquiredJobs() {
657  return new ArrayIterator( [] ); // not implemented
658  }
659 
668  public function getAllAbandonedJobs() {
669  return new ArrayIterator( [] ); // not implemented
670  }
671 
679  public function getCoalesceLocationInternal() {
680  return null;
681  }
682 
692  final public function getSiblingQueuesWithJobs( array $types ) {
693  return $this->doGetSiblingQueuesWithJobs( $types );
694  }
695 
702  protected function doGetSiblingQueuesWithJobs( array $types ) {
703  return null; // not supported
704  }
705 
716  final public function getSiblingQueueSizes( array $types ) {
717  return $this->doGetSiblingQueueSizes( $types );
718  }
719 
726  protected function doGetSiblingQueueSizes( array $types ) {
727  return null; // not supported
728  }
729 
735  protected function factoryJob( $command, $params ) {
736  // @TODO: dependency inject this as a callback
737  return Job::factory( $command, $params );
738  }
739 
743  protected function assertNotReadOnly() {
744  if ( $this->readOnlyReason !== false ) {
745  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
746  }
747  }
748 
754  if ( $this->typeAgnostic ) {
755  return;
756  }
757  if ( $job->getType() !== $this->type ) {
758  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
759  }
760  }
761 
770  protected function incrStats( $key, $type, $delta = 1 ) {
771  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
772  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
773  }
774 
781  protected function supportsTypeAgnostic(): bool {
782  return false;
783  }
784 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:232
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\doAck
doAck(RunnableJob $job)
JobQueue\$idGenerator
GlobalIdGenerator $idGenerator
Definition: JobQueue.php:51
JobQueue\assertMatchingJobType
assertMatchingJobType(IJobSpecification $job)
Definition: JobQueue.php:753
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:641
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:216
Wikimedia\UUID\GlobalIdGenerator
Class for getting statistically unique IDs without a central coordinator.
Definition: GlobalIdGenerator.php:34
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:43
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:360
JobQueue\incrStats
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition: JobQueue.php:770
NullStatsdDataFactory
Definition: NullStatsdDataFactory.php:10
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:61
RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:702
JobQueue\doGetSize
doGetSize()
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:47
$res
$res
Definition: testCompression.php:57
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:477
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:345
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:320
DuplicateJob\newFromJob
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:45
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:743
WANObjectCache\newEmpty
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
Definition: WANObjectCache.php:388
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:274
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:581
wfDeprecated
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
Definition: GlobalFunctions.php:997
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:726
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:208
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:39
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:619
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:307
JobQueue\ack
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:433
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:296
JobQueue\doPop
doPop()
JobQueueError
Definition: JobQueueError.php:29
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:253
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\$stats
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:49
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:176
Job\factory
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:71
JobQueue\$wanCache
WANObjectCache $wanCache
Definition: JobQueue.php:54
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:593
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:131
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:331
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:602
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:136
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:200
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:41
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:656
JobQueue\doIsEmpty
doIsEmpty()
$command
$command
Definition: mcc.php:125
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:49
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:610
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:45
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:399
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:35
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:59
JobQueue\factoryJob
factoryJob( $command, $params)
Definition: JobQueue.php:735
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature, $type)
Definition: JobQueue.php:553
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:679
JobQueueReadOnlyError
Definition: JobQueueReadOnlyError.php:29
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition: JobQueue.php:530
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:668
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:161
JobQueue\$domain
string $domain
DB domain ID.
Definition: JobQueue.php:37
JobQueue\$typeAgnostic
bool $typeAgnostic
Definition: JobQueue.php:57
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:716
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:692
JobQueue\supportsTypeAgnostic
supportsTypeAgnostic()
Subclasses should set this to true if they support type agnostic queues.
Definition: JobQueue.php:781
IJobSpecification
Interface for serializable objects that describe a job queue task.
Definition: IJobSpecification.php:42
JobQueue\getType
getType()
Definition: JobQueue.php:169
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:491
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:80
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:518
JobQueue\getDomain
getDomain()
Definition: JobQueue.php:153