MediaWiki  master
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
25 
34  protected $cache;
35 
37  protected $domain;
39  protected $readOnlyMode;
41  protected $invalidDomain = false;
43  private $jobClasses;
45  private $jobTypeConfiguration;
47  private $jobTypesExcludedFromDefaultQueue;
49  private $statsdDataFactory;
51  private $wanCache;
53  private $globalIdGenerator;
54 
56  protected $coalescedQueues;
57 
58  public const TYPE_DEFAULT = 1; // integer; jobs popped by default
59  private const TYPE_ANY = 2; // integer; any job
60 
61  public const USE_CACHE = 1; // integer; use process or persistent cache
62 
63  private const PROC_CACHE_TTL = 15; // integer; seconds
64 
65  private const CACHE_VERSION = 1; // integer; cache version
66 
80  public function __construct(
81  $domain,
83  bool $invalidDomain,
84  array $jobClasses,
85  array $jobTypeConfiguration,
86  array $jobTypesExcludedFromDefaultQueue,
87  IBufferingStatsdDataFactory $statsdDataFactory,
88  WANObjectCache $wanCache,
89  GlobalIdGenerator $globalIdGenerator
90  ) {
91  $this->domain = $domain;
92  $this->readOnlyMode = $readOnlyMode;
93  $this->cache = new MapCacheLRU( 10 );
94  $this->invalidDomain = $invalidDomain;
95  $this->jobClasses = $jobClasses;
96  $this->jobTypeConfiguration = $jobTypeConfiguration;
97  $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
98  $this->statsdDataFactory = $statsdDataFactory;
99  $this->wanCache = $wanCache;
100  $this->globalIdGenerator = $globalIdGenerator;
101  }
102 
109  public function get( $type ) {
110  $conf = [ 'domain' => $this->domain, 'type' => $type ];
111  $conf += $this->jobTypeConfiguration[$type] ?? $this->jobTypeConfiguration['default'];
112  if ( !isset( $conf['readOnlyReason'] ) ) {
113  $conf['readOnlyReason'] = $this->readOnlyMode->getReason();
114  }
115 
116  $conf['stats'] = $this->statsdDataFactory;
117  $conf['wanCache'] = $this->wanCache;
118  $conf['idGenerator'] = $this->globalIdGenerator;
119 
120  return JobQueue::factory( $conf );
121  }
122 
133  public function push( $jobs ) {
134  if ( $this->invalidDomain ) {
135  // Do not enqueue job that cannot be run (T171371)
136  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
138  return;
139  }
140 
141  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
142  if ( $jobs === [] ) {
143  return;
144  }
145 
146  $this->assertValidJobs( $jobs );
147 
148  $jobsByType = []; // (job type => list of jobs)
149  foreach ( $jobs as $job ) {
150  $type = $job->getType();
151  if ( isset( $this->jobTypeConfiguration[$type] ) ) {
152  $jobsByType[$type][] = $job;
153  } else {
154  if (
155  isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
156  $this->jobTypeConfiguration['default']['typeAgnostic']
157  ) {
158  $jobsByType['default'][] = $job;
159  } else {
160  $jobsByType[$type][] = $job;
161  }
162  }
163  }
164 
165  foreach ( $jobsByType as $type => $jobs ) {
166  $this->get( $type )->push( $jobs );
167  }
168 
169  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
170  $list = $this->cache->getField( 'queues-ready', 'list' );
171  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
172  $this->cache->clear( 'queues-ready' );
173  }
174  }
175 
177  $cache->set(
178  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
179  'true',
180  15
181  );
182  if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
183  $cache->set(
184  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
185  'true',
186  15
187  );
188  }
189  }
190 
198  public function lazyPush( $jobs ) {
199  if ( $this->invalidDomain ) {
200  // Do not enqueue job that cannot be run (T171371)
201  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
202  }
203 
204  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
205  $this->push( $jobs );
206  return;
207  }
208 
209  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
210 
211  // Throw errors now instead of on push(), when other jobs may be buffered
212  $this->assertValidJobs( $jobs );
213 
214  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
215  }
216 
228  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
229  $job = false;
230 
231  if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
232  throw new JobQueueError(
233  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
234  } elseif ( is_string( $qtype ) && !isset( $this->jobClasses[$qtype] ) ) {
235  // Do not pop jobs if there is no class for the queue type
236  throw new JobQueueError( "Unrecognized job type '$qtype'." );
237  }
238 
239  if ( is_string( $qtype ) ) { // specific job type
240  if ( !in_array( $qtype, $ignored ) ) {
241  $job = $this->get( $qtype )->pop();
242  }
243  } else { // any job in the "default" jobs types
244  if ( $flags & self::USE_CACHE ) {
245  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
246  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
247  }
248  $types = $this->cache->getField( 'queues-ready', 'list' );
249  } else {
250  $types = $this->getQueuesWithJobs();
251  }
252 
253  if ( $qtype == self::TYPE_DEFAULT ) {
254  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
255  }
256 
257  $types = array_diff( $types, $ignored ); // avoid selected types
258  shuffle( $types ); // avoid starvation
259 
260  foreach ( $types as $type ) { // for each queue...
261  $job = $this->get( $type )->pop();
262  if ( $job ) { // found
263  break;
264  } else { // not found
265  $this->cache->clear( 'queues-ready' );
266  }
267  }
268  }
269 
270  return $job;
271  }
272 
279  public function ack( RunnableJob $job ) {
280  $this->get( $job->getType() )->ack( $job );
281  }
282 
290  public function deduplicateRootJob( RunnableJob $job ) {
291  return $this->get( $job->getType() )->deduplicateRootJob( $job );
292  }
293 
301  public function waitForBackups() {
302  // Try to avoid doing this more than once per queue storage medium
303  foreach ( $this->jobTypeConfiguration as $type => $conf ) {
304  $this->get( $type )->waitForBackups();
305  }
306  }
307 
313  public function getQueueTypes() {
314  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
315  }
316 
322  public function getDefaultQueueTypes() {
323  return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
324  }
325 
333  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
335  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
336 
337  $value = $cache->get( $key );
338  if ( $value === false ) {
339  $queues = $this->getQueuesWithJobs();
340  if ( $type == self::TYPE_DEFAULT ) {
341  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
342  }
343  $value = count( $queues ) ? 'true' : 'false';
344  $cache->add( $key, $value, 15 );
345  }
346 
347  return ( $value === 'true' );
348  }
349 
355  public function getQueuesWithJobs() {
356  $types = [];
357  foreach ( $this->getCoalescedQueues() as $info ) {
359  $queue = $info['queue'];
360  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
361  if ( is_array( $nonEmpty ) ) { // batching features supported
362  $types = array_merge( $types, $nonEmpty );
363  } else { // we have to go through the queues in the bucket one-by-one
364  foreach ( $info['types'] as $type ) {
365  if ( !$this->get( $type )->isEmpty() ) {
366  $types[] = $type;
367  }
368  }
369  }
370  }
371 
372  return $types;
373  }
374 
380  public function getQueueSizes() {
381  $sizeMap = [];
382  foreach ( $this->getCoalescedQueues() as $info ) {
384  $queue = $info['queue'];
385  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
386  if ( is_array( $sizes ) ) { // batching features supported
387  $sizeMap += $sizes;
388  } else { // we have to go through the queues in the bucket one-by-one
389  foreach ( $info['types'] as $type ) {
390  $sizeMap[$type] = $this->get( $type )->getSize();
391  }
392  }
393  }
394 
395  return $sizeMap;
396  }
397 
402  protected function getCoalescedQueues() {
403  if ( $this->coalescedQueues === null ) {
404  $this->coalescedQueues = [];
405  foreach ( $this->jobTypeConfiguration as $type => $conf ) {
406  $conf['domain'] = $this->domain;
407  $conf['type'] = 'null';
408  $conf['stats'] = $this->statsdDataFactory;
409  $conf['wanCache'] = $this->wanCache;
410  $conf['idGenerator'] = $this->globalIdGenerator;
411 
412  $queue = JobQueue::factory( $conf );
413  $loc = $queue->getCoalesceLocationInternal();
414  if ( !isset( $this->coalescedQueues[$loc] ) ) {
415  $this->coalescedQueues[$loc]['queue'] = $queue;
416  $this->coalescedQueues[$loc]['types'] = [];
417  }
418  if ( $type === 'default' ) {
419  $this->coalescedQueues[$loc]['types'] = array_merge(
420  $this->coalescedQueues[$loc]['types'],
421  array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
422  );
423  } else {
424  $this->coalescedQueues[$loc]['types'][] = $type;
425  }
426  }
427  }
428 
429  return $this->coalescedQueues;
430  }
431 
436  private function getCachedConfigVar( $name ) {
437  // @TODO: cleanup this whole method with a proper config system
438  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
439  return $GLOBALS[$name]; // common case
440  } else {
441  $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
442  $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
443  $value = $cache->getWithSetCallback(
444  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
445  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
446  static function () use ( $wiki, $name ) {
447  global $wgConf;
448  // @TODO: use the full domain ID here
449  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
450  },
451  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
452  );
453 
454  return $value['v'];
455  }
456  }
457 
462  private function assertValidJobs( array $jobs ) {
463  foreach ( $jobs as $job ) {
464  if ( !( $job instanceof IJobSpecification ) ) {
465  $type = is_object( $job ) ? get_class( $job ) : gettype( $job );
466  throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
467  }
468  }
469  }
470 }
$wgConf
$wgConf hold the site configuration.
Definition: Setup.php:139
A read-only mode service which does not depend on LoadBalancer.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Class to handle enqueueing of background jobs.
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki domain ID.
getQueueSizes()
Get the size of the queues for a list of job types.
__construct( $domain, ConfiguredReadOnlyMode $readOnlyMode, bool $invalidDomain, array $jobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, IBufferingStatsdDataFactory $statsdDataFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop a job off one of the job queues.
ack(RunnableJob $job)
Acknowledge that a job was completed.
ConfiguredReadOnlyMode $readOnlyMode
Read only mode.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
bool $invalidDomain
Whether the wiki is not recognized in configuration.
MapCacheLRU $cache
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
getQueueTypes()
Get the list of queue types.
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:137
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Handles a simple LRU key/value map with a maximum number of entries.
Definition: MapCacheLRU.php:36
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
getWithSetCallback( $key, callable $callback, $rank=self::RANK_TOP, $maxAge=INF)
Get an item with the given key, producing and setting it if not found.
Service locator for MediaWiki core services.
static getLocalClusterInstance()
Get the main cluster-local cache object.
Multi-datacenter aware caching interface.
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:312
Class for getting statistically unique IDs without a central coordinator.
MediaWiki adaptation of StatsdDataFactory that provides buffering functionality.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
if(count( $args)< 1) $job