23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
77 $this->domain = $params[
'domain'] ?? $params[
'wiki'];
78 $this->type = $params[
'type'];
79 $this->claimTTL = $params[
'claimTTL'] ?? 0;
80 $this->maxTries = $params[
'maxTries'] ?? 3;
81 if ( isset( $params[
'order'] ) && $params[
'order'] !==
'any' ) {
82 $this->order = $params[
'order'];
87 throw new JobQueueError( __CLASS__ .
" does not support '{$this->order}' order." );
89 $this->readOnlyReason = $params[
'readOnlyReason'] ??
false;
91 $this->wanCache = $params[
'wanCache'] ?? WANObjectCache::newEmpty();
92 $this->idGenerator = $params[
'idGenerator'];
125 final public static function factory( array $params ) {
126 $class = $params[
'class'];
127 if ( !class_exists( $class ) ) {
128 throw new JobQueueError(
"Invalid job queue class '$class'." );
131 if ( !isset( $params[
'idGenerator'] ) ) {
132 wfDeprecated( __METHOD__ .
' called without "idGenerator" set',
'1.35' );
140 $obj =
new $class( $params );
141 if ( !( $obj instanceof
self ) ) {
142 throw new JobQueueError(
"Class '$class' is not a " . __CLASS__ .
" class." );
160 return WikiMap::getWikiIdFromDbDomain( $this->domain );
342 final public function push( $jobs, $flags = 0 ) {
343 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
357 final public function batchPush( array $jobs, $flags = 0 ) {
360 if ( $jobs === [] ) {
364 foreach ( $jobs as
$job ) {
365 if (
$job->getType() !== $this->type ) {
367 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
368 } elseif (
$job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
370 "Got delayed '{$job->getType()}' job; delays are not supported." );
376 foreach ( $jobs as
$job ) {
377 if (
$job->isRootJob() ) {
398 final public function pop() {
406 $this->
incrStats(
'dupe_pops', $this->type );
409 }
catch ( Exception $e ) {
420 abstract protected function doPop();
434 if ( $job->
getType() !== $this->type ) {
435 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
438 $this->
doAck( $job );
480 if ( $job->
getType() !== $this->type ) {
481 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
495 $params =
$job->hasRootJobParams() ?
$job->getRootJobParams() :
null;
497 throw new JobQueueError(
"Cannot register root job; missing parameters." );
505 $timestamp = $this->wanCache->get( $key );
506 if ( $timestamp !==
false && $timestamp >= $params[
'rootJobTimestamp'] ) {
511 return $this->wanCache->set( $key, $params[
'rootJobTimestamp'], self::ROOTJOB_TTL );
522 if (
$job->getType() !== $this->type ) {
523 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
536 $params =
$job->hasRootJobParams() ?
$job->getRootJobParams() :
null;
543 $timestamp = $this->wanCache->get( $key );
544 if ( $timestamp ===
false || $params[
'rootJobTimestamp'] > $timestamp ) {
546 $this->wanCache->set( $key, $params[
'rootJobTimestamp'], self::ROOTJOB_TTL );
550 return ( $timestamp && $timestamp > $params[
'rootJobTimestamp'] );
558 return $this->wanCache->makeGlobalKey(
574 final public function delete() {
586 throw new JobQueueError(
"This method is not implemented." );
646 return new ArrayIterator( [] );
661 return new ArrayIterator( [] );
673 return new ArrayIterator( [] );
748 if ( $this->readOnlyReason !==
false ) {
762 $this->stats->updateCount(
"jobqueue.{$key}.all", $delta );
763 $this->stats->updateCount(
"jobqueue.{$key}.{$type}", $delta );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that $function is deprecated.
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
A BagOStuff object with no objects in it.
Class to handle enqueueing and running of background jobs.
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
string $order
Job priority for pop()
__construct(array $params)
Stable to call.
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
push( $jobs, $flags=0)
Push one or more jobs into the queue.
doGetAbandonedCount()
Stable to override.
GlobalIdGenerator $idGenerator
pop()
Pop a job off of the queue.
int $claimTTL
Time to live in seconds.
doDeduplicateRootJob(IJobSpecification $job)
Stable to override.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doDelete()
Stable to override.
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
doGetSiblingQueueSizes(array $types)
Stable to override.
doFlushCaches()
Stable to override.
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
StatsdDataFactoryInterface $stats
int $maxTries
Maximum number of times to try a job.
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
ack(RunnableJob $job)
Acknowledge that a job was completed.
doBatchPush(array $jobs, $flags)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
string bool $readOnlyReason
Read only rationale (or false if r/w)
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
doGetSiblingQueuesWithJobs(array $types)
Stable to override.
factoryJob( $command, $params)
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
flushCaches()
Clear any process and persistent caches.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doWaitForBackups()
Stable to override.
string $domain
DB domain ID.
supportedOrders()
Get the allowed queue orders for configuration validation.
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
getRootJobCacheKey( $signature)
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
doGetDelayedCount()
Stable to override.
doIsRootJobOldDuplicate(IJobSpecification $job)
Stable to override.
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Multi-datacenter aware caching interface.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
if(count( $args)< 1) $job