MediaWiki REL1_32
JobQueueGroup.php
Go to the documentation of this file.
1<?php
31 protected static $instances = [];
32
34 protected $cache;
35
37 protected $domain;
39 protected $readOnlyReason;
41 protected $invalidWiki = false;
42
45
46 const TYPE_DEFAULT = 1; // integer; jobs popped by default
47 const TYPE_ANY = 2; // integer; any job
48
49 const USE_CACHE = 1; // integer; use process or persistent cache
50
51 const PROC_CACHE_TTL = 15; // integer; seconds
52
53 const CACHE_VERSION = 1; // integer; cache version
54
59 protected function __construct( $domain, $readOnlyReason ) {
60 $this->domain = $domain;
61 $this->readOnlyReason = $readOnlyReason;
62 $this->cache = new MapCacheLRU( 10 );
63 }
64
69 public static function singleton( $domain = false ) {
70 global $wgLocalDatabases;
71
72 if ( $domain === false ) {
74 }
75
76 if ( !isset( self::$instances[$domain] ) ) {
77 self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
78 // Make sure jobs are not getting pushed to bogus wikis. This can confuse
79 // the job runner system into spawning endless RPC requests that fail (T171371).
80 $wikiId = WikiMap::getWikiIdFromDomain( $domain );
81 if (
83 !in_array( $wikiId, $wgLocalDatabases )
84 ) {
85 self::$instances[$domain]->invalidWiki = true;
86 }
87 }
88
89 return self::$instances[$domain];
90 }
91
97 public static function destroySingletons() {
98 self::$instances = [];
99 }
100
107 public function get( $type ) {
108 global $wgJobTypeConf;
109
110 $conf = [ 'wiki' => $this->domain, 'type' => $type ];
111 if ( isset( $wgJobTypeConf[$type] ) ) {
112 $conf = $conf + $wgJobTypeConf[$type];
113 } else {
114 $conf = $conf + $wgJobTypeConf['default'];
115 }
116 $conf['aggregator'] = JobQueueAggregator::singleton();
117 if ( !isset( $conf['readOnlyReason'] ) ) {
118 $conf['readOnlyReason'] = $this->readOnlyReason;
119 }
120
121 return JobQueue::factory( $conf );
122 }
123
134 public function push( $jobs ) {
136
137 if ( $this->invalidWiki ) {
138 // Do not enqueue job that cannot be run (T171371)
139 $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
140 MWExceptionHandler::logException( $e );
141 return;
142 }
143
144 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
145 if ( !count( $jobs ) ) {
146 return;
147 }
148
149 $this->assertValidJobs( $jobs );
150
151 $jobsByType = []; // (job type => list of jobs)
152 foreach ( $jobs as $job ) {
153 $jobsByType[$job->getType()][] = $job;
154 }
155
156 foreach ( $jobsByType as $type => $jobs ) {
157 $this->get( $type )->push( $jobs );
158 }
159
160 if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
161 $list = $this->cache->getField( 'queues-ready', 'list' );
162 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
163 $this->cache->clear( 'queues-ready' );
164 }
165 }
166
167 $cache = ObjectCache::getLocalClusterInstance();
168 $cache->set(
169 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
170 'true',
171 15
172 );
173 if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
174 $cache->set(
175 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
176 'true',
177 15
178 );
179 }
180 }
181
193 public function lazyPush( $jobs ) {
194 if ( $this->invalidWiki ) {
195 // Do not enqueue job that cannot be run (T171371)
196 throw new LogicException( "Domain '{$this->domain}' is not recognized." );
197 }
198
199 if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
200 $this->push( $jobs );
201 return;
202 }
203
204 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
205
206 // Throw errors now instead of on push(), when other jobs may be buffered
207 $this->assertValidJobs( $jobs );
208
209 DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
210 }
211
219 public static function pushLazyJobs() {
220 wfDeprecated( __METHOD__, '1.33' );
221 }
222
234 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
235 $job = false;
236
237 if ( is_string( $qtype ) ) { // specific job type
238 if ( !in_array( $qtype, $blacklist ) ) {
239 $job = $this->get( $qtype )->pop();
240 }
241 } else { // any job in the "default" jobs types
242 if ( $flags & self::USE_CACHE ) {
243 if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
244 $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
245 }
246 $types = $this->cache->getField( 'queues-ready', 'list' );
247 } else {
248 $types = $this->getQueuesWithJobs();
249 }
250
251 if ( $qtype == self::TYPE_DEFAULT ) {
252 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
253 }
254
255 $types = array_diff( $types, $blacklist ); // avoid selected types
256 shuffle( $types ); // avoid starvation
257
258 foreach ( $types as $type ) { // for each queue...
259 $job = $this->get( $type )->pop();
260 if ( $job ) { // found
261 break;
262 } else { // not found
263 $this->cache->clear( 'queues-ready' );
264 }
265 }
266 }
267
268 return $job;
269 }
270
277 public function ack( Job $job ) {
278 $this->get( $job->getType() )->ack( $job );
279 }
280
288 public function deduplicateRootJob( Job $job ) {
289 return $this->get( $job->getType() )->deduplicateRootJob( $job );
290 }
291
299 public function waitForBackups() {
300 global $wgJobTypeConf;
301
302 // Try to avoid doing this more than once per queue storage medium
303 foreach ( $wgJobTypeConf as $type => $conf ) {
304 $this->get( $type )->waitForBackups();
305 }
306 }
307
313 public function getQueueTypes() {
314 return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
315 }
316
322 public function getDefaultQueueTypes() {
324
325 return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
326 }
327
335 public function queuesHaveJobs( $type = self::TYPE_ANY ) {
336 $cache = ObjectCache::getLocalClusterInstance();
337 $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
338
339 $value = $cache->get( $key );
340 if ( $value === false ) {
341 $queues = $this->getQueuesWithJobs();
342 if ( $type == self::TYPE_DEFAULT ) {
343 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
344 }
345 $value = count( $queues ) ? 'true' : 'false';
346 $cache->add( $key, $value, 15 );
347 }
348
349 return ( $value === 'true' );
350 }
351
357 public function getQueuesWithJobs() {
358 $types = [];
359 foreach ( $this->getCoalescedQueues() as $info ) {
360 $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
361 if ( is_array( $nonEmpty ) ) { // batching features supported
362 $types = array_merge( $types, $nonEmpty );
363 } else { // we have to go through the queues in the bucket one-by-one
364 foreach ( $info['types'] as $type ) {
365 if ( !$this->get( $type )->isEmpty() ) {
366 $types[] = $type;
367 }
368 }
369 }
370 }
371
372 return $types;
373 }
374
380 public function getQueueSizes() {
381 $sizeMap = [];
382 foreach ( $this->getCoalescedQueues() as $info ) {
383 $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
384 if ( is_array( $sizes ) ) { // batching features supported
385 $sizeMap = $sizeMap + $sizes;
386 } else { // we have to go through the queues in the bucket one-by-one
387 foreach ( $info['types'] as $type ) {
388 $sizeMap[$type] = $this->get( $type )->getSize();
389 }
390 }
391 }
392
393 return $sizeMap;
394 }
395
399 protected function getCoalescedQueues() {
400 global $wgJobTypeConf;
401
402 if ( $this->coalescedQueues === null ) {
403 $this->coalescedQueues = [];
404 foreach ( $wgJobTypeConf as $type => $conf ) {
406 [ 'wiki' => $this->domain, 'type' => 'null' ] + $conf );
407 $loc = $queue->getCoalesceLocationInternal();
408 if ( !isset( $this->coalescedQueues[$loc] ) ) {
409 $this->coalescedQueues[$loc]['queue'] = $queue;
410 $this->coalescedQueues[$loc]['types'] = [];
411 }
412 if ( $type === 'default' ) {
413 $this->coalescedQueues[$loc]['types'] = array_merge(
414 $this->coalescedQueues[$loc]['types'],
415 array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
416 );
417 } else {
418 $this->coalescedQueues[$loc]['types'][] = $type;
419 }
420 }
421 }
422
423 return $this->coalescedQueues;
424 }
425
430 private function getCachedConfigVar( $name ) {
431 // @TODO: cleanup this whole method with a proper config system
432 if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
433 return $GLOBALS[$name]; // common case
434 } else {
435 $wiki = WikiMap::getWikiIdFromDomain( $this->domain );
436 $cache = ObjectCache::getMainWANInstance();
437 $value = $cache->getWithSetCallback(
438 $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
439 $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
440 function () use ( $wiki, $name ) {
441 global $wgConf;
442 // @TODO: use the full domain ID here
443 return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
444 },
445 [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
446 );
447
448 return $value['v'];
449 }
450 }
451
456 private function assertValidJobs( array $jobs ) {
457 foreach ( $jobs as $job ) { // sanity checks
458 if ( !( $job instanceof IJobSpecification ) ) {
459 throw new InvalidArgumentException( "Expected IJobSpecification objects" );
460 }
461 }
462 }
463}
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
$GLOBALS['IP']
$wgJobTypeConf
Map of job types to configuration arrays.
$wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
$wgConf
wgConf hold the site configuration.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Throws a warning that $function is deprecated.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Class to handle enqueueing of background jobs.
getCachedConfigVar( $name)
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki DB domain ID.
getQueueSizes()
Get the size of the queus for a list of job types.
ack(Job $job)
Acknowledge that a job was completed.
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
static singleton( $domain=false)
bool $invalidWiki
Whether the wiki is not recognized in configuration.
ProcessCacheLRU $cache
string bool $readOnlyReason
Read only rationale (or false if r/w)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
static destroySingletons()
Destroy the singleton instances.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
__construct( $domain, $readOnlyReason)
static JobQueueGroup[] $instances
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
assertValidJobs(array $jobs)
getQueueTypes()
Get the list of queue types.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:104
Class to both describe a background job and handle jobs.
Definition Job.php:30
getType()
Definition Job.php:128
Handles a simple LRU key/value map with a maximum number of entries.
Class for process caching individual properties of expiring items.
static getCurrentWikiDbDomain()
Definition WikiMap.php:289
static getWikiIdFromDomain( $domain)
Get the wiki ID of a database domain.
Definition WikiMap.php:252
static isCurrentWikiDbDomain( $domain)
Definition WikiMap.php:267
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Allows to change the fields on the form that will be generated $name
Definition hooks.txt:302
returning false will NOT prevent logging $e
Definition hooks.txt:2226
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
Job queue task description interface.
you have access to all of the normal MediaWiki so you can get a DB use the cache
$cache
Definition mcc.php:33
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
if(count( $args)< 1) $job