MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25 
35 abstract class JobQueue {
37  protected $domain;
39  protected $type;
41  protected $order;
43  protected $claimTTL;
45  protected $maxTries;
47  protected $readOnlyReason;
49  protected $stats;
51  protected $idGenerator;
52 
54  protected $wanCache;
55 
56  protected const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
57 
58  protected const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
59 
76  protected function __construct( array $params ) {
77  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
78  $this->type = $params['type'];
79  $this->claimTTL = $params['claimTTL'] ?? 0;
80  $this->maxTries = $params['maxTries'] ?? 3;
81  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
82  $this->order = $params['order'];
83  } else {
84  $this->order = $this->optimalOrder();
85  }
86  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
87  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
88  }
89  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
90  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
91  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
92  $this->idGenerator = $params['idGenerator'];
93  }
94 
125  final public static function factory( array $params ) {
126  $class = $params['class'];
127  if ( !class_exists( $class ) ) {
128  throw new JobQueueError( "Invalid job queue class '$class'." );
129  }
130 
131  $obj = new $class( $params );
132  if ( !( $obj instanceof self ) ) {
133  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
134  }
135 
136  return $obj;
137  }
138 
142  final public function getDomain() {
143  return $this->domain;
144  }
145 
150  final public function getWiki() {
151  wfDeprecated( __METHOD__, '1.33' );
152  return WikiMap::getWikiIdFromDbDomain( $this->domain );
153  }
154 
158  final public function getType() {
159  return $this->type;
160  }
161 
165  final public function getOrder() {
166  return $this->order;
167  }
168 
174  abstract protected function supportedOrders();
175 
181  abstract protected function optimalOrder();
182 
189  protected function supportsDelayedJobs() {
190  return false; // not implemented
191  }
192 
197  final public function delayedJobsEnabled() {
198  return $this->supportsDelayedJobs();
199  }
200 
205  public function getReadOnlyReason() {
206  return $this->readOnlyReason;
207  }
208 
221  final public function isEmpty() {
222  $res = $this->doIsEmpty();
223 
224  return $res;
225  }
226 
231  abstract protected function doIsEmpty();
232 
242  final public function getSize() {
243  $res = $this->doGetSize();
244 
245  return $res;
246  }
247 
252  abstract protected function doGetSize();
253 
263  final public function getAcquiredCount() {
264  $res = $this->doGetAcquiredCount();
265 
266  return $res;
267  }
268 
273  abstract protected function doGetAcquiredCount();
274 
285  final public function getDelayedCount() {
286  $res = $this->doGetDelayedCount();
287 
288  return $res;
289  }
290 
296  protected function doGetDelayedCount() {
297  return 0; // not implemented
298  }
299 
309  final public function getAbandonedCount() {
310  $res = $this->doGetAbandonedCount();
311 
312  return $res;
313  }
314 
320  protected function doGetAbandonedCount() {
321  return 0; // not implemented
322  }
323 
334  final public function push( $jobs, $flags = 0 ) {
335  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
336  $this->batchPush( $jobs, $flags );
337  }
338 
349  final public function batchPush( array $jobs, $flags = 0 ) {
350  $this->assertNotReadOnly();
351 
352  if ( $jobs === [] ) {
353  return; // nothing to do
354  }
355 
356  foreach ( $jobs as $job ) {
357  $this->assertMatchingJobType( $job );
358  if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
359  throw new JobQueueError(
360  "Got delayed '{$job->getType()}' job; delays are not supported." );
361  }
362  }
363 
364  $this->doBatchPush( $jobs, $flags );
365 
366  foreach ( $jobs as $job ) {
367  if ( $job->isRootJob() ) {
368  $this->deduplicateRootJob( $job );
369  }
370  }
371  }
372 
378  abstract protected function doBatchPush( array $jobs, $flags );
379 
388  final public function pop() {
389  $this->assertNotReadOnly();
390 
391  $job = $this->doPop();
392 
393  // Flag this job as an old duplicate based on its "root" job...
394  try {
395  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
396  $this->incrStats( 'dupe_pops', $this->type );
397  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
398  }
399  } catch ( Exception $e ) {
400  // don't lose jobs over this
401  }
402 
403  return $job;
404  }
405 
410  abstract protected function doPop();
411 
422  final public function ack( RunnableJob $job ) {
423  $this->assertNotReadOnly();
424  $this->assertMatchingJobType( $job );
425 
426  $this->doAck( $job );
427  }
428 
433  abstract protected function doAck( RunnableJob $job );
434 
466  final public function deduplicateRootJob( IJobSpecification $job ) {
467  $this->assertNotReadOnly();
468  $this->assertMatchingJobType( $job );
469 
470  return $this->doDeduplicateRootJob( $job );
471  }
472 
481  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
482  if ( !$params ) {
483  throw new JobQueueError( "Cannot register root job; missing parameters." );
484  }
485 
486  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
487  // Callers should call JobQueueGroup::push() before this method so that if the
488  // insert fails, the de-duplication registration will be aborted. Having only the
489  // de-duplication registration succeed would cause jobs to become no-ops without
490  // any actual jobs that made them redundant.
491  $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
492  if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
493  return true; // a newer version of this root job was enqueued
494  }
495 
496  // Update the timestamp of the last root job started at the location...
497  return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
498  }
499 
507  final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
508  $this->assertMatchingJobType( $job );
509 
510  return $this->doIsRootJobOldDuplicate( $job );
511  }
512 
520  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
521  if ( !$params ) {
522  return false; // job has no de-deplication info
523  }
524 
525  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
526  // Get the last time this root job was enqueued
527  $timestamp = $this->wanCache->get( $key );
528  if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
529  // Update the timestamp of the last known root job started at the location...
530  $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
531  }
532 
533  // Check if a new root job was started at the location after this one's...
534  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
535  }
536 
541  protected function getRootJobCacheKey( $signature ) {
542  return $this->wanCache->makeGlobalKey(
543  'jobqueue',
544  $this->domain,
545  $this->type,
546  'rootjob',
547  $signature
548  );
549  }
550 
558  final public function delete() {
559  $this->assertNotReadOnly();
560 
561  $this->doDelete();
562  }
563 
569  protected function doDelete() {
570  throw new JobQueueError( "This method is not implemented." );
571  }
572 
581  final public function waitForBackups() {
582  $this->doWaitForBackups();
583  }
584 
590  protected function doWaitForBackups() {
591  }
592 
598  final public function flushCaches() {
599  $this->doFlushCaches();
600  }
601 
607  protected function doFlushCaches() {
608  }
609 
618  abstract public function getAllQueuedJobs();
619 
629  public function getAllDelayedJobs() {
630  return new ArrayIterator( [] ); // not implemented
631  }
632 
644  public function getAllAcquiredJobs() {
645  return new ArrayIterator( [] ); // not implemented
646  }
647 
656  public function getAllAbandonedJobs() {
657  return new ArrayIterator( [] ); // not implemented
658  }
659 
667  public function getCoalesceLocationInternal() {
668  return null;
669  }
670 
680  final public function getSiblingQueuesWithJobs( array $types ) {
681  return $this->doGetSiblingQueuesWithJobs( $types );
682  }
683 
690  protected function doGetSiblingQueuesWithJobs( array $types ) {
691  return null; // not supported
692  }
693 
704  final public function getSiblingQueueSizes( array $types ) {
705  return $this->doGetSiblingQueueSizes( $types );
706  }
707 
714  protected function doGetSiblingQueueSizes( array $types ) {
715  return null; // not supported
716  }
717 
723  protected function factoryJob( $command, $params ) {
724  // @TODO: dependency inject this as a callback
725  return Job::factory( $command, $params );
726  }
727 
731  protected function assertNotReadOnly() {
732  if ( $this->readOnlyReason !== false ) {
733  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
734  }
735  }
736 
742  if ( $job->getType() !== $this->type ) {
743  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
744  }
745  }
746 
755  protected function incrStats( $key, $type, $delta = 1 ) {
756  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
757  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
758  }
759 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:221
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\doAck
doAck(RunnableJob $job)
JobQueue\$idGenerator
GlobalIdGenerator $idGenerator
Definition: JobQueue.php:51
JobQueue\assertMatchingJobType
assertMatchingJobType(IJobSpecification $job)
Definition: JobQueue.php:741
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:629
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:205
Wikimedia\UUID\GlobalIdGenerator
Class for getting statistically unique IDs without a central coordinator.
Definition: GlobalIdGenerator.php:34
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:43
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:349
JobQueue\incrStats
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition: JobQueue.php:755
NullStatsdDataFactory
Definition: NullStatsdDataFactory.php:10
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:58
RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:690
JobQueue\doGetSize
doGetSize()
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:47
$res
$res
Definition: testCompression.php:57
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:466
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:334
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:309
DuplicateJob\newFromJob
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:45
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:731
WANObjectCache\newEmpty
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
Definition: WANObjectCache.php:389
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:263
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:569
wfDeprecated
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
Definition: GlobalFunctions.php:997
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:714
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:197
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:39
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:607
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:296
JobQueue\ack
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:422
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:285
JobQueue\doPop
doPop()
JobQueueError
@newable
Definition: JobQueueError.php:29
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:242
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\$stats
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:49
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:165
Job\factory
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:70
JobQueue\$wanCache
WANObjectCache $wanCache
Definition: JobQueue.php:54
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:581
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:128
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:320
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:590
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:125
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:189
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:41
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:644
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:541
$command
$command
Definition: mcc.php:125
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:49
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:598
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:45
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:388
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:35
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:56
JobQueue\factoryJob
factoryJob( $command, $params)
Definition: JobQueue.php:723
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:667
JobQueueReadOnlyError
@newable
Definition: JobQueueReadOnlyError.php:29
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition: JobQueue.php:519
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:656
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:150
JobQueue\$domain
string $domain
DB domain ID.
Definition: JobQueue.php:37
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:704
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:680
IJobSpecification
Interface for serializable objects that describe a job queue task.
Definition: IJobSpecification.php:42
JobQueue\getType
getType()
Definition: JobQueue.php:158
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:480
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:76
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:507
JobQueue\getDomain
getDomain()
Definition: JobQueue.php:142