MediaWiki master
JobQueueGroup.php
Go to the documentation of this file.
1<?php
21namespace MediaWiki\JobQueue;
22
23use InvalidArgumentException;
33
46 protected $cache;
47
49 protected $domain;
51 protected $readOnlyMode;
53 private $localJobClasses;
55 private $jobTypeConfiguration;
57 private $jobTypesExcludedFromDefaultQueue;
59 private $statsFactory;
61 private $wanCache;
63 private $globalIdGenerator;
64
67
68 public const TYPE_DEFAULT = 1; // integer; jobs popped by default
69 private const TYPE_ANY = 2; // integer; any job
70
71 public const USE_CACHE = 1; // integer; use process or persistent cache
72
73 private const PROC_CACHE_TTL = 15; // integer; seconds
74
88 public function __construct(
89 $domain,
91 ?array $localJobClasses,
92 array $jobTypeConfiguration,
93 array $jobTypesExcludedFromDefaultQueue,
94 StatsFactory $statsFactory,
95 WANObjectCache $wanCache,
96 GlobalIdGenerator $globalIdGenerator
97 ) {
98 $this->domain = $domain;
99 $this->readOnlyMode = $readOnlyMode;
100 $this->cache = new MapCacheLRU( 10 );
101 $this->localJobClasses = $localJobClasses;
102 $this->jobTypeConfiguration = $jobTypeConfiguration;
103 $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
104 $this->statsFactory = $statsFactory;
105 $this->wanCache = $wanCache;
106 $this->globalIdGenerator = $globalIdGenerator;
107 }
108
115 public function get( $type ) {
116 $conf = [ 'domain' => $this->domain, 'type' => $type ];
117 $conf += $this->jobTypeConfiguration[$type] ?? $this->jobTypeConfiguration['default'];
118 if ( !isset( $conf['readOnlyReason'] ) ) {
119 $conf['readOnlyReason'] = $this->readOnlyMode->getConfiguredReason();
120 }
121
122 $conf['stats'] = $this->statsFactory;
123 $conf['wanCache'] = $this->wanCache;
124 $conf['idGenerator'] = $this->globalIdGenerator;
125
126 return JobQueue::factory( $conf );
127 }
128
138 public function push( $jobs ) {
139 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
140 if ( $jobs === [] ) {
141 return;
142 }
143
144 $this->assertValidJobs( $jobs );
145
146 $jobsByType = []; // (job type => list of jobs)
147 foreach ( $jobs as $job ) {
148 $type = $job->getType();
149 if ( isset( $this->jobTypeConfiguration[$type] ) ) {
150 $jobsByType[$type][] = $job;
151 } else {
152 if (
153 isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
154 $this->jobTypeConfiguration['default']['typeAgnostic']
155 ) {
156 $jobsByType['default'][] = $job;
157 } else {
158 $jobsByType[$type][] = $job;
159 }
160 }
161 }
162
163 foreach ( $jobsByType as $type => $jobs ) {
164 $this->get( $type )->push( $jobs );
165 }
166
167 if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
168 $list = $this->cache->getField( 'queues-ready', 'list' );
169 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
170 $this->cache->clear( 'queues-ready' );
171 }
172 }
173
174 $cache = MediaWikiServices::getInstance()->getObjectCacheFactory()->getLocalClusterInstance();
175 $cache->set(
176 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
177 'true',
178 15
179 );
180 if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
181 $cache->set(
182 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
183 'true',
184 15
185 );
186 }
187 }
188
196 public function lazyPush( $jobs ) {
197 if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
198 $this->push( $jobs );
199 return;
200 }
201
202 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
203
204 // Throw errors now instead of on push(), when other jobs may be buffered
205 $this->assertValidJobs( $jobs );
206
207 DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
208 }
209
224 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
225 $job = false;
226
227 if ( !$this->localJobClasses ) {
228 throw new JobQueueError(
229 "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
230 }
231 if ( is_string( $qtype ) && !isset( $this->localJobClasses[$qtype] ) ) {
232 // Do not pop jobs if there is no class for the queue type
233 throw new JobQueueError( "Unrecognized job type '$qtype'." );
234 }
235
236 if ( is_string( $qtype ) ) { // specific job type
237 if ( !in_array( $qtype, $ignored ) ) {
238 $job = $this->get( $qtype )->pop();
239 }
240 } else { // any job in the "default" jobs types
241 if ( $flags & self::USE_CACHE ) {
242 if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
243 $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
244 }
245 $types = $this->cache->getField( 'queues-ready', 'list' );
246 } else {
247 $types = $this->getQueuesWithJobs();
248 }
249
250 if ( $qtype == self::TYPE_DEFAULT ) {
251 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
252 }
253
254 $types = array_diff( $types, $ignored ); // avoid selected types
255 shuffle( $types ); // avoid starvation
256
257 foreach ( $types as $type ) { // for each queue...
258 $job = $this->get( $type )->pop();
259 if ( $job ) { // found
260 break;
261 } else { // not found
262 $this->cache->clear( 'queues-ready' );
263 }
264 }
265 }
266
267 return $job;
268 }
269
276 public function ack( RunnableJob $job ) {
277 $this->get( $job->getType() )->ack( $job );
278 }
279
289 wfDeprecated( __METHOD__, '1.40' );
290 return true;
291 }
292
302 public function waitForBackups() {
303 wfDeprecated( __METHOD__, '1.41' );
304 // Try to avoid doing this more than once per queue storage medium
305 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
306 $this->get( $type )->waitForBackups();
307 }
308 }
309
317 public function getQueueTypes() {
318 if ( !$this->localJobClasses ) {
319 throw new JobQueueError( 'Cannot inspect job queue from foreign wiki' );
320 }
321 return array_keys( $this->localJobClasses );
322 }
323
331 public function getDefaultQueueTypes() {
332 return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
333 }
334
344 public function queuesHaveJobs( $type = self::TYPE_ANY ) {
345 $cache = MediaWikiServices::getInstance()->getObjectCacheFactory()->getLocalClusterInstance();
346 $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
347
348 $value = $cache->get( $key );
349 if ( $value === false ) {
350 $queues = $this->getQueuesWithJobs();
351 if ( $type == self::TYPE_DEFAULT ) {
352 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
353 }
354 $value = count( $queues ) ? 'true' : 'false';
355 $cache->add( $key, $value, 15 );
356 }
357
358 return ( $value === 'true' );
359 }
360
368 public function getQueuesWithJobs() {
369 $types = [];
370 foreach ( $this->getCoalescedQueues() as $info ) {
372 $queue = $info['queue'];
373 $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
374 if ( is_array( $nonEmpty ) ) { // batching features supported
375 $types = array_merge( $types, $nonEmpty );
376 } else { // we have to go through the queues in the bucket one-by-one
377 foreach ( $info['types'] as $type ) {
378 if ( !$this->get( $type )->isEmpty() ) {
379 $types[] = $type;
380 }
381 }
382 }
383 }
384
385 return $types;
386 }
387
395 public function getQueueSizes() {
396 $sizeMap = [];
397 foreach ( $this->getCoalescedQueues() as $info ) {
399 $queue = $info['queue'];
400 $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
401 if ( is_array( $sizes ) ) { // batching features supported
402 $sizeMap += $sizes;
403 } else { // we have to go through the queues in the bucket one-by-one
404 foreach ( $info['types'] as $type ) {
405 $sizeMap[$type] = $this->get( $type )->getSize();
406 }
407 }
408 }
409
410 return $sizeMap;
411 }
412
417 protected function getCoalescedQueues() {
418 if ( $this->coalescedQueues === null ) {
419 $this->coalescedQueues = [];
420 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
421 $conf['domain'] = $this->domain;
422 $conf['type'] = 'null';
423 $conf['stats'] = $this->statsFactory;
424 $conf['wanCache'] = $this->wanCache;
425 $conf['idGenerator'] = $this->globalIdGenerator;
426
427 $queue = JobQueue::factory( $conf );
428 $loc = $queue->getCoalesceLocationInternal();
429 if ( !isset( $this->coalescedQueues[$loc] ) ) {
430 $this->coalescedQueues[$loc]['queue'] = $queue;
431 $this->coalescedQueues[$loc]['types'] = [];
432 }
433 if ( $type === 'default' ) {
434 $this->coalescedQueues[$loc]['types'] = array_merge(
435 $this->coalescedQueues[$loc]['types'],
436 array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
437 );
438 } else {
439 $this->coalescedQueues[$loc]['types'][] = $type;
440 }
441 }
442 }
443
445 }
446
447 private function assertValidJobs( array $jobs ) {
448 foreach ( $jobs as $job ) {
449 if ( !( $job instanceof IJobSpecification ) ) {
450 $type = get_debug_type( $job );
451 throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
452 }
453 }
454 }
455}
456
458class_alias( JobQueueGroup::class, 'JobQueueGroup' );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
Defer callable updates to run later in the PHP process.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Handle enqueueing of background jobs.
ReadOnlyMode $readOnlyMode
Read only mode.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
getQueueSizes()
Get the size of the queues for a list of job types.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
push( $jobs)
Insert jobs into the respective queues of which they belong.
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
string $domain
Wiki domain ID.
__construct( $domain, ReadOnlyMode $readOnlyMode, ?array $localJobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, StatsFactory $statsFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
ack(RunnableJob $job)
Acknowledge that a job was completed.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop one job off a job queue.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
getQueueTypes()
Get the list of queue types.
getDefaultQueueTypes()
Get the list of default queue types.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:157
Service locator for MediaWiki core services.
static getInstance()
Returns the global default instance of the top level service locator.
Store key-value entries in a size-limited in-memory LRU cache.
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
Multi-datacenter aware caching interface.
Determine whether a site is currently in read-only mode.
This is the primary interface for validating metrics definitions, caching defined metrics,...
Class for getting statistically unique IDs without a central coordinator.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
if(count( $args)< 1) $job