MediaWiki  1.33.0
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
23 
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $domain;
40  protected $readOnlyReason;
42  protected $invalidDomain = false;
43 
45  protected $coalescedQueues;
46 
47  const TYPE_DEFAULT = 1; // integer; jobs popped by default
48  const TYPE_ANY = 2; // integer; any job
49 
50  const USE_CACHE = 1; // integer; use process or persistent cache
51 
52  const PROC_CACHE_TTL = 15; // integer; seconds
53 
54  const CACHE_VERSION = 1; // integer; cache version
55 
60  protected function __construct( $domain, $readOnlyReason ) {
61  $this->domain = $domain;
62  $this->readOnlyReason = $readOnlyReason;
63  $this->cache = new MapCacheLRU( 10 );
64  }
65 
70  public static function singleton( $domain = false ) {
71  global $wgLocalDatabases;
72 
73  if ( $domain === false ) {
75  }
76 
77  if ( !isset( self::$instances[$domain] ) ) {
78  self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
79  // Make sure jobs are not getting pushed to bogus wikis. This can confuse
80  // the job runner system into spawning endless RPC requests that fail (T171371).
82  if (
84  !in_array( $wikiId, $wgLocalDatabases )
85  ) {
86  self::$instances[$domain]->invalidDomain = true;
87  }
88  }
89 
90  return self::$instances[$domain];
91  }
92 
98  public static function destroySingletons() {
99  self::$instances = [];
100  }
101 
108  public function get( $type ) {
109  global $wgJobTypeConf;
110 
111  $conf = [ 'domain' => $this->domain, 'type' => $type ];
112  if ( isset( $wgJobTypeConf[$type] ) ) {
113  $conf = $conf + $wgJobTypeConf[$type];
114  } else {
115  $conf = $conf + $wgJobTypeConf['default'];
116  }
117  if ( !isset( $conf['readOnlyReason'] ) ) {
118  $conf['readOnlyReason'] = $this->readOnlyReason;
119  }
120 
121  $services = MediaWikiServices::getInstance();
122  $conf['stats'] = $services->getStatsdDataFactory();
123  $conf['wanCache'] = $services->getMainWANObjectCache();
124  $conf['stash'] = $services->getMainObjectStash();
125 
126  return JobQueue::factory( $conf );
127  }
128 
139  public function push( $jobs ) {
141 
142  if ( $this->invalidDomain ) {
143  // Do not enqueue job that cannot be run (T171371)
144  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
146  return;
147  }
148 
149  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
150  if ( $jobs === [] ) {
151  return;
152  }
153 
154  $this->assertValidJobs( $jobs );
155 
156  $jobsByType = []; // (job type => list of jobs)
157  foreach ( $jobs as $job ) {
158  $jobsByType[$job->getType()][] = $job;
159  }
160 
161  foreach ( $jobsByType as $type => $jobs ) {
162  $this->get( $type )->push( $jobs );
163  }
164 
165  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
166  $list = $this->cache->getField( 'queues-ready', 'list' );
167  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
168  $this->cache->clear( 'queues-ready' );
169  }
170  }
171 
173  $cache->set(
174  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
175  'true',
176  15
177  );
178  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
179  $cache->set(
180  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
181  'true',
182  15
183  );
184  }
185  }
186 
198  public function lazyPush( $jobs ) {
199  if ( $this->invalidDomain ) {
200  // Do not enqueue job that cannot be run (T171371)
201  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
202  }
203 
204  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
205  $this->push( $jobs );
206  return;
207  }
208 
209  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
210 
211  // Throw errors now instead of on push(), when other jobs may be buffered
212  $this->assertValidJobs( $jobs );
213 
214  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
215  }
216 
224  public static function pushLazyJobs() {
225  wfDeprecated( __METHOD__, '1.33' );
226  }
227 
239  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
240  global $wgJobClasses;
241 
242  $job = false;
243 
244  if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
245  throw new JobQueueError(
246  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
247  } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
248  // Do not pop jobs if there is no class for the queue type
249  throw new JobQueueError( "Unrecognized job type '$qtype'." );
250  }
251 
252  if ( is_string( $qtype ) ) { // specific job type
253  if ( !in_array( $qtype, $blacklist ) ) {
254  $job = $this->get( $qtype )->pop();
255  }
256  } else { // any job in the "default" jobs types
257  if ( $flags & self::USE_CACHE ) {
258  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
259  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
260  }
261  $types = $this->cache->getField( 'queues-ready', 'list' );
262  } else {
263  $types = $this->getQueuesWithJobs();
264  }
265 
266  if ( $qtype == self::TYPE_DEFAULT ) {
267  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
268  }
269 
270  $types = array_diff( $types, $blacklist ); // avoid selected types
271  shuffle( $types ); // avoid starvation
272 
273  foreach ( $types as $type ) { // for each queue...
274  $job = $this->get( $type )->pop();
275  if ( $job ) { // found
276  break;
277  } else { // not found
278  $this->cache->clear( 'queues-ready' );
279  }
280  }
281  }
282 
283  return $job;
284  }
285 
292  public function ack( Job $job ) {
293  $this->get( $job->getType() )->ack( $job );
294  }
295 
303  public function deduplicateRootJob( Job $job ) {
304  return $this->get( $job->getType() )->deduplicateRootJob( $job );
305  }
306 
314  public function waitForBackups() {
315  global $wgJobTypeConf;
316 
317  // Try to avoid doing this more than once per queue storage medium
318  foreach ( $wgJobTypeConf as $type => $conf ) {
319  $this->get( $type )->waitForBackups();
320  }
321  }
322 
328  public function getQueueTypes() {
329  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
330  }
331 
337  public function getDefaultQueueTypes() {
339 
340  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
341  }
342 
350  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
352  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
353 
354  $value = $cache->get( $key );
355  if ( $value === false ) {
356  $queues = $this->getQueuesWithJobs();
357  if ( $type == self::TYPE_DEFAULT ) {
358  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
359  }
360  $value = count( $queues ) ? 'true' : 'false';
361  $cache->add( $key, $value, 15 );
362  }
363 
364  return ( $value === 'true' );
365  }
366 
372  public function getQueuesWithJobs() {
373  $types = [];
374  foreach ( $this->getCoalescedQueues() as $info ) {
376  $queue = $info['queue'];
377  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
378  if ( is_array( $nonEmpty ) ) { // batching features supported
379  $types = array_merge( $types, $nonEmpty );
380  } else { // we have to go through the queues in the bucket one-by-one
381  foreach ( $info['types'] as $type ) {
382  if ( !$this->get( $type )->isEmpty() ) {
383  $types[] = $type;
384  }
385  }
386  }
387  }
388 
389  return $types;
390  }
391 
397  public function getQueueSizes() {
398  $sizeMap = [];
399  foreach ( $this->getCoalescedQueues() as $info ) {
401  $queue = $info['queue'];
402  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
403  if ( is_array( $sizes ) ) { // batching features supported
404  $sizeMap = $sizeMap + $sizes;
405  } else { // we have to go through the queues in the bucket one-by-one
406  foreach ( $info['types'] as $type ) {
407  $sizeMap[$type] = $this->get( $type )->getSize();
408  }
409  }
410  }
411 
412  return $sizeMap;
413  }
414 
418  protected function getCoalescedQueues() {
419  global $wgJobTypeConf;
420 
421  if ( $this->coalescedQueues === null ) {
422  $this->coalescedQueues = [];
423  foreach ( $wgJobTypeConf as $type => $conf ) {
425  [ 'domain' => $this->domain, 'type' => 'null' ] + $conf );
426  $loc = $queue->getCoalesceLocationInternal();
427  if ( !isset( $this->coalescedQueues[$loc] ) ) {
428  $this->coalescedQueues[$loc]['queue'] = $queue;
429  $this->coalescedQueues[$loc]['types'] = [];
430  }
431  if ( $type === 'default' ) {
432  $this->coalescedQueues[$loc]['types'] = array_merge(
433  $this->coalescedQueues[$loc]['types'],
434  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
435  );
436  } else {
437  $this->coalescedQueues[$loc]['types'][] = $type;
438  }
439  }
440  }
441 
442  return $this->coalescedQueues;
443  }
444 
449  private function getCachedConfigVar( $name ) {
450  // @TODO: cleanup this whole method with a proper config system
451  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
452  return $GLOBALS[$name]; // common case
453  } else {
454  $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
455  $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
456  $value = $cache->getWithSetCallback(
457  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
458  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
459  function () use ( $wiki, $name ) {
460  global $wgConf;
461  // @TODO: use the full domain ID here
462  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
463  },
464  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
465  );
466 
467  return $value['v'];
468  }
469  }
470 
475  private function assertValidJobs( array $jobs ) {
476  foreach ( $jobs as $job ) { // sanity checks
477  if ( !( $job instanceof IJobSpecification ) ) {
478  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
479  }
480  }
481  }
482 }
$wgConf
$wgConf
wgConf hold the site configuration.
Definition: DefaultSettings.php:59
WikiMap\isCurrentWikiDbDomain
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:303
JobQueueGroup\pop
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
Definition: JobQueueGroup.php:239
JobQueueGroup\USE_CACHE
const USE_CACHE
Definition: JobQueueGroup.php:50
Job\getType
getType()
Definition: Job.php:143
JobQueueGroup\waitForBackups
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
Definition: JobQueueGroup.php:314
$wgLocalDatabases
string[] $wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
Definition: DefaultSettings.php:2159
WikiMap\getCurrentWikiDbDomain
static getCurrentWikiDbDomain()
Definition: WikiMap.php:292
ObjectCache\getLocalClusterInstance
static getLocalClusterInstance()
Get the main cluster-local cache object.
Definition: ObjectCache.php:356
JobQueueGroup\PROC_CACHE_TTL
const PROC_CACHE_TTL
Definition: JobQueueGroup.php:52
JobQueueGroup\CACHE_VERSION
const CACHE_VERSION
Definition: JobQueueGroup.php:54
$wgJobTypeConf
$wgJobTypeConf
Map of job types to configuration arrays.
Definition: DefaultSettings.php:7574
JobQueueGroup\$coalescedQueues
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
Definition: JobQueueGroup.php:45
JobQueueEnqueueUpdate
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Definition: JobQueueEnqueueUpdate.php:31
captcha-old.count
count
Definition: captcha-old.py:249
JobQueueGroup\TYPE_DEFAULT
const TYPE_DEFAULT
Definition: JobQueueGroup.php:47
wfConfiguredReadOnlyReason
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Definition: GlobalFunctions.php:1221
JobQueueGroup\getCoalescedQueues
getCoalescedQueues()
Definition: JobQueueGroup.php:418
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
Definition: DeferredUpdates.php:79
JobQueueGroup\$instances
static JobQueueGroup[] $instances
Definition: JobQueueGroup.php:32
JobQueueGroup\queuesHaveJobs
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Definition: JobQueueGroup.php:350
JobQueueGroup\destroySingletons
static destroySingletons()
Destroy the singleton instances.
Definition: JobQueueGroup.php:98
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
$wgJobClasses
$wgJobClasses['replaceText']
Definition: ReplaceText.php:52
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:30
ProcessCacheLRU\set
set( $key, $prop, $value)
Set a property field for a cache entry.
Definition: ProcessCacheLRU.php:54
wfDeprecated
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Throws a warning that $function is deprecated.
Definition: GlobalFunctions.php:1078
JobQueueGroup\$domain
string $domain
Wiki domain ID.
Definition: JobQueueGroup.php:38
JobQueueGroup\push
push( $jobs)
Insert jobs into the respective queues of which they belong.
Definition: JobQueueGroup.php:139
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:259
JobQueueGroup\$cache
ProcessCacheLRU $cache
Definition: JobQueueGroup.php:35
$queue
$queue
Definition: mergeMessageFileList.php:159
MapCacheLRU
Handles a simple LRU key/value map with a maximum number of entries.
Definition: MapCacheLRU.php:37
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
JobQueueGroup\getCachedConfigVar
getCachedConfigVar( $name)
Definition: JobQueueGroup.php:449
JobQueueError
Definition: JobQueueError.php:28
array
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
JobQueueGroup\pushLazyJobs
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
Definition: JobQueueGroup.php:224
$name
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:271
$wgJobTypesExcludedFromDefaultQueue
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
Definition: DefaultSettings.php:7540
JobQueueGroup\getDefaultQueueTypes
getDefaultQueueTypes()
Get the list of default queue types.
Definition: JobQueueGroup.php:337
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2162
$value
$value
Definition: styleTest.css.php:49
JobQueueGroup\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueueGroup.php:40
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:106
ProcessCacheLRU\get
get( $key, $prop)
Get a property field for a cache entry.
Definition: ProcessCacheLRU.php:79
JobQueueGroup\deduplicateRootJob
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueueGroup.php:303
JobQueueGroup\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueueGroup.php:292
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:70
JobQueueGroup\TYPE_ANY
const TYPE_ANY
Definition: JobQueueGroup.php:48
JobQueueGroup\assertValidJobs
assertValidJobs(array $jobs)
Definition: JobQueueGroup.php:475
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:49
JobQueueGroup\getQueueTypes
getQueueTypes()
Get the list of queue types.
Definition: JobQueueGroup.php:328
JobQueueGroup\getQueuesWithJobs
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
Definition: JobQueueGroup.php:372
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
$services
static configuration should be added through ResourceLoaderGetConfigVars instead can be used to get the real title e g db for database replication lag or jobqueue for job queue size converted to pseudo seconds It is possible to add more fields and they will be returned to the user in the API response after the basic globals have been set but before ordinary actions take place or wrap services the preferred way to define a new service is the $wgServiceWiringFiles array $services
Definition: hooks.txt:2220
MediaWikiServices
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
ProcessCacheLRU
Class for process caching individual properties of expiring items.
Definition: ProcessCacheLRU.php:32
JobQueueGroup\getQueueSizes
getQueueSizes()
Get the size of the queus for a list of job types.
Definition: JobQueueGroup.php:397
IJobSpecification
Job queue task description interface.
Definition: IJobSpecification.php:29
$GLOBALS
$GLOBALS['IP']
Definition: ComposerHookHandler.php:6
IExpiringStore\TTL_PROC_LONG
const TTL_PROC_LONG
Definition: IExpiringStore.php:43
JobQueueGroup\$invalidDomain
bool $invalidDomain
Whether the wiki is not recognized in configuration.
Definition: JobQueueGroup.php:42
JobQueueGroup\lazyPush
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Definition: JobQueueGroup.php:198
JobQueueGroup
Class to handle enqueueing of background jobs.
Definition: JobQueueGroup.php:30
MWExceptionHandler\logException
static logException( $e, $catcher=self::CAUGHT_BY_OTHER)
Log an exception to the exception log (if enabled).
Definition: MWExceptionHandler.php:667
JobQueueGroup\__construct
__construct( $domain, $readOnlyReason)
Definition: JobQueueGroup.php:60
$type
$type
Definition: testCompression.php:48