MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
24 
33 abstract class JobQueue {
35  protected $domain;
37  protected $type;
39  protected $order;
41  protected $claimTTL;
43  protected $maxTries;
45  protected $readOnlyReason;
47  protected $stats;
48 
50  protected $wanCache;
51 
52  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
53 
54  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
55 
68  protected function __construct( array $params ) {
69  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
70  $this->type = $params['type'];
71  $this->claimTTL = $params['claimTTL'] ?? 0;
72  $this->maxTries = $params['maxTries'] ?? 3;
73  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
74  $this->order = $params['order'];
75  } else {
76  $this->order = $this->optimalOrder();
77  }
78  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
79  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
80  }
81  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
82  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
83  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
84  }
85 
116  final public static function factory( array $params ) {
117  $class = $params['class'];
118  if ( !class_exists( $class ) ) {
119  throw new JobQueueError( "Invalid job queue class '$class'." );
120  }
121  $obj = new $class( $params );
122  if ( !( $obj instanceof self ) ) {
123  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
124  }
125 
126  return $obj;
127  }
128 
132  final public function getDomain() {
133  return $this->domain;
134  }
135 
140  final public function getWiki() {
141  return WikiMap::getWikiIdFromDbDomain( $this->domain );
142  }
143 
147  final public function getType() {
148  return $this->type;
149  }
150 
154  final public function getOrder() {
155  return $this->order;
156  }
157 
163  abstract protected function supportedOrders();
164 
170  abstract protected function optimalOrder();
171 
177  protected function supportsDelayedJobs() {
178  return false; // not implemented
179  }
180 
185  final public function delayedJobsEnabled() {
186  return $this->supportsDelayedJobs();
187  }
188 
193  public function getReadOnlyReason() {
194  return $this->readOnlyReason;
195  }
196 
209  final public function isEmpty() {
210  $res = $this->doIsEmpty();
211 
212  return $res;
213  }
214 
219  abstract protected function doIsEmpty();
220 
230  final public function getSize() {
231  $res = $this->doGetSize();
232 
233  return $res;
234  }
235 
240  abstract protected function doGetSize();
241 
251  final public function getAcquiredCount() {
252  $res = $this->doGetAcquiredCount();
253 
254  return $res;
255  }
256 
261  abstract protected function doGetAcquiredCount();
262 
273  final public function getDelayedCount() {
274  $res = $this->doGetDelayedCount();
275 
276  return $res;
277  }
278 
283  protected function doGetDelayedCount() {
284  return 0; // not implemented
285  }
286 
296  final public function getAbandonedCount() {
297  $res = $this->doGetAbandonedCount();
298 
299  return $res;
300  }
301 
306  protected function doGetAbandonedCount() {
307  return 0; // not implemented
308  }
309 
320  final public function push( $jobs, $flags = 0 ) {
321  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
322  $this->batchPush( $jobs, $flags );
323  }
324 
335  final public function batchPush( array $jobs, $flags = 0 ) {
336  $this->assertNotReadOnly();
337 
338  if ( $jobs === [] ) {
339  return; // nothing to do
340  }
341 
342  foreach ( $jobs as $job ) {
343  if ( $job->getType() !== $this->type ) {
344  throw new JobQueueError(
345  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
346  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
347  throw new JobQueueError(
348  "Got delayed '{$job->getType()}' job; delays are not supported." );
349  }
350  }
351 
352  $this->doBatchPush( $jobs, $flags );
353 
354  foreach ( $jobs as $job ) {
355  if ( $job->isRootJob() ) {
356  $this->deduplicateRootJob( $job );
357  }
358  }
359  }
360 
366  abstract protected function doBatchPush( array $jobs, $flags );
367 
376  final public function pop() {
377  $this->assertNotReadOnly();
378 
379  $job = $this->doPop();
380 
381  // Flag this job as an old duplicate based on its "root" job...
382  try {
383  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
384  $this->incrStats( 'dupe_pops', $this->type );
385  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
386  }
387  } catch ( Exception $e ) {
388  // don't lose jobs over this
389  }
390 
391  return $job;
392  }
393 
398  abstract protected function doPop();
399 
410  final public function ack( RunnableJob $job ) {
411  $this->assertNotReadOnly();
412  if ( $job->getType() !== $this->type ) {
413  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
414  }
415 
416  $this->doAck( $job );
417  }
418 
423  abstract protected function doAck( RunnableJob $job );
424 
456  final public function deduplicateRootJob( IJobSpecification $job ) {
457  $this->assertNotReadOnly();
458  if ( $job->getType() !== $this->type ) {
459  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
460  }
461 
462  return $this->doDeduplicateRootJob( $job );
463  }
464 
472  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
473  if ( !$params ) {
474  throw new JobQueueError( "Cannot register root job; missing parameters." );
475  }
476 
477  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
478  // Callers should call JobQueueGroup::push() before this method so that if the
479  // insert fails, the de-duplication registration will be aborted. Having only the
480  // de-duplication registration succeed would cause jobs to become no-ops without
481  // any actual jobs that made them redundant.
482  $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
483  if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
484  return true; // a newer version of this root job was enqueued
485  }
486 
487  // Update the timestamp of the last root job started at the location...
488  return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
489  }
490 
498  final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
499  if ( $job->getType() !== $this->type ) {
500  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
501  }
502 
503  return $this->doIsRootJobOldDuplicate( $job );
504  }
505 
512  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
513  if ( !$params ) {
514  return false; // job has no de-deplication info
515  }
516 
517  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
518  // Get the last time this root job was enqueued
519  $timestamp = $this->wanCache->get( $key );
520  if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
521  // Update the timestamp of the last known root job started at the location...
522  $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
523  }
524 
525  // Check if a new root job was started at the location after this one's...
526  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
527  }
528 
533  protected function getRootJobCacheKey( $signature ) {
534  return $this->wanCache->makeGlobalKey(
535  'jobqueue',
536  $this->domain,
537  $this->type,
538  'rootjob',
539  $signature
540  );
541  }
542 
550  final public function delete() {
551  $this->assertNotReadOnly();
552 
553  $this->doDelete();
554  }
555 
560  protected function doDelete() {
561  throw new JobQueueError( "This method is not implemented." );
562  }
563 
572  final public function waitForBackups() {
573  $this->doWaitForBackups();
574  }
575 
580  protected function doWaitForBackups() {
581  }
582 
588  final public function flushCaches() {
589  $this->doFlushCaches();
590  }
591 
596  protected function doFlushCaches() {
597  }
598 
607  abstract public function getAllQueuedJobs();
608 
617  public function getAllDelayedJobs() {
618  return new ArrayIterator( [] ); // not implemented
619  }
620 
631  public function getAllAcquiredJobs() {
632  return new ArrayIterator( [] ); // not implemented
633  }
634 
642  public function getAllAbandonedJobs() {
643  return new ArrayIterator( [] ); // not implemented
644  }
645 
652  public function getCoalesceLocationInternal() {
653  return null;
654  }
655 
665  final public function getSiblingQueuesWithJobs( array $types ) {
666  return $this->doGetSiblingQueuesWithJobs( $types );
667  }
668 
674  protected function doGetSiblingQueuesWithJobs( array $types ) {
675  return null; // not supported
676  }
677 
688  final public function getSiblingQueueSizes( array $types ) {
689  return $this->doGetSiblingQueueSizes( $types );
690  }
691 
697  protected function doGetSiblingQueueSizes( array $types ) {
698  return null; // not supported
699  }
700 
706  protected function factoryJob( $command, $params ) {
707  // @TODO: dependency inject this as a callback
708  return Job::factory( $command, $params );
709  }
710 
714  protected function assertNotReadOnly() {
715  if ( $this->readOnlyReason !== false ) {
716  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
717  }
718  }
719 
728  protected function incrStats( $key, $type, $delta = 1 ) {
729  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
730  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
731  }
732 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:209
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\doAck
doAck(RunnableJob $job)
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:617
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:193
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:41
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:335
JobQueue\incrStats
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:728
NullStatsdDataFactory
Definition: NullStatsdDataFactory.php:10
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:54
RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:35
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:674
JobQueue\doGetSize
doGetSize()
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:45
$res
$res
Definition: testCompression.php:54
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:456
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:320
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:296
DuplicateJob\newFromJob
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:45
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:714
WANObjectCache\newEmpty
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
Definition: WANObjectCache.php:300
IJobSpecification\getType
getType()
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:251
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:560
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:697
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:185
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:37
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:596
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:283
JobQueue\ack
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:410
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:273
JobQueue\doPop
doPop()
JobQueueError
Definition: JobQueueError.php:28
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:230
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\$stats
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:47
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:154
Job\factory
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:63
JobQueue\$wanCache
WANObjectCache $wanCache
Definition: JobQueue.php:50
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:572
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:116
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:306
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:580
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:116
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:177
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:39
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:631
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:533
$command
$command
Definition: mcc.php:125
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:50
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:588
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:43
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:376
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:33
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:52
JobQueue\factoryJob
factoryJob( $command, $params)
Definition: JobQueue.php:706
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:652
JobQueueReadOnlyError
Definition: JobQueueReadOnlyError.php:28
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition: JobQueue.php:511
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:642
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:140
JobQueue\$domain
string $domain
DB domain ID.
Definition: JobQueue.php:35
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:688
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:665
IJobSpecification
Interface for serializable objects that describe a job queue task.
Definition: IJobSpecification.php:35
JobQueue\getType
getType()
Definition: JobQueue.php:147
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:471
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:68
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:498
JobQueue\getDomain
getDomain()
Definition: JobQueue.php:132