MediaWiki REL1_34
JobQueue.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
24
31abstract class JobQueue {
33 protected $domain;
35 protected $type;
37 protected $order;
39 protected $claimTTL;
41 protected $maxTries;
43 protected $readOnlyReason;
45 protected $stats;
46
48 protected $wanCache;
49
50 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51
52 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53
66 protected function __construct( array $params ) {
67 $this->domain = $params['domain'] ?? $params['wiki']; // b/c
68 $this->type = $params['type'];
69 $this->claimTTL = $params['claimTTL'] ?? 0;
70 $this->maxTries = $params['maxTries'] ?? 3;
71 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
72 $this->order = $params['order'];
73 } else {
74 $this->order = $this->optimalOrder();
75 }
76 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
77 throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
78 }
79 $this->readOnlyReason = $params['readOnlyReason'] ?? false;
80 $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
81 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
82 }
83
114 final public static function factory( array $params ) {
115 $class = $params['class'];
116 if ( !class_exists( $class ) ) {
117 throw new JobQueueError( "Invalid job queue class '$class'." );
118 }
119 $obj = new $class( $params );
120 if ( !( $obj instanceof self ) ) {
121 throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
122 }
123
124 return $obj;
125 }
126
130 final public function getDomain() {
131 return $this->domain;
132 }
133
138 final public function getWiki() {
139 return WikiMap::getWikiIdFromDbDomain( $this->domain );
140 }
141
145 final public function getType() {
146 return $this->type;
147 }
148
152 final public function getOrder() {
153 return $this->order;
154 }
155
161 abstract protected function supportedOrders();
162
168 abstract protected function optimalOrder();
169
175 protected function supportsDelayedJobs() {
176 return false; // not implemented
177 }
178
183 final public function delayedJobsEnabled() {
184 return $this->supportsDelayedJobs();
185 }
186
191 public function getReadOnlyReason() {
193 }
194
207 final public function isEmpty() {
208 $res = $this->doIsEmpty();
209
210 return $res;
211 }
212
217 abstract protected function doIsEmpty();
218
228 final public function getSize() {
229 $res = $this->doGetSize();
230
231 return $res;
232 }
233
238 abstract protected function doGetSize();
239
249 final public function getAcquiredCount() {
250 $res = $this->doGetAcquiredCount();
251
252 return $res;
253 }
254
259 abstract protected function doGetAcquiredCount();
260
271 final public function getDelayedCount() {
272 $res = $this->doGetDelayedCount();
273
274 return $res;
275 }
276
281 protected function doGetDelayedCount() {
282 return 0; // not implemented
283 }
284
294 final public function getAbandonedCount() {
295 $res = $this->doGetAbandonedCount();
296
297 return $res;
298 }
299
304 protected function doGetAbandonedCount() {
305 return 0; // not implemented
306 }
307
318 final public function push( $jobs, $flags = 0 ) {
319 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
320 $this->batchPush( $jobs, $flags );
321 }
322
333 final public function batchPush( array $jobs, $flags = 0 ) {
334 $this->assertNotReadOnly();
335
336 if ( $jobs === [] ) {
337 return; // nothing to do
338 }
339
340 foreach ( $jobs as $job ) {
341 if ( $job->getType() !== $this->type ) {
342 throw new JobQueueError(
343 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
344 } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
345 throw new JobQueueError(
346 "Got delayed '{$job->getType()}' job; delays are not supported." );
347 }
348 }
349
350 $this->doBatchPush( $jobs, $flags );
351
352 foreach ( $jobs as $job ) {
353 if ( $job->isRootJob() ) {
354 $this->deduplicateRootJob( $job );
355 }
356 }
357 }
358
364 abstract protected function doBatchPush( array $jobs, $flags );
365
374 final public function pop() {
375 $this->assertNotReadOnly();
376
377 $job = $this->doPop();
378
379 // Flag this job as an old duplicate based on its "root" job...
380 try {
381 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
382 $this->incrStats( 'dupe_pops', $this->type );
383 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
384 }
385 } catch ( Exception $e ) {
386 // don't lose jobs over this
387 }
388
389 return $job;
390 }
391
396 abstract protected function doPop();
397
408 final public function ack( RunnableJob $job ) {
409 $this->assertNotReadOnly();
410 if ( $job->getType() !== $this->type ) {
411 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
412 }
413
414 $this->doAck( $job );
415 }
416
421 abstract protected function doAck( RunnableJob $job );
422
454 final public function deduplicateRootJob( IJobSpecification $job ) {
455 $this->assertNotReadOnly();
456 if ( $job->getType() !== $this->type ) {
457 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
458 }
459
460 return $this->doDeduplicateRootJob( $job );
461 }
462
470 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
471 if ( !$params ) {
472 throw new JobQueueError( "Cannot register root job; missing parameters." );
473 }
474
475 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
476 // Callers should call JobQueueGroup::push() before this method so that if the
477 // insert fails, the de-duplication registration will be aborted. Having only the
478 // de-duplication registration succeed would cause jobs to become no-ops without
479 // any actual jobs that made them redundant.
480 $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
481 if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
482 return true; // a newer version of this root job was enqueued
483 }
484
485 // Update the timestamp of the last root job started at the location...
486 return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
487 }
488
496 final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
497 if ( $job->getType() !== $this->type ) {
498 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
499 }
500
501 return $this->doIsRootJobOldDuplicate( $job );
502 }
503
510 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
511 if ( !$params ) {
512 return false; // job has no de-deplication info
513 }
514
515 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
516 // Get the last time this root job was enqueued
517 $timestamp = $this->wanCache->get( $key );
518 if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
519 // Update the timestamp of the last known root job started at the location...
520 $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
521 }
522
523 // Check if a new root job was started at the location after this one's...
524 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
525 }
526
531 protected function getRootJobCacheKey( $signature ) {
532 return $this->wanCache->makeGlobalKey(
533 'jobqueue',
534 $this->domain,
535 $this->type,
536 'rootjob',
537 $signature
538 );
539 }
540
548 final public function delete() {
549 $this->assertNotReadOnly();
550
551 $this->doDelete();
552 }
553
558 protected function doDelete() {
559 throw new JobQueueError( "This method is not implemented." );
560 }
561
570 final public function waitForBackups() {
571 $this->doWaitForBackups();
572 }
573
578 protected function doWaitForBackups() {
579 }
580
586 final public function flushCaches() {
587 $this->doFlushCaches();
588 }
589
594 protected function doFlushCaches() {
595 }
596
605 abstract public function getAllQueuedJobs();
606
615 public function getAllDelayedJobs() {
616 return new ArrayIterator( [] ); // not implemented
617 }
618
629 public function getAllAcquiredJobs() {
630 return new ArrayIterator( [] ); // not implemented
631 }
632
640 public function getAllAbandonedJobs() {
641 return new ArrayIterator( [] ); // not implemented
642 }
643
650 public function getCoalesceLocationInternal() {
651 return null;
652 }
653
663 final public function getSiblingQueuesWithJobs( array $types ) {
664 return $this->doGetSiblingQueuesWithJobs( $types );
665 }
666
672 protected function doGetSiblingQueuesWithJobs( array $types ) {
673 return null; // not supported
674 }
675
686 final public function getSiblingQueueSizes( array $types ) {
687 return $this->doGetSiblingQueueSizes( $types );
688 }
689
695 protected function doGetSiblingQueueSizes( array $types ) {
696 return null; // not supported
697 }
698
704 protected function factoryJob( $command, $params ) {
705 // @TODO: dependency inject this as a callback
706 return Job::factory( $command, $params );
707 }
708
712 protected function assertNotReadOnly() {
713 if ( $this->readOnlyReason !== false ) {
714 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
715 }
716 }
717
726 protected function incrStats( $key, $type, $delta = 1 ) {
727 $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
728 $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
729 }
730}
$command
Definition cdb.php:65
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:31
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition JobQueue.php:207
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition JobQueue.php:726
string $order
Job priority for pop()
Definition JobQueue.php:37
__construct(array $params)
Definition JobQueue.php:66
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition JobQueue.php:294
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition JobQueue.php:570
const ROOTJOB_TTL
Definition JobQueue.php:52
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition JobQueue.php:318
doGetAbandonedCount()
Definition JobQueue.php:304
pop()
Pop a job off of the queue.
Definition JobQueue.php:374
int $claimTTL
Time to live in seconds.
Definition JobQueue.php:39
doDeduplicateRootJob(IJobSpecification $job)
Definition JobQueue.php:469
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition JobQueue.php:650
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition JobQueue.php:333
doGetSiblingQueueSizes(array $types)
Definition JobQueue.php:695
string $type
Job type.
Definition JobQueue.php:35
doFlushCaches()
Definition JobQueue.php:594
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition JobQueue.php:175
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition JobQueue.php:686
StatsdDataFactoryInterface $stats
Definition JobQueue.php:45
int $maxTries
Maximum number of times to try a job.
Definition JobQueue.php:41
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:114
doAck(RunnableJob $job)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition JobQueue.php:640
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition JobQueue.php:408
getReadOnlyReason()
Definition JobQueue.php:191
doBatchPush(array $jobs, $flags)
WANObjectCache $wanCache
Definition JobQueue.php:48
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition JobQueue.php:629
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition JobQueue.php:43
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition JobQueue.php:496
doGetSiblingQueuesWithJobs(array $types)
Definition JobQueue.php:672
doGetAcquiredCount()
factoryJob( $command, $params)
Definition JobQueue.php:704
const QOS_ATOMIC
Definition JobQueue.php:50
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition JobQueue.php:454
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition JobQueue.php:228
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition JobQueue.php:249
assertNotReadOnly()
Definition JobQueue.php:712
flushCaches()
Clear any process and persistent caches.
Definition JobQueue.php:586
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition JobQueue.php:615
doWaitForBackups()
Definition JobQueue.php:578
string $domain
DB domain ID.
Definition JobQueue.php:33
supportedOrders()
Get the allowed queue orders for configuration validation.
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition JobQueue.php:663
getRootJobCacheKey( $signature)
Definition JobQueue.php:531
delayedJobsEnabled()
Definition JobQueue.php:183
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition JobQueue.php:271
doGetDelayedCount()
Definition JobQueue.php:281
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition JobQueue.php:509
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition Job.php:63
Multi-datacenter aware caching interface.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
if(count( $args)< 1) $job