MediaWiki master
JobQueueGroup.php
Go to the documentation of this file.
1<?php
7namespace MediaWiki\JobQueue;
8
9use InvalidArgumentException;
10use LogicException;
20
33 protected $cache;
34
36 protected $domain;
38 protected $readOnlyMode;
40 private $localJobClasses;
42 private $jobTypeConfiguration;
44 private $jobTypesExcludedFromDefaultQueue;
46 private $statsFactory;
48 private $wanCache;
50 private $globalIdGenerator;
51
54
55 public const TYPE_DEFAULT = 1; // integer; jobs popped by default
56 private const TYPE_ANY = 2; // integer; any job
57
58 public const USE_CACHE = 1; // integer; use process or persistent cache
59
60 private const PROC_CACHE_TTL = 15; // integer; seconds
61
74 public function __construct(
75 $domain,
77 ?array $localJobClasses,
78 array $jobTypeConfiguration,
79 array $jobTypesExcludedFromDefaultQueue,
80 StatsFactory $statsFactory,
81 WANObjectCache $wanCache,
82 GlobalIdGenerator $globalIdGenerator
83 ) {
84 $this->domain = $domain;
85 $this->readOnlyMode = $readOnlyMode;
86 $this->cache = new MapCacheLRU( 10 );
87 $this->localJobClasses = $localJobClasses;
88 $this->jobTypeConfiguration = $jobTypeConfiguration;
89 $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
90 $this->statsFactory = $statsFactory;
91 $this->wanCache = $wanCache;
92 $this->globalIdGenerator = $globalIdGenerator;
93 }
94
101 public function get( $type ) {
102 $conf = [ 'domain' => $this->domain, 'type' => $type ];
103 $conf += $this->jobTypeConfiguration[$type] ?? $this->jobTypeConfiguration['default'];
104 if ( !isset( $conf['readOnlyReason'] ) ) {
105 $conf['readOnlyReason'] = $this->readOnlyMode->getConfiguredReason();
106 }
107
108 $conf['stats'] = $this->statsFactory;
109 $conf['wanCache'] = $this->wanCache;
110 $conf['idGenerator'] = $this->globalIdGenerator;
111
112 return JobQueue::factory( $conf );
113 }
114
124 public function push( $jobs ) {
125 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
126 if ( $jobs === [] ) {
127 return;
128 }
129
130 $this->assertValidJobs( $jobs );
131
132 $jobsByType = []; // (job type => list of jobs)
133 foreach ( $jobs as $job ) {
134 $type = $job->getType();
135 if ( isset( $this->jobTypeConfiguration[$type] ) ) {
136 $jobsByType[$type][] = $job;
137 } else {
138 if (
139 isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
140 $this->jobTypeConfiguration['default']['typeAgnostic']
141 ) {
142 $jobsByType['default'][] = $job;
143 } else {
144 $jobsByType[$type][] = $job;
145 }
146 }
147 }
148
149 foreach ( $jobsByType as $type => $jobs ) {
150 $this->get( $type )->push( $jobs );
151 }
152
153 if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
154 $list = $this->cache->getField( 'queues-ready', 'list' );
155 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
156 $this->cache->clear( 'queues-ready' );
157 }
158 }
159
160 $cache = MediaWikiServices::getInstance()->getObjectCacheFactory()->getLocalClusterInstance();
161 $cache->set(
162 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
163 'true',
164 15
165 );
166 if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
167 $cache->set(
168 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
169 'true',
170 15
171 );
172 }
173 }
174
182 public function lazyPush( $jobs ) {
183 if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
184 $this->push( $jobs );
185 return;
186 }
187
188 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
189
190 // Throw errors now instead of on push(), when other jobs may be buffered
191 $this->assertValidJobs( $jobs );
192
193 DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
194 }
195
210 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
211 $job = false;
212
213 if ( !$this->localJobClasses ) {
214 throw new JobQueueError(
215 "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
216 }
217 if ( is_string( $qtype ) && !isset( $this->localJobClasses[$qtype] ) ) {
218 // Do not pop jobs if there is no class for the queue type
219 throw new JobQueueError( "Unrecognized job type '$qtype'." );
220 }
221
222 if ( is_string( $qtype ) ) { // specific job type
223 if ( !in_array( $qtype, $ignored ) ) {
224 $job = $this->get( $qtype )->pop();
225 }
226 } else { // any job in the "default" jobs types
227 if ( $flags & self::USE_CACHE ) {
228 if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
229 $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
230 }
231 $types = $this->cache->getField( 'queues-ready', 'list' );
232 } else {
233 $types = $this->getQueuesWithJobs();
234 }
235
236 if ( $qtype == self::TYPE_DEFAULT ) {
237 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
238 }
239
240 $types = array_diff( $types, $ignored ); // avoid selected types
241 shuffle( $types ); // avoid starvation
242
243 foreach ( $types as $type ) { // for each queue...
244 $job = $this->get( $type )->pop();
245 if ( $job ) { // found
246 break;
247 } else { // not found
248 $this->cache->clear( 'queues-ready' );
249 }
250 }
251 }
252
253 return $job;
254 }
255
262 public function ack( RunnableJob $job ) {
263 $this->get( $job->getType() )->ack( $job );
264 }
265
273 public function getQueueTypes() {
274 if ( !$this->localJobClasses ) {
275 throw new LogicException( 'Cannot inspect job queue from foreign wiki' );
276 }
277 return array_keys( $this->localJobClasses );
278 }
279
287 public function getDefaultQueueTypes() {
288 return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
289 }
290
300 public function queuesHaveJobs( $type = self::TYPE_ANY ) {
301 $cache = MediaWikiServices::getInstance()->getObjectCacheFactory()->getLocalClusterInstance();
302 $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
303
304 $value = $cache->get( $key );
305 if ( $value === false ) {
306 $queues = $this->getQueuesWithJobs();
307 if ( $type == self::TYPE_DEFAULT ) {
308 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
309 }
310 $value = count( $queues ) ? 'true' : 'false';
311 $cache->add( $key, $value, 15 );
312 }
313
314 return ( $value === 'true' );
315 }
316
324 public function getQueuesWithJobs() {
325 $types = [];
326 foreach ( $this->getCoalescedQueues() as $info ) {
328 $queue = $info['queue'];
329 $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
330 if ( is_array( $nonEmpty ) ) { // batching features supported
331 $types = array_merge( $types, $nonEmpty );
332 } else { // we have to go through the queues in the bucket one-by-one
333 foreach ( $info['types'] as $type ) {
334 if ( !$this->get( $type )->isEmpty() ) {
335 $types[] = $type;
336 }
337 }
338 }
339 }
340
341 return $types;
342 }
343
351 public function getQueueSizes() {
352 $sizeMap = [];
353 foreach ( $this->getCoalescedQueues() as $info ) {
355 $queue = $info['queue'];
356 $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
357 if ( is_array( $sizes ) ) { // batching features supported
358 $sizeMap += $sizes;
359 } else { // we have to go through the queues in the bucket one-by-one
360 foreach ( $info['types'] as $type ) {
361 $sizeMap[$type] = $this->get( $type )->getSize();
362 }
363 }
364 }
365
366 return $sizeMap;
367 }
368
373 protected function getCoalescedQueues() {
374 if ( $this->coalescedQueues === null ) {
375 $this->coalescedQueues = [];
376 foreach ( $this->jobTypeConfiguration as $type => $conf ) {
377 $conf['domain'] = $this->domain;
378 $conf['type'] = 'null';
379 $conf['stats'] = $this->statsFactory;
380 $conf['wanCache'] = $this->wanCache;
381 $conf['idGenerator'] = $this->globalIdGenerator;
382
383 $queue = JobQueue::factory( $conf );
384 $loc = $queue->getCoalesceLocationInternal() ?? '';
385 if ( !isset( $this->coalescedQueues[$loc] ) ) {
386 $this->coalescedQueues[$loc]['queue'] = $queue;
387 $this->coalescedQueues[$loc]['types'] = [];
388 }
389 if ( $type === 'default' ) {
390 $this->coalescedQueues[$loc]['types'] = array_merge(
391 $this->coalescedQueues[$loc]['types'],
392 array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
393 );
394 } else {
395 $this->coalescedQueues[$loc]['types'][] = $type;
396 }
397 }
398 }
399
401 }
402
403 private function assertValidJobs( array $jobs ) {
404 foreach ( $jobs as $job ) {
405 if ( !( $job instanceof IJobSpecification ) ) {
406 $type = get_debug_type( $job );
407 throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
408 }
409 }
410 }
411}
412
414class_alias( JobQueueGroup::class, 'JobQueueGroup' );
Defer callable updates to run later in the PHP process.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Handle enqueueing of background jobs.
ReadOnlyMode $readOnlyMode
Read only mode.
getQueueSizes()
Get the size of the queues for a list of job types.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki domain ID.
__construct( $domain, ReadOnlyMode $readOnlyMode, ?array $localJobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, StatsFactory $statsFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
ack(RunnableJob $job)
Acknowledge that a job was completed.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop one job off a job queue.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
getQueueTypes()
Get the list of queue types.
getDefaultQueueTypes()
Get the list of default queue types.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:142
Service locator for MediaWiki core services.
static getInstance()
Returns the global default instance of the top level service locator.
Store key-value entries in a size-limited in-memory LRU cache.
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
Multi-datacenter aware caching interface.
Determine whether a site is currently in read-only mode.
This is the primary interface for validating metrics definitions, caching defined metrics,...
Class for getting statistically unique IDs without a central coordinator.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
if(count( $args)< 1) $job