Go to the documentation of this file.
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
67 $this->domain = $params[
'domain'] ?? $params[
'wiki'];
68 $this->type = $params[
'type'];
69 $this->claimTTL = $params[
'claimTTL'] ?? 0;
70 $this->maxTries = $params[
'maxTries'] ?? 3;
71 if ( isset( $params[
'order'] ) && $params[
'order'] !==
'any' ) {
72 $this->order = $params[
'order'];
77 throw new JobQueueError( __CLASS__ .
" does not support '{$this->order}' order." );
79 $this->readOnlyReason = $params[
'readOnlyReason'] ??
false;
114 final public static function factory( array $params ) {
115 $class = $params[
'class'];
116 if ( !class_exists( $class ) ) {
117 throw new JobQueueError(
"Invalid job queue class '$class'." );
119 $obj =
new $class( $params );
120 if ( !( $obj instanceof
self ) ) {
121 throw new JobQueueError(
"Class '$class' is not a " . __CLASS__ .
" class." );
318 final public function push( $jobs, $flags = 0 ) {
319 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
333 final public function batchPush( array $jobs, $flags = 0 ) {
336 if ( $jobs === [] ) {
340 foreach ( $jobs as
$job ) {
343 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
346 "Got delayed '{$job->getType()}' job; delays are not supported." );
352 foreach ( $jobs as
$job ) {
353 if (
$job->isRootJob() ) {
364 abstract protected function doBatchPush( array $jobs, $flags );
374 final public function pop() {
382 $this->
incrStats(
'dupe_pops', $this->type );
385 }
catch ( Exception $e ) {
396 abstract protected function doPop();
411 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
414 $this->
doAck( $job );
457 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
470 $params =
$job->hasRootJobParams() ?
$job->getRootJobParams() :
null;
472 throw new JobQueueError(
"Cannot register root job; missing parameters." );
480 $timestamp = $this->wanCache->get( $key );
481 if ( $timestamp !==
false && $timestamp >= $params[
'rootJobTimestamp'] ) {
486 return $this->wanCache->set( $key, $params[
'rootJobTimestamp'], self::ROOTJOB_TTL );
498 throw new JobQueueError(
"Got '{$job->getType()}' job; expected '{$this->type}'." );
510 $params =
$job->hasRootJobParams() ?
$job->getRootJobParams() :
null;
517 $timestamp = $this->wanCache->get( $key );
518 if ( $timestamp ===
false || $params[
'rootJobTimestamp'] > $timestamp ) {
520 $this->wanCache->set( $key, $params[
'rootJobTimestamp'], self::ROOTJOB_TTL );
524 return ( $timestamp && $timestamp > $params[
'rootJobTimestamp'] );
532 return $this->wanCache->makeGlobalKey(
548 final public function delete() {
559 throw new JobQueueError(
"This method is not implemented." );
616 return new ArrayIterator( [] );
630 return new ArrayIterator( [] );
641 return new ArrayIterator( [] );
713 if ( $this->readOnlyReason !==
false ) {
727 $this->stats->updateCount(
"jobqueue.{$key}.all", $delta );
728 $this->stats->updateCount(
"jobqueue.{$key}.{$type}", $delta );
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
int $claimTTL
Time to live in seconds.
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
doGetSiblingQueuesWithJobs(array $types)
string bool $readOnlyReason
Read only rationale (or false if r/w)
doBatchPush(array $jobs, $flags)
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
push( $jobs, $flags=0)
Push one or more jobs into the queue.
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
doGetSiblingQueueSizes(array $types)
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
supportedOrders()
Get the allowed queue orders for configuration validation.
ack(RunnableJob $job)
Acknowledge that a job was completed.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
StatsdDataFactoryInterface $stats
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Multi-datacenter aware caching interface.
static factory(array $params)
Get a job queue object of the specified type.
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
string $order
Job priority for pop()
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
getRootJobCacheKey( $signature)
if(count( $args)< 1) $job
flushCaches()
Clear any process and persistent caches.
int $maxTries
Maximum number of times to try a job.
pop()
Pop a job off of the queue.
Class to handle enqueueing and running of background jobs.
factoryJob( $command, $params)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doIsRootJobOldDuplicate(IJobSpecification $job)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
string $domain
DB domain ID.
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Interface for serializable objects that describe a job queue task.
doDeduplicateRootJob(IJobSpecification $job)
__construct(array $params)
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.