MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25 
34 abstract class JobQueue {
36  protected $domain;
38  protected $type;
40  protected $order;
42  protected $claimTTL;
44  protected $maxTries;
46  protected $readOnlyReason;
48  protected $stats;
50  protected $idGenerator;
51 
53  protected $wanCache;
54 
55  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
56 
57  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
58 
72  protected function __construct( array $params ) {
73  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
74  $this->type = $params['type'];
75  $this->claimTTL = $params['claimTTL'] ?? 0;
76  $this->maxTries = $params['maxTries'] ?? 3;
77  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
78  $this->order = $params['order'];
79  } else {
80  $this->order = $this->optimalOrder();
81  }
82  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
83  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
84  }
85  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
86  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
87  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
88  $this->idGenerator = $params['idGenerator'];
89  }
90 
121  final public static function factory( array $params ) {
122  $class = $params['class'];
123  if ( !class_exists( $class ) ) {
124  throw new JobQueueError( "Invalid job queue class '$class'." );
125  }
126 
127  if ( !isset( $params['idGenerator'] ) ) {
128  wfDeprecated( __METHOD__ . ' called without "idGenerator" set', '1.35' );
129  $params['idGenerator'] = new GlobalIdGenerator(
130  sys_get_temp_dir(),
131  new EmptyBagOStuff(),
132  'shell_exec'
133  );
134  }
135 
136  $obj = new $class( $params );
137  if ( !( $obj instanceof self ) ) {
138  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
139  }
140 
141  return $obj;
142  }
143 
147  final public function getDomain() {
148  return $this->domain;
149  }
150 
155  final public function getWiki() {
156  return WikiMap::getWikiIdFromDbDomain( $this->domain );
157  }
158 
162  final public function getType() {
163  return $this->type;
164  }
165 
169  final public function getOrder() {
170  return $this->order;
171  }
172 
178  abstract protected function supportedOrders();
179 
185  abstract protected function optimalOrder();
186 
192  protected function supportsDelayedJobs() {
193  return false; // not implemented
194  }
195 
200  final public function delayedJobsEnabled() {
201  return $this->supportsDelayedJobs();
202  }
203 
208  public function getReadOnlyReason() {
209  return $this->readOnlyReason;
210  }
211 
224  final public function isEmpty() {
225  $res = $this->doIsEmpty();
226 
227  return $res;
228  }
229 
234  abstract protected function doIsEmpty();
235 
245  final public function getSize() {
246  $res = $this->doGetSize();
247 
248  return $res;
249  }
250 
255  abstract protected function doGetSize();
256 
266  final public function getAcquiredCount() {
267  $res = $this->doGetAcquiredCount();
268 
269  return $res;
270  }
271 
276  abstract protected function doGetAcquiredCount();
277 
288  final public function getDelayedCount() {
289  $res = $this->doGetDelayedCount();
290 
291  return $res;
292  }
293 
298  protected function doGetDelayedCount() {
299  return 0; // not implemented
300  }
301 
311  final public function getAbandonedCount() {
312  $res = $this->doGetAbandonedCount();
313 
314  return $res;
315  }
316 
321  protected function doGetAbandonedCount() {
322  return 0; // not implemented
323  }
324 
335  final public function push( $jobs, $flags = 0 ) {
336  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
337  $this->batchPush( $jobs, $flags );
338  }
339 
350  final public function batchPush( array $jobs, $flags = 0 ) {
351  $this->assertNotReadOnly();
352 
353  if ( $jobs === [] ) {
354  return; // nothing to do
355  }
356 
357  foreach ( $jobs as $job ) {
358  if ( $job->getType() !== $this->type ) {
359  throw new JobQueueError(
360  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
361  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
362  throw new JobQueueError(
363  "Got delayed '{$job->getType()}' job; delays are not supported." );
364  }
365  }
366 
367  $this->doBatchPush( $jobs, $flags );
368 
369  foreach ( $jobs as $job ) {
370  if ( $job->isRootJob() ) {
371  $this->deduplicateRootJob( $job );
372  }
373  }
374  }
375 
381  abstract protected function doBatchPush( array $jobs, $flags );
382 
391  final public function pop() {
392  $this->assertNotReadOnly();
393 
394  $job = $this->doPop();
395 
396  // Flag this job as an old duplicate based on its "root" job...
397  try {
398  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
399  $this->incrStats( 'dupe_pops', $this->type );
400  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
401  }
402  } catch ( Exception $e ) {
403  // don't lose jobs over this
404  }
405 
406  return $job;
407  }
408 
413  abstract protected function doPop();
414 
425  final public function ack( RunnableJob $job ) {
426  $this->assertNotReadOnly();
427  if ( $job->getType() !== $this->type ) {
428  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
429  }
430 
431  $this->doAck( $job );
432  }
433 
438  abstract protected function doAck( RunnableJob $job );
439 
471  final public function deduplicateRootJob( IJobSpecification $job ) {
472  $this->assertNotReadOnly();
473  if ( $job->getType() !== $this->type ) {
474  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
475  }
476 
477  return $this->doDeduplicateRootJob( $job );
478  }
479 
487  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
488  if ( !$params ) {
489  throw new JobQueueError( "Cannot register root job; missing parameters." );
490  }
491 
492  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
493  // Callers should call JobQueueGroup::push() before this method so that if the
494  // insert fails, the de-duplication registration will be aborted. Having only the
495  // de-duplication registration succeed would cause jobs to become no-ops without
496  // any actual jobs that made them redundant.
497  $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
498  if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
499  return true; // a newer version of this root job was enqueued
500  }
501 
502  // Update the timestamp of the last root job started at the location...
503  return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
504  }
505 
513  final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
514  if ( $job->getType() !== $this->type ) {
515  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
516  }
517 
518  return $this->doIsRootJobOldDuplicate( $job );
519  }
520 
527  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
528  if ( !$params ) {
529  return false; // job has no de-deplication info
530  }
531 
532  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
533  // Get the last time this root job was enqueued
534  $timestamp = $this->wanCache->get( $key );
535  if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
536  // Update the timestamp of the last known root job started at the location...
537  $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
538  }
539 
540  // Check if a new root job was started at the location after this one's...
541  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
542  }
543 
548  protected function getRootJobCacheKey( $signature ) {
549  return $this->wanCache->makeGlobalKey(
550  'jobqueue',
551  $this->domain,
552  $this->type,
553  'rootjob',
554  $signature
555  );
556  }
557 
565  final public function delete() {
566  $this->assertNotReadOnly();
567 
568  $this->doDelete();
569  }
570 
575  protected function doDelete() {
576  throw new JobQueueError( "This method is not implemented." );
577  }
578 
587  final public function waitForBackups() {
588  $this->doWaitForBackups();
589  }
590 
595  protected function doWaitForBackups() {
596  }
597 
603  final public function flushCaches() {
604  $this->doFlushCaches();
605  }
606 
611  protected function doFlushCaches() {
612  }
613 
622  abstract public function getAllQueuedJobs();
623 
632  public function getAllDelayedJobs() {
633  return new ArrayIterator( [] ); // not implemented
634  }
635 
646  public function getAllAcquiredJobs() {
647  return new ArrayIterator( [] ); // not implemented
648  }
649 
657  public function getAllAbandonedJobs() {
658  return new ArrayIterator( [] ); // not implemented
659  }
660 
667  public function getCoalesceLocationInternal() {
668  return null;
669  }
670 
680  final public function getSiblingQueuesWithJobs( array $types ) {
681  return $this->doGetSiblingQueuesWithJobs( $types );
682  }
683 
689  protected function doGetSiblingQueuesWithJobs( array $types ) {
690  return null; // not supported
691  }
692 
703  final public function getSiblingQueueSizes( array $types ) {
704  return $this->doGetSiblingQueueSizes( $types );
705  }
706 
712  protected function doGetSiblingQueueSizes( array $types ) {
713  return null; // not supported
714  }
715 
721  protected function factoryJob( $command, $params ) {
722  // @TODO: dependency inject this as a callback
723  return Job::factory( $command, $params );
724  }
725 
729  protected function assertNotReadOnly() {
730  if ( $this->readOnlyReason !== false ) {
731  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
732  }
733  }
734 
743  protected function incrStats( $key, $type, $delta = 1 ) {
744  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
745  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
746  }
747 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:224
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\doAck
doAck(RunnableJob $job)
JobQueue\$idGenerator
GlobalIdGenerator $idGenerator
Definition: JobQueue.php:50
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:632
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:208
Wikimedia\UUID\GlobalIdGenerator
Class for getting statistically unique IDs without a central coordinator.
Definition: GlobalIdGenerator.php:36
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:42
EmptyBagOStuff
A BagOStuff object with no objects in it.
Definition: EmptyBagOStuff.php:29
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:350
JobQueue\incrStats
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:743
NullStatsdDataFactory
Definition: NullStatsdDataFactory.php:10
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:57
RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:35
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:689
JobQueue\doGetSize
doGetSize()
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:46
$res
$res
Definition: testCompression.php:54
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:471
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:335
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:311
DuplicateJob\newFromJob
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:45
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:729
WANObjectCache\newEmpty
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
Definition: WANObjectCache.php:312
IJobSpecification\getType
getType()
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:266
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:575
wfDeprecated
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that $function is deprecated.
Definition: GlobalFunctions.php:1045
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:712
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:200
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:38
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:611
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:298
JobQueue\ack
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:425
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:288
JobQueue\doPop
doPop()
JobQueueError
Definition: JobQueueError.php:28
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:245
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\$stats
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:48
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:169
Job\factory
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:63
JobQueue\$wanCache
WANObjectCache $wanCache
Definition: JobQueue.php:53
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:587
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:116
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:321
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:595
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:121
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:192
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:40
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:646
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:548
$command
$command
Definition: mcc.php:125
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:50
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:603
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:44
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:391
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:34
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:55
JobQueue\factoryJob
factoryJob( $command, $params)
Definition: JobQueue.php:721
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:667
JobQueueReadOnlyError
Definition: JobQueueReadOnlyError.php:28
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition: JobQueue.php:526
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:657
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:155
JobQueue\$domain
string $domain
DB domain ID.
Definition: JobQueue.php:36
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:703
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:680
IJobSpecification
Interface for serializable objects that describe a job queue task.
Definition: IJobSpecification.php:35
JobQueue\getType
getType()
Definition: JobQueue.php:162
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:486
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:72
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:513
JobQueue\getDomain
getDomain()
Definition: JobQueue.php:147