MediaWiki  master
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
23 
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $domain;
40  protected $readOnlyReason;
42  protected $invalidDomain = false;
43 
45  protected $coalescedQueues;
46 
47  const TYPE_DEFAULT = 1; // integer; jobs popped by default
48  const TYPE_ANY = 2; // integer; any job
49 
50  const USE_CACHE = 1; // integer; use process or persistent cache
51 
52  const PROC_CACHE_TTL = 15; // integer; seconds
53 
54  const CACHE_VERSION = 1; // integer; cache version
55 
60  protected function __construct( $domain, $readOnlyReason ) {
61  $this->domain = $domain;
62  $this->readOnlyReason = $readOnlyReason;
63  $this->cache = new MapCacheLRU( 10 );
64  }
65 
70  public static function singleton( $domain = false ) {
71  global $wgLocalDatabases;
72 
73  if ( $domain === false ) {
75  }
76 
77  if ( !isset( self::$instances[$domain] ) ) {
78  self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
79  // Make sure jobs are not getting pushed to bogus wikis. This can confuse
80  // the job runner system into spawning endless RPC requests that fail (T171371).
81  $wikiId = WikiMap::getWikiIdFromDbDomain( $domain );
82  if (
83  !WikiMap::isCurrentWikiDbDomain( $domain ) &&
84  !in_array( $wikiId, $wgLocalDatabases )
85  ) {
86  self::$instances[$domain]->invalidDomain = true;
87  }
88  }
89 
90  return self::$instances[$domain];
91  }
92 
98  public static function destroySingletons() {
99  self::$instances = [];
100  }
101 
108  public function get( $type ) {
109  global $wgJobTypeConf;
110 
111  $conf = [ 'domain' => $this->domain, 'type' => $type ];
112  if ( isset( $wgJobTypeConf[$type] ) ) {
113  $conf = $conf + $wgJobTypeConf[$type];
114  } else {
115  $conf = $conf + $wgJobTypeConf['default'];
116  }
117  if ( !isset( $conf['readOnlyReason'] ) ) {
118  $conf['readOnlyReason'] = $this->readOnlyReason;
119  }
120 
121  $services = MediaWikiServices::getInstance();
122  $conf['stats'] = $services->getStatsdDataFactory();
123  $conf['wanCache'] = $services->getMainWANObjectCache();
124  $conf['stash'] = $services->getMainObjectStash();
125 
126  return JobQueue::factory( $conf );
127  }
128 
139  public function push( $jobs ) {
141 
142  if ( $this->invalidDomain ) {
143  // Do not enqueue job that cannot be run (T171371)
144  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
146  return;
147  }
148 
149  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
150  if ( $jobs === [] ) {
151  return;
152  }
153 
154  $this->assertValidJobs( $jobs );
155 
156  $jobsByType = []; // (job type => list of jobs)
157  foreach ( $jobs as $job ) {
158  $jobsByType[$job->getType()][] = $job;
159  }
160 
161  foreach ( $jobsByType as $type => $jobs ) {
162  $this->get( $type )->push( $jobs );
163  }
164 
165  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
166  $list = $this->cache->getField( 'queues-ready', 'list' );
167  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
168  $this->cache->clear( 'queues-ready' );
169  }
170  }
171 
173  $cache->set(
174  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
175  'true',
176  15
177  );
178  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
179  $cache->set(
180  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
181  'true',
182  15
183  );
184  }
185  }
186 
194  public function lazyPush( $jobs ) {
195  if ( $this->invalidDomain ) {
196  // Do not enqueue job that cannot be run (T171371)
197  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
198  }
199 
200  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
201  $this->push( $jobs );
202  return;
203  }
204 
205  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
206 
207  // Throw errors now instead of on push(), when other jobs may be buffered
208  $this->assertValidJobs( $jobs );
209 
210  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
211  }
212 
224  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
225  global $wgJobClasses;
226 
227  $job = false;
228 
229  if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
230  throw new JobQueueError(
231  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
232  } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
233  // Do not pop jobs if there is no class for the queue type
234  throw new JobQueueError( "Unrecognized job type '$qtype'." );
235  }
236 
237  if ( is_string( $qtype ) ) { // specific job type
238  if ( !in_array( $qtype, $blacklist ) ) {
239  $job = $this->get( $qtype )->pop();
240  }
241  } else { // any job in the "default" jobs types
242  if ( $flags & self::USE_CACHE ) {
243  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
244  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
245  }
246  $types = $this->cache->getField( 'queues-ready', 'list' );
247  } else {
248  $types = $this->getQueuesWithJobs();
249  }
250 
251  if ( $qtype == self::TYPE_DEFAULT ) {
252  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
253  }
254 
255  $types = array_diff( $types, $blacklist ); // avoid selected types
256  shuffle( $types ); // avoid starvation
257 
258  foreach ( $types as $type ) { // for each queue...
259  $job = $this->get( $type )->pop();
260  if ( $job ) { // found
261  break;
262  } else { // not found
263  $this->cache->clear( 'queues-ready' );
264  }
265  }
266  }
267 
268  return $job;
269  }
270 
277  public function ack( Job $job ) {
278  $this->get( $job->getType() )->ack( $job );
279  }
280 
288  public function deduplicateRootJob( Job $job ) {
289  return $this->get( $job->getType() )->deduplicateRootJob( $job );
290  }
291 
299  public function waitForBackups() {
300  global $wgJobTypeConf;
301 
302  // Try to avoid doing this more than once per queue storage medium
303  foreach ( $wgJobTypeConf as $type => $conf ) {
304  $this->get( $type )->waitForBackups();
305  }
306  }
307 
313  public function getQueueTypes() {
314  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
315  }
316 
322  public function getDefaultQueueTypes() {
324 
325  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
326  }
327 
335  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
337  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
338 
339  $value = $cache->get( $key );
340  if ( $value === false ) {
341  $queues = $this->getQueuesWithJobs();
342  if ( $type == self::TYPE_DEFAULT ) {
343  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
344  }
345  $value = count( $queues ) ? 'true' : 'false';
346  $cache->add( $key, $value, 15 );
347  }
348 
349  return ( $value === 'true' );
350  }
351 
357  public function getQueuesWithJobs() {
358  $types = [];
359  foreach ( $this->getCoalescedQueues() as $info ) {
361  $queue = $info['queue'];
362  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
363  if ( is_array( $nonEmpty ) ) { // batching features supported
364  $types = array_merge( $types, $nonEmpty );
365  } else { // we have to go through the queues in the bucket one-by-one
366  foreach ( $info['types'] as $type ) {
367  if ( !$this->get( $type )->isEmpty() ) {
368  $types[] = $type;
369  }
370  }
371  }
372  }
373 
374  return $types;
375  }
376 
382  public function getQueueSizes() {
383  $sizeMap = [];
384  foreach ( $this->getCoalescedQueues() as $info ) {
386  $queue = $info['queue'];
387  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
388  if ( is_array( $sizes ) ) { // batching features supported
389  $sizeMap = $sizeMap + $sizes;
390  } else { // we have to go through the queues in the bucket one-by-one
391  foreach ( $info['types'] as $type ) {
392  $sizeMap[$type] = $this->get( $type )->getSize();
393  }
394  }
395  }
396 
397  return $sizeMap;
398  }
399 
403  protected function getCoalescedQueues() {
404  global $wgJobTypeConf;
405 
406  if ( $this->coalescedQueues === null ) {
407  $this->coalescedQueues = [];
408  foreach ( $wgJobTypeConf as $type => $conf ) {
410  [ 'domain' => $this->domain, 'type' => 'null' ] + $conf );
411  $loc = $queue->getCoalesceLocationInternal();
412  if ( !isset( $this->coalescedQueues[$loc] ) ) {
413  $this->coalescedQueues[$loc]['queue'] = $queue;
414  $this->coalescedQueues[$loc]['types'] = [];
415  }
416  if ( $type === 'default' ) {
417  $this->coalescedQueues[$loc]['types'] = array_merge(
418  $this->coalescedQueues[$loc]['types'],
419  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
420  );
421  } else {
422  $this->coalescedQueues[$loc]['types'][] = $type;
423  }
424  }
425  }
426 
427  return $this->coalescedQueues;
428  }
429 
434  private function getCachedConfigVar( $name ) {
435  // @TODO: cleanup this whole method with a proper config system
436  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
437  return $GLOBALS[$name]; // common case
438  } else {
439  $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
440  $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
442  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
443  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
444  function () use ( $wiki, $name ) {
445  global $wgConf;
446  // @TODO: use the full domain ID here
447  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
448  },
449  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
450  );
451 
452  return $value['v'];
453  }
454  }
455 
460  private function assertValidJobs( array $jobs ) {
461  foreach ( $jobs as $job ) { // sanity checks
462  if ( !( $job instanceof IJobSpecification ) ) {
463  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
464  }
465  }
466  }
467 }
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
getType()
Definition: Job.php:169
assertValidJobs(array $jobs)
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2159
get( $key, $maxAge=0.0)
Get the value for a key.
bool $invalidDomain
Whether the wiki is not recognized in configuration.
__construct( $domain, $readOnlyReason)
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
Class to both describe a background job and handle jobs.
Definition: Job.php:30
MapCacheLRU $cache
$value
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
static getLocalClusterInstance()
Get the main cluster-local cache object.
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
getWithSetCallback( $key, callable $callback, $rank=self::RANK_TOP, $maxAge=0.0)
Get an item with the given key, producing and setting it if not found.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
$wgJobClasses
Maps jobs to their handlers; extensions can add to this to provide custom jobs.
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
ack(Job $job)
Acknowledge that a job was completed.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
getDefaultQueueTypes()
Get the list of default queue types.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
getQueueTypes()
Get the list of queue types.
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
static configuration should be added through ResourceLoaderGetConfigVars instead can be used to get the real title e g db for database replication lag or jobqueue for job queue size converted to pseudo seconds It is possible to add more fields and they will be returned to the user in the API response after the basic globals have been set but before ordinary actions take place or wrap services the preferred way to define a new service is the $wgServiceWiringFiles array $services
Definition: hooks.txt:2217
const PROC_CACHE_TTL
string bool $readOnlyReason
Read only rationale (or false if r/w)
static logException( $e, $catcher=self::CAUGHT_BY_OTHER)
Log an exception to the exception log (if enabled).
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:780
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
$GLOBALS['IP']
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
getQueueSizes()
Get the size of the queus for a list of job types.
$wgConf
wgConf hold the site configuration.
getCachedConfigVar( $name)
static getCurrentWikiDbDomain()
Definition: WikiMap.php:302
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
static JobQueueGroup [] $instances
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:106
$wgJobTypeConf
Map of job types to configuration arrays.
if(count( $args)< 1) $job
string $domain
Wiki domain ID.
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:313
push( $jobs)
Insert jobs into the respective queues of which they belong.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
static singleton( $domain=false)
Interface for serializable objects that describe a job queue task.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:271
string [] $wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
static destroySingletons()
Destroy the singleton instances.