MediaWiki  1.23.0
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
32  protected static $instances = array();
33 
35  protected $cache;
36 
38  protected $wiki;
39 
41  protected $coalescedQueues;
42 
43  const TYPE_DEFAULT = 1; // integer; jobs popped by default
44  const TYPE_ANY = 2; // integer; any job
45 
46  const USE_CACHE = 1; // integer; use process or persistent cache
47 
48  const PROC_CACHE_TTL = 15; // integer; seconds
49 
50  const CACHE_VERSION = 1; // integer; cache version
51 
55  protected function __construct( $wiki ) {
56  $this->wiki = $wiki;
57  $this->cache = new ProcessCacheLRU( 10 );
58  }
59 
64  public static function singleton( $wiki = false ) {
65  $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
66  if ( !isset( self::$instances[$wiki] ) ) {
67  self::$instances[$wiki] = new self( $wiki );
68  }
69 
70  return self::$instances[$wiki];
71  }
72 
78  public static function destroySingletons() {
79  self::$instances = array();
80  }
81 
88  public function get( $type ) {
89  global $wgJobTypeConf;
90 
91  $conf = array( 'wiki' => $this->wiki, 'type' => $type );
92  if ( isset( $wgJobTypeConf[$type] ) ) {
93  $conf = $conf + $wgJobTypeConf[$type];
94  } else {
95  $conf = $conf + $wgJobTypeConf['default'];
96  }
97 
98  return JobQueue::factory( $conf );
99  }
100 
111  public function push( $jobs ) {
112  $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
113  if ( !count( $jobs ) ) {
114  return true;
115  }
116 
117  $jobsByType = array(); // (job type => list of jobs)
118  foreach ( $jobs as $job ) {
119  if ( $job instanceof IJobSpecification ) {
120  $jobsByType[$job->getType()][] = $job;
121  } else {
122  throw new MWException( "Attempted to push a non-Job object into a queue." );
123  }
124  }
125 
126  $ok = true;
127  foreach ( $jobsByType as $type => $jobs ) {
128  if ( $this->get( $type )->push( $jobs ) ) {
129  JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
130  } else {
131  $ok = false;
132  }
133  }
134 
135  if ( $this->cache->has( 'queues-ready', 'list' ) ) {
136  $list = $this->cache->get( 'queues-ready', 'list' );
137  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
138  $this->cache->clear( 'queues-ready' );
139  }
140  }
141 
142  return $ok;
143  }
144 
156  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = array() ) {
157  $job = false;
158 
159  if ( is_string( $qtype ) ) { // specific job type
160  if ( !in_array( $qtype, $blacklist ) ) {
161  $job = $this->get( $qtype )->pop();
162  if ( !$job ) {
163  JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
164  }
165  }
166  } else { // any job in the "default" jobs types
167  if ( $flags & self::USE_CACHE ) {
168  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
169  $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
170  }
171  $types = $this->cache->get( 'queues-ready', 'list' );
172  } else {
173  $types = $this->getQueuesWithJobs();
174  }
175 
176  if ( $qtype == self::TYPE_DEFAULT ) {
177  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
178  }
179 
180  $types = array_diff( $types, $blacklist ); // avoid selected types
181  shuffle( $types ); // avoid starvation
182 
183  foreach ( $types as $type ) { // for each queue...
184  $job = $this->get( $type )->pop();
185  if ( $job ) { // found
186  break;
187  } else { // not found
188  JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
189  $this->cache->clear( 'queues-ready' );
190  }
191  }
192  }
193 
194  return $job;
195  }
196 
203  public function ack( Job $job ) {
204  return $this->get( $job->getType() )->ack( $job );
205  }
206 
214  public function deduplicateRootJob( Job $job ) {
215  return $this->get( $job->getType() )->deduplicateRootJob( $job );
216  }
217 
226  public function waitForBackups() {
227  global $wgJobTypeConf;
228 
229  wfProfileIn( __METHOD__ );
230  // Try to avoid doing this more than once per queue storage medium
231  foreach ( $wgJobTypeConf as $type => $conf ) {
232  $this->get( $type )->waitForBackups();
233  }
234  wfProfileOut( __METHOD__ );
235  }
236 
242  public function getQueueTypes() {
243  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
244  }
245 
251  public function getDefaultQueueTypes() {
252  global $wgJobTypesExcludedFromDefaultQueue;
253 
254  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
255  }
256 
264  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
265  global $wgMemc;
266 
267  $key = wfMemcKey( 'jobqueue', 'queueshavejobs', $type );
268 
269  $value = $wgMemc->get( $key );
270  if ( $value === false ) {
271  $queues = $this->getQueuesWithJobs();
272  if ( $type == self::TYPE_DEFAULT ) {
273  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
274  }
275  $value = count( $queues ) ? 'true' : 'false';
276  $wgMemc->add( $key, $value, 15 );
277  }
278 
279  return ( $value === 'true' );
280  }
281 
287  public function getQueuesWithJobs() {
288  $types = array();
289  foreach ( $this->getCoalescedQueues() as $info ) {
290  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
291  if ( is_array( $nonEmpty ) ) { // batching features supported
292  $types = array_merge( $types, $nonEmpty );
293  } else { // we have to go through the queues in the bucket one-by-one
294  foreach ( $info['types'] as $type ) {
295  if ( !$this->get( $type )->isEmpty() ) {
296  $types[] = $type;
297  }
298  }
299  }
300  }
301 
302  return $types;
303  }
304 
310  public function getQueueSizes() {
311  $sizeMap = array();
312  foreach ( $this->getCoalescedQueues() as $info ) {
313  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
314  if ( is_array( $sizes ) ) { // batching features supported
315  $sizeMap = $sizeMap + $sizes;
316  } else { // we have to go through the queues in the bucket one-by-one
317  foreach ( $info['types'] as $type ) {
318  $sizeMap[$type] = $this->get( $type )->getSize();
319  }
320  }
321  }
322 
323  return $sizeMap;
324  }
325 
329  protected function getCoalescedQueues() {
330  global $wgJobTypeConf;
331 
332  if ( $this->coalescedQueues === null ) {
333  $this->coalescedQueues = array();
334  foreach ( $wgJobTypeConf as $type => $conf ) {
335  $queue = JobQueue::factory(
336  array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
337  $loc = $queue->getCoalesceLocationInternal();
338  if ( !isset( $this->coalescedQueues[$loc] ) ) {
339  $this->coalescedQueues[$loc]['queue'] = $queue;
340  $this->coalescedQueues[$loc]['types'] = array();
341  }
342  if ( $type === 'default' ) {
343  $this->coalescedQueues[$loc]['types'] = array_merge(
344  $this->coalescedQueues[$loc]['types'],
345  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
346  );
347  } else {
348  $this->coalescedQueues[$loc]['types'][] = $type;
349  }
350  }
351  }
352 
353  return $this->coalescedQueues;
354  }
355 
365  public function executeReadyPeriodicTasks() {
366  global $wgMemc;
367 
368  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
369  $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
370  $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
371 
372  $count = 0;
373  $tasksRun = array(); // (queue => task => UNIX timestamp)
374  foreach ( $this->getQueueTypes() as $type ) {
375  $queue = $this->get( $type );
376  foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
377  if ( $definition['period'] <= 0 ) {
378  continue; // disabled
379  } elseif ( !isset( $lastRuns[$type][$task] )
380  || $lastRuns[$type][$task] < ( time() - $definition['period'] )
381  ) {
382  try {
383  if ( call_user_func( $definition['callback'] ) !== null ) {
384  $tasksRun[$type][$task] = time();
385  ++$count;
386  }
387  } catch ( JobQueueError $e ) {
389  }
390  }
391  }
392  // The tasks may have recycled jobs or release delayed jobs into the queue
393  if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
394  JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
395  }
396  }
397 
398  $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
399  if ( is_array( $lastRuns ) ) {
400  foreach ( $tasksRun as $type => $tasks ) {
401  foreach ( $tasks as $task => $timestamp ) {
402  if ( !isset( $lastRuns[$type][$task] )
403  || $timestamp > $lastRuns[$type][$task]
404  ) {
405  $lastRuns[$type][$task] = $timestamp;
406  }
407  }
408  }
409  } else {
410  $lastRuns = $tasksRun;
411  }
412 
413  return $lastRuns;
414  } );
415 
416  return $count;
417  }
418 
423  private function getCachedConfigVar( $name ) {
424  global $wgConf, $wgMemc;
425 
426  if ( $this->wiki === wfWikiID() ) {
427  return $GLOBALS[$name]; // common case
428  } else {
429  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
430  $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
431  $value = $wgMemc->get( $key ); // ('v' => ...) or false
432  if ( is_array( $value ) ) {
433  return $value['v'];
434  } else {
435  $value = $wgConf->getConfig( $this->wiki, $name );
436  $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
437 
438  return $value;
439  }
440  }
441  }
442 }
JobQueueGroup\USE_CACHE
const USE_CACHE
Definition: JobQueueGroup.php:43
Job\getType
getType()
Definition: Job.php:152
JobQueueGroup\waitForBackups
waitForBackups()
Wait for any slaves or backup queue servers to catch up.
Definition: JobQueueGroup.php:223
JobQueueGroup\executeReadyPeriodicTasks
executeReadyPeriodicTasks()
Execute any due periodic queue maintenance tasks for all queues.
Definition: JobQueueGroup.php:362
php
skin txt MediaWiki includes four core it has been set as the default in MediaWiki since the replacing Monobook it had been been the default skin since before being replaced by Vector largely rewritten in while keeping its appearance Several legacy skins were removed in the as the burden of supporting them became too heavy to bear Those in etc for skin dependent CSS etc for skin dependent JavaScript These can also be customised on a per user by etc This feature has led to a wide variety of user styles becoming that gallery is a good place to ending in php
Definition: skin.txt:62
JobQueueGroup\PROC_CACHE_TTL
const PROC_CACHE_TTL
Definition: JobQueueGroup.php:45
JobQueueGroup\CACHE_VERSION
const CACHE_VERSION
Definition: JobQueueGroup.php:47
JobQueueGroup\$coalescedQueues
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types) *.
Definition: JobQueueGroup.php:38
$wgMemc
globals will be eliminated from MediaWiki replaced by an application object which would be passed to constructors Whether that would be an convenient solution remains to be but certainly PHP makes such object oriented programming models easier than they were in previous versions For the time being MediaWiki programmers will have to work in an environment with some global context At the time of globals were initialised on startup by MediaWiki of these were configuration which are documented in DefaultSettings php There is no comprehensive documentation for the remaining however some of the most important ones are listed below They are typically initialised either in index php or in Setup php For a description of the see design txt $wgTitle Title object created from the request URL $wgOut OutputPage object for HTTP response $wgUser User object for the user associated with the current request $wgLang Language object selected by user preferences $wgContLang Language object associated with the wiki being viewed $wgParser Parser object Parser extensions register their hooks here $wgRequest WebRequest to get request data $wgMemc
Definition: globals.txt:25
JobQueueGroup\TYPE_DEFAULT
const TYPE_DEFAULT
Definition: JobQueueGroup.php:40
$timestamp
if( $limit) $timestamp
Definition: importImages.php:104
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
wfProfileIn
wfProfileIn( $functionname)
Begin profiling of a function.
Definition: Profiler.php:33
JobQueueGroup\getCoalescedQueues
getCoalescedQueues()
Definition: JobQueueGroup.php:326
JobQueueGroup\queuesHaveJobs
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Definition: JobQueueGroup.php:261
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:3620
JobQueueGroup\destroySingletons
static destroySingletons()
Destroy the singleton instances.
Definition: JobQueueGroup.php:75
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2113
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueGroup\$wiki
string $wiki
Wiki ID *.
Definition: JobQueueGroup.php:36
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
MWException
MediaWiki exception.
Definition: MWException.php:26
wfMemcKey
wfMemcKey()
Get a cache key.
Definition: GlobalFunctions.php:3571
JobQueueGroup\push
push( $jobs)
Insert jobs into the respective queues of with the belong.
Definition: JobQueueGroup.php:108
wfProfileOut
wfProfileOut( $functionname='missing')
Stop profiling of a function.
Definition: Profiler.php:46
JobQueueGroup\$cache
ProcessCacheLRU $cache
Definition: JobQueueGroup.php:34
JobQueueGroup\getCachedConfigVar
getCachedConfigVar( $name)
Definition: JobQueueGroup.php:420
array
the array() calling protocol came about after MediaWiki 1.4rc1.
List of Api Query prop modules.
global
when a variable name is used in a it is silently declared as a new masking the global
Definition: design.txt:93
JobQueueError
Definition: JobQueue.php:738
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix)
Get a cache key for a foreign DB.
Definition: GlobalFunctions.php:3588
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
false
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:188
JobQueueGroup\$instances
static $instances
Definition: JobQueueGroup.php:32
$ok
$ok
Definition: UtfNormalTest.php:71
JobQueueGroup\getDefaultQueueTypes
getDefaultQueueTypes()
Get the list of default queue types.
Definition: JobQueueGroup.php:248
wfWikiID
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
Definition: GlobalFunctions.php:3604
$name
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:336
$value
$value
Definition: styleTest.css.php:45
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:105
JobQueueGroup\__construct
__construct( $wiki)
Definition: JobQueueGroup.php:52
JobQueueGroup\pop
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=array())
Pop a job off one of the job queues.
Definition: JobQueueGroup.php:153
JobQueueGroup\deduplicateRootJob
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueueGroup.php:211
JobQueueGroup\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueueGroup.php:200
$count
$count
Definition: UtfNormalTest2.php:96
JobQueueGroup\TYPE_ANY
const TYPE_ANY
Definition: JobQueueGroup.php:41
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:42
JobQueueGroup\getQueueTypes
getQueueTypes()
Get the list of queue types.
Definition: JobQueueGroup.php:239
JobQueueGroup\singleton
static singleton( $wiki=false)
Definition: JobQueueGroup.php:61
JobQueueGroup\getQueuesWithJobs
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
Definition: JobQueueGroup.php:284
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
MWExceptionHandler\logException
static logException(Exception $e)
Log an exception to the exception log (if enabled).
Definition: MWExceptionHandler.php:351
ProcessCacheLRU
Handles per process caching of items.
Definition: ProcessCacheLRU.php:28
$e
if( $useReadline) $e
Definition: eval.php:66
JobQueueGroup\getQueueSizes
getQueueSizes()
Get the size of the queus for a list of job types.
Definition: JobQueueGroup.php:307
JobQueueAggregator\singleton
static singleton()
Definition: JobQueueAggregator.php:44
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
$GLOBALS
$GLOBALS['IP']
Definition: ComposerHookHandler.php:6
JobQueueGroup
Class to handle enqueueing of background jobs.
Definition: JobQueueGroup.php:30
$type
$type
Definition: testCompression.php:46