MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
24 
31 abstract class JobQueue {
33  protected $domain;
35  protected $type;
37  protected $order;
39  protected $claimTTL;
41  protected $maxTries;
43  protected $readOnlyReason;
45  protected $stats;
46 
48  protected $wanCache;
49 
50  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51 
52  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53 
66  protected function __construct( array $params ) {
67  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
68  $this->type = $params['type'];
69  $this->claimTTL = $params['claimTTL'] ?? 0;
70  $this->maxTries = $params['maxTries'] ?? 3;
71  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
72  $this->order = $params['order'];
73  } else {
74  $this->order = $this->optimalOrder();
75  }
76  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
77  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
78  }
79  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
80  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
81  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
82  }
83 
114  final public static function factory( array $params ) {
115  $class = $params['class'];
116  if ( !class_exists( $class ) ) {
117  throw new JobQueueError( "Invalid job queue class '$class'." );
118  }
119  $obj = new $class( $params );
120  if ( !( $obj instanceof self ) ) {
121  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
122  }
123 
124  return $obj;
125  }
126 
130  final public function getDomain() {
131  return $this->domain;
132  }
133 
138  final public function getWiki() {
139  return WikiMap::getWikiIdFromDbDomain( $this->domain );
140  }
141 
145  final public function getType() {
146  return $this->type;
147  }
148 
152  final public function getOrder() {
153  return $this->order;
154  }
155 
161  abstract protected function supportedOrders();
162 
168  abstract protected function optimalOrder();
169 
175  protected function supportsDelayedJobs() {
176  return false; // not implemented
177  }
178 
183  final public function delayedJobsEnabled() {
184  return $this->supportsDelayedJobs();
185  }
186 
191  public function getReadOnlyReason() {
192  return $this->readOnlyReason;
193  }
194 
207  final public function isEmpty() {
208  $res = $this->doIsEmpty();
209 
210  return $res;
211  }
212 
217  abstract protected function doIsEmpty();
218 
228  final public function getSize() {
229  $res = $this->doGetSize();
230 
231  return $res;
232  }
233 
238  abstract protected function doGetSize();
239 
249  final public function getAcquiredCount() {
250  $res = $this->doGetAcquiredCount();
251 
252  return $res;
253  }
254 
259  abstract protected function doGetAcquiredCount();
260 
271  final public function getDelayedCount() {
272  $res = $this->doGetDelayedCount();
273 
274  return $res;
275  }
276 
281  protected function doGetDelayedCount() {
282  return 0; // not implemented
283  }
284 
294  final public function getAbandonedCount() {
295  $res = $this->doGetAbandonedCount();
296 
297  return $res;
298  }
299 
304  protected function doGetAbandonedCount() {
305  return 0; // not implemented
306  }
307 
318  final public function push( $jobs, $flags = 0 ) {
319  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
320  $this->batchPush( $jobs, $flags );
321  }
322 
333  final public function batchPush( array $jobs, $flags = 0 ) {
334  $this->assertNotReadOnly();
335 
336  if ( $jobs === [] ) {
337  return; // nothing to do
338  }
339 
340  foreach ( $jobs as $job ) {
341  if ( $job->getType() !== $this->type ) {
342  throw new JobQueueError(
343  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
344  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
345  throw new JobQueueError(
346  "Got delayed '{$job->getType()}' job; delays are not supported." );
347  }
348  }
349 
350  $this->doBatchPush( $jobs, $flags );
351 
352  foreach ( $jobs as $job ) {
353  if ( $job->isRootJob() ) {
354  $this->deduplicateRootJob( $job );
355  }
356  }
357  }
358 
364  abstract protected function doBatchPush( array $jobs, $flags );
365 
374  final public function pop() {
375  $this->assertNotReadOnly();
376 
377  $job = $this->doPop();
378 
379  // Flag this job as an old duplicate based on its "root" job...
380  try {
381  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
382  $this->incrStats( 'dupe_pops', $this->type );
383  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
384  }
385  } catch ( Exception $e ) {
386  // don't lose jobs over this
387  }
388 
389  return $job;
390  }
391 
396  abstract protected function doPop();
397 
408  final public function ack( RunnableJob $job ) {
409  $this->assertNotReadOnly();
410  if ( $job->getType() !== $this->type ) {
411  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
412  }
413 
414  $this->doAck( $job );
415  }
416 
421  abstract protected function doAck( RunnableJob $job );
422 
454  final public function deduplicateRootJob( IJobSpecification $job ) {
455  $this->assertNotReadOnly();
456  if ( $job->getType() !== $this->type ) {
457  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
458  }
459 
460  return $this->doDeduplicateRootJob( $job );
461  }
462 
469  protected function doDeduplicateRootJob( IJobSpecification $job ) {
470  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
471  if ( !$params ) {
472  throw new JobQueueError( "Cannot register root job; missing parameters." );
473  }
474 
475  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
476  // Callers should call JobQueueGroup::push() before this method so that if the
477  // insert fails, the de-duplication registration will be aborted. Having only the
478  // de-duplication registration succeed would cause jobs to become no-ops without
479  // any actual jobs that made them redundant.
480  $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
481  if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
482  return true; // a newer version of this root job was enqueued
483  }
484 
485  // Update the timestamp of the last root job started at the location...
486  return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
487  }
488 
496  final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
497  if ( $job->getType() !== $this->type ) {
498  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
499  }
500 
501  return $this->doIsRootJobOldDuplicate( $job );
502  }
503 
509  protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
510  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
511  if ( !$params ) {
512  return false; // job has no de-deplication info
513  }
514 
515  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
516  // Get the last time this root job was enqueued
517  $timestamp = $this->wanCache->get( $key );
518  if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
519  // Update the timestamp of the last known root job started at the location...
520  $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
521  }
522 
523  // Check if a new root job was started at the location after this one's...
524  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
525  }
526 
531  protected function getRootJobCacheKey( $signature ) {
532  return $this->wanCache->makeGlobalKey(
533  'jobqueue',
534  $this->domain,
535  $this->type,
536  'rootjob',
537  $signature
538  );
539  }
540 
548  final public function delete() {
549  $this->assertNotReadOnly();
550 
551  $this->doDelete();
552  }
553 
558  protected function doDelete() {
559  throw new JobQueueError( "This method is not implemented." );
560  }
561 
570  final public function waitForBackups() {
571  $this->doWaitForBackups();
572  }
573 
578  protected function doWaitForBackups() {
579  }
580 
586  final public function flushCaches() {
587  $this->doFlushCaches();
588  }
589 
594  protected function doFlushCaches() {
595  }
596 
605  abstract public function getAllQueuedJobs();
606 
615  public function getAllDelayedJobs() {
616  return new ArrayIterator( [] ); // not implemented
617  }
618 
629  public function getAllAcquiredJobs() {
630  return new ArrayIterator( [] ); // not implemented
631  }
632 
640  public function getAllAbandonedJobs() {
641  return new ArrayIterator( [] ); // not implemented
642  }
643 
650  public function getCoalesceLocationInternal() {
651  return null;
652  }
653 
663  final public function getSiblingQueuesWithJobs( array $types ) {
664  return $this->doGetSiblingQueuesWithJobs( $types );
665  }
666 
672  protected function doGetSiblingQueuesWithJobs( array $types ) {
673  return null; // not supported
674  }
675 
686  final public function getSiblingQueueSizes( array $types ) {
687  return $this->doGetSiblingQueueSizes( $types );
688  }
689 
695  protected function doGetSiblingQueueSizes( array $types ) {
696  return null; // not supported
697  }
698 
704  protected function factoryJob( $command, $params ) {
705  // @TODO: dependency inject this as a callback
706  return Job::factory( $command, $params );
707  }
708 
712  protected function assertNotReadOnly() {
713  if ( $this->readOnlyReason !== false ) {
714  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
715  }
716  }
717 
726  protected function incrStats( $key, $type, $delta = 1 ) {
727  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
728  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
729  }
730 }
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:43
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:45
factoryJob( $command, $params)
Definition: JobQueue.php:704
string $order
Job priority for pop()
Definition: JobQueue.php:37
const QOS_ATOMIC
Definition: JobQueue.php:50
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:333
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:408
doFlushCaches()
Definition: JobQueue.php:594
delayedJobsEnabled()
Definition: JobQueue.php:183
$command
Definition: cdb.php:65
assertNotReadOnly()
Definition: JobQueue.php:712
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:175
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:496
doDelete()
Definition: JobQueue.php:558
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:726
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:672
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:294
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:650
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:271
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition: JobQueue.php:509
WANObjectCache $wanCache
Definition: JobQueue.php:48
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:207
__construct(array $params)
Definition: JobQueue.php:66
doGetAbandonedCount()
Definition: JobQueue.php:304
string $type
Job type.
Definition: JobQueue.php:35
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
getOrder()
Definition: JobQueue.php:152
getRootJobCacheKey( $signature)
Definition: JobQueue.php:531
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:249
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:454
doBatchPush(array $jobs, $flags)
getReadOnlyReason()
Definition: JobQueue.php:191
doAck(RunnableJob $job)
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:63
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:663
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:318
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:629
doWaitForBackups()
Definition: JobQueue.php:578
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:114
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:695
doGetAcquiredCount()
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count( $args)< 1) $job
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:41
getDomain()
Definition: JobQueue.php:130
const ROOTJOB_TTL
Definition: JobQueue.php:52
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:640
Interface for serializable objects that describe a job queue task.
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:39
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:686
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:615
doGetDelayedCount()
Definition: JobQueue.php:281
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:469
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:570
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:586
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:228
supportedOrders()
Get the allowed queue orders for configuration validation.
pop()
Pop a job off of the queue.
Definition: JobQueue.php:374
string $domain
DB domain ID.
Definition: JobQueue.php:33