23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
67 $this->domain = $params[
'domain'] ?? $params[
'wiki'];
68 $this->type = $params[
'type'];
69 $this->claimTTL = $params[
'claimTTL'] ?? 0;
70 $this->maxTries = $params[
'maxTries'] ?? 3;
71 if ( isset( $params[
'order'] ) && $params[
'order'] !==
'any' ) {
72 $this->order = $params[
'order'];
77 throw new JobQueueError( __CLASS__ .
" does not support '{$this->order}' order." );
79 $this->readOnlyReason = $params[
'readOnlyReason'] ??
false;
81 $this->wanCache = $params[
'wanCache'] ?? WANObjectCache::newEmpty();
114 final public static function factory( array $params ) {
115 $class = $params[
'class'];
116 if ( !class_exists( $class ) ) {
117 throw new JobQueueError(
"Invalid job queue class '$class'." );
119 $obj =
new $class( $params );
120 if ( !( $obj instanceof
self ) ) {
121 throw new JobQueueError(
"Class '$class' is not a " . __CLASS__ .
" class." );
139 return WikiMap::getWikiIdFromDbDomain( $this->domain );
318 final public function push( $jobs, $flags = 0 ) {
319 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
333 final public function batchPush( array $jobs, $flags = 0 ) {
336 if ( $jobs === [] ) {
340 foreach ( $jobs as
$job ) {
341 if (
$job->getType() !== $this->type ) {
343 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
344 } elseif (
$job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
346 "Got delayed '{$job->getType()}' job; delays are not supported." );
352 foreach ( $jobs as
$job ) {
353 if (
$job->isRootJob() ) {
374 final public function pop() {
382 $this->
incrStats(
'dupe_pops', $this->type );
385 }
catch ( Exception $e ) {
396 abstract protected function doPop();
410 if ( $job->
getType() !== $this->type ) {
411 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
414 $this->
doAck( $job );
456 if ( $job->
getType() !== $this->type ) {
457 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
470 $params =
$job->hasRootJobParams() ?
$job->getRootJobParams() :
null;
472 throw new JobQueueError(
"Cannot register root job; missing parameters." );
480 $timestamp = $this->wanCache->get( $key );
481 if ( $timestamp !==
false && $timestamp >= $params[
'rootJobTimestamp'] ) {
486 return $this->wanCache->set( $key, $params[
'rootJobTimestamp'], self::ROOTJOB_TTL );
497 if (
$job->getType() !== $this->type ) {
498 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
510 $params =
$job->hasRootJobParams() ?
$job->getRootJobParams() :
null;
517 $timestamp = $this->wanCache->get( $key );
518 if ( $timestamp ===
false || $params[
'rootJobTimestamp'] > $timestamp ) {
520 $this->wanCache->set( $key, $params[
'rootJobTimestamp'], self::ROOTJOB_TTL );
524 return ( $timestamp && $timestamp > $params[
'rootJobTimestamp'] );
532 return $this->wanCache->makeGlobalKey(
548 final public function delete() {
559 throw new JobQueueError(
"This method is not implemented." );
616 return new ArrayIterator( [] );
630 return new ArrayIterator( [] );
641 return new ArrayIterator( [] );
713 if ( $this->readOnlyReason !==
false ) {
727 $this->stats->updateCount(
"jobqueue.{$key}.all", $delta );
728 $this->stats->updateCount(
"jobqueue.{$key}.{$type}", $delta );
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
Class to handle enqueueing and running of background jobs.
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
string $order
Job priority for pop()
__construct(array $params)
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
push( $jobs, $flags=0)
Push one or more jobs into the queue.
pop()
Pop a job off of the queue.
int $claimTTL
Time to live in seconds.
doDeduplicateRootJob(IJobSpecification $job)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
doGetSiblingQueueSizes(array $types)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
StatsdDataFactoryInterface $stats
int $maxTries
Maximum number of times to try a job.
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
ack(RunnableJob $job)
Acknowledge that a job was completed.
doBatchPush(array $jobs, $flags)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
string bool $readOnlyReason
Read only rationale (or false if r/w)
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
doGetSiblingQueuesWithJobs(array $types)
factoryJob( $command, $params)
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
flushCaches()
Clear any process and persistent caches.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
string $domain
DB domain ID.
supportedOrders()
Get the allowed queue orders for configuration validation.
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
getRootJobCacheKey( $signature)
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
doIsRootJobOldDuplicate(IJobSpecification $job)
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Multi-datacenter aware caching interface.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
if(count( $args)< 1) $job