MediaWiki master
JobQueueGroup.php
Go to the documentation of this file.
1<?php
25
38 protected $cache;
39
41 protected $domain;
43 protected $readOnlyMode;
45 private $localJobClasses;
47 private $jobTypeConfiguration;
49 private $jobTypesExcludedFromDefaultQueue;
51 private $statsdDataFactory;
53 private $wanCache;
55 private $globalIdGenerator;
56
59
60 public const TYPE_DEFAULT = 1; // integer; jobs popped by default
61 private const TYPE_ANY = 2; // integer; any job
62
63 public const USE_CACHE = 1; // integer; use process or persistent cache
64
65 private const PROC_CACHE_TTL = 15; // integer; seconds
66
79 public function __construct(
80 $domain,
81 ReadOnlyMode $readOnlyMode,
82 ?array $localJobClasses,
83 array $jobTypeConfiguration,
84 array $jobTypesExcludedFromDefaultQueue,
85 IBufferingStatsdDataFactory $statsdDataFactory,
86 WANObjectCache $wanCache,
87 GlobalIdGenerator $globalIdGenerator
88 ) {
89 $this->domain = $domain;
90 $this->readOnlyMode = $readOnlyMode;
91 $this->cache = new MapCacheLRU( 10 );
92 $this->localJobClasses = $localJobClasses;
93 $this->jobTypeConfiguration = $jobTypeConfiguration;
94 $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
95 $this->statsdDataFactory = $statsdDataFactory;
96 $this->wanCache = $wanCache;
97 $this->globalIdGenerator = $globalIdGenerator;
98 }
99
106 public function get( $type ) {
107 $conf = [ 'domain' => $this->domain, 'type' => $type ];
108 $conf += $this->jobTypeConfiguration[$type] ?? $this->jobTypeConfiguration['default'];
109 if ( !isset( $conf['readOnlyReason'] ) ) {
110 $conf['readOnlyReason'] = $this->readOnlyMode->getConfiguredReason();
111 }
112
113 $conf['stats'] = $this->statsdDataFactory;
114 $conf['wanCache'] = $this->wanCache;
115 $conf['idGenerator'] = $this->globalIdGenerator;
116
117 return JobQueue::factory( $conf );
118 }
119
129 public function push( $jobs ) {
130 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
131 if ( $jobs === [] ) {
132 return;
133 }
134
135 $this->assertValidJobs( $jobs );
136
137 $jobsByType = []; // (job type => list of jobs)
138 foreach ( $jobs as $job ) {
139 $type = $job->getType();
140 if ( isset( $this->jobTypeConfiguration[$type] ) ) {
141 $jobsByType[$type][] = $job;
142 } else {
143 if (
144 isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
145 $this->jobTypeConfiguration['default']['typeAgnostic']
146 ) {
147 $jobsByType['default'][] = $job;
148 } else {
149 $jobsByType[$type][] = $job;
150 }
151 }
152 }
153
154 foreach ( $jobsByType as $type => $jobs ) {
155 $this->get( $type )->push( $jobs );
156 }
157
158 if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
159 $list = $this->cache->getField( 'queues-ready', 'list' );
160 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
161 $this->cache->clear( 'queues-ready' );
162 }
163 }
164
165 $cache = ObjectCache::getLocalClusterInstance();
166 $cache->set(
167 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
168 'true',
169 15
170 );
171 if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
172 $cache->set(
173 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
174 'true',
175 15
176 );
177 }
178 }
179
187 public function lazyPush( $jobs ) {
188 if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
189 $this->push( $jobs );
190 return;
191 }
192
193 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
194
195 // Throw errors now instead of on push(), when other jobs may be buffered
196 $this->assertValidJobs( $jobs );
197
198 DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
199 }
200
215 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
216 $job = false;
217
218 if ( !$this->localJobClasses ) {
219 throw new JobQueueError(
220 "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
221 }
222 if ( is_string( $qtype ) && !isset( $this->localJobClasses[$qtype] ) ) {
223 // Do not pop jobs if there is no class for the queue type
224 throw new JobQueueError( "Unrecognized job type '$qtype'." );
225 }
226
227 if ( is_string( $qtype ) ) { // specific job type
228 if ( !in_array( $qtype, $ignored ) ) {
229 $job = $this->get( $qtype )->pop();
230 }
231 } else { // any job in the "default" jobs types
232 if ( $flags & self::USE_CACHE ) {
233 if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
234 $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
235 }
236 $types = $this->cache->getField( 'queues-ready', 'list' );
237 } else {
238 $types = $this->getQueuesWithJobs();
239 }
240
241 if ( $qtype == self::TYPE_DEFAULT ) {
242 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
243 }
244
245 $types = array_diff( $types, $ignored ); // avoid selected types
246 shuffle( $types ); // avoid starvation
247
248 foreach ( $types as $type ) { // for each queue...
249 $job = $this->get( $type )->pop();
250 if ( $job ) { // found
251 break;
252 } else { // not found
253 $this->cache->clear( 'queues-ready' );
254 }
255 }
256 }
257
258 return $job;
259 }
260
267 public function ack( RunnableJob $job ) {
268 $this->get( $job->getType() )->ack( $job );
269 }
270
280 wfDeprecated( __METHOD__, '1.40' );
281 return true;
282 }
283
293 public function waitForBackups() {
294 wfDeprecated( __METHOD__, '1.41' );
295 // Try to avoid doing this more than once per queue storage medium
296 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
297 $this->get( $type )->waitForBackups();
298 }
299 }
300
308 public function getQueueTypes() {
309 if ( !$this->localJobClasses ) {
310 throw new JobQueueError( 'Cannot inspect job queue from foreign wiki' );
311 }
312 return array_keys( $this->localJobClasses );
313 }
314
322 public function getDefaultQueueTypes() {
323 return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
324 }
325
335 public function queuesHaveJobs( $type = self::TYPE_ANY ) {
336 $cache = ObjectCache::getLocalClusterInstance();
337 $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
338
339 $value = $cache->get( $key );
340 if ( $value === false ) {
341 $queues = $this->getQueuesWithJobs();
342 if ( $type == self::TYPE_DEFAULT ) {
343 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
344 }
345 $value = count( $queues ) ? 'true' : 'false';
346 $cache->add( $key, $value, 15 );
347 }
348
349 return ( $value === 'true' );
350 }
351
359 public function getQueuesWithJobs() {
360 $types = [];
361 foreach ( $this->getCoalescedQueues() as $info ) {
363 $queue = $info['queue'];
364 $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
365 if ( is_array( $nonEmpty ) ) { // batching features supported
366 $types = array_merge( $types, $nonEmpty );
367 } else { // we have to go through the queues in the bucket one-by-one
368 foreach ( $info['types'] as $type ) {
369 if ( !$this->get( $type )->isEmpty() ) {
370 $types[] = $type;
371 }
372 }
373 }
374 }
375
376 return $types;
377 }
378
386 public function getQueueSizes() {
387 $sizeMap = [];
388 foreach ( $this->getCoalescedQueues() as $info ) {
390 $queue = $info['queue'];
391 $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
392 if ( is_array( $sizes ) ) { // batching features supported
393 $sizeMap += $sizes;
394 } else { // we have to go through the queues in the bucket one-by-one
395 foreach ( $info['types'] as $type ) {
396 $sizeMap[$type] = $this->get( $type )->getSize();
397 }
398 }
399 }
400
401 return $sizeMap;
402 }
403
408 protected function getCoalescedQueues() {
409 if ( $this->coalescedQueues === null ) {
410 $this->coalescedQueues = [];
411 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
412 $conf['domain'] = $this->domain;
413 $conf['type'] = 'null';
414 $conf['stats'] = $this->statsdDataFactory;
415 $conf['wanCache'] = $this->wanCache;
416 $conf['idGenerator'] = $this->globalIdGenerator;
417
418 $queue = JobQueue::factory( $conf );
419 $loc = $queue->getCoalesceLocationInternal();
420 if ( !isset( $this->coalescedQueues[$loc] ) ) {
421 $this->coalescedQueues[$loc]['queue'] = $queue;
422 $this->coalescedQueues[$loc]['types'] = [];
423 }
424 if ( $type === 'default' ) {
425 $this->coalescedQueues[$loc]['types'] = array_merge(
426 $this->coalescedQueues[$loc]['types'],
427 array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
428 );
429 } else {
430 $this->coalescedQueues[$loc]['types'][] = $type;
431 }
432 }
433 }
434
435 return $this->coalescedQueues;
436 }
437
442 private function assertValidJobs( array $jobs ) {
443 foreach ( $jobs as $job ) {
444 if ( !( $job instanceof IJobSpecification ) ) {
445 $type = is_object( $job ) ? get_class( $job ) : gettype( $job );
446 throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
447 }
448 }
449 }
450}
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
Handle enqueueing of background jobs.
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki domain ID.
getQueueSizes()
Get the size of the queues for a list of job types.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop one job off a job queue.
ack(RunnableJob $job)
Acknowledge that a job was completed.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
ReadOnlyMode $readOnlyMode
Read only mode.
__construct( $domain, ReadOnlyMode $readOnlyMode, ?array $localJobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, IBufferingStatsdDataFactory $statsdDataFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
MapCacheLRU $cache
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
getQueueTypes()
Get the list of queue types.
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:150
Store key-value entries in a size-limited in-memory LRU cache.
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
Defer callable updates to run later in the PHP process.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Multi-datacenter aware caching interface.
Determine whether a site is currently in read-only mode.
Class for getting statistically unique IDs without a central coordinator.
MediaWiki adaptation of StatsdDataFactory that provides buffering functionality.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
if(count( $args)< 1) $job