MediaWiki REL1_31
JobQueueGroup.php
Go to the documentation of this file.
1<?php
31 protected static $instances = [];
32
34 protected $cache;
35
37 protected $domain;
39 protected $readOnlyReason;
41 protected $invalidWiki = false;
42
45
47 protected $bufferedJobs = [];
48
49 const TYPE_DEFAULT = 1; // integer; jobs popped by default
50 const TYPE_ANY = 2; // integer; any job
51
52 const USE_CACHE = 1; // integer; use process or persistent cache
53
54 const PROC_CACHE_TTL = 15; // integer; seconds
55
56 const CACHE_VERSION = 1; // integer; cache version
57
62 protected function __construct( $domain, $readOnlyReason ) {
63 $this->domain = $domain;
64 $this->readOnlyReason = $readOnlyReason;
65 $this->cache = new ProcessCacheLRU( 10 );
66 }
67
72 public static function singleton( $domain = false ) {
74
75 if ( $domain === false ) {
77 }
78
79 if ( !isset( self::$instances[$domain] ) ) {
80 self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
81 // Make sure jobs are not getting pushed to bogus wikis. This can confuse
82 // the job runner system into spawning endless RPC requests that fail (T171371).
84 if (
86 !in_array( $wikiId, $wgLocalDatabases )
87 ) {
88 self::$instances[$domain]->invalidWiki = true;
89 }
90 }
91
92 return self::$instances[$domain];
93 }
94
100 public static function destroySingletons() {
101 self::$instances = [];
102 }
103
110 public function get( $type ) {
112
113 $conf = [ 'wiki' => $this->domain, 'type' => $type ];
114 if ( isset( $wgJobTypeConf[$type] ) ) {
115 $conf = $conf + $wgJobTypeConf[$type];
116 } else {
117 $conf = $conf + $wgJobTypeConf['default'];
118 }
119 $conf['aggregator'] = JobQueueAggregator::singleton();
120 if ( $this->readOnlyReason !== false ) {
121 $conf['readOnlyReason'] = $this->readOnlyReason;
122 }
123
124 return JobQueue::factory( $conf );
125 }
126
137 public function push( $jobs ) {
139
140 if ( $this->invalidWiki ) {
141 // Do not enqueue job that cannot be run (T171371)
142 $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
143 MWExceptionHandler::logException( $e );
144 return;
145 }
146
147 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
148 if ( !count( $jobs ) ) {
149 return;
150 }
151
152 $this->assertValidJobs( $jobs );
153
154 $jobsByType = []; // (job type => list of jobs)
155 foreach ( $jobs as $job ) {
156 $jobsByType[$job->getType()][] = $job;
157 }
158
159 foreach ( $jobsByType as $type => $jobs ) {
160 $this->get( $type )->push( $jobs );
161 }
162
163 if ( $this->cache->has( 'queues-ready', 'list' ) ) {
164 $list = $this->cache->get( 'queues-ready', 'list' );
165 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
166 $this->cache->clear( 'queues-ready' );
167 }
168 }
169
170 $cache = ObjectCache::getLocalClusterInstance();
171 $cache->set(
172 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
173 'true',
174 15
175 );
176 if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
177 $cache->set(
178 $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
179 'true',
180 15
181 );
182 }
183 }
184
196 public function lazyPush( $jobs ) {
197 if ( $this->invalidWiki ) {
198 // Do not enqueue job that cannot be run (T171371)
199 throw new LogicException( "Domain '{$this->domain}' is not recognized." );
200 }
201
202 if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
203 $this->push( $jobs );
204 return;
205 }
206
207 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
208
209 // Throw errors now instead of on push(), when other jobs may be buffered
210 $this->assertValidJobs( $jobs );
211
212 $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
213 }
214
221 public static function pushLazyJobs() {
222 foreach ( self::$instances as $group ) {
223 try {
224 $group->push( $group->bufferedJobs );
225 $group->bufferedJobs = [];
226 } catch ( Exception $e ) {
227 // Get in as many jobs as possible and let other post-send updates happen
228 MWExceptionHandler::logException( $e );
229 }
230 }
231 }
232
244 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
245 $job = false;
246
247 if ( is_string( $qtype ) ) { // specific job type
248 if ( !in_array( $qtype, $blacklist ) ) {
249 $job = $this->get( $qtype )->pop();
250 }
251 } else { // any job in the "default" jobs types
252 if ( $flags & self::USE_CACHE ) {
253 if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
254 $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
255 }
256 $types = $this->cache->get( 'queues-ready', 'list' );
257 } else {
258 $types = $this->getQueuesWithJobs();
259 }
260
261 if ( $qtype == self::TYPE_DEFAULT ) {
262 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
263 }
264
265 $types = array_diff( $types, $blacklist ); // avoid selected types
266 shuffle( $types ); // avoid starvation
267
268 foreach ( $types as $type ) { // for each queue...
269 $job = $this->get( $type )->pop();
270 if ( $job ) { // found
271 break;
272 } else { // not found
273 $this->cache->clear( 'queues-ready' );
274 }
275 }
276 }
277
278 return $job;
279 }
280
287 public function ack( Job $job ) {
288 $this->get( $job->getType() )->ack( $job );
289 }
290
298 public function deduplicateRootJob( Job $job ) {
299 return $this->get( $job->getType() )->deduplicateRootJob( $job );
300 }
301
309 public function waitForBackups() {
311
312 // Try to avoid doing this more than once per queue storage medium
313 foreach ( $wgJobTypeConf as $type => $conf ) {
314 $this->get( $type )->waitForBackups();
315 }
316 }
317
323 public function getQueueTypes() {
324 return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
325 }
326
332 public function getDefaultQueueTypes() {
334
335 return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
336 }
337
345 public function queuesHaveJobs( $type = self::TYPE_ANY ) {
346 $cache = ObjectCache::getLocalClusterInstance();
347 $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
348
349 $value = $cache->get( $key );
350 if ( $value === false ) {
351 $queues = $this->getQueuesWithJobs();
352 if ( $type == self::TYPE_DEFAULT ) {
353 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
354 }
355 $value = count( $queues ) ? 'true' : 'false';
356 $cache->add( $key, $value, 15 );
357 }
358
359 return ( $value === 'true' );
360 }
361
367 public function getQueuesWithJobs() {
368 $types = [];
369 foreach ( $this->getCoalescedQueues() as $info ) {
370 $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
371 if ( is_array( $nonEmpty ) ) { // batching features supported
372 $types = array_merge( $types, $nonEmpty );
373 } else { // we have to go through the queues in the bucket one-by-one
374 foreach ( $info['types'] as $type ) {
375 if ( !$this->get( $type )->isEmpty() ) {
376 $types[] = $type;
377 }
378 }
379 }
380 }
381
382 return $types;
383 }
384
390 public function getQueueSizes() {
391 $sizeMap = [];
392 foreach ( $this->getCoalescedQueues() as $info ) {
393 $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
394 if ( is_array( $sizes ) ) { // batching features supported
395 $sizeMap = $sizeMap + $sizes;
396 } else { // we have to go through the queues in the bucket one-by-one
397 foreach ( $info['types'] as $type ) {
398 $sizeMap[$type] = $this->get( $type )->getSize();
399 }
400 }
401 }
402
403 return $sizeMap;
404 }
405
409 protected function getCoalescedQueues() {
411
412 if ( $this->coalescedQueues === null ) {
413 $this->coalescedQueues = [];
414 foreach ( $wgJobTypeConf as $type => $conf ) {
416 [ 'wiki' => $this->domain, 'type' => 'null' ] + $conf );
417 $loc = $queue->getCoalesceLocationInternal();
418 if ( !isset( $this->coalescedQueues[$loc] ) ) {
419 $this->coalescedQueues[$loc]['queue'] = $queue;
420 $this->coalescedQueues[$loc]['types'] = [];
421 }
422 if ( $type === 'default' ) {
423 $this->coalescedQueues[$loc]['types'] = array_merge(
424 $this->coalescedQueues[$loc]['types'],
425 array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
426 );
427 } else {
428 $this->coalescedQueues[$loc]['types'][] = $type;
429 }
430 }
431 }
432
434 }
435
440 private function getCachedConfigVar( $name ) {
441 // @TODO: cleanup this whole method with a proper config system
442 if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
443 return $GLOBALS[$name]; // common case
444 } else {
445 $wiki = WikiMap::getWikiIdFromDomain( $this->domain );
446 $cache = ObjectCache::getMainWANInstance();
447 $value = $cache->getWithSetCallback(
448 $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
449 $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
450 function () use ( $wiki, $name ) {
452 // @TODO: use the full domain ID here
453 return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
454 },
455 [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
456 );
457 return $value['v'];
458 }
459 }
460
465 private function assertValidJobs( array $jobs ) {
466 foreach ( $jobs as $job ) { // sanity checks
467 if ( !( $job instanceof IJobSpecification ) ) {
468 throw new InvalidArgumentException( "Expected IJobSpecification objects" );
469 }
470 }
471 }
472
473 function __destruct() {
474 $n = count( $this->bufferedJobs );
475 if ( $n > 0 ) {
476 $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
477 trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
478 }
479 }
480}
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
$GLOBALS['IP']
$wgJobTypeConf
Map of job types to configuration arrays.
$wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
$wgConf
wgConf hold the site configuration.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Class to handle enqueueing of background jobs.
getCachedConfigVar( $name)
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki DB domain ID.
getQueueSizes()
Get the size of the queus for a list of job types.
ack(Job $job)
Acknowledge that a job was completed.
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
static singleton( $domain=false)
bool $invalidWiki
Whether the wiki is not recognized in configuration.
ProcessCacheLRU $cache
string bool $readOnlyReason
Read only rationale (or false if r/w)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
static destroySingletons()
Destroy the singleton instances.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
__construct( $domain, $readOnlyReason)
static JobQueueGroup[] $instances
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
assertValidJobs(array $jobs)
getQueueTypes()
Get the list of queue types.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:108
Class to both describe a background job and handle jobs.
Definition Job.php:31
getType()
Definition Job.php:146
Class for process caching individual properties of expiring items.
get( $key, $prop)
Get a property field for a cache entry.
set( $key, $prop, $value)
Set a property field for a cache entry.
static getCurrentWikiDbDomain()
Definition WikiMap.php:289
static getWikiIdFromDomain( $domain)
Get the wiki ID of a database domain.
Definition WikiMap.php:252
static isCurrentWikiDbDomain( $domain)
Definition WikiMap.php:267
when a variable name is used in a it is silently declared as a new local masking the global
Definition design.txt:95
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
the array() calling protocol came about after MediaWiki 1.4rc1.
Allows to change the fields on the form that will be generated $name
Definition hooks.txt:302
returning false will NOT prevent logging $e
Definition hooks.txt:2176
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
Job queue task description interface.
you have access to all of the normal MediaWiki so you can get a DB use the cache
if(count( $args)< 1) $job