MediaWiki  master
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
23 
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $domain;
40  protected $readOnlyReason;
42  protected $invalidDomain = false;
43 
45  protected $coalescedQueues;
46 
47  public const TYPE_DEFAULT = 1; // integer; jobs popped by default
48  private const TYPE_ANY = 2; // integer; any job
49 
50  public const USE_CACHE = 1; // integer; use process or persistent cache
51 
52  private const PROC_CACHE_TTL = 15; // integer; seconds
53 
54  private const CACHE_VERSION = 1; // integer; cache version
55 
60  protected function __construct( $domain, $readOnlyReason ) {
61  $this->domain = $domain;
62  $this->readOnlyReason = $readOnlyReason;
63  $this->cache = new MapCacheLRU( 10 );
64  }
65 
70  public static function singleton( $domain = false ) {
71  global $wgLocalDatabases;
72 
73  if ( $domain === false ) {
75  }
76 
77  if ( !isset( self::$instances[$domain] ) ) {
78  $reason = MediaWikiServices::getInstance()
79  ->getConfiguredReadOnlyMode()
80  ->getReason();
81  self::$instances[$domain] = new self( $domain, $reason );
82  // Make sure jobs are not getting pushed to bogus wikis. This can confuse
83  // the job runner system into spawning endless RPC requests that fail (T171371).
85  if (
87  !in_array( $wikiId, $wgLocalDatabases )
88  ) {
89  self::$instances[$domain]->invalidDomain = true;
90  }
91  }
92 
93  return self::$instances[$domain];
94  }
95 
101  public static function destroySingletons() {
102  self::$instances = [];
103  }
104 
111  public function get( $type ) {
112  global $wgJobTypeConf;
113 
114  $conf = [ 'domain' => $this->domain, 'type' => $type ];
115  if ( isset( $wgJobTypeConf[$type] ) ) {
116  $conf += $wgJobTypeConf[$type];
117  } else {
118  $conf += $wgJobTypeConf['default'];
119  }
120  if ( !isset( $conf['readOnlyReason'] ) ) {
121  $conf['readOnlyReason'] = $this->readOnlyReason;
122  }
123 
124  return $this->factoryJobQueue( $conf );
125  }
126 
132  private function factoryJobQueue( array $conf ) {
133  $services = MediaWikiServices::getInstance();
134  $conf['stats'] = $services->getStatsdDataFactory();
135  $conf['wanCache'] = $services->getMainWANObjectCache();
136  $conf['idGenerator'] = $services->getGlobalIdGenerator();
137 
138  return JobQueue::factory( $conf );
139  }
140 
151  public function push( $jobs ) {
153 
154  if ( $this->invalidDomain ) {
155  // Do not enqueue job that cannot be run (T171371)
156  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
158  return;
159  }
160 
161  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
162  if ( $jobs === [] ) {
163  return;
164  }
165 
166  $this->assertValidJobs( $jobs );
167 
168  $jobsByType = []; // (job type => list of jobs)
169  foreach ( $jobs as $job ) {
170  $jobsByType[$job->getType()][] = $job;
171  }
172 
173  foreach ( $jobsByType as $type => $jobs ) {
174  $this->get( $type )->push( $jobs );
175  }
176 
177  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
178  $list = $this->cache->getField( 'queues-ready', 'list' );
179  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
180  $this->cache->clear( 'queues-ready' );
181  }
182  }
183 
185  $cache->set(
186  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
187  'true',
188  15
189  );
190  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
191  $cache->set(
192  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
193  'true',
194  15
195  );
196  }
197  }
198 
206  public function lazyPush( $jobs ) {
207  if ( $this->invalidDomain ) {
208  // Do not enqueue job that cannot be run (T171371)
209  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
210  }
211 
212  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
213  $this->push( $jobs );
214  return;
215  }
216 
217  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
218 
219  // Throw errors now instead of on push(), when other jobs may be buffered
220  $this->assertValidJobs( $jobs );
221 
222  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
223  }
224 
236  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
237  global $wgJobClasses;
238 
239  $job = false;
240 
241  if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
242  throw new JobQueueError(
243  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
244  } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
245  // Do not pop jobs if there is no class for the queue type
246  throw new JobQueueError( "Unrecognized job type '$qtype'." );
247  }
248 
249  if ( is_string( $qtype ) ) { // specific job type
250  if ( !in_array( $qtype, $ignored ) ) {
251  $job = $this->get( $qtype )->pop();
252  }
253  } else { // any job in the "default" jobs types
254  if ( $flags & self::USE_CACHE ) {
255  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
256  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
257  }
258  $types = $this->cache->getField( 'queues-ready', 'list' );
259  } else {
260  $types = $this->getQueuesWithJobs();
261  }
262 
263  if ( $qtype == self::TYPE_DEFAULT ) {
264  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
265  }
266 
267  $types = array_diff( $types, $ignored ); // avoid selected types
268  shuffle( $types ); // avoid starvation
269 
270  foreach ( $types as $type ) { // for each queue...
271  $job = $this->get( $type )->pop();
272  if ( $job ) { // found
273  break;
274  } else { // not found
275  $this->cache->clear( 'queues-ready' );
276  }
277  }
278  }
279 
280  return $job;
281  }
282 
289  public function ack( RunnableJob $job ) {
290  $this->get( $job->getType() )->ack( $job );
291  }
292 
300  public function deduplicateRootJob( RunnableJob $job ) {
301  return $this->get( $job->getType() )->deduplicateRootJob( $job );
302  }
303 
311  public function waitForBackups() {
312  global $wgJobTypeConf;
313 
314  // Try to avoid doing this more than once per queue storage medium
315  foreach ( $wgJobTypeConf as $type => $conf ) {
316  $this->get( $type )->waitForBackups();
317  }
318  }
319 
325  public function getQueueTypes() {
326  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
327  }
328 
334  public function getDefaultQueueTypes() {
336 
337  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
338  }
339 
347  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
349  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
350 
351  $value = $cache->get( $key );
352  if ( $value === false ) {
353  $queues = $this->getQueuesWithJobs();
354  if ( $type == self::TYPE_DEFAULT ) {
355  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
356  }
357  $value = count( $queues ) ? 'true' : 'false';
358  $cache->add( $key, $value, 15 );
359  }
360 
361  return ( $value === 'true' );
362  }
363 
369  public function getQueuesWithJobs() {
370  $types = [];
371  foreach ( $this->getCoalescedQueues() as $info ) {
373  $queue = $info['queue'];
374  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
375  if ( is_array( $nonEmpty ) ) { // batching features supported
376  $types = array_merge( $types, $nonEmpty );
377  } else { // we have to go through the queues in the bucket one-by-one
378  foreach ( $info['types'] as $type ) {
379  if ( !$this->get( $type )->isEmpty() ) {
380  $types[] = $type;
381  }
382  }
383  }
384  }
385 
386  return $types;
387  }
388 
394  public function getQueueSizes() {
395  $sizeMap = [];
396  foreach ( $this->getCoalescedQueues() as $info ) {
398  $queue = $info['queue'];
399  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
400  if ( is_array( $sizes ) ) { // batching features supported
401  $sizeMap += $sizes;
402  } else { // we have to go through the queues in the bucket one-by-one
403  foreach ( $info['types'] as $type ) {
404  $sizeMap[$type] = $this->get( $type )->getSize();
405  }
406  }
407  }
408 
409  return $sizeMap;
410  }
411 
416  protected function getCoalescedQueues() {
417  global $wgJobTypeConf;
418 
419  if ( $this->coalescedQueues === null ) {
420  $this->coalescedQueues = [];
421  foreach ( $wgJobTypeConf as $type => $conf ) {
422  $queue = $this->factoryJobQueue(
423  [ 'domain' => $this->domain, 'type' => 'null' ] + $conf );
424  $loc = $queue->getCoalesceLocationInternal();
425  if ( !isset( $this->coalescedQueues[$loc] ) ) {
426  $this->coalescedQueues[$loc]['queue'] = $queue;
427  $this->coalescedQueues[$loc]['types'] = [];
428  }
429  if ( $type === 'default' ) {
430  $this->coalescedQueues[$loc]['types'] = array_merge(
431  $this->coalescedQueues[$loc]['types'],
432  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
433  );
434  } else {
435  $this->coalescedQueues[$loc]['types'][] = $type;
436  }
437  }
438  }
439 
440  return $this->coalescedQueues;
441  }
442 
447  private function getCachedConfigVar( $name ) {
448  // @TODO: cleanup this whole method with a proper config system
449  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
450  return $GLOBALS[$name]; // common case
451  } else {
452  $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
453  $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
454  $value = $cache->getWithSetCallback(
455  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
456  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
457  static function () use ( $wiki, $name ) {
458  global $wgConf;
459  // @TODO: use the full domain ID here
460  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
461  },
462  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
463  );
464 
465  return $value['v'];
466  }
467  }
468 
473  private function assertValidJobs( array $jobs ) {
474  foreach ( $jobs as $job ) { // sanity checks
475  if ( !( $job instanceof IJobSpecification ) ) {
476  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
477  }
478  }
479  }
480 }
$wgConf
$wgConf
$wgConf hold the site configuration.
Definition: DefaultSettings.php:58
WikiMap\isCurrentWikiDbDomain
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:312
JobQueueGroup\USE_CACHE
const USE_CACHE
Definition: JobQueueGroup.php:50
JobQueueGroup\waitForBackups
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
Definition: JobQueueGroup.php:311
JobQueueGroup\deduplicateRootJob
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueueGroup.php:300
$wgLocalDatabases
string[] $wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
Definition: DefaultSettings.php:2326
WikiMap\getCurrentWikiDbDomain
static getCurrentWikiDbDomain()
Definition: WikiMap.php:293
ObjectCache\getLocalClusterInstance
static getLocalClusterInstance()
Get the main cluster-local cache object.
Definition: ObjectCache.php:272
JobQueueGroup\PROC_CACHE_TTL
const PROC_CACHE_TTL
Definition: JobQueueGroup.php:52
JobQueueGroup\CACHE_VERSION
const CACHE_VERSION
Definition: JobQueueGroup.php:54
$wgJobTypeConf
$wgJobTypeConf
Map of job types to configuration arrays.
Definition: DefaultSettings.php:8118
MediaWiki\MediaWikiServices
MediaWikiServices is the service locator for the application scope of MediaWiki.
Definition: MediaWikiServices.php:173
JobQueueGroup\$coalescedQueues
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
Definition: JobQueueGroup.php:45
JobQueueEnqueueUpdate
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Definition: JobQueueEnqueueUpdate.php:31
JobQueueGroup\TYPE_DEFAULT
const TYPE_DEFAULT
Definition: JobQueueGroup.php:47
JobQueueGroup\getCoalescedQueues
getCoalescedQueues()
Definition: JobQueueGroup.php:416
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
Definition: DeferredUpdates.php:119
RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
JobQueueGroup\$instances
static JobQueueGroup[] $instances
Definition: JobQueueGroup.php:32
JobQueueGroup\queuesHaveJobs
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
Definition: JobQueueGroup.php:347
JobQueueGroup\destroySingletons
static destroySingletons()
Destroy the singleton instances.
Definition: JobQueueGroup.php:101
MWExceptionHandler\logException
static logException(Throwable $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log a throwable to the exception log (if enabled).
Definition: MWExceptionHandler.php:666
IJobSpecification\getType
getType()
JobQueueGroup\$domain
string $domain
Wiki domain ID.
Definition: JobQueueGroup.php:38
JobQueueGroup\push
push( $jobs)
Insert jobs into the respective queues of which they belong.
Definition: JobQueueGroup.php:151
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
$queue
$queue
Definition: mergeMessageFileList.php:176
MapCacheLRU
Handles a simple LRU key/value map with a maximum number of entries.
Definition: MapCacheLRU.php:37
JobQueueGroup\getCachedConfigVar
getCachedConfigVar( $name)
Definition: JobQueueGroup.php:447
JobQueueError
@newable
Definition: JobQueueError.php:29
MapCacheLRU\getWithSetCallback
getWithSetCallback( $key, callable $callback, $rank=self::RANK_TOP, $maxAge=INF)
Get an item with the given key, producing and setting it if not found.
Definition: MapCacheLRU.php:268
JobQueueGroup\ack
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueueGroup.php:289
$wgJobTypesExcludedFromDefaultQueue
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
Definition: DefaultSettings.php:8084
JobQueueGroup\getDefaultQueueTypes
getDefaultQueueTypes()
Get the list of default queue types.
Definition: JobQueueGroup.php:334
JobQueueGroup\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueueGroup.php:40
MapCacheLRU\set
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
Definition: MapCacheLRU.php:110
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:125
$wgJobClasses
$wgJobClasses
Maps jobs to their handlers; extensions can add to this to provide custom jobs.
Definition: DefaultSettings.php:8045
JobQueueGroup\pop
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop a job off one of the job queues.
Definition: JobQueueGroup.php:236
JobQueueGroup\singleton
static singleton( $domain=false)
Definition: JobQueueGroup.php:70
JobQueueGroup\TYPE_ANY
const TYPE_ANY
Definition: JobQueueGroup.php:48
JobQueueGroup\assertValidJobs
assertValidJobs(array $jobs)
Definition: JobQueueGroup.php:473
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:50
JobQueueGroup\getQueueTypes
getQueueTypes()
Get the list of queue types.
Definition: JobQueueGroup.php:325
MapCacheLRU\get
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
Definition: MapCacheLRU.php:172
JobQueueGroup\getQueuesWithJobs
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
Definition: JobQueueGroup.php:369
JobQueueGroup\factoryJobQueue
factoryJobQueue(array $conf)
Definition: JobQueueGroup.php:132
JobQueueGroup\getQueueSizes
getQueueSizes()
Get the size of the queues for a list of job types.
Definition: JobQueueGroup.php:394
JobQueueGroup\$cache
MapCacheLRU $cache
Definition: JobQueueGroup.php:35
IJobSpecification
Interface for serializable objects that describe a job queue task.
Definition: IJobSpecification.php:42
JobQueueGroup\$invalidDomain
bool $invalidDomain
Whether the wiki is not recognized in configuration.
Definition: JobQueueGroup.php:42
JobQueueGroup\lazyPush
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
Definition: JobQueueGroup.php:206
JobQueueGroup
Class to handle enqueueing of background jobs.
Definition: JobQueueGroup.php:30
JobQueueGroup\__construct
__construct( $domain, $readOnlyReason)
Definition: JobQueueGroup.php:60
$type
$type
Definition: testCompression.php:52