MediaWiki  REL1_28
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $wiki;
40  protected $readOnlyReason;
41 
43  protected $coalescedQueues;
44 
46  protected $bufferedJobs = [];
47 
48  const TYPE_DEFAULT = 1; // integer; jobs popped by default
49  const TYPE_ANY = 2; // integer; any job
50 
51  const USE_CACHE = 1; // integer; use process or persistent cache
52 
53  const PROC_CACHE_TTL = 15; // integer; seconds
54 
55  const CACHE_VERSION = 1; // integer; cache version
56 
61  protected function __construct( $wiki, $readOnlyReason ) {
62  $this->wiki = $wiki;
63  $this->readOnlyReason = $readOnlyReason;
64  $this->cache = new ProcessCacheLRU( 10 );
65  }
66 
71  public static function singleton( $wiki = false ) {
72  $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
73  if ( !isset( self::$instances[$wiki] ) ) {
74  self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
75  }
76 
77  return self::$instances[$wiki];
78  }
79 
85  public static function destroySingletons() {
86  self::$instances = [];
87  }
88 
95  public function get( $type ) {
96  global $wgJobTypeConf;
97 
98  $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
99  if ( isset( $wgJobTypeConf[$type] ) ) {
100  $conf = $conf + $wgJobTypeConf[$type];
101  } else {
102  $conf = $conf + $wgJobTypeConf['default'];
103  }
104  $conf['aggregator'] = JobQueueAggregator::singleton();
105  if ( $this->readOnlyReason !== false ) {
106  $conf['readOnlyReason'] = $this->readOnlyReason;
107  }
108 
109  return JobQueue::factory( $conf );
110  }
111 
122  public function push( $jobs ) {
123  global $wgJobTypesExcludedFromDefaultQueue;
124 
125  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
126  if ( !count( $jobs ) ) {
127  return;
128  }
129 
130  $this->assertValidJobs( $jobs );
131 
132  $jobsByType = []; // (job type => list of jobs)
133  foreach ( $jobs as $job ) {
134  $jobsByType[$job->getType()][] = $job;
135  }
136 
137  foreach ( $jobsByType as $type => $jobs ) {
138  $this->get( $type )->push( $jobs );
139  }
140 
141  if ( $this->cache->has( 'queues-ready', 'list' ) ) {
142  $list = $this->cache->get( 'queues-ready', 'list' );
143  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
144  $this->cache->clear( 'queues-ready' );
145  }
146  }
147 
149  $cache->set(
150  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ),
151  'true',
152  15
153  );
154  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
155  $cache->set(
156  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ),
157  'true',
158  15
159  );
160  }
161  }
162 
174  public function lazyPush( $jobs ) {
175  if ( PHP_SAPI === 'cli' ) {
176  $this->push( $jobs );
177  return;
178  }
179 
180  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
181 
182  // Throw errors now instead of on push(), when other jobs may be buffered
183  $this->assertValidJobs( $jobs );
184 
185  $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
186  }
187 
194  public static function pushLazyJobs() {
195  foreach ( self::$instances as $group ) {
196  try {
197  $group->push( $group->bufferedJobs );
198  $group->bufferedJobs = [];
199  } catch ( Exception $e ) {
200  // Get in as many jobs as possible and let other post-send updates happen
202  }
203  }
204  }
205 
217  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
218  $job = false;
219 
220  if ( is_string( $qtype ) ) { // specific job type
221  if ( !in_array( $qtype, $blacklist ) ) {
222  $job = $this->get( $qtype )->pop();
223  }
224  } else { // any job in the "default" jobs types
225  if ( $flags & self::USE_CACHE ) {
226  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
227  $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
228  }
229  $types = $this->cache->get( 'queues-ready', 'list' );
230  } else {
231  $types = $this->getQueuesWithJobs();
232  }
233 
234  if ( $qtype == self::TYPE_DEFAULT ) {
235  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
236  }
237 
238  $types = array_diff( $types, $blacklist ); // avoid selected types
239  shuffle( $types ); // avoid starvation
240 
241  foreach ( $types as $type ) { // for each queue...
242  $job = $this->get( $type )->pop();
243  if ( $job ) { // found
244  break;
245  } else { // not found
246  $this->cache->clear( 'queues-ready' );
247  }
248  }
249  }
250 
251  return $job;
252  }
253 
260  public function ack( Job $job ) {
261  $this->get( $job->getType() )->ack( $job );
262  }
263 
271  public function deduplicateRootJob( Job $job ) {
272  return $this->get( $job->getType() )->deduplicateRootJob( $job );
273  }
274 
282  public function waitForBackups() {
283  global $wgJobTypeConf;
284 
285  // Try to avoid doing this more than once per queue storage medium
286  foreach ( $wgJobTypeConf as $type => $conf ) {
287  $this->get( $type )->waitForBackups();
288  }
289  }
290 
296  public function getQueueTypes() {
297  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
298  }
299 
305  public function getDefaultQueueTypes() {
306  global $wgJobTypesExcludedFromDefaultQueue;
307 
308  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
309  }
310 
318  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
320  $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type );
321 
322  $value = $cache->get( $key );
323  if ( $value === false ) {
324  $queues = $this->getQueuesWithJobs();
325  if ( $type == self::TYPE_DEFAULT ) {
326  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
327  }
328  $value = count( $queues ) ? 'true' : 'false';
329  $cache->add( $key, $value, 15 );
330  }
331 
332  return ( $value === 'true' );
333  }
334 
340  public function getQueuesWithJobs() {
341  $types = [];
342  foreach ( $this->getCoalescedQueues() as $info ) {
343  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
344  if ( is_array( $nonEmpty ) ) { // batching features supported
345  $types = array_merge( $types, $nonEmpty );
346  } else { // we have to go through the queues in the bucket one-by-one
347  foreach ( $info['types'] as $type ) {
348  if ( !$this->get( $type )->isEmpty() ) {
349  $types[] = $type;
350  }
351  }
352  }
353  }
354 
355  return $types;
356  }
357 
363  public function getQueueSizes() {
364  $sizeMap = [];
365  foreach ( $this->getCoalescedQueues() as $info ) {
366  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
367  if ( is_array( $sizes ) ) { // batching features supported
368  $sizeMap = $sizeMap + $sizes;
369  } else { // we have to go through the queues in the bucket one-by-one
370  foreach ( $info['types'] as $type ) {
371  $sizeMap[$type] = $this->get( $type )->getSize();
372  }
373  }
374  }
375 
376  return $sizeMap;
377  }
378 
382  protected function getCoalescedQueues() {
383  global $wgJobTypeConf;
384 
385  if ( $this->coalescedQueues === null ) {
386  $this->coalescedQueues = [];
387  foreach ( $wgJobTypeConf as $type => $conf ) {
389  [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
390  $loc = $queue->getCoalesceLocationInternal();
391  if ( !isset( $this->coalescedQueues[$loc] ) ) {
392  $this->coalescedQueues[$loc]['queue'] = $queue;
393  $this->coalescedQueues[$loc]['types'] = [];
394  }
395  if ( $type === 'default' ) {
396  $this->coalescedQueues[$loc]['types'] = array_merge(
397  $this->coalescedQueues[$loc]['types'],
398  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
399  );
400  } else {
401  $this->coalescedQueues[$loc]['types'][] = $type;
402  }
403  }
404  }
405 
406  return $this->coalescedQueues;
407  }
408 
413  private function getCachedConfigVar( $name ) {
414  // @TODO: cleanup this whole method with a proper config system
415  if ( $this->wiki === wfWikiID() ) {
416  return $GLOBALS[$name]; // common case
417  } else {
418  $wiki = $this->wiki;
420  $value = $cache->getWithSetCallback(
421  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
422  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
423  function () use ( $wiki, $name ) {
424  global $wgConf;
425 
426  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
427  },
428  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
429  );
430 
431  return $value['v'];
432  }
433  }
434 
439  private function assertValidJobs( array $jobs ) {
440  foreach ( $jobs as $job ) { // sanity checks
441  if ( !( $job instanceof IJobSpecification ) ) {
442  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
443  }
444  }
445  }
446 
447  function __destruct() {
448  $n = count( $this->bufferedJobs );
449  if ( $n > 0 ) {
450  $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
451  trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
452  }
453  }
454 }
$wgConf
$wgConf
wgConf hold the site configuration.
Definition: DefaultSettings.php:62
JobQueueGroup\pop
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
Definition: JobQueueGroup.php:217
JobQueueGroup\USE_CACHE
const USE_CACHE
Definition: JobQueueGroup.php:51
JobQueueGroup\__construct
__construct( $wiki, $readOnlyReason)
Definition: JobQueueGroup.php:61
Job\getType
getType()
Definition: Job.php:121
JobQueueGroup\waitForBackups
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
Definition: JobQueueGroup.php:282
ObjectCache\getLocalClusterInstance
static getLocalClusterInstance()
Get the main cluster-local cache object.
Definition: ObjectCache.php:357
use
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
Definition: APACHE-LICENSE-2.0.txt:3
JobQueueGroup\PROC_CACHE_TTL
const PROC_CACHE_TTL
Definition: JobQueueGroup.php:53
JobQueueGroup\CACHE_VERSION
const CACHE_VERSION
Definition: JobQueueGroup.php:55
array
the array() calling protocol came about after MediaWiki 1.4rc1.
JobQueueGroup\$coalescedQueues
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
Definition: JobQueueGroup.php:43
JobQueueGroup\__destruct
__destruct()
Definition: JobQueueGroup.php:447
JobQueueGroup\TYPE_DEFAULT
const TYPE_DEFAULT
Definition: JobQueueGroup.php:48
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
wfConfiguredReadOnlyReason
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Definition: GlobalFunctions.php:1307
JobQueueGroup\getCoalescedQueues
getCoalescedQueues()
Definition: JobQueueGroup.php:382
JobQueueGroup\$instances
static JobQueueGroup[] $instances
Definition: JobQueueGroup.php:32
JobQueueGroup\queuesHaveJobs
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Definition: JobQueueGroup.php:318
JobQueueGroup\destroySingletons
static destroySingletons()
Destroy the singleton instances.
Definition: JobQueueGroup.php:85
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2706
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueGroup\$wiki
string $wiki
Wiki ID.
Definition: JobQueueGroup.php:38
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
ProcessCacheLRU\set
set( $key, $prop, $value)
Set a property field for a cache entry.
Definition: ProcessCacheLRU.php:56
JobQueueGroup\push
push( $jobs)
Insert jobs into the respective queues of which they belong.
Definition: JobQueueGroup.php:122
JobQueueGroup\$cache
ProcessCacheLRU $cache
Definition: JobQueueGroup.php:35
$queue
$queue
Definition: mergeMessageFileList.php:171
JobQueueGroup\getCachedConfigVar
getCachedConfigVar( $name)
Definition: JobQueueGroup.php:413
global
when a variable name is used in a it is silently declared as a new masking the global
Definition: design.txt:93
MWExceptionHandler\logException
static logException( $e)
Log an exception to the exception log (if enabled).
Definition: MWExceptionHandler.php:585
JobQueueGroup\pushLazyJobs
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
Definition: JobQueueGroup.php:194
$type
namespace are movable Hooks may change this value to override the return value of MWNamespace::isMovable(). 'NewDifferenceEngine' do that in ParserLimitReportFormat instead use this to modify the parameters of the image and a DIV can begin in one section and end in another Make sure your code can handle that case gracefully See the EditSectionClearerLink extension for an example zero but section is usually empty its values are the globals values before the output is cached one of or reset my talk my contributions etc etc otherwise the built in rate limiting checks are if enabled allows for interception of redirect as a string mapping parameter names to values & $type
Definition: hooks.txt:2259
JobQueueGroup\getDefaultQueueTypes
getDefaultQueueTypes()
Get the list of default queue types.
Definition: JobQueueGroup.php:305
JobQueueGroup\$bufferedJobs
Job[] $bufferedJobs
Definition: JobQueueGroup.php:46
wfWikiID
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
Definition: GlobalFunctions.php:3024
$value
$value
Definition: styleTest.css.php:45
JobQueueGroup\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueueGroup.php:40
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
ProcessCacheLRU\get
get( $key, $prop)
Get a property field for a cache entry.
Definition: ProcessCacheLRU.php:96
JobQueueGroup\deduplicateRootJob
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueueGroup.php:271
JobQueueGroup\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueueGroup.php:260
JobQueueGroup\TYPE_ANY
const TYPE_ANY
Definition: JobQueueGroup.php:49
ObjectCache\getMainWANInstance
static getMainWANInstance()
Get the main WAN cache object.
Definition: ObjectCache.php:370
JobQueueGroup\assertValidJobs
assertValidJobs(array $jobs)
Definition: JobQueueGroup.php:439
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:47
JobQueueGroup\getQueueTypes
getQueueTypes()
Get the list of queue types.
Definition: JobQueueGroup.php:296
JobQueueGroup\singleton
static singleton( $wiki=false)
Definition: JobQueueGroup.php:71
JobQueueGroup\getQueuesWithJobs
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
Definition: JobQueueGroup.php:340
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
$name
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:304
ProcessCacheLRU
Handles per process caching of items.
Definition: ProcessCacheLRU.php:29
JobQueueGroup\getQueueSizes
getQueueSizes()
Get the size of the queus for a list of job types.
Definition: JobQueueGroup.php:363
false
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
JobQueueAggregator\singleton
static singleton()
Definition: JobQueueAggregator.php:44
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
$GLOBALS
$GLOBALS['IP']
Definition: ComposerHookHandler.php:6
IExpiringStore\TTL_PROC_LONG
const TTL_PROC_LONG
Definition: IExpiringStore.php:42
JobQueueGroup\lazyPush
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Definition: JobQueueGroup.php:174
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2105
JobQueueGroup
Class to handle enqueueing of background jobs.
Definition: JobQueueGroup.php:30