MediaWiki  master
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
25 
37  protected static $instances = [];
38 
40  protected $cache;
41 
43  protected $domain;
45  protected $readOnlyMode;
47  protected $invalidDomain = false;
49  private $jobClasses;
57  private $wanCache;
60 
62  protected $coalescedQueues;
63 
64  public const TYPE_DEFAULT = 1; // integer; jobs popped by default
65  private const TYPE_ANY = 2; // integer; any job
66 
67  public const USE_CACHE = 1; // integer; use process or persistent cache
68 
69  private const PROC_CACHE_TTL = 15; // integer; seconds
70 
71  private const CACHE_VERSION = 1; // integer; cache version
72 
86  public function __construct(
87  $domain,
89  bool $invalidDomain,
90  array $jobClasses,
96  ) {
97  $this->domain = $domain;
98  $this->readOnlyMode = $readOnlyMode;
99  $this->cache = new MapCacheLRU( 10 );
100  $this->invalidDomain = $invalidDomain;
101  $this->jobClasses = $jobClasses;
102  $this->jobTypeConfiguration = $jobTypeConfiguration;
103  $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
104  $this->statsdDataFactory = $statsdDataFactory;
105  $this->wanCache = $wanCache;
106  $this->globalIdGenerator = $globalIdGenerator;
107  }
108 
114  public static function singleton( $domain = false ) {
115  return MediaWikiServices::getInstance()->getJobQueueGroupFactory()->makeJobQueueGroup( $domain );
116  }
117 
124  public static function destroySingletons() {
125  }
126 
133  public function get( $type ) {
134  $conf = [ 'domain' => $this->domain, 'type' => $type ];
135  if ( isset( $this->jobTypeConfiguration[$type] ) ) {
136  $conf += $this->jobTypeConfiguration[$type];
137  } else {
138  $conf += $this->jobTypeConfiguration['default'];
139  }
140  if ( !isset( $conf['readOnlyReason'] ) ) {
141  $conf['readOnlyReason'] = $this->readOnlyMode->getReason();
142  }
143 
144  $conf['stats'] = $this->statsdDataFactory;
145  $conf['wanCache'] = $this->wanCache;
146  $conf['idGenerator'] = $this->globalIdGenerator;
147 
148  return JobQueue::factory( $conf );
149  }
150 
161  public function push( $jobs ) {
162  if ( $this->invalidDomain ) {
163  // Do not enqueue job that cannot be run (T171371)
164  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
166  return;
167  }
168 
169  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
170  if ( $jobs === [] ) {
171  return;
172  }
173 
174  $this->assertValidJobs( $jobs );
175 
176  $jobsByType = []; // (job type => list of jobs)
177  foreach ( $jobs as $job ) {
178  $type = $job->getType();
179  if ( isset( $this->jobTypeConfiguration[$type] ) ) {
180  $jobsByType[$type][] = $job;
181  } else {
182  if (
183  isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
184  $this->jobTypeConfiguration['default']['typeAgnostic']
185  ) {
186  $jobsByType['default'][] = $job;
187  } else {
188  $jobsByType[$type][] = $job;
189  }
190  }
191  }
192 
193  foreach ( $jobsByType as $type => $jobs ) {
194  $this->get( $type )->push( $jobs );
195  }
196 
197  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
198  $list = $this->cache->getField( 'queues-ready', 'list' );
199  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
200  $this->cache->clear( 'queues-ready' );
201  }
202  }
203 
205  $cache->set(
206  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
207  'true',
208  15
209  );
210  if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
211  $cache->set(
212  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
213  'true',
214  15
215  );
216  }
217  }
218 
226  public function lazyPush( $jobs ) {
227  if ( $this->invalidDomain ) {
228  // Do not enqueue job that cannot be run (T171371)
229  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
230  }
231 
232  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
233  $this->push( $jobs );
234  return;
235  }
236 
237  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
238 
239  // Throw errors now instead of on push(), when other jobs may be buffered
240  $this->assertValidJobs( $jobs );
241 
242  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
243  }
244 
256  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
257  $job = false;
258 
259  if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
260  throw new JobQueueError(
261  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
262  } elseif ( is_string( $qtype ) && !isset( $this->jobClasses[$qtype] ) ) {
263  // Do not pop jobs if there is no class for the queue type
264  throw new JobQueueError( "Unrecognized job type '$qtype'." );
265  }
266 
267  if ( is_string( $qtype ) ) { // specific job type
268  if ( !in_array( $qtype, $ignored ) ) {
269  $job = $this->get( $qtype )->pop();
270  }
271  } else { // any job in the "default" jobs types
272  if ( $flags & self::USE_CACHE ) {
273  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
274  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
275  }
276  $types = $this->cache->getField( 'queues-ready', 'list' );
277  } else {
278  $types = $this->getQueuesWithJobs();
279  }
280 
281  if ( $qtype == self::TYPE_DEFAULT ) {
282  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
283  }
284 
285  $types = array_diff( $types, $ignored ); // avoid selected types
286  shuffle( $types ); // avoid starvation
287 
288  foreach ( $types as $type ) { // for each queue...
289  $job = $this->get( $type )->pop();
290  if ( $job ) { // found
291  break;
292  } else { // not found
293  $this->cache->clear( 'queues-ready' );
294  }
295  }
296  }
297 
298  return $job;
299  }
300 
307  public function ack( RunnableJob $job ) {
308  $this->get( $job->getType() )->ack( $job );
309  }
310 
318  public function deduplicateRootJob( RunnableJob $job ) {
319  return $this->get( $job->getType() )->deduplicateRootJob( $job );
320  }
321 
329  public function waitForBackups() {
330  // Try to avoid doing this more than once per queue storage medium
331  foreach ( $this->jobTypeConfiguration as $type => $conf ) {
332  $this->get( $type )->waitForBackups();
333  }
334  }
335 
341  public function getQueueTypes() {
342  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
343  }
344 
350  public function getDefaultQueueTypes() {
351  return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
352  }
353 
361  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
363  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
364 
365  $value = $cache->get( $key );
366  if ( $value === false ) {
367  $queues = $this->getQueuesWithJobs();
368  if ( $type == self::TYPE_DEFAULT ) {
369  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
370  }
371  $value = count( $queues ) ? 'true' : 'false';
372  $cache->add( $key, $value, 15 );
373  }
374 
375  return ( $value === 'true' );
376  }
377 
383  public function getQueuesWithJobs() {
384  $types = [];
385  foreach ( $this->getCoalescedQueues() as $info ) {
387  $queue = $info['queue'];
388  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
389  if ( is_array( $nonEmpty ) ) { // batching features supported
390  $types = array_merge( $types, $nonEmpty );
391  } else { // we have to go through the queues in the bucket one-by-one
392  foreach ( $info['types'] as $type ) {
393  if ( !$this->get( $type )->isEmpty() ) {
394  $types[] = $type;
395  }
396  }
397  }
398  }
399 
400  return $types;
401  }
402 
408  public function getQueueSizes() {
409  $sizeMap = [];
410  foreach ( $this->getCoalescedQueues() as $info ) {
412  $queue = $info['queue'];
413  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
414  if ( is_array( $sizes ) ) { // batching features supported
415  $sizeMap += $sizes;
416  } else { // we have to go through the queues in the bucket one-by-one
417  foreach ( $info['types'] as $type ) {
418  $sizeMap[$type] = $this->get( $type )->getSize();
419  }
420  }
421  }
422 
423  return $sizeMap;
424  }
425 
430  protected function getCoalescedQueues() {
431  if ( $this->coalescedQueues === null ) {
432  $this->coalescedQueues = [];
433  foreach ( $this->jobTypeConfiguration as $type => $conf ) {
434  $conf['domain'] = $this->domain;
435  $conf['type'] = 'null';
436  $conf['stats'] = $this->statsdDataFactory;
437  $conf['wanCache'] = $this->wanCache;
438  $conf['idGenerator'] = $this->globalIdGenerator;
439 
440  $queue = JobQueue::factory( $conf );
441  $loc = $queue->getCoalesceLocationInternal();
442  if ( !isset( $this->coalescedQueues[$loc] ) ) {
443  $this->coalescedQueues[$loc]['queue'] = $queue;
444  $this->coalescedQueues[$loc]['types'] = [];
445  }
446  if ( $type === 'default' ) {
447  $this->coalescedQueues[$loc]['types'] = array_merge(
448  $this->coalescedQueues[$loc]['types'],
449  array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
450  );
451  } else {
452  $this->coalescedQueues[$loc]['types'][] = $type;
453  }
454  }
455  }
456 
457  return $this->coalescedQueues;
458  }
459 
464  private function getCachedConfigVar( $name ) {
465  // @TODO: cleanup this whole method with a proper config system
466  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
467  return $GLOBALS[$name]; // common case
468  } else {
469  $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
470  $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
471  $value = $cache->getWithSetCallback(
472  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
473  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
474  static function () use ( $wiki, $name ) {
475  global $wgConf;
476  // @TODO: use the full domain ID here
477  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
478  },
479  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
480  );
481 
482  return $value['v'];
483  }
484  }
485 
490  private function assertValidJobs( array $jobs ) {
491  foreach ( $jobs as $job ) {
492  if ( !( $job instanceof IJobSpecification ) ) {
493  $type = is_object( $job ) ? get_class( $job ) : gettype( $job );
494  throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
495  }
496  }
497  }
498 }
$wgConf
$wgConf
$wgConf hold the site configuration.
Definition: DefaultSettings.php:58
WikiMap\isCurrentWikiDbDomain
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:312
JobQueueGroup\USE_CACHE
const USE_CACHE
Definition: JobQueueGroup.php:67
JobQueueGroup\waitForBackups
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
Definition: JobQueueGroup.php:329
JobQueueGroup\deduplicateRootJob
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueueGroup.php:318
Wikimedia\UUID\GlobalIdGenerator
Class for getting statistically unique IDs without a central coordinator.
Definition: GlobalIdGenerator.php:34
ObjectCache\getLocalClusterInstance
static getLocalClusterInstance()
Get the main cluster-local cache object.
Definition: ObjectCache.php:273
JobQueueGroup\PROC_CACHE_TTL
const PROC_CACHE_TTL
Definition: JobQueueGroup.php:69
JobQueueGroup\CACHE_VERSION
const CACHE_VERSION
Definition: JobQueueGroup.php:71
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:200
JobQueueGroup\$jobTypeConfiguration
array $jobTypeConfiguration
Definition: JobQueueGroup.php:51
JobQueueGroup\$coalescedQueues
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
Definition: JobQueueGroup.php:62
JobQueueEnqueueUpdate
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Definition: JobQueueEnqueueUpdate.php:33
JobQueueGroup\TYPE_DEFAULT
const TYPE_DEFAULT
Definition: JobQueueGroup.php:64
JobQueueGroup\getCoalescedQueues
getCoalescedQueues()
Definition: JobQueueGroup.php:430
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
Definition: DeferredUpdates.php:119
ConfiguredReadOnlyMode
A read-only mode service which does not depend on LoadBalancer.
Definition: ConfiguredReadOnlyMode.php:9
RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
JobQueueGroup\$instances
static JobQueueGroup[] $instances
Definition: JobQueueGroup.php:37
JobQueueGroup\queuesHaveJobs
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Definition: JobQueueGroup.php:361
JobQueueGroup\destroySingletons
static destroySingletons()
Destroy the singleton instances.
Definition: JobQueueGroup.php:124
JobQueueGroup\__construct
__construct( $domain, ConfiguredReadOnlyMode $readOnlyMode, bool $invalidDomain, array $jobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, IBufferingStatsdDataFactory $statsdDataFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
Definition: JobQueueGroup.php:86
JobQueueGroup\$statsdDataFactory
IBufferingStatsdDataFactory $statsdDataFactory
Definition: JobQueueGroup.php:55
MWExceptionHandler\logException
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Definition: MWExceptionHandler.php:700
IJobSpecification\getType
getType()
JobQueueGroup\$wanCache
WANObjectCache $wanCache
Definition: JobQueueGroup.php:57
JobQueueGroup\$domain
string $domain
Wiki domain ID.
Definition: JobQueueGroup.php:43
JobQueueGroup\push
push( $jobs)
Insert jobs into the respective queues of which they belong.
Definition: JobQueueGroup.php:161
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
JobQueueGroup\$jobTypesExcludedFromDefaultQueue
array $jobTypesExcludedFromDefaultQueue
Definition: JobQueueGroup.php:53
JobQueueGroup\$globalIdGenerator
GlobalIdGenerator $globalIdGenerator
Definition: JobQueueGroup.php:59
$queue
$queue
Definition: mergeMessageFileList.php:176
MapCacheLRU
Handles a simple LRU key/value map with a maximum number of entries.
Definition: MapCacheLRU.php:36
JobQueueGroup\getCachedConfigVar
getCachedConfigVar( $name)
Definition: JobQueueGroup.php:464
JobQueueGroup\$jobClasses
array $jobClasses
Definition: JobQueueGroup.php:49
JobQueueError
@newable
Definition: JobQueueError.php:29
MapCacheLRU\getWithSetCallback
getWithSetCallback( $key, callable $callback, $rank=self::RANK_TOP, $maxAge=INF)
Get an item with the given key, producing and setting it if not found.
Definition: MapCacheLRU.php:262
JobQueueGroup\ack
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueueGroup.php:307
JobQueueGroup\getDefaultQueueTypes
getDefaultQueueTypes()
Get the list of default queue types.
Definition: JobQueueGroup.php:350
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:131
MapCacheLRU\set
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
Definition: MapCacheLRU.php:109
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:136
JobQueueGroup\pop
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop a job off one of the job queues.
Definition: JobQueueGroup.php:256
IBufferingStatsdDataFactory
MediaWiki adaptation of StatsdDataFactory that provides buffering functionality.
Definition: IBufferingStatsdDataFactory.php:13
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:114
JobQueueGroup\TYPE_ANY
const TYPE_ANY
Definition: JobQueueGroup.php:65
JobQueueGroup\assertValidJobs
assertValidJobs(array $jobs)
Definition: JobQueueGroup.php:490
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:49
JobQueueGroup\getQueueTypes
getQueueTypes()
Get the list of queue types.
Definition: JobQueueGroup.php:341
MapCacheLRU\get
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
Definition: MapCacheLRU.php:169
JobQueueGroup\getQueuesWithJobs
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
Definition: JobQueueGroup.php:383
JobQueueGroup\$readOnlyMode
ConfiguredReadOnlyMode $readOnlyMode
Read only mode.
Definition: JobQueueGroup.php:45
JobQueueGroup\getQueueSizes
getQueueSizes()
Get the size of the queues for a list of job types.
Definition: JobQueueGroup.php:408
JobQueueGroup\$cache
MapCacheLRU $cache
Definition: JobQueueGroup.php:40
IJobSpecification
Interface for serializable objects that describe a job queue task.
Definition: IJobSpecification.php:42
JobQueueGroup\$invalidDomain
bool $invalidDomain
Whether the wiki is not recognized in configuration.
Definition: JobQueueGroup.php:47
JobQueueGroup\lazyPush
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Definition: JobQueueGroup.php:226
JobQueueGroup
Class to handle enqueueing of background jobs.
Definition: JobQueueGroup.php:32
$type
$type
Definition: testCompression.php:52