MediaWiki  1.28.3
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $wiki;
40  protected $readOnlyReason;
41 
43  protected $coalescedQueues;
44 
46  protected $bufferedJobs = [];
47 
48  const TYPE_DEFAULT = 1; // integer; jobs popped by default
49  const TYPE_ANY = 2; // integer; any job
50 
51  const USE_CACHE = 1; // integer; use process or persistent cache
52 
53  const PROC_CACHE_TTL = 15; // integer; seconds
54 
55  const CACHE_VERSION = 1; // integer; cache version
56 
61  protected function __construct( $wiki, $readOnlyReason ) {
62  $this->wiki = $wiki;
63  $this->readOnlyReason = $readOnlyReason;
64  $this->cache = new ProcessCacheLRU( 10 );
65  }
66 
71  public static function singleton( $wiki = false ) {
72  $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
73  if ( !isset( self::$instances[$wiki] ) ) {
74  self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
75  }
76 
77  return self::$instances[$wiki];
78  }
79 
85  public static function destroySingletons() {
86  self::$instances = [];
87  }
88 
95  public function get( $type ) {
96  global $wgJobTypeConf;
97 
98  $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
99  if ( isset( $wgJobTypeConf[$type] ) ) {
100  $conf = $conf + $wgJobTypeConf[$type];
101  } else {
102  $conf = $conf + $wgJobTypeConf['default'];
103  }
104  $conf['aggregator'] = JobQueueAggregator::singleton();
105  if ( $this->readOnlyReason !== false ) {
106  $conf['readOnlyReason'] = $this->readOnlyReason;
107  }
108 
109  return JobQueue::factory( $conf );
110  }
111 
122  public function push( $jobs ) {
123  global $wgJobTypesExcludedFromDefaultQueue;
124 
125  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
126  if ( !count( $jobs ) ) {
127  return;
128  }
129 
130  $this->assertValidJobs( $jobs );
131 
132  $jobsByType = []; // (job type => list of jobs)
133  foreach ( $jobs as $job ) {
134  $jobsByType[$job->getType()][] = $job;
135  }
136 
137  foreach ( $jobsByType as $type => $jobs ) {
138  $this->get( $type )->push( $jobs );
139  }
140 
141  if ( $this->cache->has( 'queues-ready', 'list' ) ) {
142  $list = $this->cache->get( 'queues-ready', 'list' );
143  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
144  $this->cache->clear( 'queues-ready' );
145  }
146  }
147 
149  $cache->set(
150  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ),
151  'true',
152  15
153  );
154  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
155  $cache->set(
156  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ),
157  'true',
158  15
159  );
160  }
161  }
162 
174  public function lazyPush( $jobs ) {
175  if ( PHP_SAPI === 'cli' ) {
176  $this->push( $jobs );
177  return;
178  }
179 
180  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
181 
182  // Throw errors now instead of on push(), when other jobs may be buffered
183  $this->assertValidJobs( $jobs );
184 
185  $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
186  }
187 
194  public static function pushLazyJobs() {
195  foreach ( self::$instances as $group ) {
196  try {
197  $group->push( $group->bufferedJobs );
198  $group->bufferedJobs = [];
199  } catch ( Exception $e ) {
200  // Get in as many jobs as possible and let other post-send updates happen
202  }
203  }
204  }
205 
217  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
218  $job = false;
219 
220  if ( is_string( $qtype ) ) { // specific job type
221  if ( !in_array( $qtype, $blacklist ) ) {
222  $job = $this->get( $qtype )->pop();
223  }
224  } else { // any job in the "default" jobs types
225  if ( $flags & self::USE_CACHE ) {
226  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
227  $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
228  }
229  $types = $this->cache->get( 'queues-ready', 'list' );
230  } else {
231  $types = $this->getQueuesWithJobs();
232  }
233 
234  if ( $qtype == self::TYPE_DEFAULT ) {
235  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
236  }
237 
238  $types = array_diff( $types, $blacklist ); // avoid selected types
239  shuffle( $types ); // avoid starvation
240 
241  foreach ( $types as $type ) { // for each queue...
242  $job = $this->get( $type )->pop();
243  if ( $job ) { // found
244  break;
245  } else { // not found
246  $this->cache->clear( 'queues-ready' );
247  }
248  }
249  }
250 
251  return $job;
252  }
253 
260  public function ack( Job $job ) {
261  $this->get( $job->getType() )->ack( $job );
262  }
263 
271  public function deduplicateRootJob( Job $job ) {
272  return $this->get( $job->getType() )->deduplicateRootJob( $job );
273  }
274 
282  public function waitForBackups() {
283  global $wgJobTypeConf;
284 
285  // Try to avoid doing this more than once per queue storage medium
286  foreach ( $wgJobTypeConf as $type => $conf ) {
287  $this->get( $type )->waitForBackups();
288  }
289  }
290 
296  public function getQueueTypes() {
297  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
298  }
299 
305  public function getDefaultQueueTypes() {
306  global $wgJobTypesExcludedFromDefaultQueue;
307 
308  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
309  }
310 
318  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
320  $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type );
321 
322  $value = $cache->get( $key );
323  if ( $value === false ) {
324  $queues = $this->getQueuesWithJobs();
325  if ( $type == self::TYPE_DEFAULT ) {
326  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
327  }
328  $value = count( $queues ) ? 'true' : 'false';
329  $cache->add( $key, $value, 15 );
330  }
331 
332  return ( $value === 'true' );
333  }
334 
340  public function getQueuesWithJobs() {
341  $types = [];
342  foreach ( $this->getCoalescedQueues() as $info ) {
343  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
344  if ( is_array( $nonEmpty ) ) { // batching features supported
345  $types = array_merge( $types, $nonEmpty );
346  } else { // we have to go through the queues in the bucket one-by-one
347  foreach ( $info['types'] as $type ) {
348  if ( !$this->get( $type )->isEmpty() ) {
349  $types[] = $type;
350  }
351  }
352  }
353  }
354 
355  return $types;
356  }
357 
363  public function getQueueSizes() {
364  $sizeMap = [];
365  foreach ( $this->getCoalescedQueues() as $info ) {
366  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
367  if ( is_array( $sizes ) ) { // batching features supported
368  $sizeMap = $sizeMap + $sizes;
369  } else { // we have to go through the queues in the bucket one-by-one
370  foreach ( $info['types'] as $type ) {
371  $sizeMap[$type] = $this->get( $type )->getSize();
372  }
373  }
374  }
375 
376  return $sizeMap;
377  }
378 
382  protected function getCoalescedQueues() {
383  global $wgJobTypeConf;
384 
385  if ( $this->coalescedQueues === null ) {
386  $this->coalescedQueues = [];
387  foreach ( $wgJobTypeConf as $type => $conf ) {
389  [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
390  $loc = $queue->getCoalesceLocationInternal();
391  if ( !isset( $this->coalescedQueues[$loc] ) ) {
392  $this->coalescedQueues[$loc]['queue'] = $queue;
393  $this->coalescedQueues[$loc]['types'] = [];
394  }
395  if ( $type === 'default' ) {
396  $this->coalescedQueues[$loc]['types'] = array_merge(
397  $this->coalescedQueues[$loc]['types'],
398  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
399  );
400  } else {
401  $this->coalescedQueues[$loc]['types'][] = $type;
402  }
403  }
404  }
405 
406  return $this->coalescedQueues;
407  }
408 
413  private function getCachedConfigVar( $name ) {
414  // @TODO: cleanup this whole method with a proper config system
415  if ( $this->wiki === wfWikiID() ) {
416  return $GLOBALS[$name]; // common case
417  } else {
418  $wiki = $this->wiki;
420  $value = $cache->getWithSetCallback(
421  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
422  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
423  function () use ( $wiki, $name ) {
424  global $wgConf;
425 
426  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
427  },
428  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
429  );
430 
431  return $value['v'];
432  }
433  }
434 
439  private function assertValidJobs( array $jobs ) {
440  foreach ( $jobs as $job ) { // sanity checks
441  if ( !( $job instanceof IJobSpecification ) ) {
442  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
443  }
444  }
445  }
446 
447  function __destruct() {
448  $n = count( $this->bufferedJobs );
449  if ( $n > 0 ) {
450  $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
451  trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
452  }
453  }
454 }
ProcessCacheLRU $cache
static getMainWANInstance()
Get the main WAN cache object.
getType()
Definition: Job.php:121
the array() calling protocol came about after MediaWiki 1.4rc1.
assertValidJobs(array $jobs)
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:2106
getCachedConfigVar($name)
queuesHaveJobs($type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Class to both describe a background job and handle jobs.
Definition: Job.php:31
$value
static getLocalClusterInstance()
Get the main cluster-local cache object.
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2707
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
when a variable name is used in a it is silently declared as a new local masking the global
Definition: design.txt:93
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
__construct($wiki, $readOnlyReason)
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
pop($qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
string $wiki
Wiki ID.
ack(Job $job)
Acknowledge that a job was completed.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
getDefaultQueueTypes()
Get the list of default queue types.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
getQueueTypes()
Get the list of queue types.
$GLOBALS['IP']
const PROC_CACHE_TTL
string bool $readOnlyReason
Read only rationale (or false if r/w)
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
push($jobs)
Insert jobs into the respective queues of which they belong.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
Class to handle enqueueing of background jobs.
static singleton($wiki=false)
getQueueSizes()
Get the size of the queus for a list of job types.
$wgConf
wgConf hold the site configuration.
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
static JobQueueGroup[] $instances
get($key, $prop)
Get a property field for a cache entry.
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
if(count($args)< 1) $job
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
set($key, $prop, $value)
Set a property field for a cache entry.
Job queue task description interface.
static logException($e)
Log an exception to the exception log (if enabled).
lazyPush($jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Handles per process caching of items.
do that in ParserLimitReportFormat instead use this to modify the parameters of the image and a DIV can begin in one section and end in another Make sure your code can handle that case gracefully See the EditSectionClearerLink extension for an example zero but section is usually empty its values are the globals values before the output is cached one of or reset my talk my contributions etc etc otherwise the built in rate limiting checks are if enabled allows for interception of redirect as a string mapping parameter names to values & $type
Definition: hooks.txt:2495
static destroySingletons()
Destroy the singleton instances.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:304