MediaWiki REL1_35
JobQueue.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25
35abstract class JobQueue {
37 protected $domain;
39 protected $type;
41 protected $order;
43 protected $claimTTL;
45 protected $maxTries;
47 protected $readOnlyReason;
49 protected $stats;
51 protected $idGenerator;
52
54 protected $wanCache;
55
56 protected const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
57
58 protected const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
59
76 protected function __construct( array $params ) {
77 $this->domain = $params['domain'] ?? $params['wiki']; // b/c
78 $this->type = $params['type'];
79 $this->claimTTL = $params['claimTTL'] ?? 0;
80 $this->maxTries = $params['maxTries'] ?? 3;
81 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
82 $this->order = $params['order'];
83 } else {
84 $this->order = $this->optimalOrder();
85 }
86 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
87 throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
88 }
89 $this->readOnlyReason = $params['readOnlyReason'] ?? false;
90 $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
91 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
92 $this->idGenerator = $params['idGenerator'];
93 }
94
125 final public static function factory( array $params ) {
126 $class = $params['class'];
127 if ( !class_exists( $class ) ) {
128 throw new JobQueueError( "Invalid job queue class '$class'." );
129 }
130
131 if ( !isset( $params['idGenerator'] ) ) {
132 wfDeprecated( __METHOD__ . ' called without "idGenerator" set', '1.35' );
133 $params['idGenerator'] = new GlobalIdGenerator(
134 sys_get_temp_dir(),
135 new EmptyBagOStuff(),
136 'shell_exec'
137 );
138 }
139
140 $obj = new $class( $params );
141 if ( !( $obj instanceof self ) ) {
142 throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
143 }
144
145 return $obj;
146 }
147
151 final public function getDomain() {
152 return $this->domain;
153 }
154
159 final public function getWiki() {
160 return WikiMap::getWikiIdFromDbDomain( $this->domain );
161 }
162
166 final public function getType() {
167 return $this->type;
168 }
169
173 final public function getOrder() {
174 return $this->order;
175 }
176
182 abstract protected function supportedOrders();
183
189 abstract protected function optimalOrder();
190
197 protected function supportsDelayedJobs() {
198 return false; // not implemented
199 }
200
205 final public function delayedJobsEnabled() {
206 return $this->supportsDelayedJobs();
207 }
208
213 public function getReadOnlyReason() {
215 }
216
229 final public function isEmpty() {
230 $res = $this->doIsEmpty();
231
232 return $res;
233 }
234
239 abstract protected function doIsEmpty();
240
250 final public function getSize() {
251 $res = $this->doGetSize();
252
253 return $res;
254 }
255
260 abstract protected function doGetSize();
261
271 final public function getAcquiredCount() {
272 $res = $this->doGetAcquiredCount();
273
274 return $res;
275 }
276
281 abstract protected function doGetAcquiredCount();
282
293 final public function getDelayedCount() {
294 $res = $this->doGetDelayedCount();
295
296 return $res;
297 }
298
304 protected function doGetDelayedCount() {
305 return 0; // not implemented
306 }
307
317 final public function getAbandonedCount() {
318 $res = $this->doGetAbandonedCount();
319
320 return $res;
321 }
322
328 protected function doGetAbandonedCount() {
329 return 0; // not implemented
330 }
331
342 final public function push( $jobs, $flags = 0 ) {
343 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
344 $this->batchPush( $jobs, $flags );
345 }
346
357 final public function batchPush( array $jobs, $flags = 0 ) {
358 $this->assertNotReadOnly();
359
360 if ( $jobs === [] ) {
361 return; // nothing to do
362 }
363
364 foreach ( $jobs as $job ) {
365 if ( $job->getType() !== $this->type ) {
366 throw new JobQueueError(
367 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
368 } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
369 throw new JobQueueError(
370 "Got delayed '{$job->getType()}' job; delays are not supported." );
371 }
372 }
373
374 $this->doBatchPush( $jobs, $flags );
375
376 foreach ( $jobs as $job ) {
377 if ( $job->isRootJob() ) {
378 $this->deduplicateRootJob( $job );
379 }
380 }
381 }
382
388 abstract protected function doBatchPush( array $jobs, $flags );
389
398 final public function pop() {
399 $this->assertNotReadOnly();
400
401 $job = $this->doPop();
402
403 // Flag this job as an old duplicate based on its "root" job...
404 try {
405 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
406 $this->incrStats( 'dupe_pops', $this->type );
407 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
408 }
409 } catch ( Exception $e ) {
410 // don't lose jobs over this
411 }
412
413 return $job;
414 }
415
420 abstract protected function doPop();
421
432 final public function ack( RunnableJob $job ) {
433 $this->assertNotReadOnly();
434 if ( $job->getType() !== $this->type ) {
435 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
436 }
437
438 $this->doAck( $job );
439 }
440
445 abstract protected function doAck( RunnableJob $job );
446
478 final public function deduplicateRootJob( IJobSpecification $job ) {
479 $this->assertNotReadOnly();
480 if ( $job->getType() !== $this->type ) {
481 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
482 }
483
484 return $this->doDeduplicateRootJob( $job );
485 }
486
495 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
496 if ( !$params ) {
497 throw new JobQueueError( "Cannot register root job; missing parameters." );
498 }
499
500 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
501 // Callers should call JobQueueGroup::push() before this method so that if the
502 // insert fails, the de-duplication registration will be aborted. Having only the
503 // de-duplication registration succeed would cause jobs to become no-ops without
504 // any actual jobs that made them redundant.
505 $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
506 if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
507 return true; // a newer version of this root job was enqueued
508 }
509
510 // Update the timestamp of the last root job started at the location...
511 return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
512 }
513
521 final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
522 if ( $job->getType() !== $this->type ) {
523 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
524 }
525
526 return $this->doIsRootJobOldDuplicate( $job );
527 }
528
536 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
537 if ( !$params ) {
538 return false; // job has no de-deplication info
539 }
540
541 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
542 // Get the last time this root job was enqueued
543 $timestamp = $this->wanCache->get( $key );
544 if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
545 // Update the timestamp of the last known root job started at the location...
546 $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
547 }
548
549 // Check if a new root job was started at the location after this one's...
550 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
551 }
552
557 protected function getRootJobCacheKey( $signature ) {
558 return $this->wanCache->makeGlobalKey(
559 'jobqueue',
560 $this->domain,
561 $this->type,
562 'rootjob',
563 $signature
564 );
565 }
566
574 final public function delete() {
575 $this->assertNotReadOnly();
576
577 $this->doDelete();
578 }
579
585 protected function doDelete() {
586 throw new JobQueueError( "This method is not implemented." );
587 }
588
597 final public function waitForBackups() {
598 $this->doWaitForBackups();
599 }
600
606 protected function doWaitForBackups() {
607 }
608
614 final public function flushCaches() {
615 $this->doFlushCaches();
616 }
617
623 protected function doFlushCaches() {
624 }
625
634 abstract public function getAllQueuedJobs();
635
645 public function getAllDelayedJobs() {
646 return new ArrayIterator( [] ); // not implemented
647 }
648
660 public function getAllAcquiredJobs() {
661 return new ArrayIterator( [] ); // not implemented
662 }
663
672 public function getAllAbandonedJobs() {
673 return new ArrayIterator( [] ); // not implemented
674 }
675
683 public function getCoalesceLocationInternal() {
684 return null;
685 }
686
696 final public function getSiblingQueuesWithJobs( array $types ) {
697 return $this->doGetSiblingQueuesWithJobs( $types );
698 }
699
706 protected function doGetSiblingQueuesWithJobs( array $types ) {
707 return null; // not supported
708 }
709
720 final public function getSiblingQueueSizes( array $types ) {
721 return $this->doGetSiblingQueueSizes( $types );
722 }
723
730 protected function doGetSiblingQueueSizes( array $types ) {
731 return null; // not supported
732 }
733
739 protected function factoryJob( $command, $params ) {
740 // @TODO: dependency inject this as a callback
741 return Job::factory( $command, $params );
742 }
743
747 protected function assertNotReadOnly() {
748 if ( $this->readOnlyReason !== false ) {
749 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
750 }
751 }
752
761 protected function incrStats( $key, $type, $delta = 1 ) {
762 $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
763 $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
764 }
765}
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that $function is deprecated.
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
A BagOStuff object with no objects in it.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:35
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition JobQueue.php:229
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition JobQueue.php:761
string $order
Job priority for pop()
Definition JobQueue.php:41
__construct(array $params)
Stable to call.
Definition JobQueue.php:76
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition JobQueue.php:317
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition JobQueue.php:597
const ROOTJOB_TTL
Definition JobQueue.php:58
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition JobQueue.php:342
doGetAbandonedCount()
Stable to override.
Definition JobQueue.php:328
GlobalIdGenerator $idGenerator
Definition JobQueue.php:51
pop()
Pop a job off of the queue.
Definition JobQueue.php:398
int $claimTTL
Time to live in seconds.
Definition JobQueue.php:43
doDeduplicateRootJob(IJobSpecification $job)
Stable to override.
Definition JobQueue.php:494
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition JobQueue.php:683
doDelete()
Stable to override.
Definition JobQueue.php:585
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition JobQueue.php:357
doGetSiblingQueueSizes(array $types)
Stable to override.
Definition JobQueue.php:730
string $type
Job type.
Definition JobQueue.php:39
doFlushCaches()
Stable to override.
Definition JobQueue.php:623
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition JobQueue.php:197
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition JobQueue.php:720
StatsdDataFactoryInterface $stats
Definition JobQueue.php:49
int $maxTries
Maximum number of times to try a job.
Definition JobQueue.php:45
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:125
doAck(RunnableJob $job)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition JobQueue.php:672
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition JobQueue.php:432
getReadOnlyReason()
Definition JobQueue.php:213
doBatchPush(array $jobs, $flags)
WANObjectCache $wanCache
Definition JobQueue.php:54
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition JobQueue.php:660
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition JobQueue.php:47
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition JobQueue.php:521
doGetSiblingQueuesWithJobs(array $types)
Stable to override.
Definition JobQueue.php:706
doGetAcquiredCount()
factoryJob( $command, $params)
Definition JobQueue.php:739
const QOS_ATOMIC
Definition JobQueue.php:56
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition JobQueue.php:478
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition JobQueue.php:250
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition JobQueue.php:271
assertNotReadOnly()
Definition JobQueue.php:747
flushCaches()
Clear any process and persistent caches.
Definition JobQueue.php:614
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition JobQueue.php:645
doWaitForBackups()
Stable to override.
Definition JobQueue.php:606
string $domain
DB domain ID.
Definition JobQueue.php:37
supportedOrders()
Get the allowed queue orders for configuration validation.
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition JobQueue.php:696
getRootJobCacheKey( $signature)
Definition JobQueue.php:557
delayedJobsEnabled()
Definition JobQueue.php:205
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition JobQueue.php:293
doGetDelayedCount()
Stable to override.
Definition JobQueue.php:304
doIsRootJobOldDuplicate(IJobSpecification $job)
Stable to override.
Definition JobQueue.php:535
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition Job.php:65
Multi-datacenter aware caching interface.
Class for getting statistically unique IDs without a central coordinator.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
$command
Definition mcc.php:125
if(count( $args)< 1) $job