MediaWiki  REL1_31
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
31  protected static $instances = [];
32 
34  protected $cache;
35 
37  protected $domain;
39  protected $readOnlyReason;
41  protected $invalidWiki = false;
42 
44  protected $coalescedQueues;
45 
47  protected $bufferedJobs = [];
48 
49  const TYPE_DEFAULT = 1; // integer; jobs popped by default
50  const TYPE_ANY = 2; // integer; any job
51 
52  const USE_CACHE = 1; // integer; use process or persistent cache
53 
54  const PROC_CACHE_TTL = 15; // integer; seconds
55 
56  const CACHE_VERSION = 1; // integer; cache version
57 
62  protected function __construct( $domain, $readOnlyReason ) {
63  $this->domain = $domain;
64  $this->readOnlyReason = $readOnlyReason;
65  $this->cache = new ProcessCacheLRU( 10 );
66  }
67 
72  public static function singleton( $domain = false ) {
74 
75  if ( $domain === false ) {
77  }
78 
79  if ( !isset( self::$instances[$domain] ) ) {
80  self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
81  // Make sure jobs are not getting pushed to bogus wikis. This can confuse
82  // the job runner system into spawning endless RPC requests that fail (T171371).
84  if (
86  !in_array( $wikiId, $wgLocalDatabases )
87  ) {
88  self::$instances[$domain]->invalidWiki = true;
89  }
90  }
91 
92  return self::$instances[$domain];
93  }
94 
100  public static function destroySingletons() {
101  self::$instances = [];
102  }
103 
110  public function get( $type ) {
112 
113  $conf = [ 'wiki' => $this->domain, 'type' => $type ];
114  if ( isset( $wgJobTypeConf[$type] ) ) {
115  $conf = $conf + $wgJobTypeConf[$type];
116  } else {
117  $conf = $conf + $wgJobTypeConf['default'];
118  }
119  $conf['aggregator'] = JobQueueAggregator::singleton();
120  if ( $this->readOnlyReason !== false ) {
121  $conf['readOnlyReason'] = $this->readOnlyReason;
122  }
123 
124  return JobQueue::factory( $conf );
125  }
126 
137  public function push( $jobs ) {
139 
140  if ( $this->invalidWiki ) {
141  // Do not enqueue job that cannot be run (T171371)
142  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
144  return;
145  }
146 
147  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
148  if ( !count( $jobs ) ) {
149  return;
150  }
151 
152  $this->assertValidJobs( $jobs );
153 
154  $jobsByType = []; // (job type => list of jobs)
155  foreach ( $jobs as $job ) {
156  $jobsByType[$job->getType()][] = $job;
157  }
158 
159  foreach ( $jobsByType as $type => $jobs ) {
160  $this->get( $type )->push( $jobs );
161  }
162 
163  if ( $this->cache->has( 'queues-ready', 'list' ) ) {
164  $list = $this->cache->get( 'queues-ready', 'list' );
165  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
166  $this->cache->clear( 'queues-ready' );
167  }
168  }
169 
171  $cache->set(
172  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
173  'true',
174  15
175  );
176  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
177  $cache->set(
178  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
179  'true',
180  15
181  );
182  }
183  }
184 
196  public function lazyPush( $jobs ) {
197  if ( $this->invalidWiki ) {
198  // Do not enqueue job that cannot be run (T171371)
199  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
200  }
201 
202  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
203  $this->push( $jobs );
204  return;
205  }
206 
207  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
208 
209  // Throw errors now instead of on push(), when other jobs may be buffered
210  $this->assertValidJobs( $jobs );
211 
212  $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
213  }
214 
221  public static function pushLazyJobs() {
222  foreach ( self::$instances as $group ) {
223  try {
224  $group->push( $group->bufferedJobs );
225  $group->bufferedJobs = [];
226  } catch ( Exception $e ) {
227  // Get in as many jobs as possible and let other post-send updates happen
229  }
230  }
231  }
232 
244  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
245  $job = false;
246 
247  if ( is_string( $qtype ) ) { // specific job type
248  if ( !in_array( $qtype, $blacklist ) ) {
249  $job = $this->get( $qtype )->pop();
250  }
251  } else { // any job in the "default" jobs types
252  if ( $flags & self::USE_CACHE ) {
253  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
254  $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
255  }
256  $types = $this->cache->get( 'queues-ready', 'list' );
257  } else {
258  $types = $this->getQueuesWithJobs();
259  }
260 
261  if ( $qtype == self::TYPE_DEFAULT ) {
262  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
263  }
264 
265  $types = array_diff( $types, $blacklist ); // avoid selected types
266  shuffle( $types ); // avoid starvation
267 
268  foreach ( $types as $type ) { // for each queue...
269  $job = $this->get( $type )->pop();
270  if ( $job ) { // found
271  break;
272  } else { // not found
273  $this->cache->clear( 'queues-ready' );
274  }
275  }
276  }
277 
278  return $job;
279  }
280 
287  public function ack( Job $job ) {
288  $this->get( $job->getType() )->ack( $job );
289  }
290 
298  public function deduplicateRootJob( Job $job ) {
299  return $this->get( $job->getType() )->deduplicateRootJob( $job );
300  }
301 
309  public function waitForBackups() {
311 
312  // Try to avoid doing this more than once per queue storage medium
313  foreach ( $wgJobTypeConf as $type => $conf ) {
314  $this->get( $type )->waitForBackups();
315  }
316  }
317 
323  public function getQueueTypes() {
324  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
325  }
326 
332  public function getDefaultQueueTypes() {
334 
335  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
336  }
337 
345  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
347  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
348 
349  $value = $cache->get( $key );
350  if ( $value === false ) {
351  $queues = $this->getQueuesWithJobs();
352  if ( $type == self::TYPE_DEFAULT ) {
353  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
354  }
355  $value = count( $queues ) ? 'true' : 'false';
356  $cache->add( $key, $value, 15 );
357  }
358 
359  return ( $value === 'true' );
360  }
361 
367  public function getQueuesWithJobs() {
368  $types = [];
369  foreach ( $this->getCoalescedQueues() as $info ) {
370  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
371  if ( is_array( $nonEmpty ) ) { // batching features supported
372  $types = array_merge( $types, $nonEmpty );
373  } else { // we have to go through the queues in the bucket one-by-one
374  foreach ( $info['types'] as $type ) {
375  if ( !$this->get( $type )->isEmpty() ) {
376  $types[] = $type;
377  }
378  }
379  }
380  }
381 
382  return $types;
383  }
384 
390  public function getQueueSizes() {
391  $sizeMap = [];
392  foreach ( $this->getCoalescedQueues() as $info ) {
393  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
394  if ( is_array( $sizes ) ) { // batching features supported
395  $sizeMap = $sizeMap + $sizes;
396  } else { // we have to go through the queues in the bucket one-by-one
397  foreach ( $info['types'] as $type ) {
398  $sizeMap[$type] = $this->get( $type )->getSize();
399  }
400  }
401  }
402 
403  return $sizeMap;
404  }
405 
409  protected function getCoalescedQueues() {
411 
412  if ( $this->coalescedQueues === null ) {
413  $this->coalescedQueues = [];
414  foreach ( $wgJobTypeConf as $type => $conf ) {
416  [ 'wiki' => $this->domain, 'type' => 'null' ] + $conf );
417  $loc = $queue->getCoalesceLocationInternal();
418  if ( !isset( $this->coalescedQueues[$loc] ) ) {
419  $this->coalescedQueues[$loc]['queue'] = $queue;
420  $this->coalescedQueues[$loc]['types'] = [];
421  }
422  if ( $type === 'default' ) {
423  $this->coalescedQueues[$loc]['types'] = array_merge(
424  $this->coalescedQueues[$loc]['types'],
425  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
426  );
427  } else {
428  $this->coalescedQueues[$loc]['types'][] = $type;
429  }
430  }
431  }
432 
433  return $this->coalescedQueues;
434  }
435 
440  private function getCachedConfigVar( $name ) {
441  // @TODO: cleanup this whole method with a proper config system
442  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
443  return $GLOBALS[$name]; // common case
444  } else {
445  $wiki = WikiMap::getWikiIdFromDomain( $this->domain );
447  $value = $cache->getWithSetCallback(
448  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
449  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
450  function () use ( $wiki, $name ) {
451  global $wgConf;
452  // @TODO: use the full domain ID here
453  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
454  },
455  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
456  );
457  return $value['v'];
458  }
459  }
460 
465  private function assertValidJobs( array $jobs ) {
466  foreach ( $jobs as $job ) { // sanity checks
467  if ( !( $job instanceof IJobSpecification ) ) {
468  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
469  }
470  }
471  }
472 
473  function __destruct() {
474  $n = count( $this->bufferedJobs );
475  if ( $n > 0 ) {
476  $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
477  trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
478  }
479  }
480 }
$wgConf
$wgConf
wgConf hold the site configuration.
Definition: DefaultSettings.php:58
WikiMap\isCurrentWikiDbDomain
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:267
JobQueueGroup\pop
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
Definition: JobQueueGroup.php:244
JobQueueGroup\USE_CACHE
const USE_CACHE
Definition: JobQueueGroup.php:52
Job\getType
getType()
Definition: Job.php:146
JobQueueGroup\waitForBackups
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
Definition: JobQueueGroup.php:309
WikiMap\getCurrentWikiDbDomain
static getCurrentWikiDbDomain()
Definition: WikiMap.php:289
ObjectCache\getLocalClusterInstance
static getLocalClusterInstance()
Get the main cluster-local cache object.
Definition: ObjectCache.php:367
use
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
Definition: APACHE-LICENSE-2.0.txt:10
JobQueueGroup\PROC_CACHE_TTL
const PROC_CACHE_TTL
Definition: JobQueueGroup.php:54
JobQueueGroup\CACHE_VERSION
const CACHE_VERSION
Definition: JobQueueGroup.php:56
$wgJobTypeConf
$wgJobTypeConf
Map of job types to configuration arrays.
Definition: DefaultSettings.php:7545
array
the array() calling protocol came about after MediaWiki 1.4rc1.
JobQueueGroup\$coalescedQueues
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
Definition: JobQueueGroup.php:44
JobQueueGroup\__destruct
__destruct()
Definition: JobQueueGroup.php:473
JobQueueGroup\$invalidWiki
bool $invalidWiki
Whether the wiki is not recognized in configuration.
Definition: JobQueueGroup.php:41
JobQueueGroup\TYPE_DEFAULT
const TYPE_DEFAULT
Definition: JobQueueGroup.php:49
wfConfiguredReadOnlyReason
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Definition: GlobalFunctions.php:1286
JobQueueGroup\getCoalescedQueues
getCoalescedQueues()
Definition: JobQueueGroup.php:409
JobQueueGroup\$instances
static JobQueueGroup[] $instances
Definition: JobQueueGroup.php:31
JobQueueGroup\queuesHaveJobs
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Definition: JobQueueGroup.php:345
JobQueueGroup\destroySingletons
static destroySingletons()
Destroy the singleton instances.
Definition: JobQueueGroup.php:100
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:55
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:37
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
ProcessCacheLRU\set
set( $key, $prop, $value)
Set a property field for a cache entry.
Definition: ProcessCacheLRU.php:59
JobQueueGroup\$domain
string $domain
Wiki DB domain ID.
Definition: JobQueueGroup.php:37
JobQueueGroup\push
push( $jobs)
Insert jobs into the respective queues of which they belong.
Definition: JobQueueGroup.php:137
JobQueueGroup\$cache
ProcessCacheLRU $cache
Definition: JobQueueGroup.php:34
$queue
$queue
Definition: mergeMessageFileList.php:160
JobQueueGroup\getCachedConfigVar
getCachedConfigVar( $name)
Definition: JobQueueGroup.php:440
global
when a variable name is used in a it is silently declared as a new masking the global
Definition: design.txt:95
JobQueueGroup\pushLazyJobs
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
Definition: JobQueueGroup.php:221
WikiMap\getWikiIdFromDomain
static getWikiIdFromDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:252
$wgLocalDatabases
$wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
Definition: DefaultSettings.php:2094
$wgJobTypesExcludedFromDefaultQueue
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
Definition: DefaultSettings.php:7511
JobQueueGroup\getDefaultQueueTypes
getDefaultQueueTypes()
Get the list of default queue types.
Definition: JobQueueGroup.php:332
JobQueueGroup\$bufferedJobs
Job[] $bufferedJobs
Definition: JobQueueGroup.php:47
$value
$value
Definition: styleTest.css.php:45
JobQueueGroup\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueueGroup.php:39
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
ProcessCacheLRU\get
get( $key, $prop)
Get a property field for a cache entry.
Definition: ProcessCacheLRU.php:99
JobQueueGroup\deduplicateRootJob
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueueGroup.php:298
JobQueueGroup\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueueGroup.php:287
$name
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:302
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:72
JobQueueGroup\TYPE_ANY
const TYPE_ANY
Definition: JobQueueGroup.php:50
ObjectCache\getMainWANInstance
static getMainWANInstance()
Get the main WAN cache object.
Definition: ObjectCache.php:380
JobQueueGroup\assertValidJobs
assertValidJobs(array $jobs)
Definition: JobQueueGroup.php:465
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:47
JobQueueGroup\getQueueTypes
getQueueTypes()
Get the list of queue types.
Definition: JobQueueGroup.php:323
JobQueueGroup\getQueuesWithJobs
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
Definition: JobQueueGroup.php:367
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:22
ProcessCacheLRU
Class for process caching individual properties of expiring items.
Definition: ProcessCacheLRU.php:32
JobQueueGroup\getQueueSizes
getQueueSizes()
Get the size of the queus for a list of job types.
Definition: JobQueueGroup.php:390
JobQueueAggregator\singleton
static singleton()
Definition: JobQueueAggregator.php:43
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:29
$GLOBALS
$GLOBALS['IP']
Definition: ComposerHookHandler.php:6
IExpiringStore\TTL_PROC_LONG
const TTL_PROC_LONG
Definition: IExpiringStore.php:43
JobQueueGroup\lazyPush
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Definition: JobQueueGroup.php:196
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2171
JobQueueGroup
Class to handle enqueueing of background jobs.
Definition: JobQueueGroup.php:29
MWExceptionHandler\logException
static logException( $e, $catcher=self::CAUGHT_BY_OTHER)
Log an exception to the exception log (if enabled).
Definition: MWExceptionHandler.php:645
JobQueueGroup\__construct
__construct( $domain, $readOnlyReason)
Definition: JobQueueGroup.php:62
$type
$type
Definition: testCompression.php:48