MediaWiki REL1_33
JobQueueGroup.php
Go to the documentation of this file.
1<?php
23
32 protected static $instances = [];
33
35 protected $cache;
36
38 protected $domain;
40 protected $readOnlyReason;
42 protected $invalidDomain = false;
43
46
47 const TYPE_DEFAULT = 1; // integer; jobs popped by default
48 const TYPE_ANY = 2; // integer; any job
49
50 const USE_CACHE = 1; // integer; use process or persistent cache
51
52 const PROC_CACHE_TTL = 15; // integer; seconds
53
54 const CACHE_VERSION = 1; // integer; cache version
55
60 protected function __construct( $domain, $readOnlyReason ) {
61 $this->domain = $domain;
62 $this->readOnlyReason = $readOnlyReason;
63 $this->cache = new MapCacheLRU( 10 );
64 }
65
70 public static function singleton( $domain = false ) {
71 global $wgLocalDatabases;
72
73 if ( $domain === false ) {
74 $domain = WikiMap::getCurrentWikiDbDomain()->getId();
75 }
76
77 if ( !isset( self::$instances[$domain] ) ) {
78 self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
79 // Make sure jobs are not getting pushed to bogus wikis. This can confuse
80 // the job runner system into spawning endless RPC requests that fail (T171371).
81 $wikiId = WikiMap::getWikiIdFromDbDomain( $domain );
82 if (
83 !WikiMap::isCurrentWikiDbDomain( $domain ) &&
84 !in_array( $wikiId, $wgLocalDatabases )
85 ) {
86 self::$instances[$domain]->invalidDomain = true;
87 }
88 }
89
90 return self::$instances[$domain];
91 }
92
98 public static function destroySingletons() {
99 self::$instances = [];
100 }
101
108 public function get( $type ) {
109 global $wgJobTypeConf;
110
111 $conf = [ 'domain' => $this->domain, 'type' => $type ];
112 if ( isset( $wgJobTypeConf[$type] ) ) {
113 $conf = $conf + $wgJobTypeConf[$type];
114 } else {
115 $conf = $conf + $wgJobTypeConf['default'];
116 }
117 if ( !isset( $conf['readOnlyReason'] ) ) {
118 $conf['readOnlyReason'] = $this->readOnlyReason;
119 }
120
121 $services = MediaWikiServices::getInstance();
122 $conf['stats'] = $services->getStatsdDataFactory();
123 $conf['wanCache'] = $services->getMainWANObjectCache();
124 $conf['stash'] = $services->getMainObjectStash();
125
126 return JobQueue::factory( $conf );
127 }
128
139 public function push( $jobs ) {
141
142 if ( $this->invalidDomain ) {
143 // Do not enqueue job that cannot be run (T171371)
144 $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
145 MWExceptionHandler::logException( $e );
146 return;
147 }
148
149 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
150 if ( $jobs === [] ) {
151 return;
152 }
153
154 $this->assertValidJobs( $jobs );
155
156 $jobsByType = []; // (job type => list of jobs)
157 foreach ( $jobs as $job ) {
158 $jobsByType[$job->getType()][] = $job;
159 }
160
161 foreach ( $jobsByType as $type => $jobs ) {
162 $this->get( $type )->push( $jobs );
163 }
164
165 if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
166 $list = $this->cache->getField( 'queues-ready', 'list' );
167 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
168 $this->cache->clear( 'queues-ready' );
169 }
170 }
171
172 $cache = ObjectCache::getLocalClusterInstance();
173 $cache->set(
174 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
175 'true',
176 15
177 );
178 if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
179 $cache->set(
180 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
181 'true',
182 15
183 );
184 }
185 }
186
198 public function lazyPush( $jobs ) {
199 if ( $this->invalidDomain ) {
200 // Do not enqueue job that cannot be run (T171371)
201 throw new LogicException( "Domain '{$this->domain}' is not recognized." );
202 }
203
204 if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
205 $this->push( $jobs );
206 return;
207 }
208
209 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
210
211 // Throw errors now instead of on push(), when other jobs may be buffered
212 $this->assertValidJobs( $jobs );
213
214 DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
215 }
216
224 public static function pushLazyJobs() {
225 wfDeprecated( __METHOD__, '1.33' );
226 }
227
239 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
240 global $wgJobClasses;
241
242 $job = false;
243
244 if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
245 throw new JobQueueError(
246 "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
247 } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
248 // Do not pop jobs if there is no class for the queue type
249 throw new JobQueueError( "Unrecognized job type '$qtype'." );
250 }
251
252 if ( is_string( $qtype ) ) { // specific job type
253 if ( !in_array( $qtype, $blacklist ) ) {
254 $job = $this->get( $qtype )->pop();
255 }
256 } else { // any job in the "default" jobs types
257 if ( $flags & self::USE_CACHE ) {
258 if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
259 $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
260 }
261 $types = $this->cache->getField( 'queues-ready', 'list' );
262 } else {
263 $types = $this->getQueuesWithJobs();
264 }
265
266 if ( $qtype == self::TYPE_DEFAULT ) {
267 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
268 }
269
270 $types = array_diff( $types, $blacklist ); // avoid selected types
271 shuffle( $types ); // avoid starvation
272
273 foreach ( $types as $type ) { // for each queue...
274 $job = $this->get( $type )->pop();
275 if ( $job ) { // found
276 break;
277 } else { // not found
278 $this->cache->clear( 'queues-ready' );
279 }
280 }
281 }
282
283 return $job;
284 }
285
292 public function ack( Job $job ) {
293 $this->get( $job->getType() )->ack( $job );
294 }
295
303 public function deduplicateRootJob( Job $job ) {
304 return $this->get( $job->getType() )->deduplicateRootJob( $job );
305 }
306
314 public function waitForBackups() {
315 global $wgJobTypeConf;
316
317 // Try to avoid doing this more than once per queue storage medium
318 foreach ( $wgJobTypeConf as $type => $conf ) {
319 $this->get( $type )->waitForBackups();
320 }
321 }
322
328 public function getQueueTypes() {
329 return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
330 }
331
337 public function getDefaultQueueTypes() {
339
340 return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
341 }
342
350 public function queuesHaveJobs( $type = self::TYPE_ANY ) {
351 $cache = ObjectCache::getLocalClusterInstance();
352 $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
353
354 $value = $cache->get( $key );
355 if ( $value === false ) {
356 $queues = $this->getQueuesWithJobs();
357 if ( $type == self::TYPE_DEFAULT ) {
358 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
359 }
360 $value = count( $queues ) ? 'true' : 'false';
361 $cache->add( $key, $value, 15 );
362 }
363
364 return ( $value === 'true' );
365 }
366
372 public function getQueuesWithJobs() {
373 $types = [];
374 foreach ( $this->getCoalescedQueues() as $info ) {
376 $queue = $info['queue'];
377 $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
378 if ( is_array( $nonEmpty ) ) { // batching features supported
379 $types = array_merge( $types, $nonEmpty );
380 } else { // we have to go through the queues in the bucket one-by-one
381 foreach ( $info['types'] as $type ) {
382 if ( !$this->get( $type )->isEmpty() ) {
383 $types[] = $type;
384 }
385 }
386 }
387 }
388
389 return $types;
390 }
391
397 public function getQueueSizes() {
398 $sizeMap = [];
399 foreach ( $this->getCoalescedQueues() as $info ) {
401 $queue = $info['queue'];
402 $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
403 if ( is_array( $sizes ) ) { // batching features supported
404 $sizeMap = $sizeMap + $sizes;
405 } else { // we have to go through the queues in the bucket one-by-one
406 foreach ( $info['types'] as $type ) {
407 $sizeMap[$type] = $this->get( $type )->getSize();
408 }
409 }
410 }
411
412 return $sizeMap;
413 }
414
418 protected function getCoalescedQueues() {
419 global $wgJobTypeConf;
420
421 if ( $this->coalescedQueues === null ) {
422 $this->coalescedQueues = [];
423 foreach ( $wgJobTypeConf as $type => $conf ) {
425 [ 'domain' => $this->domain, 'type' => 'null' ] + $conf );
426 $loc = $queue->getCoalesceLocationInternal();
427 if ( !isset( $this->coalescedQueues[$loc] ) ) {
428 $this->coalescedQueues[$loc]['queue'] = $queue;
429 $this->coalescedQueues[$loc]['types'] = [];
430 }
431 if ( $type === 'default' ) {
432 $this->coalescedQueues[$loc]['types'] = array_merge(
433 $this->coalescedQueues[$loc]['types'],
434 array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
435 );
436 } else {
437 $this->coalescedQueues[$loc]['types'][] = $type;
438 }
439 }
440 }
441
442 return $this->coalescedQueues;
443 }
444
449 private function getCachedConfigVar( $name ) {
450 // @TODO: cleanup this whole method with a proper config system
451 if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
452 return $GLOBALS[$name]; // common case
453 } else {
454 $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
455 $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
456 $value = $cache->getWithSetCallback(
457 $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
458 $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
459 function () use ( $wiki, $name ) {
460 global $wgConf;
461 // @TODO: use the full domain ID here
462 return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
463 },
464 [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
465 );
466
467 return $value['v'];
468 }
469 }
470
475 private function assertValidJobs( array $jobs ) {
476 foreach ( $jobs as $job ) { // sanity checks
477 if ( !( $job instanceof IJobSpecification ) ) {
478 throw new InvalidArgumentException( "Expected IJobSpecification objects" );
479 }
480 }
481 }
482}
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
$GLOBALS['IP']
$wgJobTypeConf
Map of job types to configuration arrays.
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
$wgConf
wgConf hold the site configuration.
$wgJobClasses
Maps jobs to their handlers; extensions can add to this to provide custom jobs.
string[] $wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Throws a warning that $function is deprecated.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Class to handle enqueueing of background jobs.
getCachedConfigVar( $name)
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki domain ID.
getQueueSizes()
Get the size of the queus for a list of job types.
ack(Job $job)
Acknowledge that a job was completed.
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
static singleton( $domain=false)
ProcessCacheLRU $cache
string bool $readOnlyReason
Read only rationale (or false if r/w)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
static destroySingletons()
Destroy the singleton instances.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
bool $invalidDomain
Whether the wiki is not recognized in configuration.
__construct( $domain, $readOnlyReason)
static JobQueueGroup[] $instances
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
assertValidJobs(array $jobs)
getQueueTypes()
Get the list of queue types.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:106
Class to both describe a background job and handle jobs.
Definition Job.php:30
getType()
Definition Job.php:143
Handles a simple LRU key/value map with a maximum number of entries.
MediaWikiServices is the service locator for the application scope of MediaWiki.
Class for process caching individual properties of expiring items.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
static configuration should be added through ResourceLoaderGetConfigVars instead can be used to get the real title e g db for database replication lag or jobqueue for job queue size converted to pseudo seconds It is possible to add more fields and they will be returned to the user in the API response after the basic globals have been set but before ordinary actions take place or wrap services the preferred way to define a new service is the $wgServiceWiringFiles array $services
Definition hooks.txt:2290
Allows to change the fields on the form that will be generated $name
Definition hooks.txt:271
returning false will NOT prevent logging $e
Definition hooks.txt:2175
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
Job queue task description interface.
you have access to all of the normal MediaWiki so you can get a DB use the cache
$cache
Definition mcc.php:33
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
if(count( $args)< 1) $job