MediaWiki  1.32.0
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
31  protected static $instances = [];
32 
34  protected $cache;
35 
37  protected $wiki;
39  protected $readOnlyReason;
41  protected $invalidWiki = false;
42 
44  protected $coalescedQueues;
45 
46  const TYPE_DEFAULT = 1; // integer; jobs popped by default
47  const TYPE_ANY = 2; // integer; any job
48 
49  const USE_CACHE = 1; // integer; use process or persistent cache
50 
51  const PROC_CACHE_TTL = 15; // integer; seconds
52 
53  const CACHE_VERSION = 1; // integer; cache version
54 
59  protected function __construct( $wiki, $readOnlyReason ) {
60  $this->wiki = $wiki;
61  $this->readOnlyReason = $readOnlyReason;
62  $this->cache = new MapCacheLRU( 10 );
63  }
64 
69  public static function singleton( $wiki = false ) {
70  global $wgLocalDatabases;
71 
72  $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
73 
74  if ( !isset( self::$instances[$wiki] ) ) {
75  self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
76  // Make sure jobs are not getting pushed to bogus wikis. This can confuse
77  // the job runner system into spawning endless RPC requests that fail (T171371).
78  if ( $wiki !== wfWikiID() && !in_array( $wiki, $wgLocalDatabases ) ) {
79  self::$instances[$wiki]->invalidWiki = true;
80  }
81  }
82 
83  return self::$instances[$wiki];
84  }
85 
91  public static function destroySingletons() {
92  self::$instances = [];
93  }
94 
101  public function get( $type ) {
102  global $wgJobTypeConf;
103 
104  $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
105  if ( isset( $wgJobTypeConf[$type] ) ) {
106  $conf = $conf + $wgJobTypeConf[$type];
107  } else {
108  $conf = $conf + $wgJobTypeConf['default'];
109  }
110  $conf['aggregator'] = JobQueueAggregator::singleton();
111  if ( !isset( $conf['readOnlyReason'] ) ) {
112  $conf['readOnlyReason'] = $this->readOnlyReason;
113  }
114 
115  return JobQueue::factory( $conf );
116  }
117 
128  public function push( $jobs ) {
130 
131  if ( $this->invalidWiki ) {
132  // Do not enqueue job that cannot be run (T171371)
133  $e = new LogicException( "Domain '{$this->wiki}' is not recognized." );
135  return;
136  }
137 
138  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
139  if ( !count( $jobs ) ) {
140  return;
141  }
142 
143  $this->assertValidJobs( $jobs );
144 
145  $jobsByType = []; // (job type => list of jobs)
146  foreach ( $jobs as $job ) {
147  $jobsByType[$job->getType()][] = $job;
148  }
149 
150  foreach ( $jobsByType as $type => $jobs ) {
151  $this->get( $type )->push( $jobs );
152  }
153 
154  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
155  $list = $this->cache->getField( 'queues-ready', 'list' );
156  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
157  $this->cache->clear( 'queues-ready' );
158  }
159  }
160 
162  $cache->set(
163  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ),
164  'true',
165  15
166  );
167  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
168  $cache->set(
169  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ),
170  'true',
171  15
172  );
173  }
174  }
175 
187  public function lazyPush( $jobs ) {
188  if ( $this->invalidWiki ) {
189  // Do not enqueue job that cannot be run (T171371)
190  throw new LogicException( "Domain '{$this->wiki}' is not recognized." );
191  }
192 
193  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
194  $this->push( $jobs );
195  return;
196  }
197 
198  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
199 
200  // Throw errors now instead of on push(), when other jobs may be buffered
201  $this->assertValidJobs( $jobs );
202 
203  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->wiki, $jobs ) );
204  }
205 
213  public static function pushLazyJobs() {
214  wfDeprecated( __METHOD__, '1.33' );
215  }
216 
228  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
229  $job = false;
230 
231  if ( is_string( $qtype ) ) { // specific job type
232  if ( !in_array( $qtype, $blacklist ) ) {
233  $job = $this->get( $qtype )->pop();
234  }
235  } else { // any job in the "default" jobs types
236  if ( $flags & self::USE_CACHE ) {
237  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
238  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
239  }
240  $types = $this->cache->getField( 'queues-ready', 'list' );
241  } else {
242  $types = $this->getQueuesWithJobs();
243  }
244 
245  if ( $qtype == self::TYPE_DEFAULT ) {
246  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
247  }
248 
249  $types = array_diff( $types, $blacklist ); // avoid selected types
250  shuffle( $types ); // avoid starvation
251 
252  foreach ( $types as $type ) { // for each queue...
253  $job = $this->get( $type )->pop();
254  if ( $job ) { // found
255  break;
256  } else { // not found
257  $this->cache->clear( 'queues-ready' );
258  }
259  }
260  }
261 
262  return $job;
263  }
264 
271  public function ack( Job $job ) {
272  $this->get( $job->getType() )->ack( $job );
273  }
274 
282  public function deduplicateRootJob( Job $job ) {
283  return $this->get( $job->getType() )->deduplicateRootJob( $job );
284  }
285 
293  public function waitForBackups() {
294  global $wgJobTypeConf;
295 
296  // Try to avoid doing this more than once per queue storage medium
297  foreach ( $wgJobTypeConf as $type => $conf ) {
298  $this->get( $type )->waitForBackups();
299  }
300  }
301 
307  public function getQueueTypes() {
308  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
309  }
310 
316  public function getDefaultQueueTypes() {
318 
319  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
320  }
321 
329  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
331  $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type );
332 
333  $value = $cache->get( $key );
334  if ( $value === false ) {
335  $queues = $this->getQueuesWithJobs();
336  if ( $type == self::TYPE_DEFAULT ) {
337  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
338  }
339  $value = count( $queues ) ? 'true' : 'false';
340  $cache->add( $key, $value, 15 );
341  }
342 
343  return ( $value === 'true' );
344  }
345 
351  public function getQueuesWithJobs() {
352  $types = [];
353  foreach ( $this->getCoalescedQueues() as $info ) {
354  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
355  if ( is_array( $nonEmpty ) ) { // batching features supported
356  $types = array_merge( $types, $nonEmpty );
357  } else { // we have to go through the queues in the bucket one-by-one
358  foreach ( $info['types'] as $type ) {
359  if ( !$this->get( $type )->isEmpty() ) {
360  $types[] = $type;
361  }
362  }
363  }
364  }
365 
366  return $types;
367  }
368 
374  public function getQueueSizes() {
375  $sizeMap = [];
376  foreach ( $this->getCoalescedQueues() as $info ) {
377  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
378  if ( is_array( $sizes ) ) { // batching features supported
379  $sizeMap = $sizeMap + $sizes;
380  } else { // we have to go through the queues in the bucket one-by-one
381  foreach ( $info['types'] as $type ) {
382  $sizeMap[$type] = $this->get( $type )->getSize();
383  }
384  }
385  }
386 
387  return $sizeMap;
388  }
389 
393  protected function getCoalescedQueues() {
394  global $wgJobTypeConf;
395 
396  if ( $this->coalescedQueues === null ) {
397  $this->coalescedQueues = [];
398  foreach ( $wgJobTypeConf as $type => $conf ) {
400  [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
401  $loc = $queue->getCoalesceLocationInternal();
402  if ( !isset( $this->coalescedQueues[$loc] ) ) {
403  $this->coalescedQueues[$loc]['queue'] = $queue;
404  $this->coalescedQueues[$loc]['types'] = [];
405  }
406  if ( $type === 'default' ) {
407  $this->coalescedQueues[$loc]['types'] = array_merge(
408  $this->coalescedQueues[$loc]['types'],
409  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
410  );
411  } else {
412  $this->coalescedQueues[$loc]['types'][] = $type;
413  }
414  }
415  }
416 
417  return $this->coalescedQueues;
418  }
419 
424  private function getCachedConfigVar( $name ) {
425  // @TODO: cleanup this whole method with a proper config system
426  if ( $this->wiki === wfWikiID() ) {
427  return $GLOBALS[$name]; // common case
428  } else {
429  $wiki = $this->wiki;
431  $value = $cache->getWithSetCallback(
432  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
433  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
434  function () use ( $wiki, $name ) {
435  global $wgConf;
436 
437  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
438  },
439  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
440  );
441 
442  return $value['v'];
443  }
444  }
445 
450  private function assertValidJobs( array $jobs ) {
451  foreach ( $jobs as $job ) { // sanity checks
452  if ( !( $job instanceof IJobSpecification ) ) {
453  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
454  }
455  }
456  }
457 }
$wgConf
$wgConf
wgConf hold the site configuration.
Definition: DefaultSettings.php:58
JobQueueGroup\pop
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
Definition: JobQueueGroup.php:228
JobQueueGroup\USE_CACHE
const USE_CACHE
Definition: JobQueueGroup.php:49
JobQueueGroup\__construct
__construct( $wiki, $readOnlyReason)
Definition: JobQueueGroup.php:59
Job\getType
getType()
Definition: Job.php:128
JobQueueGroup\waitForBackups
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
Definition: JobQueueGroup.php:293
false
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:187
ObjectCache\getLocalClusterInstance
static getLocalClusterInstance()
Get the main cluster-local cache object.
Definition: ObjectCache.php:365
JobQueueGroup\PROC_CACHE_TTL
const PROC_CACHE_TTL
Definition: JobQueueGroup.php:51
JobQueueGroup\CACHE_VERSION
const CACHE_VERSION
Definition: JobQueueGroup.php:53
$wgJobTypeConf
$wgJobTypeConf
Map of job types to configuration arrays.
Definition: DefaultSettings.php:7600
JobQueueGroup\$coalescedQueues
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
Definition: JobQueueGroup.php:44
JobQueueEnqueueUpdate
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Definition: JobQueueEnqueueUpdate.php:31
captcha-old.count
count
Definition: captcha-old.py:249
JobQueueGroup\$invalidWiki
bool $invalidWiki
Whether the wiki is not recognized in configuration.
Definition: JobQueueGroup.php:41
JobQueueGroup\TYPE_DEFAULT
const TYPE_DEFAULT
Definition: JobQueueGroup.php:46
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
wfConfiguredReadOnlyReason
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Definition: GlobalFunctions.php:1261
JobQueueGroup\getCoalescedQueues
getCoalescedQueues()
Definition: JobQueueGroup.php:393
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
Definition: DeferredUpdates.php:79
JobQueueGroup\$instances
static JobQueueGroup[] $instances
Definition: JobQueueGroup.php:31
JobQueueGroup\queuesHaveJobs
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Definition: JobQueueGroup.php:329
JobQueueGroup\destroySingletons
static destroySingletons()
Destroy the singleton instances.
Definition: JobQueueGroup.php:91
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueGroup\$wiki
string $wiki
Wiki ID.
Definition: JobQueueGroup.php:37
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:30
ProcessCacheLRU\set
set( $key, $prop, $value)
Set a property field for a cache entry.
Definition: ProcessCacheLRU.php:54
wfDeprecated
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Throws a warning that $function is deprecated.
Definition: GlobalFunctions.php:1118
JobQueueGroup\push
push( $jobs)
Insert jobs into the respective queues of which they belong.
Definition: JobQueueGroup.php:128
JobQueueGroup\$cache
ProcessCacheLRU $cache
Definition: JobQueueGroup.php:34
$queue
$queue
Definition: mergeMessageFileList.php:160
MapCacheLRU
Handles a simple LRU key/value map with a maximum number of entries.
Definition: MapCacheLRU.php:37
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
JobQueueGroup\getCachedConfigVar
getCachedConfigVar( $name)
Definition: JobQueueGroup.php:424
array
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
JobQueueGroup\pushLazyJobs
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
Definition: JobQueueGroup.php:213
$wgLocalDatabases
$wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
Definition: DefaultSettings.php:2159
$name
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:302
$wgJobTypesExcludedFromDefaultQueue
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
Definition: DefaultSettings.php:7566
JobQueueGroup\getDefaultQueueTypes
getDefaultQueueTypes()
Get the list of default queue types.
Definition: JobQueueGroup.php:316
wfWikiID
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
Definition: GlobalFunctions.php:2644
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2213
$value
$value
Definition: styleTest.css.php:49
JobQueueGroup\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueueGroup.php:39
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:104
ProcessCacheLRU\get
get( $key, $prop)
Get a property field for a cache entry.
Definition: ProcessCacheLRU.php:79
JobQueueGroup\deduplicateRootJob
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueueGroup.php:282
JobQueueGroup\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueueGroup.php:271
JobQueueGroup\TYPE_ANY
const TYPE_ANY
Definition: JobQueueGroup.php:47
ObjectCache\getMainWANInstance
static getMainWANInstance()
Get the main WAN cache object.
Definition: ObjectCache.php:378
JobQueueGroup\assertValidJobs
assertValidJobs(array $jobs)
Definition: JobQueueGroup.php:450
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:48
JobQueueGroup\getQueueTypes
getQueueTypes()
Get the list of queue types.
Definition: JobQueueGroup.php:307
JobQueueGroup\singleton
static singleton( $wiki=false)
Definition: JobQueueGroup.php:69
JobQueueGroup\getQueuesWithJobs
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
Definition: JobQueueGroup.php:351
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
ProcessCacheLRU
Class for process caching individual properties of expiring items.
Definition: ProcessCacheLRU.php:32
JobQueueGroup\getQueueSizes
getQueueSizes()
Get the size of the queus for a list of job types.
Definition: JobQueueGroup.php:374
JobQueueAggregator\singleton
static singleton()
Definition: JobQueueAggregator.php:43
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:29
$GLOBALS
$GLOBALS['IP']
Definition: ComposerHookHandler.php:6
IExpiringStore\TTL_PROC_LONG
const TTL_PROC_LONG
Definition: IExpiringStore.php:43
JobQueueGroup\lazyPush
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Definition: JobQueueGroup.php:187
JobQueueGroup
Class to handle enqueueing of background jobs.
Definition: JobQueueGroup.php:29
MWExceptionHandler\logException
static logException( $e, $catcher=self::CAUGHT_BY_OTHER)
Log an exception to the exception log (if enabled).
Definition: MWExceptionHandler.php:683
$type
$type
Definition: testCompression.php:48