MediaWiki  master
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
25 
37  protected static $instances = [];
38 
40  protected $cache;
41 
43  protected $domain;
45  protected $readOnlyMode;
47  protected $invalidDomain = false;
49  private $jobClasses;
57  private $wanCache;
60 
62  protected $coalescedQueues;
63 
64  public const TYPE_DEFAULT = 1; // integer; jobs popped by default
65  private const TYPE_ANY = 2; // integer; any job
66 
67  public const USE_CACHE = 1; // integer; use process or persistent cache
68 
69  private const PROC_CACHE_TTL = 15; // integer; seconds
70 
71  private const CACHE_VERSION = 1; // integer; cache version
72 
86  public function __construct(
87  $domain,
89  bool $invalidDomain,
90  array $jobClasses,
96  ) {
97  $this->domain = $domain;
98  $this->readOnlyMode = $readOnlyMode;
99  $this->cache = new MapCacheLRU( 10 );
100  $this->invalidDomain = $invalidDomain;
101  $this->jobClasses = $jobClasses;
102  $this->jobTypeConfiguration = $jobTypeConfiguration;
103  $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
104  $this->statsdDataFactory = $statsdDataFactory;
105  $this->wanCache = $wanCache;
106  $this->globalIdGenerator = $globalIdGenerator;
107  }
108 
114  public static function singleton( $domain = false ) {
115  return MediaWikiServices::getInstance()->getJobQueueGroupFactory()->makeJobQueueGroup( $domain );
116  }
117 
124  public static function destroySingletons() {
125  }
126 
133  public function get( $type ) {
134  $conf = [ 'domain' => $this->domain, 'type' => $type ];
135  if ( isset( $this->jobTypeConfiguration[$type] ) ) {
136  $conf += $this->jobTypeConfiguration[$type];
137  } else {
138  $conf += $this->jobTypeConfiguration['default'];
139  }
140  if ( !isset( $conf['readOnlyReason'] ) ) {
141  $conf['readOnlyReason'] = $this->readOnlyMode->getReason();
142  }
143 
144  $conf['stats'] = $this->statsdDataFactory;
145  $conf['wanCache'] = $this->wanCache;
146  $conf['idGenerator'] = $this->globalIdGenerator;
147 
148  return JobQueue::factory( $conf );
149  }
150 
161  public function push( $jobs ) {
162  if ( $this->invalidDomain ) {
163  // Do not enqueue job that cannot be run (T171371)
164  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
166  return;
167  }
168 
169  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
170  if ( $jobs === [] ) {
171  return;
172  }
173 
174  $this->assertValidJobs( $jobs );
175 
176  $jobsByType = []; // (job type => list of jobs)
177  foreach ( $jobs as $job ) {
178  $type = $job->getType();
179  if ( isset( $this->jobTypeConfiguration[$type] ) ) {
180  $jobsByType[$type][] = $job;
181  } else {
182  if (
183  isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
184  $this->jobTypeConfiguration['default']['typeAgnostic']
185  ) {
186  $jobsByType['default'][] = $job;
187  } else {
188  $jobsByType[$type][] = $job;
189  }
190  }
191  }
192 
193  foreach ( $jobsByType as $type => $jobs ) {
194  $this->get( $type )->push( $jobs );
195  }
196 
197  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
198  $list = $this->cache->getField( 'queues-ready', 'list' );
199  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
200  $this->cache->clear( 'queues-ready' );
201  }
202  }
203 
205  $cache->set(
206  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
207  'true',
208  15
209  );
210  if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
211  $cache->set(
212  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
213  'true',
214  15
215  );
216  }
217  }
218 
226  public function lazyPush( $jobs ) {
227  if ( $this->invalidDomain ) {
228  // Do not enqueue job that cannot be run (T171371)
229  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
230  }
231 
232  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
233  $this->push( $jobs );
234  return;
235  }
236 
237  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
238 
239  // Throw errors now instead of on push(), when other jobs may be buffered
240  $this->assertValidJobs( $jobs );
241 
242  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
243  }
244 
256  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
257  $job = false;
258 
259  if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
260  throw new JobQueueError(
261  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
262  } elseif ( is_string( $qtype ) && !isset( $this->jobClasses[$qtype] ) ) {
263  // Do not pop jobs if there is no class for the queue type
264  throw new JobQueueError( "Unrecognized job type '$qtype'." );
265  }
266 
267  if ( is_string( $qtype ) ) { // specific job type
268  if ( !in_array( $qtype, $ignored ) ) {
269  $job = $this->get( $qtype )->pop();
270  }
271  } else { // any job in the "default" jobs types
272  if ( $flags & self::USE_CACHE ) {
273  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
274  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
275  }
276  $types = $this->cache->getField( 'queues-ready', 'list' );
277  } else {
278  $types = $this->getQueuesWithJobs();
279  }
280 
281  if ( $qtype == self::TYPE_DEFAULT ) {
282  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
283  }
284 
285  $types = array_diff( $types, $ignored ); // avoid selected types
286  shuffle( $types ); // avoid starvation
287 
288  foreach ( $types as $type ) { // for each queue...
289  $job = $this->get( $type )->pop();
290  if ( $job ) { // found
291  break;
292  } else { // not found
293  $this->cache->clear( 'queues-ready' );
294  }
295  }
296  }
297 
298  return $job;
299  }
300 
307  public function ack( RunnableJob $job ) {
308  $this->get( $job->getType() )->ack( $job );
309  }
310 
318  public function deduplicateRootJob( RunnableJob $job ) {
319  return $this->get( $job->getType() )->deduplicateRootJob( $job );
320  }
321 
329  public function waitForBackups() {
330  // Try to avoid doing this more than once per queue storage medium
331  foreach ( $this->jobTypeConfiguration as $type => $conf ) {
332  $this->get( $type )->waitForBackups();
333  }
334  }
335 
341  public function getQueueTypes() {
342  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
343  }
344 
350  public function getDefaultQueueTypes() {
351  return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
352  }
353 
361  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
363  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
364 
365  $value = $cache->get( $key );
366  if ( $value === false ) {
367  $queues = $this->getQueuesWithJobs();
368  if ( $type == self::TYPE_DEFAULT ) {
369  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
370  }
371  $value = count( $queues ) ? 'true' : 'false';
372  $cache->add( $key, $value, 15 );
373  }
374 
375  return ( $value === 'true' );
376  }
377 
383  public function getQueuesWithJobs() {
384  $types = [];
385  foreach ( $this->getCoalescedQueues() as $info ) {
387  $queue = $info['queue'];
388  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
389  if ( is_array( $nonEmpty ) ) { // batching features supported
390  $types = array_merge( $types, $nonEmpty );
391  } else { // we have to go through the queues in the bucket one-by-one
392  foreach ( $info['types'] as $type ) {
393  if ( !$this->get( $type )->isEmpty() ) {
394  $types[] = $type;
395  }
396  }
397  }
398  }
399 
400  return $types;
401  }
402 
408  public function getQueueSizes() {
409  $sizeMap = [];
410  foreach ( $this->getCoalescedQueues() as $info ) {
412  $queue = $info['queue'];
413  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
414  if ( is_array( $sizes ) ) { // batching features supported
415  $sizeMap += $sizes;
416  } else { // we have to go through the queues in the bucket one-by-one
417  foreach ( $info['types'] as $type ) {
418  $sizeMap[$type] = $this->get( $type )->getSize();
419  }
420  }
421  }
422 
423  return $sizeMap;
424  }
425 
430  protected function getCoalescedQueues() {
431  if ( $this->coalescedQueues === null ) {
432  $this->coalescedQueues = [];
433  foreach ( $this->jobTypeConfiguration as $type => $conf ) {
434  $conf['domain'] = $this->domain;
435  $conf['type'] = 'null';
436  $conf['stats'] = $this->statsdDataFactory;
437  $conf['wanCache'] = $this->wanCache;
438  $conf['idGenerator'] = $this->globalIdGenerator;
439 
440  $queue = JobQueue::factory( $conf );
441  $loc = $queue->getCoalesceLocationInternal();
442  if ( !isset( $this->coalescedQueues[$loc] ) ) {
443  $this->coalescedQueues[$loc]['queue'] = $queue;
444  $this->coalescedQueues[$loc]['types'] = [];
445  }
446  if ( $type === 'default' ) {
447  $this->coalescedQueues[$loc]['types'] = array_merge(
448  $this->coalescedQueues[$loc]['types'],
449  array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
450  );
451  } else {
452  $this->coalescedQueues[$loc]['types'][] = $type;
453  }
454  }
455  }
456 
457  return $this->coalescedQueues;
458  }
459 
464  private function getCachedConfigVar( $name ) {
465  // @TODO: cleanup this whole method with a proper config system
466  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
467  return $GLOBALS[$name]; // common case
468  } else {
469  $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
470  $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
471  $value = $cache->getWithSetCallback(
472  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
473  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
474  static function () use ( $wiki, $name ) {
475  global $wgConf;
476  // @TODO: use the full domain ID here
477  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
478  },
479  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
480  );
481 
482  return $value['v'];
483  }
484  }
485 
490  private function assertValidJobs( array $jobs ) {
491  foreach ( $jobs as $job ) {
492  if ( !( $job instanceof IJobSpecification ) ) {
493  $type = is_object( $job ) ? get_class( $job ) : gettype( $job );
494  throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
495  }
496  }
497  }
498 }
$wgConf
$wgConf hold the site configuration.
Definition: Setup.php:138
A read-only mode service which does not depend on LoadBalancer.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Class to handle enqueueing of background jobs.
getCachedConfigVar( $name)
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki domain ID.
getQueueSizes()
Get the size of the queues for a list of job types.
__construct( $domain, ConfiguredReadOnlyMode $readOnlyMode, bool $invalidDomain, array $jobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, IBufferingStatsdDataFactory $statsdDataFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
const PROC_CACHE_TTL
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
static singleton( $domain=false)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop a job off one of the job queues.
ack(RunnableJob $job)
Acknowledge that a job was completed.
static destroySingletons()
Destroy the singleton instances.
WANObjectCache $wanCache
ConfiguredReadOnlyMode $readOnlyMode
Read only mode.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
bool $invalidDomain
Whether the wiki is not recognized in configuration.
GlobalIdGenerator $globalIdGenerator
static JobQueueGroup[] $instances
array $jobTypesExcludedFromDefaultQueue
MapCacheLRU $cache
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
assertValidJobs(array $jobs)
getQueueTypes()
Get the list of queue types.
IBufferingStatsdDataFactory $statsdDataFactory
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
array $jobTypeConfiguration
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:137
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Handles a simple LRU key/value map with a maximum number of entries.
Definition: MapCacheLRU.php:36
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
getWithSetCallback( $key, callable $callback, $rank=self::RANK_TOP, $maxAge=INF)
Get an item with the given key, producing and setting it if not found.
MediaWikiServices is the service locator for the application scope of MediaWiki.
static getLocalClusterInstance()
Get the main cluster-local cache object.
Multi-datacenter aware caching interface.
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:312
Class for getting statistically unique IDs without a central coordinator.
MediaWiki adaptation of StatsdDataFactory that provides buffering functionality.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
if(count( $args)< 1) $job