MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25 
35 abstract class JobQueue {
37  protected $domain;
39  protected $type;
41  protected $order;
43  protected $claimTTL;
45  protected $maxTries;
47  protected $readOnlyReason;
49  protected $stats;
51  protected $idGenerator;
52 
54  protected $wanCache;
55 
56  protected const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
57 
58  protected const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
59 
76  protected function __construct( array $params ) {
77  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
78  $this->type = $params['type'];
79  $this->claimTTL = $params['claimTTL'] ?? 0;
80  $this->maxTries = $params['maxTries'] ?? 3;
81  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
82  $this->order = $params['order'];
83  } else {
84  $this->order = $this->optimalOrder();
85  }
86  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
87  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
88  }
89  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
90  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
91  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
92  $this->idGenerator = $params['idGenerator'];
93  }
94 
125  final public static function factory( array $params ) {
126  $class = $params['class'];
127  if ( !class_exists( $class ) ) {
128  throw new JobQueueError( "Invalid job queue class '$class'." );
129  }
130 
131  if ( !isset( $params['idGenerator'] ) ) {
132  wfDeprecated( __METHOD__ . ' called without "idGenerator" set', '1.35' );
133  $params['idGenerator'] = new GlobalIdGenerator(
134  sys_get_temp_dir(),
135  new EmptyBagOStuff(),
136  'shell_exec'
137  );
138  }
139 
140  $obj = new $class( $params );
141  if ( !( $obj instanceof self ) ) {
142  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
143  }
144 
145  return $obj;
146  }
147 
151  final public function getDomain() {
152  return $this->domain;
153  }
154 
159  final public function getWiki() {
160  return WikiMap::getWikiIdFromDbDomain( $this->domain );
161  }
162 
166  final public function getType() {
167  return $this->type;
168  }
169 
173  final public function getOrder() {
174  return $this->order;
175  }
176 
182  abstract protected function supportedOrders();
183 
189  abstract protected function optimalOrder();
190 
197  protected function supportsDelayedJobs() {
198  return false; // not implemented
199  }
200 
205  final public function delayedJobsEnabled() {
206  return $this->supportsDelayedJobs();
207  }
208 
213  public function getReadOnlyReason() {
214  return $this->readOnlyReason;
215  }
216 
229  final public function isEmpty() {
230  $res = $this->doIsEmpty();
231 
232  return $res;
233  }
234 
239  abstract protected function doIsEmpty();
240 
250  final public function getSize() {
251  $res = $this->doGetSize();
252 
253  return $res;
254  }
255 
260  abstract protected function doGetSize();
261 
271  final public function getAcquiredCount() {
272  $res = $this->doGetAcquiredCount();
273 
274  return $res;
275  }
276 
281  abstract protected function doGetAcquiredCount();
282 
293  final public function getDelayedCount() {
294  $res = $this->doGetDelayedCount();
295 
296  return $res;
297  }
298 
304  protected function doGetDelayedCount() {
305  return 0; // not implemented
306  }
307 
317  final public function getAbandonedCount() {
318  $res = $this->doGetAbandonedCount();
319 
320  return $res;
321  }
322 
328  protected function doGetAbandonedCount() {
329  return 0; // not implemented
330  }
331 
342  final public function push( $jobs, $flags = 0 ) {
343  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
344  $this->batchPush( $jobs, $flags );
345  }
346 
357  final public function batchPush( array $jobs, $flags = 0 ) {
358  $this->assertNotReadOnly();
359 
360  if ( $jobs === [] ) {
361  return; // nothing to do
362  }
363 
364  foreach ( $jobs as $job ) {
365  if ( $job->getType() !== $this->type ) {
366  throw new JobQueueError(
367  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
368  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
369  throw new JobQueueError(
370  "Got delayed '{$job->getType()}' job; delays are not supported." );
371  }
372  }
373 
374  $this->doBatchPush( $jobs, $flags );
375 
376  foreach ( $jobs as $job ) {
377  if ( $job->isRootJob() ) {
378  $this->deduplicateRootJob( $job );
379  }
380  }
381  }
382 
388  abstract protected function doBatchPush( array $jobs, $flags );
389 
398  final public function pop() {
399  $this->assertNotReadOnly();
400 
401  $job = $this->doPop();
402 
403  // Flag this job as an old duplicate based on its "root" job...
404  try {
405  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
406  $this->incrStats( 'dupe_pops', $this->type );
407  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
408  }
409  } catch ( Exception $e ) {
410  // don't lose jobs over this
411  }
412 
413  return $job;
414  }
415 
420  abstract protected function doPop();
421 
432  final public function ack( RunnableJob $job ) {
433  $this->assertNotReadOnly();
434  if ( $job->getType() !== $this->type ) {
435  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
436  }
437 
438  $this->doAck( $job );
439  }
440 
445  abstract protected function doAck( RunnableJob $job );
446 
478  final public function deduplicateRootJob( IJobSpecification $job ) {
479  $this->assertNotReadOnly();
480  if ( $job->getType() !== $this->type ) {
481  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
482  }
483 
484  return $this->doDeduplicateRootJob( $job );
485  }
486 
495  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
496  if ( !$params ) {
497  throw new JobQueueError( "Cannot register root job; missing parameters." );
498  }
499 
500  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
501  // Callers should call JobQueueGroup::push() before this method so that if the
502  // insert fails, the de-duplication registration will be aborted. Having only the
503  // de-duplication registration succeed would cause jobs to become no-ops without
504  // any actual jobs that made them redundant.
505  $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
506  if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
507  return true; // a newer version of this root job was enqueued
508  }
509 
510  // Update the timestamp of the last root job started at the location...
511  return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
512  }
513 
521  final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
522  if ( $job->getType() !== $this->type ) {
523  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
524  }
525 
526  return $this->doIsRootJobOldDuplicate( $job );
527  }
528 
536  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
537  if ( !$params ) {
538  return false; // job has no de-deplication info
539  }
540 
541  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
542  // Get the last time this root job was enqueued
543  $timestamp = $this->wanCache->get( $key );
544  if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
545  // Update the timestamp of the last known root job started at the location...
546  $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
547  }
548 
549  // Check if a new root job was started at the location after this one's...
550  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
551  }
552 
557  protected function getRootJobCacheKey( $signature ) {
558  return $this->wanCache->makeGlobalKey(
559  'jobqueue',
560  $this->domain,
561  $this->type,
562  'rootjob',
563  $signature
564  );
565  }
566 
574  final public function delete() {
575  $this->assertNotReadOnly();
576 
577  $this->doDelete();
578  }
579 
585  protected function doDelete() {
586  throw new JobQueueError( "This method is not implemented." );
587  }
588 
597  final public function waitForBackups() {
598  $this->doWaitForBackups();
599  }
600 
606  protected function doWaitForBackups() {
607  }
608 
614  final public function flushCaches() {
615  $this->doFlushCaches();
616  }
617 
623  protected function doFlushCaches() {
624  }
625 
634  abstract public function getAllQueuedJobs();
635 
645  public function getAllDelayedJobs() {
646  return new ArrayIterator( [] ); // not implemented
647  }
648 
660  public function getAllAcquiredJobs() {
661  return new ArrayIterator( [] ); // not implemented
662  }
663 
672  public function getAllAbandonedJobs() {
673  return new ArrayIterator( [] ); // not implemented
674  }
675 
683  public function getCoalesceLocationInternal() {
684  return null;
685  }
686 
696  final public function getSiblingQueuesWithJobs( array $types ) {
697  return $this->doGetSiblingQueuesWithJobs( $types );
698  }
699 
706  protected function doGetSiblingQueuesWithJobs( array $types ) {
707  return null; // not supported
708  }
709 
720  final public function getSiblingQueueSizes( array $types ) {
721  return $this->doGetSiblingQueueSizes( $types );
722  }
723 
730  protected function doGetSiblingQueueSizes( array $types ) {
731  return null; // not supported
732  }
733 
739  protected function factoryJob( $command, $params ) {
740  // @TODO: dependency inject this as a callback
741  return Job::factory( $command, $params );
742  }
743 
747  protected function assertNotReadOnly() {
748  if ( $this->readOnlyReason !== false ) {
749  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
750  }
751  }
752 
761  protected function incrStats( $key, $type, $delta = 1 ) {
762  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
763  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
764  }
765 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:229
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\doAck
doAck(RunnableJob $job)
JobQueue\$idGenerator
GlobalIdGenerator $idGenerator
Definition: JobQueue.php:51
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:645
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:213
Wikimedia\UUID\GlobalIdGenerator
Class for getting statistically unique IDs without a central coordinator.
Definition: GlobalIdGenerator.php:36
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:43
EmptyBagOStuff
A BagOStuff object with no objects in it.
Definition: EmptyBagOStuff.php:29
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:357
JobQueue\incrStats
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition: JobQueue.php:761
NullStatsdDataFactory
Definition: NullStatsdDataFactory.php:10
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:58
RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Stable to override.
Definition: JobQueue.php:706
JobQueue\doGetSize
doGetSize()
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:47
$res
$res
Definition: testCompression.php:57
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:478
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:342
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:317
DuplicateJob\newFromJob
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:45
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:747
WANObjectCache\newEmpty
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
Definition: WANObjectCache.php:348
IJobSpecification\getType
getType()
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:271
JobQueue\doDelete
doDelete()
Stable to override.
Definition: JobQueue.php:585
wfDeprecated
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that $function is deprecated.
Definition: GlobalFunctions.php:1026
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Stable to override.
Definition: JobQueue.php:730
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:205
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:39
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Stable to override.
Definition: JobQueue.php:623
JobQueue\doGetDelayedCount
doGetDelayedCount()
Stable to override.
Definition: JobQueue.php:304
JobQueue\ack
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:432
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:293
JobQueue\doPop
doPop()
JobQueueError
@newable
Definition: JobQueueError.php:29
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:250
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\$stats
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:49
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:173
Job\factory
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:65
JobQueue\$wanCache
WANObjectCache $wanCache
Definition: JobQueue.php:54
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:597
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:125
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Stable to override.
Definition: JobQueue.php:328
JobQueue\doWaitForBackups
doWaitForBackups()
Stable to override.
Definition: JobQueue.php:606
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:125
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:197
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:41
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:660
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:557
$command
$command
Definition: mcc.php:125
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:50
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:614
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:45
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:398
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:35
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:56
JobQueue\factoryJob
factoryJob( $command, $params)
Definition: JobQueue.php:739
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:683
JobQueueReadOnlyError
@newable
Definition: JobQueueReadOnlyError.php:29
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(IJobSpecification $job)
Stable to override.
Definition: JobQueue.php:535
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:672
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:159
JobQueue\$domain
string $domain
DB domain ID.
Definition: JobQueue.php:37
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:720
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:696
IJobSpecification
Interface for serializable objects that describe a job queue task.
Definition: IJobSpecification.php:35
JobQueue\getType
getType()
Definition: JobQueue.php:166
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Stable to override.
Definition: JobQueue.php:494
JobQueue\__construct
__construct(array $params)
Stable to call.
Definition: JobQueue.php:76
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:521
JobQueue\getDomain
getDomain()
Definition: JobQueue.php:151