MediaWiki master
JobQueueGroup.php
Go to the documentation of this file.
1<?php
26
39 protected $cache;
40
42 protected $domain;
44 protected $readOnlyMode;
46 private $localJobClasses;
48 private $jobTypeConfiguration;
50 private $jobTypesExcludedFromDefaultQueue;
52 private $statsdDataFactory;
54 private $wanCache;
56 private $globalIdGenerator;
57
60
61 public const TYPE_DEFAULT = 1; // integer; jobs popped by default
62 private const TYPE_ANY = 2; // integer; any job
63
64 public const USE_CACHE = 1; // integer; use process or persistent cache
65
66 private const PROC_CACHE_TTL = 15; // integer; seconds
67
81 public function __construct(
82 $domain,
83 ReadOnlyMode $readOnlyMode,
84 ?array $localJobClasses,
85 array $jobTypeConfiguration,
86 array $jobTypesExcludedFromDefaultQueue,
87 IBufferingStatsdDataFactory $statsdDataFactory,
88 WANObjectCache $wanCache,
89 GlobalIdGenerator $globalIdGenerator
90 ) {
91 $this->domain = $domain;
92 $this->readOnlyMode = $readOnlyMode;
93 $this->cache = new MapCacheLRU( 10 );
94 $this->localJobClasses = $localJobClasses;
95 $this->jobTypeConfiguration = $jobTypeConfiguration;
96 $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
97 $this->statsdDataFactory = $statsdDataFactory;
98 $this->wanCache = $wanCache;
99 $this->globalIdGenerator = $globalIdGenerator;
100 }
101
108 public function get( $type ) {
109 $conf = [ 'domain' => $this->domain, 'type' => $type ];
110 $conf += $this->jobTypeConfiguration[$type] ?? $this->jobTypeConfiguration['default'];
111 if ( !isset( $conf['readOnlyReason'] ) ) {
112 $conf['readOnlyReason'] = $this->readOnlyMode->getConfiguredReason();
113 }
114
115 $conf['stats'] = $this->statsdDataFactory;
116 $conf['wanCache'] = $this->wanCache;
117 $conf['idGenerator'] = $this->globalIdGenerator;
118
119 return JobQueue::factory( $conf );
120 }
121
131 public function push( $jobs ) {
132 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
133 if ( $jobs === [] ) {
134 return;
135 }
136
137 $this->assertValidJobs( $jobs );
138
139 $jobsByType = []; // (job type => list of jobs)
140 foreach ( $jobs as $job ) {
141 $type = $job->getType();
142 if ( isset( $this->jobTypeConfiguration[$type] ) ) {
143 $jobsByType[$type][] = $job;
144 } else {
145 if (
146 isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
147 $this->jobTypeConfiguration['default']['typeAgnostic']
148 ) {
149 $jobsByType['default'][] = $job;
150 } else {
151 $jobsByType[$type][] = $job;
152 }
153 }
154 }
155
156 foreach ( $jobsByType as $type => $jobs ) {
157 $this->get( $type )->push( $jobs );
158 }
159
160 if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
161 $list = $this->cache->getField( 'queues-ready', 'list' );
162 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
163 $this->cache->clear( 'queues-ready' );
164 }
165 }
166
167 $cache = MediaWikiServices::getInstance()->getObjectCacheFactory()->getLocalClusterInstance();
168 $cache->set(
169 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
170 'true',
171 15
172 );
173 if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
174 $cache->set(
175 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
176 'true',
177 15
178 );
179 }
180 }
181
189 public function lazyPush( $jobs ) {
190 if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
191 $this->push( $jobs );
192 return;
193 }
194
195 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
196
197 // Throw errors now instead of on push(), when other jobs may be buffered
198 $this->assertValidJobs( $jobs );
199
200 DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
201 }
202
217 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
218 $job = false;
219
220 if ( !$this->localJobClasses ) {
221 throw new JobQueueError(
222 "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
223 }
224 if ( is_string( $qtype ) && !isset( $this->localJobClasses[$qtype] ) ) {
225 // Do not pop jobs if there is no class for the queue type
226 throw new JobQueueError( "Unrecognized job type '$qtype'." );
227 }
228
229 if ( is_string( $qtype ) ) { // specific job type
230 if ( !in_array( $qtype, $ignored ) ) {
231 $job = $this->get( $qtype )->pop();
232 }
233 } else { // any job in the "default" jobs types
234 if ( $flags & self::USE_CACHE ) {
235 if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
236 $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
237 }
238 $types = $this->cache->getField( 'queues-ready', 'list' );
239 } else {
240 $types = $this->getQueuesWithJobs();
241 }
242
243 if ( $qtype == self::TYPE_DEFAULT ) {
244 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
245 }
246
247 $types = array_diff( $types, $ignored ); // avoid selected types
248 shuffle( $types ); // avoid starvation
249
250 foreach ( $types as $type ) { // for each queue...
251 $job = $this->get( $type )->pop();
252 if ( $job ) { // found
253 break;
254 } else { // not found
255 $this->cache->clear( 'queues-ready' );
256 }
257 }
258 }
259
260 return $job;
261 }
262
269 public function ack( RunnableJob $job ) {
270 $this->get( $job->getType() )->ack( $job );
271 }
272
282 wfDeprecated( __METHOD__, '1.40' );
283 return true;
284 }
285
295 public function waitForBackups() {
296 wfDeprecated( __METHOD__, '1.41' );
297 // Try to avoid doing this more than once per queue storage medium
298 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
299 $this->get( $type )->waitForBackups();
300 }
301 }
302
310 public function getQueueTypes() {
311 if ( !$this->localJobClasses ) {
312 throw new JobQueueError( 'Cannot inspect job queue from foreign wiki' );
313 }
314 return array_keys( $this->localJobClasses );
315 }
316
324 public function getDefaultQueueTypes() {
325 return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
326 }
327
337 public function queuesHaveJobs( $type = self::TYPE_ANY ) {
338 $cache = MediaWikiServices::getInstance()->getObjectCacheFactory()->getLocalClusterInstance();
339 $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
340
341 $value = $cache->get( $key );
342 if ( $value === false ) {
343 $queues = $this->getQueuesWithJobs();
344 if ( $type == self::TYPE_DEFAULT ) {
345 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
346 }
347 $value = count( $queues ) ? 'true' : 'false';
348 $cache->add( $key, $value, 15 );
349 }
350
351 return ( $value === 'true' );
352 }
353
361 public function getQueuesWithJobs() {
362 $types = [];
363 foreach ( $this->getCoalescedQueues() as $info ) {
365 $queue = $info['queue'];
366 $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
367 if ( is_array( $nonEmpty ) ) { // batching features supported
368 $types = array_merge( $types, $nonEmpty );
369 } else { // we have to go through the queues in the bucket one-by-one
370 foreach ( $info['types'] as $type ) {
371 if ( !$this->get( $type )->isEmpty() ) {
372 $types[] = $type;
373 }
374 }
375 }
376 }
377
378 return $types;
379 }
380
388 public function getQueueSizes() {
389 $sizeMap = [];
390 foreach ( $this->getCoalescedQueues() as $info ) {
392 $queue = $info['queue'];
393 $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
394 if ( is_array( $sizes ) ) { // batching features supported
395 $sizeMap += $sizes;
396 } else { // we have to go through the queues in the bucket one-by-one
397 foreach ( $info['types'] as $type ) {
398 $sizeMap[$type] = $this->get( $type )->getSize();
399 }
400 }
401 }
402
403 return $sizeMap;
404 }
405
410 protected function getCoalescedQueues() {
411 if ( $this->coalescedQueues === null ) {
412 $this->coalescedQueues = [];
413 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
414 $conf['domain'] = $this->domain;
415 $conf['type'] = 'null';
416 $conf['stats'] = $this->statsdDataFactory;
417 $conf['wanCache'] = $this->wanCache;
418 $conf['idGenerator'] = $this->globalIdGenerator;
419
420 $queue = JobQueue::factory( $conf );
421 $loc = $queue->getCoalesceLocationInternal();
422 if ( !isset( $this->coalescedQueues[$loc] ) ) {
423 $this->coalescedQueues[$loc]['queue'] = $queue;
424 $this->coalescedQueues[$loc]['types'] = [];
425 }
426 if ( $type === 'default' ) {
427 $this->coalescedQueues[$loc]['types'] = array_merge(
428 $this->coalescedQueues[$loc]['types'],
429 array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
430 );
431 } else {
432 $this->coalescedQueues[$loc]['types'][] = $type;
433 }
434 }
435 }
436
437 return $this->coalescedQueues;
438 }
439
444 private function assertValidJobs( array $jobs ) {
445 foreach ( $jobs as $job ) {
446 if ( !( $job instanceof IJobSpecification ) ) {
447 $type = is_object( $job ) ? get_class( $job ) : gettype( $job );
448 throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
449 }
450 }
451 }
452}
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
Handle enqueueing of background jobs.
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki domain ID.
getQueueSizes()
Get the size of the queues for a list of job types.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop one job off a job queue.
ack(RunnableJob $job)
Acknowledge that a job was completed.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
ReadOnlyMode $readOnlyMode
Read only mode.
__construct( $domain, ReadOnlyMode $readOnlyMode, ?array $localJobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, IBufferingStatsdDataFactory $statsdDataFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
MapCacheLRU $cache
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
getQueueTypes()
Get the list of queue types.
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:150
Store key-value entries in a size-limited in-memory LRU cache.
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
Defer callable updates to run later in the PHP process.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Service locator for MediaWiki core services.
Multi-datacenter aware caching interface.
Determine whether a site is currently in read-only mode.
Class for getting statistically unique IDs without a central coordinator.
MediaWiki adaptation of StatsdDataFactory that provides buffering functionality.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
if(count( $args)< 1) $job