MediaWiki  1.27.1
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $wiki;
40  protected $readOnlyReason;
41 
43  protected $coalescedQueues;
44 
46  protected $bufferedJobs = [];
47 
48  const TYPE_DEFAULT = 1; // integer; jobs popped by default
49  const TYPE_ANY = 2; // integer; any job
50 
51  const USE_CACHE = 1; // integer; use process or persistent cache
52 
53  const PROC_CACHE_TTL = 15; // integer; seconds
54 
55  const CACHE_VERSION = 1; // integer; cache version
56 
61  protected function __construct( $wiki, $readOnlyReason ) {
62  $this->wiki = $wiki;
63  $this->readOnlyReason = $readOnlyReason;
64  $this->cache = new ProcessCacheLRU( 10 );
65  }
66 
71  public static function singleton( $wiki = false ) {
72  $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
73  if ( !isset( self::$instances[$wiki] ) ) {
74  self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
75  }
76 
77  return self::$instances[$wiki];
78  }
79 
85  public static function destroySingletons() {
86  self::$instances = [];
87  }
88 
95  public function get( $type ) {
96  global $wgJobTypeConf;
97 
98  $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
99  if ( isset( $wgJobTypeConf[$type] ) ) {
100  $conf = $conf + $wgJobTypeConf[$type];
101  } else {
102  $conf = $conf + $wgJobTypeConf['default'];
103  }
104  $conf['aggregator'] = JobQueueAggregator::singleton();
105  if ( $this->readOnlyReason !== false ) {
106  $conf['readOnlyReason'] = $this->readOnlyReason;
107  }
108 
109  return JobQueue::factory( $conf );
110  }
111 
122  public function push( $jobs ) {
123  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
124  if ( !count( $jobs ) ) {
125  return;
126  }
127 
128  $this->assertValidJobs( $jobs );
129 
130  $jobsByType = []; // (job type => list of jobs)
131  foreach ( $jobs as $job ) {
132  $jobsByType[$job->getType()][] = $job;
133  }
134 
135  foreach ( $jobsByType as $type => $jobs ) {
136  $this->get( $type )->push( $jobs );
137  }
138 
139  if ( $this->cache->has( 'queues-ready', 'list' ) ) {
140  $list = $this->cache->get( 'queues-ready', 'list' );
141  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
142  $this->cache->clear( 'queues-ready' );
143  }
144  }
145  }
146 
156  public function lazyPush( $jobs ) {
157  if ( PHP_SAPI === 'cli' ) {
158  $this->push( $jobs );
159  return;
160  }
161 
162  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
163 
164  // Throw errors now instead of on push(), when other jobs may be buffered
165  $this->assertValidJobs( $jobs );
166 
167  $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
168  }
169 
176  public static function pushLazyJobs() {
177  foreach ( self::$instances as $group ) {
178  try {
179  $group->push( $group->bufferedJobs );
180  $group->bufferedJobs = [];
181  } catch ( Exception $e ) {
182  // Get in as many jobs as possible and let other post-send updates happen
184  }
185  }
186  }
187 
199  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
200  $job = false;
201 
202  if ( is_string( $qtype ) ) { // specific job type
203  if ( !in_array( $qtype, $blacklist ) ) {
204  $job = $this->get( $qtype )->pop();
205  }
206  } else { // any job in the "default" jobs types
207  if ( $flags & self::USE_CACHE ) {
208  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
209  $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
210  }
211  $types = $this->cache->get( 'queues-ready', 'list' );
212  } else {
213  $types = $this->getQueuesWithJobs();
214  }
215 
216  if ( $qtype == self::TYPE_DEFAULT ) {
217  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
218  }
219 
220  $types = array_diff( $types, $blacklist ); // avoid selected types
221  shuffle( $types ); // avoid starvation
222 
223  foreach ( $types as $type ) { // for each queue...
224  $job = $this->get( $type )->pop();
225  if ( $job ) { // found
226  break;
227  } else { // not found
228  $this->cache->clear( 'queues-ready' );
229  }
230  }
231  }
232 
233  return $job;
234  }
235 
242  public function ack( Job $job ) {
243  $this->get( $job->getType() )->ack( $job );
244  }
245 
253  public function deduplicateRootJob( Job $job ) {
254  return $this->get( $job->getType() )->deduplicateRootJob( $job );
255  }
256 
264  public function waitForBackups() {
265  global $wgJobTypeConf;
266 
267  // Try to avoid doing this more than once per queue storage medium
268  foreach ( $wgJobTypeConf as $type => $conf ) {
269  $this->get( $type )->waitForBackups();
270  }
271  }
272 
278  public function getQueueTypes() {
279  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
280  }
281 
287  public function getDefaultQueueTypes() {
288  global $wgJobTypesExcludedFromDefaultQueue;
289 
290  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
291  }
292 
300  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
301  $key = wfMemcKey( 'jobqueue', 'queueshavejobs', $type );
303 
304  $value = $cache->get( $key );
305  if ( $value === false ) {
306  $queues = $this->getQueuesWithJobs();
307  if ( $type == self::TYPE_DEFAULT ) {
308  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
309  }
310  $value = count( $queues ) ? 'true' : 'false';
311  $cache->add( $key, $value, 15 );
312  }
313 
314  return ( $value === 'true' );
315  }
316 
322  public function getQueuesWithJobs() {
323  $types = [];
324  foreach ( $this->getCoalescedQueues() as $info ) {
325  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
326  if ( is_array( $nonEmpty ) ) { // batching features supported
327  $types = array_merge( $types, $nonEmpty );
328  } else { // we have to go through the queues in the bucket one-by-one
329  foreach ( $info['types'] as $type ) {
330  if ( !$this->get( $type )->isEmpty() ) {
331  $types[] = $type;
332  }
333  }
334  }
335  }
336 
337  return $types;
338  }
339 
345  public function getQueueSizes() {
346  $sizeMap = [];
347  foreach ( $this->getCoalescedQueues() as $info ) {
348  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
349  if ( is_array( $sizes ) ) { // batching features supported
350  $sizeMap = $sizeMap + $sizes;
351  } else { // we have to go through the queues in the bucket one-by-one
352  foreach ( $info['types'] as $type ) {
353  $sizeMap[$type] = $this->get( $type )->getSize();
354  }
355  }
356  }
357 
358  return $sizeMap;
359  }
360 
364  protected function getCoalescedQueues() {
365  global $wgJobTypeConf;
366 
367  if ( $this->coalescedQueues === null ) {
368  $this->coalescedQueues = [];
369  foreach ( $wgJobTypeConf as $type => $conf ) {
371  [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
372  $loc = $queue->getCoalesceLocationInternal();
373  if ( !isset( $this->coalescedQueues[$loc] ) ) {
374  $this->coalescedQueues[$loc]['queue'] = $queue;
375  $this->coalescedQueues[$loc]['types'] = [];
376  }
377  if ( $type === 'default' ) {
378  $this->coalescedQueues[$loc]['types'] = array_merge(
379  $this->coalescedQueues[$loc]['types'],
380  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
381  );
382  } else {
383  $this->coalescedQueues[$loc]['types'][] = $type;
384  }
385  }
386  }
387 
388  return $this->coalescedQueues;
389  }
390 
395  private function getCachedConfigVar( $name ) {
396  // @TODO: cleanup this whole method with a proper config system
397  if ( $this->wiki === wfWikiID() ) {
398  return $GLOBALS[$name]; // common case
399  } else {
400  $wiki = $this->wiki;
402  $value = $cache->getWithSetCallback(
403  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
404  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
405  function () use ( $wiki, $name ) {
406  global $wgConf;
407 
408  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
409  },
410  [ 'pcTTL' => 30 ]
411  );
412 
413  return $value['v'];
414  }
415  }
416 
421  private function assertValidJobs( array $jobs ) {
422  foreach ( $jobs as $job ) { // sanity checks
423  if ( !( $job instanceof IJobSpecification ) ) {
424  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
425  }
426  }
427  }
428 
429  function __destruct() {
430  $n = count( $this->bufferedJobs );
431  if ( $n > 0 ) {
432  $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
433  trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
434  }
435  }
436 }
ProcessCacheLRU $cache
static getMainWANInstance()
Get the main WAN cache object.
getType()
Definition: Job.php:121
the array() calling protocol came about after MediaWiki 1.4rc1.
assertValidJobs(array $jobs)
magic word the default is to use $key to get the and $key value or $key value text $key value html to format the value $key
Definition: hooks.txt:2321
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:1932
getCachedConfigVar($name)
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
queuesHaveJobs($type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Class to both describe a background job and handle jobs.
Definition: Job.php:31
$value
static getLocalClusterInstance()
Get the main cluster-local cache object.
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2548
waitForBackups()
Wait for any slaves or backup queue servers to catch up.
when a variable name is used in a it is silently declared as a new local masking the global
Definition: design.txt:93
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
__construct($wiki, $readOnlyReason)
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
pop($qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
string $wiki
Wiki ID.
ack(Job $job)
Acknowledge that a job was completed.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
getDefaultQueueTypes()
Get the list of default queue types.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
getQueueTypes()
Get the list of queue types.
$GLOBALS['IP']
const PROC_CACHE_TTL
string bool $readOnlyReason
Read only rationale (or false if r/w)
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
push($jobs)
Insert jobs into the respective queues of which they belong.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
Class to handle enqueueing of background jobs.
static singleton($wiki=false)
getQueueSizes()
Get the size of the queus for a list of job types.
$wgConf
wgConf hold the site configuration.
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
static JobQueueGroup[] $instances
get($key, $prop)
Get a property field for a cache entry.
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
if(count($args)< 1) $job
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
Job queue task description interface.
wfMemcKey()
Make a cache key for the local wiki.
static logException($e)
Log an exception to the exception log (if enabled).
lazyPush($jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Handles per process caching of items.
do that in ParserLimitReportFormat instead use this to modify the parameters of the image and a DIV can begin in one section and end in another Make sure your code can handle that case gracefully See the EditSectionClearerLink extension for an example zero but section is usually empty its values are the globals values before the output is cached one of or reset my talk my contributions etc etc otherwise the built in rate limiting checks are if enabled allows for interception of redirect as a string mapping parameter names to values & $type
Definition: hooks.txt:2338
static destroySingletons()
Destroy the singleton instances.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:310