MediaWiki  1.27.2
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $wiki;
40  protected $readOnlyReason;
41 
43  protected $coalescedQueues;
44 
46  protected $bufferedJobs = [];
47 
48  const TYPE_DEFAULT = 1; // integer; jobs popped by default
49  const TYPE_ANY = 2; // integer; any job
50 
51  const USE_CACHE = 1; // integer; use process or persistent cache
52 
53  const PROC_CACHE_TTL = 15; // integer; seconds
54 
55  const CACHE_VERSION = 1; // integer; cache version
56 
61  protected function __construct( $wiki, $readOnlyReason ) {
62  $this->wiki = $wiki;
63  $this->readOnlyReason = $readOnlyReason;
64  $this->cache = new ProcessCacheLRU( 10 );
65  }
66 
71  public static function singleton( $wiki = false ) {
72  $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
73  if ( !isset( self::$instances[$wiki] ) ) {
74  self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
75  }
76 
77  return self::$instances[$wiki];
78  }
79 
85  public static function destroySingletons() {
86  self::$instances = [];
87  }
88 
95  public function get( $type ) {
96  global $wgJobTypeConf;
97 
98  $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
99  if ( isset( $wgJobTypeConf[$type] ) ) {
100  $conf = $conf + $wgJobTypeConf[$type];
101  } else {
102  $conf = $conf + $wgJobTypeConf['default'];
103  }
104  $conf['aggregator'] = JobQueueAggregator::singleton();
105  if ( $this->readOnlyReason !== false ) {
106  $conf['readOnlyReason'] = $this->readOnlyReason;
107  }
108 
109  return JobQueue::factory( $conf );
110  }
111 
122  public function push( $jobs ) {
123  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
124  if ( !count( $jobs ) ) {
125  return;
126  }
127 
128  $this->assertValidJobs( $jobs );
129 
130  $jobsByType = []; // (job type => list of jobs)
131  foreach ( $jobs as $job ) {
132  $jobsByType[$job->getType()][] = $job;
133  }
134 
135  foreach ( $jobsByType as $type => $jobs ) {
136  $this->get( $type )->push( $jobs );
137  }
138 
139  if ( $this->cache->has( 'queues-ready', 'list' ) ) {
140  $list = $this->cache->get( 'queues-ready', 'list' );
141  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
142  $this->cache->clear( 'queues-ready' );
143  }
144  }
145 
147  $cache->set(
148  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ),
149  'true',
150  15
151  );
152  if ( array_intersect( array_keys( $jobsByType ), $this->getDefaultQueueTypes() ) ) {
153  $cache->set(
154  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ),
155  'true',
156  15
157  );
158  }
159  }
160 
170  public function lazyPush( $jobs ) {
171  if ( PHP_SAPI === 'cli' ) {
172  $this->push( $jobs );
173  return;
174  }
175 
176  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
177 
178  // Throw errors now instead of on push(), when other jobs may be buffered
179  $this->assertValidJobs( $jobs );
180 
181  $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
182  }
183 
190  public static function pushLazyJobs() {
191  foreach ( self::$instances as $group ) {
192  try {
193  $group->push( $group->bufferedJobs );
194  $group->bufferedJobs = [];
195  } catch ( Exception $e ) {
196  // Get in as many jobs as possible and let other post-send updates happen
198  }
199  }
200  }
201 
213  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
214  $job = false;
215 
216  if ( is_string( $qtype ) ) { // specific job type
217  if ( !in_array( $qtype, $blacklist ) ) {
218  $job = $this->get( $qtype )->pop();
219  }
220  } else { // any job in the "default" jobs types
221  if ( $flags & self::USE_CACHE ) {
222  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
223  $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
224  }
225  $types = $this->cache->get( 'queues-ready', 'list' );
226  } else {
227  $types = $this->getQueuesWithJobs();
228  }
229 
230  if ( $qtype == self::TYPE_DEFAULT ) {
231  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
232  }
233 
234  $types = array_diff( $types, $blacklist ); // avoid selected types
235  shuffle( $types ); // avoid starvation
236 
237  foreach ( $types as $type ) { // for each queue...
238  $job = $this->get( $type )->pop();
239  if ( $job ) { // found
240  break;
241  } else { // not found
242  $this->cache->clear( 'queues-ready' );
243  }
244  }
245  }
246 
247  return $job;
248  }
249 
256  public function ack( Job $job ) {
257  $this->get( $job->getType() )->ack( $job );
258  }
259 
267  public function deduplicateRootJob( Job $job ) {
268  return $this->get( $job->getType() )->deduplicateRootJob( $job );
269  }
270 
278  public function waitForBackups() {
279  global $wgJobTypeConf;
280 
281  // Try to avoid doing this more than once per queue storage medium
282  foreach ( $wgJobTypeConf as $type => $conf ) {
283  $this->get( $type )->waitForBackups();
284  }
285  }
286 
292  public function getQueueTypes() {
293  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
294  }
295 
301  public function getDefaultQueueTypes() {
302  global $wgJobTypesExcludedFromDefaultQueue;
303 
304  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
305  }
306 
314  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
316  $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type );
317 
318  $value = $cache->get( $key );
319  if ( $value === false ) {
320  $queues = $this->getQueuesWithJobs();
321  if ( $type == self::TYPE_DEFAULT ) {
322  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
323  }
324  $value = count( $queues ) ? 'true' : 'false';
325  $cache->add( $key, $value, 15 );
326  }
327 
328  return ( $value === 'true' );
329  }
330 
336  public function getQueuesWithJobs() {
337  $types = [];
338  foreach ( $this->getCoalescedQueues() as $info ) {
339  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
340  if ( is_array( $nonEmpty ) ) { // batching features supported
341  $types = array_merge( $types, $nonEmpty );
342  } else { // we have to go through the queues in the bucket one-by-one
343  foreach ( $info['types'] as $type ) {
344  if ( !$this->get( $type )->isEmpty() ) {
345  $types[] = $type;
346  }
347  }
348  }
349  }
350 
351  return $types;
352  }
353 
359  public function getQueueSizes() {
360  $sizeMap = [];
361  foreach ( $this->getCoalescedQueues() as $info ) {
362  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
363  if ( is_array( $sizes ) ) { // batching features supported
364  $sizeMap = $sizeMap + $sizes;
365  } else { // we have to go through the queues in the bucket one-by-one
366  foreach ( $info['types'] as $type ) {
367  $sizeMap[$type] = $this->get( $type )->getSize();
368  }
369  }
370  }
371 
372  return $sizeMap;
373  }
374 
378  protected function getCoalescedQueues() {
379  global $wgJobTypeConf;
380 
381  if ( $this->coalescedQueues === null ) {
382  $this->coalescedQueues = [];
383  foreach ( $wgJobTypeConf as $type => $conf ) {
385  [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
386  $loc = $queue->getCoalesceLocationInternal();
387  if ( !isset( $this->coalescedQueues[$loc] ) ) {
388  $this->coalescedQueues[$loc]['queue'] = $queue;
389  $this->coalescedQueues[$loc]['types'] = [];
390  }
391  if ( $type === 'default' ) {
392  $this->coalescedQueues[$loc]['types'] = array_merge(
393  $this->coalescedQueues[$loc]['types'],
394  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
395  );
396  } else {
397  $this->coalescedQueues[$loc]['types'][] = $type;
398  }
399  }
400  }
401 
402  return $this->coalescedQueues;
403  }
404 
409  private function getCachedConfigVar( $name ) {
410  // @TODO: cleanup this whole method with a proper config system
411  if ( $this->wiki === wfWikiID() ) {
412  return $GLOBALS[$name]; // common case
413  } else {
414  $wiki = $this->wiki;
416  $value = $cache->getWithSetCallback(
417  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
418  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
419  function () use ( $wiki, $name ) {
420  global $wgConf;
421 
422  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
423  },
424  [ 'pcTTL' => 30 ]
425  );
426 
427  return $value['v'];
428  }
429  }
430 
435  private function assertValidJobs( array $jobs ) {
436  foreach ( $jobs as $job ) { // sanity checks
437  if ( !( $job instanceof IJobSpecification ) ) {
438  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
439  }
440  }
441  }
442 
443  function __destruct() {
444  $n = count( $this->bufferedJobs );
445  if ( $n > 0 ) {
446  $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
447  trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
448  }
449  }
450 }
ProcessCacheLRU $cache
static getMainWANInstance()
Get the main WAN cache object.
getType()
Definition: Job.php:121
the array() calling protocol came about after MediaWiki 1.4rc1.
assertValidJobs(array $jobs)
magic word the default is to use $key to get the and $key value or $key value text $key value html to format the value $key
Definition: hooks.txt:2321
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:1932
getCachedConfigVar($name)
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
queuesHaveJobs($type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Class to both describe a background job and handle jobs.
Definition: Job.php:31
$value
static getLocalClusterInstance()
Get the main cluster-local cache object.
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2548
waitForBackups()
Wait for any slaves or backup queue servers to catch up.
when a variable name is used in a it is silently declared as a new local masking the global
Definition: design.txt:93
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
__construct($wiki, $readOnlyReason)
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
pop($qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
string $wiki
Wiki ID.
ack(Job $job)
Acknowledge that a job was completed.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
getDefaultQueueTypes()
Get the list of default queue types.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
getQueueTypes()
Get the list of queue types.
$GLOBALS['IP']
const PROC_CACHE_TTL
string bool $readOnlyReason
Read only rationale (or false if r/w)
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
push($jobs)
Insert jobs into the respective queues of which they belong.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
Class to handle enqueueing of background jobs.
static singleton($wiki=false)
getQueueSizes()
Get the size of the queus for a list of job types.
$wgConf
wgConf hold the site configuration.
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
static JobQueueGroup[] $instances
get($key, $prop)
Get a property field for a cache entry.
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
if(count($args)< 1) $job
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
set($key, $prop, $value)
Set a property field for a cache entry.
Job queue task description interface.
static logException($e)
Log an exception to the exception log (if enabled).
lazyPush($jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Handles per process caching of items.
do that in ParserLimitReportFormat instead use this to modify the parameters of the image and a DIV can begin in one section and end in another Make sure your code can handle that case gracefully See the EditSectionClearerLink extension for an example zero but section is usually empty its values are the globals values before the output is cached one of or reset my talk my contributions etc etc otherwise the built in rate limiting checks are if enabled allows for interception of redirect as a string mapping parameter names to values & $type
Definition: hooks.txt:2338
static destroySingletons()
Destroy the singleton instances.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:310