MediaWiki  master
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
23 
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $domain;
40  protected $readOnlyReason;
42  protected $invalidDomain = false;
43 
45  protected $coalescedQueues;
46 
47  const TYPE_DEFAULT = 1; // integer; jobs popped by default
48  const TYPE_ANY = 2; // integer; any job
49 
50  const USE_CACHE = 1; // integer; use process or persistent cache
51 
52  const PROC_CACHE_TTL = 15; // integer; seconds
53 
54  const CACHE_VERSION = 1; // integer; cache version
55 
60  protected function __construct( $domain, $readOnlyReason ) {
61  $this->domain = $domain;
62  $this->readOnlyReason = $readOnlyReason;
63  $this->cache = new MapCacheLRU( 10 );
64  }
65 
70  public static function singleton( $domain = false ) {
71  global $wgLocalDatabases;
72 
73  if ( $domain === false ) {
75  }
76 
77  if ( !isset( self::$instances[$domain] ) ) {
78  self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
79  // Make sure jobs are not getting pushed to bogus wikis. This can confuse
80  // the job runner system into spawning endless RPC requests that fail (T171371).
81  $wikiId = WikiMap::getWikiIdFromDbDomain( $domain );
82  if (
83  !WikiMap::isCurrentWikiDbDomain( $domain ) &&
84  !in_array( $wikiId, $wgLocalDatabases )
85  ) {
86  self::$instances[$domain]->invalidDomain = true;
87  }
88  }
89 
90  return self::$instances[$domain];
91  }
92 
98  public static function destroySingletons() {
99  self::$instances = [];
100  }
101 
108  public function get( $type ) {
109  global $wgJobTypeConf;
110 
111  $conf = [ 'domain' => $this->domain, 'type' => $type ];
112  if ( isset( $wgJobTypeConf[$type] ) ) {
113  $conf = $conf + $wgJobTypeConf[$type];
114  } else {
115  $conf = $conf + $wgJobTypeConf['default'];
116  }
117  if ( !isset( $conf['readOnlyReason'] ) ) {
118  $conf['readOnlyReason'] = $this->readOnlyReason;
119  }
120 
121  $services = MediaWikiServices::getInstance();
122  $conf['stats'] = $services->getStatsdDataFactory();
123  $conf['wanCache'] = $services->getMainWANObjectCache();
124 
125  return JobQueue::factory( $conf );
126  }
127 
138  public function push( $jobs ) {
140 
141  if ( $this->invalidDomain ) {
142  // Do not enqueue job that cannot be run (T171371)
143  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
145  return;
146  }
147 
148  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
149  if ( $jobs === [] ) {
150  return;
151  }
152 
153  $this->assertValidJobs( $jobs );
154 
155  $jobsByType = []; // (job type => list of jobs)
156  foreach ( $jobs as $job ) {
157  $jobsByType[$job->getType()][] = $job;
158  }
159 
160  foreach ( $jobsByType as $type => $jobs ) {
161  $this->get( $type )->push( $jobs );
162  }
163 
164  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
165  $list = $this->cache->getField( 'queues-ready', 'list' );
166  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
167  $this->cache->clear( 'queues-ready' );
168  }
169  }
170 
172  $cache->set(
173  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
174  'true',
175  15
176  );
177  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
178  $cache->set(
179  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
180  'true',
181  15
182  );
183  }
184  }
185 
193  public function lazyPush( $jobs ) {
194  if ( $this->invalidDomain ) {
195  // Do not enqueue job that cannot be run (T171371)
196  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
197  }
198 
199  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
200  $this->push( $jobs );
201  return;
202  }
203 
204  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
205 
206  // Throw errors now instead of on push(), when other jobs may be buffered
207  $this->assertValidJobs( $jobs );
208 
209  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
210  }
211 
223  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
224  global $wgJobClasses;
225 
226  $job = false;
227 
228  if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
229  throw new JobQueueError(
230  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
231  } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
232  // Do not pop jobs if there is no class for the queue type
233  throw new JobQueueError( "Unrecognized job type '$qtype'." );
234  }
235 
236  if ( is_string( $qtype ) ) { // specific job type
237  if ( !in_array( $qtype, $blacklist ) ) {
238  $job = $this->get( $qtype )->pop();
239  }
240  } else { // any job in the "default" jobs types
241  if ( $flags & self::USE_CACHE ) {
242  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
243  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
244  }
245  $types = $this->cache->getField( 'queues-ready', 'list' );
246  } else {
247  $types = $this->getQueuesWithJobs();
248  }
249 
250  if ( $qtype == self::TYPE_DEFAULT ) {
251  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
252  }
253 
254  $types = array_diff( $types, $blacklist ); // avoid selected types
255  shuffle( $types ); // avoid starvation
256 
257  foreach ( $types as $type ) { // for each queue...
258  $job = $this->get( $type )->pop();
259  if ( $job ) { // found
260  break;
261  } else { // not found
262  $this->cache->clear( 'queues-ready' );
263  }
264  }
265  }
266 
267  return $job;
268  }
269 
276  public function ack( RunnableJob $job ) {
277  $this->get( $job->getType() )->ack( $job );
278  }
279 
287  public function deduplicateRootJob( RunnableJob $job ) {
288  return $this->get( $job->getType() )->deduplicateRootJob( $job );
289  }
290 
298  public function waitForBackups() {
299  global $wgJobTypeConf;
300 
301  // Try to avoid doing this more than once per queue storage medium
302  foreach ( $wgJobTypeConf as $type => $conf ) {
303  $this->get( $type )->waitForBackups();
304  }
305  }
306 
312  public function getQueueTypes() {
313  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
314  }
315 
321  public function getDefaultQueueTypes() {
323 
324  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
325  }
326 
334  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
336  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
337 
338  $value = $cache->get( $key );
339  if ( $value === false ) {
340  $queues = $this->getQueuesWithJobs();
341  if ( $type == self::TYPE_DEFAULT ) {
342  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
343  }
344  $value = count( $queues ) ? 'true' : 'false';
345  $cache->add( $key, $value, 15 );
346  }
347 
348  return ( $value === 'true' );
349  }
350 
356  public function getQueuesWithJobs() {
357  $types = [];
358  foreach ( $this->getCoalescedQueues() as $info ) {
360  $queue = $info['queue'];
361  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
362  if ( is_array( $nonEmpty ) ) { // batching features supported
363  $types = array_merge( $types, $nonEmpty );
364  } else { // we have to go through the queues in the bucket one-by-one
365  foreach ( $info['types'] as $type ) {
366  if ( !$this->get( $type )->isEmpty() ) {
367  $types[] = $type;
368  }
369  }
370  }
371  }
372 
373  return $types;
374  }
375 
381  public function getQueueSizes() {
382  $sizeMap = [];
383  foreach ( $this->getCoalescedQueues() as $info ) {
385  $queue = $info['queue'];
386  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
387  if ( is_array( $sizes ) ) { // batching features supported
388  $sizeMap = $sizeMap + $sizes;
389  } else { // we have to go through the queues in the bucket one-by-one
390  foreach ( $info['types'] as $type ) {
391  $sizeMap[$type] = $this->get( $type )->getSize();
392  }
393  }
394  }
395 
396  return $sizeMap;
397  }
398 
402  protected function getCoalescedQueues() {
403  global $wgJobTypeConf;
404 
405  if ( $this->coalescedQueues === null ) {
406  $this->coalescedQueues = [];
407  foreach ( $wgJobTypeConf as $type => $conf ) {
409  [ 'domain' => $this->domain, 'type' => 'null' ] + $conf );
410  $loc = $queue->getCoalesceLocationInternal();
411  if ( !isset( $this->coalescedQueues[$loc] ) ) {
412  $this->coalescedQueues[$loc]['queue'] = $queue;
413  $this->coalescedQueues[$loc]['types'] = [];
414  }
415  if ( $type === 'default' ) {
416  $this->coalescedQueues[$loc]['types'] = array_merge(
417  $this->coalescedQueues[$loc]['types'],
418  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
419  );
420  } else {
421  $this->coalescedQueues[$loc]['types'][] = $type;
422  }
423  }
424  }
425 
426  return $this->coalescedQueues;
427  }
428 
433  private function getCachedConfigVar( $name ) {
434  // @TODO: cleanup this whole method with a proper config system
435  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
436  return $GLOBALS[$name]; // common case
437  } else {
438  $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
439  $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
441  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
442  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
443  function () use ( $wiki, $name ) {
444  global $wgConf;
445  // @TODO: use the full domain ID here
446  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
447  },
448  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
449  );
450 
451  return $value['v'];
452  }
453  }
454 
459  private function assertValidJobs( array $jobs ) {
460  foreach ( $jobs as $job ) { // sanity checks
461  if ( !( $job instanceof IJobSpecification ) ) {
462  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
463  }
464  }
465  }
466 }
assertValidJobs(array $jobs)
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2147
bool $invalidDomain
Whether the wiki is not recognized in configuration.
__construct( $domain, $readOnlyReason)
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
MapCacheLRU $cache
$value
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
static getLocalClusterInstance()
Get the main cluster-local cache object.
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
$wgJobClasses
Maps jobs to their handlers; extensions can add to this to provide custom jobs.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
getDefaultQueueTypes()
Get the list of default queue types.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
getQueueTypes()
Get the list of queue types.
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
getWithSetCallback( $key, callable $callback, $rank=self::RANK_TOP, $maxAge=INF)
Get an item with the given key, producing and setting it if not found.
static configuration should be added through ResourceLoaderGetConfigVars instead can be used to get the real title e g db for database replication lag or jobqueue for job queue size converted to pseudo seconds It is possible to add more fields and they will be returned to the user in the API response after the basic globals have been set but before ordinary actions take place or wrap services the preferred way to define a new service is the $wgServiceWiringFiles array $services
Definition: hooks.txt:2205
const PROC_CACHE_TTL
string bool $readOnlyReason
Read only rationale (or false if r/w)
static logException( $e, $catcher=self::CAUGHT_BY_OTHER)
Log an exception to the exception log (if enabled).
ack(RunnableJob $job)
Acknowledge that a job was completed.
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:767
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
$GLOBALS['IP']
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
getQueueSizes()
Get the size of the queus for a list of job types.
$wgConf
wgConf hold the site configuration.
getCachedConfigVar( $name)
static getCurrentWikiDbDomain()
Definition: WikiMap.php:293
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
static JobQueueGroup [] $instances
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:114
$wgJobTypeConf
Map of job types to configuration arrays.
if(count( $args)< 1) $job
string $domain
Wiki domain ID.
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:304
push( $jobs)
Insert jobs into the respective queues of which they belong.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
static singleton( $domain=false)
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
Interface for serializable objects that describe a job queue task.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:271
string [] $wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
static destroySingletons()
Destroy the singleton instances.