45 private $localJobClasses;
47 private $jobTypeConfiguration;
49 private $jobTypesExcludedFromDefaultQueue;
51 private $statsdDataFactory;
55 private $globalIdGenerator;
61 private const TYPE_ANY = 2;
65 private const PROC_CACHE_TTL = 15;
82 ?array $localJobClasses,
83 array $jobTypeConfiguration,
84 array $jobTypesExcludedFromDefaultQueue,
89 $this->domain = $domain;
90 $this->readOnlyMode = $readOnlyMode;
92 $this->localJobClasses = $localJobClasses;
93 $this->jobTypeConfiguration = $jobTypeConfiguration;
94 $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
95 $this->statsdDataFactory = $statsdDataFactory;
96 $this->wanCache = $wanCache;
97 $this->globalIdGenerator = $globalIdGenerator;
106 public function get( $type ) {
107 $conf = [
'domain' => $this->domain,
'type' => $type ];
108 $conf += $this->jobTypeConfiguration[$type] ?? $this->jobTypeConfiguration[
'default'];
109 if ( !isset( $conf[
'readOnlyReason'] ) ) {
110 $conf[
'readOnlyReason'] = $this->readOnlyMode->getConfiguredReason();
113 $conf[
'stats'] = $this->statsdDataFactory;
114 $conf[
'wanCache'] = $this->wanCache;
115 $conf[
'idGenerator'] = $this->globalIdGenerator;
129 public function push( $jobs ) {
130 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
131 if ( $jobs === [] ) {
135 $this->assertValidJobs( $jobs );
138 foreach ( $jobs as
$job ) {
139 $type =
$job->getType();
140 if ( isset( $this->jobTypeConfiguration[$type] ) ) {
141 $jobsByType[$type][] =
$job;
144 isset( $this->jobTypeConfiguration[
'default'][
'typeAgnostic'] ) &&
145 $this->jobTypeConfiguration[
'default'][
'typeAgnostic']
147 $jobsByType[
'default'][] =
$job;
149 $jobsByType[$type][] =
$job;
154 foreach ( $jobsByType as $type => $jobs ) {
155 $this->
get( $type )->
push( $jobs );
158 if ( $this->cache->hasField(
'queues-ready',
'list' ) ) {
159 $list = $this->cache->getField(
'queues-ready',
'list' );
160 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
161 $this->cache->clear(
'queues-ready' );
165 $cache = ObjectCache::getLocalClusterInstance();
167 $cache->makeGlobalKey(
'jobqueue', $this->domain,
'hasjobs', self::TYPE_ANY ),
171 if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
173 $cache->makeGlobalKey(
'jobqueue', $this->domain,
'hasjobs', self::TYPE_DEFAULT ),
188 if ( PHP_SAPI ===
'cli' || PHP_SAPI ===
'phpdbg' ) {
189 $this->
push( $jobs );
193 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
196 $this->assertValidJobs( $jobs );
215 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
218 if ( !$this->localJobClasses ) {
220 "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
222 if ( is_string( $qtype ) && !isset( $this->localJobClasses[$qtype] ) ) {
224 throw new JobQueueError(
"Unrecognized job type '$qtype'." );
227 if ( is_string( $qtype ) ) {
228 if ( !in_array( $qtype, $ignored ) ) {
229 $job = $this->
get( $qtype )->
pop();
232 if ( $flags & self::USE_CACHE ) {
233 if ( !$this->cache->hasField(
'queues-ready',
'list', self::PROC_CACHE_TTL ) ) {
236 $types = $this->cache->getField(
'queues-ready',
'list' );
241 if ( $qtype == self::TYPE_DEFAULT ) {
245 $types = array_diff( $types, $ignored );
248 foreach ( $types as $type ) {
249 $job = $this->
get( $type )->
pop();
253 $this->cache->clear(
'queues-ready' );
296 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
309 if ( !$this->localJobClasses ) {
310 throw new JobQueueError(
'Cannot inspect job queue from foreign wiki' );
312 return array_keys( $this->localJobClasses );
323 return array_diff( $this->
getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
336 $cache = ObjectCache::getLocalClusterInstance();
337 $key = $cache->makeGlobalKey(
'jobqueue', $this->domain,
'hasjobs', $type );
339 $value = $cache->
get( $key );
340 if ( $value ===
false ) {
342 if ( $type == self::TYPE_DEFAULT ) {
345 $value = count( $queues ) ?
'true' :
'false';
346 $cache->add( $key, $value, 15 );
349 return ( $value ===
'true' );
363 $queue = $info[
'queue'];
364 $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->
getQueueTypes() );
365 if ( is_array( $nonEmpty ) ) {
366 $types = array_merge( $types, $nonEmpty );
368 foreach ( $info[
'types'] as $type ) {
369 if ( !$this->
get( $type )->isEmpty() ) {
390 $queue = $info[
'queue'];
391 $sizes = $queue->getSiblingQueueSizes( $this->
getQueueTypes() );
392 if ( is_array( $sizes ) ) {
395 foreach ( $info[
'types'] as $type ) {
396 $sizeMap[$type] = $this->
get( $type )->getSize();
409 if ( $this->coalescedQueues ===
null ) {
410 $this->coalescedQueues = [];
411 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
412 $conf[
'domain'] = $this->domain;
413 $conf[
'type'] =
'null';
414 $conf[
'stats'] = $this->statsdDataFactory;
415 $conf[
'wanCache'] = $this->wanCache;
416 $conf[
'idGenerator'] = $this->globalIdGenerator;
419 $loc = $queue->getCoalesceLocationInternal();
420 if ( !isset( $this->coalescedQueues[$loc] ) ) {
421 $this->coalescedQueues[$loc][
'queue'] = $queue;
422 $this->coalescedQueues[$loc][
'types'] = [];
424 if ( $type ===
'default' ) {
425 $this->coalescedQueues[$loc][
'types'] = array_merge(
426 $this->coalescedQueues[$loc][
'types'],
427 array_diff( $this->
getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
430 $this->coalescedQueues[$loc][
'types'][] = $type;
435 return $this->coalescedQueues;
442 private function assertValidJobs( array $jobs ) {
443 foreach ( $jobs as
$job ) {
445 $type = is_object(
$job ) ? get_class(
$job ) : gettype(
$job );
446 throw new InvalidArgumentException(
"Expected IJobSpecification objects, got " . $type );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
Handle enqueueing of background jobs.
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki domain ID.
getQueueSizes()
Get the size of the queues for a list of job types.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop one job off a job queue.
ack(RunnableJob $job)
Acknowledge that a job was completed.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
ReadOnlyMode $readOnlyMode
Read only mode.
__construct( $domain, ReadOnlyMode $readOnlyMode, ?array $localJobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, IBufferingStatsdDataFactory $statsdDataFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
getQueueTypes()
Get the list of queue types.
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
static factory(array $params)
Get a job queue object of the specified type.
Store key-value entries in a size-limited in-memory LRU cache.
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
Multi-datacenter aware caching interface.
MediaWiki adaptation of StatsdDataFactory that provides buffering functionality.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
if(count( $args)< 1) $job