MediaWiki  1.28.1
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $wiki;
40  protected $readOnlyReason;
41 
43  protected $coalescedQueues;
44 
46  protected $bufferedJobs = [];
47 
48  const TYPE_DEFAULT = 1; // integer; jobs popped by default
49  const TYPE_ANY = 2; // integer; any job
50 
51  const USE_CACHE = 1; // integer; use process or persistent cache
52 
53  const PROC_CACHE_TTL = 15; // integer; seconds
54 
55  const CACHE_VERSION = 1; // integer; cache version
56 
61  protected function __construct( $wiki, $readOnlyReason ) {
62  $this->wiki = $wiki;
63  $this->readOnlyReason = $readOnlyReason;
64  $this->cache = new ProcessCacheLRU( 10 );
65  }
66 
71  public static function singleton( $wiki = false ) {
72  $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
73  if ( !isset( self::$instances[$wiki] ) ) {
74  self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
75  }
76 
77  return self::$instances[$wiki];
78  }
79 
85  public static function destroySingletons() {
86  self::$instances = [];
87  }
88 
95  public function get( $type ) {
96  global $wgJobTypeConf;
97 
98  $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
99  if ( isset( $wgJobTypeConf[$type] ) ) {
100  $conf = $conf + $wgJobTypeConf[$type];
101  } else {
102  $conf = $conf + $wgJobTypeConf['default'];
103  }
104  $conf['aggregator'] = JobQueueAggregator::singleton();
105  if ( $this->readOnlyReason !== false ) {
106  $conf['readOnlyReason'] = $this->readOnlyReason;
107  }
108 
109  return JobQueue::factory( $conf );
110  }
111 
122  public function push( $jobs ) {
123  global $wgJobTypesExcludedFromDefaultQueue;
124 
125  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
126  if ( !count( $jobs ) ) {
127  return;
128  }
129 
130  $this->assertValidJobs( $jobs );
131 
132  $jobsByType = []; // (job type => list of jobs)
133  foreach ( $jobs as $job ) {
134  $jobsByType[$job->getType()][] = $job;
135  }
136 
137  foreach ( $jobsByType as $type => $jobs ) {
138  $this->get( $type )->push( $jobs );
139  }
140 
141  if ( $this->cache->has( 'queues-ready', 'list' ) ) {
142  $list = $this->cache->get( 'queues-ready', 'list' );
143  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
144  $this->cache->clear( 'queues-ready' );
145  }
146  }
147 
149  $cache->set(
150  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ),
151  'true',
152  15
153  );
154  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
155  $cache->set(
156  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ),
157  'true',
158  15
159  );
160  }
161  }
162 
172  public function lazyPush( $jobs ) {
173  if ( PHP_SAPI === 'cli' ) {
174  $this->push( $jobs );
175  return;
176  }
177 
178  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
179 
180  // Throw errors now instead of on push(), when other jobs may be buffered
181  $this->assertValidJobs( $jobs );
182 
183  $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
184  }
185 
192  public static function pushLazyJobs() {
193  foreach ( self::$instances as $group ) {
194  try {
195  $group->push( $group->bufferedJobs );
196  $group->bufferedJobs = [];
197  } catch ( Exception $e ) {
198  // Get in as many jobs as possible and let other post-send updates happen
200  }
201  }
202  }
203 
215  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
216  $job = false;
217 
218  if ( is_string( $qtype ) ) { // specific job type
219  if ( !in_array( $qtype, $blacklist ) ) {
220  $job = $this->get( $qtype )->pop();
221  }
222  } else { // any job in the "default" jobs types
223  if ( $flags & self::USE_CACHE ) {
224  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
225  $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
226  }
227  $types = $this->cache->get( 'queues-ready', 'list' );
228  } else {
229  $types = $this->getQueuesWithJobs();
230  }
231 
232  if ( $qtype == self::TYPE_DEFAULT ) {
233  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
234  }
235 
236  $types = array_diff( $types, $blacklist ); // avoid selected types
237  shuffle( $types ); // avoid starvation
238 
239  foreach ( $types as $type ) { // for each queue...
240  $job = $this->get( $type )->pop();
241  if ( $job ) { // found
242  break;
243  } else { // not found
244  $this->cache->clear( 'queues-ready' );
245  }
246  }
247  }
248 
249  return $job;
250  }
251 
258  public function ack( Job $job ) {
259  $this->get( $job->getType() )->ack( $job );
260  }
261 
269  public function deduplicateRootJob( Job $job ) {
270  return $this->get( $job->getType() )->deduplicateRootJob( $job );
271  }
272 
280  public function waitForBackups() {
281  global $wgJobTypeConf;
282 
283  // Try to avoid doing this more than once per queue storage medium
284  foreach ( $wgJobTypeConf as $type => $conf ) {
285  $this->get( $type )->waitForBackups();
286  }
287  }
288 
294  public function getQueueTypes() {
295  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
296  }
297 
303  public function getDefaultQueueTypes() {
304  global $wgJobTypesExcludedFromDefaultQueue;
305 
306  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
307  }
308 
316  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
318  $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type );
319 
320  $value = $cache->get( $key );
321  if ( $value === false ) {
322  $queues = $this->getQueuesWithJobs();
323  if ( $type == self::TYPE_DEFAULT ) {
324  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
325  }
326  $value = count( $queues ) ? 'true' : 'false';
327  $cache->add( $key, $value, 15 );
328  }
329 
330  return ( $value === 'true' );
331  }
332 
338  public function getQueuesWithJobs() {
339  $types = [];
340  foreach ( $this->getCoalescedQueues() as $info ) {
341  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
342  if ( is_array( $nonEmpty ) ) { // batching features supported
343  $types = array_merge( $types, $nonEmpty );
344  } else { // we have to go through the queues in the bucket one-by-one
345  foreach ( $info['types'] as $type ) {
346  if ( !$this->get( $type )->isEmpty() ) {
347  $types[] = $type;
348  }
349  }
350  }
351  }
352 
353  return $types;
354  }
355 
361  public function getQueueSizes() {
362  $sizeMap = [];
363  foreach ( $this->getCoalescedQueues() as $info ) {
364  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
365  if ( is_array( $sizes ) ) { // batching features supported
366  $sizeMap = $sizeMap + $sizes;
367  } else { // we have to go through the queues in the bucket one-by-one
368  foreach ( $info['types'] as $type ) {
369  $sizeMap[$type] = $this->get( $type )->getSize();
370  }
371  }
372  }
373 
374  return $sizeMap;
375  }
376 
380  protected function getCoalescedQueues() {
381  global $wgJobTypeConf;
382 
383  if ( $this->coalescedQueues === null ) {
384  $this->coalescedQueues = [];
385  foreach ( $wgJobTypeConf as $type => $conf ) {
387  [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
388  $loc = $queue->getCoalesceLocationInternal();
389  if ( !isset( $this->coalescedQueues[$loc] ) ) {
390  $this->coalescedQueues[$loc]['queue'] = $queue;
391  $this->coalescedQueues[$loc]['types'] = [];
392  }
393  if ( $type === 'default' ) {
394  $this->coalescedQueues[$loc]['types'] = array_merge(
395  $this->coalescedQueues[$loc]['types'],
396  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
397  );
398  } else {
399  $this->coalescedQueues[$loc]['types'][] = $type;
400  }
401  }
402  }
403 
404  return $this->coalescedQueues;
405  }
406 
411  private function getCachedConfigVar( $name ) {
412  // @TODO: cleanup this whole method with a proper config system
413  if ( $this->wiki === wfWikiID() ) {
414  return $GLOBALS[$name]; // common case
415  } else {
416  $wiki = $this->wiki;
418  $value = $cache->getWithSetCallback(
419  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
420  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
421  function () use ( $wiki, $name ) {
422  global $wgConf;
423 
424  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
425  },
426  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
427  );
428 
429  return $value['v'];
430  }
431  }
432 
437  private function assertValidJobs( array $jobs ) {
438  foreach ( $jobs as $job ) { // sanity checks
439  if ( !( $job instanceof IJobSpecification ) ) {
440  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
441  }
442  }
443  }
444 
445  function __destruct() {
446  $n = count( $this->bufferedJobs );
447  if ( $n > 0 ) {
448  $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
449  trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
450  }
451  }
452 }
ProcessCacheLRU $cache
static getMainWANInstance()
Get the main WAN cache object.
getType()
Definition: Job.php:121
the array() calling protocol came about after MediaWiki 1.4rc1.
assertValidJobs(array $jobs)
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:2102
getCachedConfigVar($name)
queuesHaveJobs($type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Class to both describe a background job and handle jobs.
Definition: Job.php:31
$value
static getLocalClusterInstance()
Get the main cluster-local cache object.
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2703
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
when a variable name is used in a it is silently declared as a new local masking the global
Definition: design.txt:93
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
__construct($wiki, $readOnlyReason)
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
pop($qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
string $wiki
Wiki ID.
ack(Job $job)
Acknowledge that a job was completed.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
getDefaultQueueTypes()
Get the list of default queue types.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
getQueueTypes()
Get the list of queue types.
$GLOBALS['IP']
const PROC_CACHE_TTL
string bool $readOnlyReason
Read only rationale (or false if r/w)
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
push($jobs)
Insert jobs into the respective queues of which they belong.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
Class to handle enqueueing of background jobs.
static singleton($wiki=false)
getQueueSizes()
Get the size of the queus for a list of job types.
$wgConf
wgConf hold the site configuration.
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
static JobQueueGroup[] $instances
get($key, $prop)
Get a property field for a cache entry.
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
if(count($args)< 1) $job
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
set($key, $prop, $value)
Set a property field for a cache entry.
Job queue task description interface.
static logException($e)
Log an exception to the exception log (if enabled).
lazyPush($jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Handles per process caching of items.
do that in ParserLimitReportFormat instead use this to modify the parameters of the image and a DIV can begin in one section and end in another Make sure your code can handle that case gracefully See the EditSectionClearerLink extension for an example zero but section is usually empty its values are the globals values before the output is cached one of or reset my talk my contributions etc etc otherwise the built in rate limiting checks are if enabled allows for interception of redirect as a string mapping parameter names to values & $type
Definition: hooks.txt:2491
static destroySingletons()
Destroy the singleton instances.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:300