MediaWiki  1.30.0
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
31  protected static $instances = [];
32 
34  protected $cache;
35 
37  protected $wiki;
39  protected $readOnlyReason;
41  protected $invalidWiki = false;
42 
44  protected $coalescedQueues;
45 
47  protected $bufferedJobs = [];
48 
49  const TYPE_DEFAULT = 1; // integer; jobs popped by default
50  const TYPE_ANY = 2; // integer; any job
51 
52  const USE_CACHE = 1; // integer; use process or persistent cache
53 
54  const PROC_CACHE_TTL = 15; // integer; seconds
55 
56  const CACHE_VERSION = 1; // integer; cache version
57 
62  protected function __construct( $wiki, $readOnlyReason ) {
63  $this->wiki = $wiki;
64  $this->readOnlyReason = $readOnlyReason;
65  $this->cache = new ProcessCacheLRU( 10 );
66  }
67 
72  public static function singleton( $wiki = false ) {
74 
75  $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
76 
77  if ( !isset( self::$instances[$wiki] ) ) {
78  self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
79  // Make sure jobs are not getting pushed to bogus wikis. This can confuse
80  // the job runner system into spawning endless RPC requests that fail (T171371).
81  if ( $wiki !== wfWikiID() && !in_array( $wiki, $wgLocalDatabases ) ) {
82  self::$instances[$wiki]->invalidWiki = true;
83  }
84  }
85 
86  return self::$instances[$wiki];
87  }
88 
94  public static function destroySingletons() {
95  self::$instances = [];
96  }
97 
104  public function get( $type ) {
106 
107  $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
108  if ( isset( $wgJobTypeConf[$type] ) ) {
109  $conf = $conf + $wgJobTypeConf[$type];
110  } else {
111  $conf = $conf + $wgJobTypeConf['default'];
112  }
113  $conf['aggregator'] = JobQueueAggregator::singleton();
114  if ( $this->readOnlyReason !== false ) {
115  $conf['readOnlyReason'] = $this->readOnlyReason;
116  }
117 
118  return JobQueue::factory( $conf );
119  }
120 
131  public function push( $jobs ) {
133 
134  if ( $this->invalidWiki ) {
135  // Do not enqueue job that cannot be run (T171371)
136  $e = new LogicException( "Domain '{$this->wiki}' is not recognized." );
138  return;
139  }
140 
141  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
142  if ( !count( $jobs ) ) {
143  return;
144  }
145 
146  $this->assertValidJobs( $jobs );
147 
148  $jobsByType = []; // (job type => list of jobs)
149  foreach ( $jobs as $job ) {
150  $jobsByType[$job->getType()][] = $job;
151  }
152 
153  foreach ( $jobsByType as $type => $jobs ) {
154  $this->get( $type )->push( $jobs );
155  }
156 
157  if ( $this->cache->has( 'queues-ready', 'list' ) ) {
158  $list = $this->cache->get( 'queues-ready', 'list' );
159  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
160  $this->cache->clear( 'queues-ready' );
161  }
162  }
163 
165  $cache->set(
166  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ),
167  'true',
168  15
169  );
170  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
171  $cache->set(
172  $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ),
173  'true',
174  15
175  );
176  }
177  }
178 
190  public function lazyPush( $jobs ) {
191  if ( $this->invalidWiki ) {
192  // Do not enqueue job that cannot be run (T171371)
193  throw new LogicException( "Domain '{$this->wiki}' is not recognized." );
194  }
195 
196  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
197  $this->push( $jobs );
198  return;
199  }
200 
201  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
202 
203  // Throw errors now instead of on push(), when other jobs may be buffered
204  $this->assertValidJobs( $jobs );
205 
206  $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
207  }
208 
215  public static function pushLazyJobs() {
216  foreach ( self::$instances as $group ) {
217  try {
218  $group->push( $group->bufferedJobs );
219  $group->bufferedJobs = [];
220  } catch ( Exception $e ) {
221  // Get in as many jobs as possible and let other post-send updates happen
223  }
224  }
225  }
226 
238  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
239  $job = false;
240 
241  if ( is_string( $qtype ) ) { // specific job type
242  if ( !in_array( $qtype, $blacklist ) ) {
243  $job = $this->get( $qtype )->pop();
244  }
245  } else { // any job in the "default" jobs types
246  if ( $flags & self::USE_CACHE ) {
247  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
248  $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
249  }
250  $types = $this->cache->get( 'queues-ready', 'list' );
251  } else {
252  $types = $this->getQueuesWithJobs();
253  }
254 
255  if ( $qtype == self::TYPE_DEFAULT ) {
256  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
257  }
258 
259  $types = array_diff( $types, $blacklist ); // avoid selected types
260  shuffle( $types ); // avoid starvation
261 
262  foreach ( $types as $type ) { // for each queue...
263  $job = $this->get( $type )->pop();
264  if ( $job ) { // found
265  break;
266  } else { // not found
267  $this->cache->clear( 'queues-ready' );
268  }
269  }
270  }
271 
272  return $job;
273  }
274 
281  public function ack( Job $job ) {
282  $this->get( $job->getType() )->ack( $job );
283  }
284 
292  public function deduplicateRootJob( Job $job ) {
293  return $this->get( $job->getType() )->deduplicateRootJob( $job );
294  }
295 
303  public function waitForBackups() {
305 
306  // Try to avoid doing this more than once per queue storage medium
307  foreach ( $wgJobTypeConf as $type => $conf ) {
308  $this->get( $type )->waitForBackups();
309  }
310  }
311 
317  public function getQueueTypes() {
318  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
319  }
320 
326  public function getDefaultQueueTypes() {
328 
329  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
330  }
331 
339  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
341  $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type );
342 
343  $value = $cache->get( $key );
344  if ( $value === false ) {
345  $queues = $this->getQueuesWithJobs();
346  if ( $type == self::TYPE_DEFAULT ) {
347  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
348  }
349  $value = count( $queues ) ? 'true' : 'false';
350  $cache->add( $key, $value, 15 );
351  }
352 
353  return ( $value === 'true' );
354  }
355 
361  public function getQueuesWithJobs() {
362  $types = [];
363  foreach ( $this->getCoalescedQueues() as $info ) {
364  $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
365  if ( is_array( $nonEmpty ) ) { // batching features supported
366  $types = array_merge( $types, $nonEmpty );
367  } else { // we have to go through the queues in the bucket one-by-one
368  foreach ( $info['types'] as $type ) {
369  if ( !$this->get( $type )->isEmpty() ) {
370  $types[] = $type;
371  }
372  }
373  }
374  }
375 
376  return $types;
377  }
378 
384  public function getQueueSizes() {
385  $sizeMap = [];
386  foreach ( $this->getCoalescedQueues() as $info ) {
387  $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
388  if ( is_array( $sizes ) ) { // batching features supported
389  $sizeMap = $sizeMap + $sizes;
390  } else { // we have to go through the queues in the bucket one-by-one
391  foreach ( $info['types'] as $type ) {
392  $sizeMap[$type] = $this->get( $type )->getSize();
393  }
394  }
395  }
396 
397  return $sizeMap;
398  }
399 
403  protected function getCoalescedQueues() {
405 
406  if ( $this->coalescedQueues === null ) {
407  $this->coalescedQueues = [];
408  foreach ( $wgJobTypeConf as $type => $conf ) {
410  [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
411  $loc = $queue->getCoalesceLocationInternal();
412  if ( !isset( $this->coalescedQueues[$loc] ) ) {
413  $this->coalescedQueues[$loc]['queue'] = $queue;
414  $this->coalescedQueues[$loc]['types'] = [];
415  }
416  if ( $type === 'default' ) {
417  $this->coalescedQueues[$loc]['types'] = array_merge(
418  $this->coalescedQueues[$loc]['types'],
419  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
420  );
421  } else {
422  $this->coalescedQueues[$loc]['types'][] = $type;
423  }
424  }
425  }
426 
427  return $this->coalescedQueues;
428  }
429 
434  private function getCachedConfigVar( $name ) {
435  // @TODO: cleanup this whole method with a proper config system
436  if ( $this->wiki === wfWikiID() ) {
437  return $GLOBALS[$name]; // common case
438  } else {
439  $wiki = $this->wiki;
441  $value = $cache->getWithSetCallback(
442  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
443  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
444  function () use ( $wiki, $name ) {
445  global $wgConf;
446 
447  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
448  },
449  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
450  );
451 
452  return $value['v'];
453  }
454  }
455 
460  private function assertValidJobs( array $jobs ) {
461  foreach ( $jobs as $job ) { // sanity checks
462  if ( !( $job instanceof IJobSpecification ) ) {
463  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
464  }
465  }
466  }
467 
468  function __destruct() {
469  $n = count( $this->bufferedJobs );
470  if ( $n > 0 ) {
471  $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
472  trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
473  }
474  }
475 }
$wgConf
$wgConf
wgConf hold the site configuration.
Definition: DefaultSettings.php:62
JobQueueGroup\pop
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
Definition: JobQueueGroup.php:238
JobQueueGroup\USE_CACHE
const USE_CACHE
Definition: JobQueueGroup.php:52
JobQueueGroup\__construct
__construct( $wiki, $readOnlyReason)
Definition: JobQueueGroup.php:62
Job\getType
getType()
Definition: Job.php:131
JobQueueGroup\waitForBackups
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
Definition: JobQueueGroup.php:303
false
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:187
ObjectCache\getLocalClusterInstance
static getLocalClusterInstance()
Get the main cluster-local cache object.
Definition: ObjectCache.php:357
JobQueueGroup\PROC_CACHE_TTL
const PROC_CACHE_TTL
Definition: JobQueueGroup.php:54
JobQueueGroup\CACHE_VERSION
const CACHE_VERSION
Definition: JobQueueGroup.php:56
$wgJobTypeConf
$wgJobTypeConf
Map of job types to configuration arrays.
Definition: DefaultSettings.php:7461
JobQueueGroup\$coalescedQueues
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
Definition: JobQueueGroup.php:44
captcha-old.count
count
Definition: captcha-old.py:249
JobQueueGroup\__destruct
__destruct()
Definition: JobQueueGroup.php:468
JobQueueGroup\$invalidWiki
bool $invalidWiki
Whether the wiki is not recognized in configuration.
Definition: JobQueueGroup.php:41
JobQueueGroup\TYPE_DEFAULT
const TYPE_DEFAULT
Definition: JobQueueGroup.php:49
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
wfConfiguredReadOnlyReason
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Definition: GlobalFunctions.php:1348
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
JobQueueGroup\getCoalescedQueues
getCoalescedQueues()
Definition: JobQueueGroup.php:403
JobQueueGroup\$instances
static JobQueueGroup[] $instances
Definition: JobQueueGroup.php:31
JobQueueGroup\queuesHaveJobs
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Definition: JobQueueGroup.php:339
JobQueueGroup\destroySingletons
static destroySingletons()
Destroy the singleton instances.
Definition: JobQueueGroup.php:94
$name
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:302
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueGroup\$wiki
string $wiki
Wiki ID.
Definition: JobQueueGroup.php:37
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
ProcessCacheLRU\set
set( $key, $prop, $value)
Set a property field for a cache entry.
Definition: ProcessCacheLRU.php:56
JobQueueGroup\push
push( $jobs)
Insert jobs into the respective queues of which they belong.
Definition: JobQueueGroup.php:131
JobQueueGroup\$cache
ProcessCacheLRU $cache
Definition: JobQueueGroup.php:34
$queue
$queue
Definition: mergeMessageFileList.php:161
JobQueueGroup\getCachedConfigVar
getCachedConfigVar( $name)
Definition: JobQueueGroup.php:434
global
when a variable name is used in a it is silently declared as a new masking the global
Definition: design.txt:93
$GLOBALS
$GLOBALS['wgAutoloadClasses']['LocalisationUpdate']
Definition: Autoload.php:10
JobQueueGroup\pushLazyJobs
static pushLazyJobs()
Push all jobs buffered via lazyPush() into their respective queues.
Definition: JobQueueGroup.php:215
$wgLocalDatabases
$wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
Definition: DefaultSettings.php:2069
$wgJobTypesExcludedFromDefaultQueue
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
Definition: DefaultSettings.php:7427
JobQueueGroup\getDefaultQueueTypes
getDefaultQueueTypes()
Get the list of default queue types.
Definition: JobQueueGroup.php:326
JobQueueGroup\$bufferedJobs
Job[] $bufferedJobs
Definition: JobQueueGroup.php:47
wfWikiID
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
Definition: GlobalFunctions.php:2807
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2141
$value
$value
Definition: styleTest.css.php:45
JobQueueGroup\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueueGroup.php:39
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
ProcessCacheLRU\get
get( $key, $prop)
Get a property field for a cache entry.
Definition: ProcessCacheLRU.php:96
JobQueueGroup\deduplicateRootJob
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueueGroup.php:292
JobQueueGroup\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueueGroup.php:281
JobQueueGroup\TYPE_ANY
const TYPE_ANY
Definition: JobQueueGroup.php:50
ObjectCache\getMainWANInstance
static getMainWANInstance()
Get the main WAN cache object.
Definition: ObjectCache.php:370
JobQueueGroup\assertValidJobs
assertValidJobs(array $jobs)
Definition: JobQueueGroup.php:460
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:47
JobQueueGroup\getQueueTypes
getQueueTypes()
Get the list of queue types.
Definition: JobQueueGroup.php:317
JobQueueGroup\singleton
static singleton( $wiki=false)
Definition: JobQueueGroup.php:72
JobQueueGroup\getQueuesWithJobs
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
Definition: JobQueueGroup.php:361
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
ProcessCacheLRU
Handles per process caching of items.
Definition: ProcessCacheLRU.php:29
JobQueueGroup\getQueueSizes
getQueueSizes()
Get the size of the queus for a list of job types.
Definition: JobQueueGroup.php:384
JobQueueAggregator\singleton
static singleton()
Definition: JobQueueAggregator.php:43
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2801
IExpiringStore\TTL_PROC_LONG
const TTL_PROC_LONG
Definition: IExpiringStore.php:42
array
the array() calling protocol came about after MediaWiki 1.4rc1.
JobQueueGroup\lazyPush
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Definition: JobQueueGroup.php:190
JobQueueGroup
Class to handle enqueueing of background jobs.
Definition: JobQueueGroup.php:29
MWExceptionHandler\logException
static logException( $e, $catcher=self::CAUGHT_BY_OTHER)
Log an exception to the exception log (if enabled).
Definition: MWExceptionHandler.php:613
$type
$type
Definition: testCompression.php:48