MediaWiki REL1_37
JobQueue.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25
35abstract class JobQueue {
37 protected $domain;
39 protected $type;
41 protected $order;
43 protected $claimTTL;
45 protected $maxTries;
47 protected $readOnlyReason;
49 protected $stats;
51 protected $idGenerator;
52
54 protected $wanCache;
55
56 protected const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
57
58 protected const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
59
76 protected function __construct( array $params ) {
77 $this->domain = $params['domain'] ?? $params['wiki']; // b/c
78 $this->type = $params['type'];
79 $this->claimTTL = $params['claimTTL'] ?? 0;
80 $this->maxTries = $params['maxTries'] ?? 3;
81 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
82 $this->order = $params['order'];
83 } else {
84 $this->order = $this->optimalOrder();
85 }
86 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
87 throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
88 }
89 $this->readOnlyReason = $params['readOnlyReason'] ?? false;
90 $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
91 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
92 $this->idGenerator = $params['idGenerator'];
93 }
94
125 final public static function factory( array $params ) {
126 $class = $params['class'];
127 if ( !class_exists( $class ) ) {
128 throw new JobQueueError( "Invalid job queue class '$class'." );
129 }
130
131 $obj = new $class( $params );
132 if ( !( $obj instanceof self ) ) {
133 throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
134 }
135
136 return $obj;
137 }
138
142 final public function getDomain() {
143 return $this->domain;
144 }
145
150 final public function getWiki() {
151 wfDeprecated( __METHOD__, '1.33' );
152 return WikiMap::getWikiIdFromDbDomain( $this->domain );
153 }
154
158 final public function getType() {
159 return $this->type;
160 }
161
165 final public function getOrder() {
166 return $this->order;
167 }
168
174 abstract protected function supportedOrders();
175
181 abstract protected function optimalOrder();
182
189 protected function supportsDelayedJobs() {
190 return false; // not implemented
191 }
192
197 final public function delayedJobsEnabled() {
198 return $this->supportsDelayedJobs();
199 }
200
205 public function getReadOnlyReason() {
207 }
208
221 final public function isEmpty() {
222 $res = $this->doIsEmpty();
223
224 return $res;
225 }
226
231 abstract protected function doIsEmpty();
232
242 final public function getSize() {
243 $res = $this->doGetSize();
244
245 return $res;
246 }
247
252 abstract protected function doGetSize();
253
263 final public function getAcquiredCount() {
264 $res = $this->doGetAcquiredCount();
265
266 return $res;
267 }
268
273 abstract protected function doGetAcquiredCount();
274
285 final public function getDelayedCount() {
286 $res = $this->doGetDelayedCount();
287
288 return $res;
289 }
290
296 protected function doGetDelayedCount() {
297 return 0; // not implemented
298 }
299
309 final public function getAbandonedCount() {
310 $res = $this->doGetAbandonedCount();
311
312 return $res;
313 }
314
320 protected function doGetAbandonedCount() {
321 return 0; // not implemented
322 }
323
334 final public function push( $jobs, $flags = 0 ) {
335 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
336 $this->batchPush( $jobs, $flags );
337 }
338
349 final public function batchPush( array $jobs, $flags = 0 ) {
350 $this->assertNotReadOnly();
351
352 if ( $jobs === [] ) {
353 return; // nothing to do
354 }
355
356 foreach ( $jobs as $job ) {
357 $this->assertMatchingJobType( $job );
358 if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
359 throw new JobQueueError(
360 "Got delayed '{$job->getType()}' job; delays are not supported." );
361 }
362 }
363
364 $this->doBatchPush( $jobs, $flags );
365
366 foreach ( $jobs as $job ) {
367 if ( $job->isRootJob() ) {
368 $this->deduplicateRootJob( $job );
369 }
370 }
371 }
372
378 abstract protected function doBatchPush( array $jobs, $flags );
379
388 final public function pop() {
389 $this->assertNotReadOnly();
390
391 $job = $this->doPop();
392
393 // Flag this job as an old duplicate based on its "root" job...
394 try {
395 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
396 $this->incrStats( 'dupe_pops', $this->type );
397 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
398 }
399 } catch ( Exception $e ) {
400 // don't lose jobs over this
401 }
402
403 return $job;
404 }
405
410 abstract protected function doPop();
411
422 final public function ack( RunnableJob $job ) {
423 $this->assertNotReadOnly();
424 $this->assertMatchingJobType( $job );
425
426 $this->doAck( $job );
427 }
428
433 abstract protected function doAck( RunnableJob $job );
434
466 final public function deduplicateRootJob( IJobSpecification $job ) {
467 $this->assertNotReadOnly();
468 $this->assertMatchingJobType( $job );
469
470 return $this->doDeduplicateRootJob( $job );
471 }
472
481 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
482 if ( !$params ) {
483 throw new JobQueueError( "Cannot register root job; missing parameters." );
484 }
485
486 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
487 // Callers should call JobQueueGroup::push() before this method so that if the
488 // insert fails, the de-duplication registration will be aborted. Having only the
489 // de-duplication registration succeed would cause jobs to become no-ops without
490 // any actual jobs that made them redundant.
491 $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
492 if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
493 return true; // a newer version of this root job was enqueued
494 }
495
496 // Update the timestamp of the last root job started at the location...
497 return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
498 }
499
507 final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
508 $this->assertMatchingJobType( $job );
509
510 return $this->doIsRootJobOldDuplicate( $job );
511 }
512
520 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
521 if ( !$params ) {
522 return false; // job has no de-deplication info
523 }
524
525 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
526 // Get the last time this root job was enqueued
527 $timestamp = $this->wanCache->get( $key );
528 if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
529 // Update the timestamp of the last known root job started at the location...
530 $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
531 }
532
533 // Check if a new root job was started at the location after this one's...
534 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
535 }
536
541 protected function getRootJobCacheKey( $signature ) {
542 return $this->wanCache->makeGlobalKey(
543 'jobqueue',
544 $this->domain,
545 $this->type,
546 'rootjob',
547 $signature
548 );
549 }
550
558 final public function delete() {
559 $this->assertNotReadOnly();
560
561 $this->doDelete();
562 }
563
569 protected function doDelete() {
570 throw new JobQueueError( "This method is not implemented." );
571 }
572
581 final public function waitForBackups() {
582 $this->doWaitForBackups();
583 }
584
590 protected function doWaitForBackups() {
591 }
592
598 final public function flushCaches() {
599 $this->doFlushCaches();
600 }
601
607 protected function doFlushCaches() {
608 }
609
618 abstract public function getAllQueuedJobs();
619
629 public function getAllDelayedJobs() {
630 return new ArrayIterator( [] ); // not implemented
631 }
632
644 public function getAllAcquiredJobs() {
645 return new ArrayIterator( [] ); // not implemented
646 }
647
656 public function getAllAbandonedJobs() {
657 return new ArrayIterator( [] ); // not implemented
658 }
659
667 public function getCoalesceLocationInternal() {
668 return null;
669 }
670
680 final public function getSiblingQueuesWithJobs( array $types ) {
681 return $this->doGetSiblingQueuesWithJobs( $types );
682 }
683
690 protected function doGetSiblingQueuesWithJobs( array $types ) {
691 return null; // not supported
692 }
693
704 final public function getSiblingQueueSizes( array $types ) {
705 return $this->doGetSiblingQueueSizes( $types );
706 }
707
714 protected function doGetSiblingQueueSizes( array $types ) {
715 return null; // not supported
716 }
717
723 protected function factoryJob( $command, $params ) {
724 // @TODO: dependency inject this as a callback
725 return Job::factory( $command, $params );
726 }
727
731 protected function assertNotReadOnly() {
732 if ( $this->readOnlyReason !== false ) {
733 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
734 }
735 }
736
742 if ( $job->getType() !== $this->type ) {
743 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
744 }
745 }
746
755 protected function incrStats( $key, $type, $delta = 1 ) {
756 $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
757 $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
758 }
759}
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:35
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition JobQueue.php:221
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition JobQueue.php:755
string $order
Job priority for pop()
Definition JobQueue.php:41
__construct(array $params)
Definition JobQueue.php:76
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition JobQueue.php:309
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition JobQueue.php:581
const ROOTJOB_TTL
Definition JobQueue.php:58
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition JobQueue.php:334
doGetAbandonedCount()
Definition JobQueue.php:320
GlobalIdGenerator $idGenerator
Definition JobQueue.php:51
pop()
Pop a job off of the queue.
Definition JobQueue.php:388
int $claimTTL
Time to live in seconds.
Definition JobQueue.php:43
doDeduplicateRootJob(IJobSpecification $job)
Definition JobQueue.php:480
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition JobQueue.php:667
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition JobQueue.php:349
doGetSiblingQueueSizes(array $types)
Definition JobQueue.php:714
string $type
Job type.
Definition JobQueue.php:39
doFlushCaches()
Definition JobQueue.php:607
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition JobQueue.php:189
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition JobQueue.php:704
StatsdDataFactoryInterface $stats
Definition JobQueue.php:49
int $maxTries
Maximum number of times to try a job.
Definition JobQueue.php:45
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:125
doAck(RunnableJob $job)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition JobQueue.php:656
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition JobQueue.php:422
getReadOnlyReason()
Definition JobQueue.php:205
doBatchPush(array $jobs, $flags)
WANObjectCache $wanCache
Definition JobQueue.php:54
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition JobQueue.php:644
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition JobQueue.php:47
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition JobQueue.php:507
doGetSiblingQueuesWithJobs(array $types)
Definition JobQueue.php:690
doGetAcquiredCount()
factoryJob( $command, $params)
Definition JobQueue.php:723
const QOS_ATOMIC
Definition JobQueue.php:56
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition JobQueue.php:466
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition JobQueue.php:242
assertMatchingJobType(IJobSpecification $job)
Definition JobQueue.php:741
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition JobQueue.php:263
assertNotReadOnly()
Definition JobQueue.php:731
flushCaches()
Clear any process and persistent caches.
Definition JobQueue.php:598
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition JobQueue.php:629
doWaitForBackups()
Definition JobQueue.php:590
string $domain
DB domain ID.
Definition JobQueue.php:37
supportedOrders()
Get the allowed queue orders for configuration validation.
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition JobQueue.php:680
getRootJobCacheKey( $signature)
Definition JobQueue.php:541
delayedJobsEnabled()
Definition JobQueue.php:197
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition JobQueue.php:285
doGetDelayedCount()
Definition JobQueue.php:296
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition JobQueue.php:519
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition Job.php:70
Multi-datacenter aware caching interface.
Class for getting statistically unique IDs without a central coordinator.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
$command
Definition mcc.php:125
if(count( $args)< 1) $job