MediaWiki  master
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
23 
32  protected static $instances = [];
33 
35  protected $cache;
36 
38  protected $domain;
40  protected $readOnlyReason;
42  protected $invalidDomain = false;
43 
45  protected $coalescedQueues;
46 
47  const TYPE_DEFAULT = 1; // integer; jobs popped by default
48  const TYPE_ANY = 2; // integer; any job
49 
50  const USE_CACHE = 1; // integer; use process or persistent cache
51 
52  const PROC_CACHE_TTL = 15; // integer; seconds
53 
54  const CACHE_VERSION = 1; // integer; cache version
55 
60  protected function __construct( $domain, $readOnlyReason ) {
61  $this->domain = $domain;
62  $this->readOnlyReason = $readOnlyReason;
63  $this->cache = new MapCacheLRU( 10 );
64  }
65 
70  public static function singleton( $domain = false ) {
71  global $wgLocalDatabases;
72 
73  if ( $domain === false ) {
75  }
76 
77  if ( !isset( self::$instances[$domain] ) ) {
78  self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
79  // Make sure jobs are not getting pushed to bogus wikis. This can confuse
80  // the job runner system into spawning endless RPC requests that fail (T171371).
81  $wikiId = WikiMap::getWikiIdFromDbDomain( $domain );
82  if (
83  !WikiMap::isCurrentWikiDbDomain( $domain ) &&
84  !in_array( $wikiId, $wgLocalDatabases )
85  ) {
86  self::$instances[$domain]->invalidDomain = true;
87  }
88  }
89 
90  return self::$instances[$domain];
91  }
92 
98  public static function destroySingletons() {
99  self::$instances = [];
100  }
101 
108  public function get( $type ) {
109  global $wgJobTypeConf;
110 
111  $conf = [ 'domain' => $this->domain, 'type' => $type ];
112  if ( isset( $wgJobTypeConf[$type] ) ) {
113  $conf = $conf + $wgJobTypeConf[$type];
114  } else {
115  $conf = $conf + $wgJobTypeConf['default'];
116  }
117  if ( !isset( $conf['readOnlyReason'] ) ) {
118  $conf['readOnlyReason'] = $this->readOnlyReason;
119  }
120 
121  $services = MediaWikiServices::getInstance();
122  $conf['stats'] = $services->getStatsdDataFactory();
123  $conf['wanCache'] = $services->getMainWANObjectCache();
124 
125  return JobQueue::factory( $conf );
126  }
127 
138  public function push( $jobs ) {
140 
141  if ( $this->invalidDomain ) {
142  // Do not enqueue job that cannot be run (T171371)
143  $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
145  return;
146  }
147 
148  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
149  if ( $jobs === [] ) {
150  return;
151  }
152 
153  $this->assertValidJobs( $jobs );
154 
155  $jobsByType = []; // (job type => list of jobs)
156  foreach ( $jobs as $job ) {
157  $jobsByType[$job->getType()][] = $job;
158  }
159 
160  foreach ( $jobsByType as $type => $jobs ) {
161  $this->get( $type )->push( $jobs );
162  }
163 
164  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
165  $list = $this->cache->getField( 'queues-ready', 'list' );
166  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
167  $this->cache->clear( 'queues-ready' );
168  }
169  }
170 
172  $cache->set(
173  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
174  'true',
175  15
176  );
177  if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
178  $cache->set(
179  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
180  'true',
181  15
182  );
183  }
184  }
185 
193  public function lazyPush( $jobs ) {
194  if ( $this->invalidDomain ) {
195  // Do not enqueue job that cannot be run (T171371)
196  throw new LogicException( "Domain '{$this->domain}' is not recognized." );
197  }
198 
199  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
200  $this->push( $jobs );
201  return;
202  }
203 
204  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
205 
206  // Throw errors now instead of on push(), when other jobs may be buffered
207  $this->assertValidJobs( $jobs );
208 
209  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
210  }
211 
223  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
224  global $wgJobClasses;
225 
226  $job = false;
227 
228  if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
229  throw new JobQueueError(
230  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
231  } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
232  // Do not pop jobs if there is no class for the queue type
233  throw new JobQueueError( "Unrecognized job type '$qtype'." );
234  }
235 
236  if ( is_string( $qtype ) ) { // specific job type
237  if ( !in_array( $qtype, $blacklist ) ) {
238  $job = $this->get( $qtype )->pop();
239  }
240  } else { // any job in the "default" jobs types
241  if ( $flags & self::USE_CACHE ) {
242  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
243  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
244  }
245  $types = $this->cache->getField( 'queues-ready', 'list' );
246  } else {
247  $types = $this->getQueuesWithJobs();
248  }
249 
250  if ( $qtype == self::TYPE_DEFAULT ) {
251  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
252  }
253 
254  $types = array_diff( $types, $blacklist ); // avoid selected types
255  shuffle( $types ); // avoid starvation
256 
257  foreach ( $types as $type ) { // for each queue...
258  $job = $this->get( $type )->pop();
259  if ( $job ) { // found
260  break;
261  } else { // not found
262  $this->cache->clear( 'queues-ready' );
263  }
264  }
265  }
266 
267  return $job;
268  }
269 
276  public function ack( RunnableJob $job ) {
277  $this->get( $job->getType() )->ack( $job );
278  }
279 
287  public function deduplicateRootJob( RunnableJob $job ) {
288  return $this->get( $job->getType() )->deduplicateRootJob( $job );
289  }
290 
298  public function waitForBackups() {
299  global $wgJobTypeConf;
300 
301  // Try to avoid doing this more than once per queue storage medium
302  foreach ( $wgJobTypeConf as $type => $conf ) {
303  $this->get( $type )->waitForBackups();
304  }
305  }
306 
312  public function getQueueTypes() {
313  return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
314  }
315 
321  public function getDefaultQueueTypes() {
323 
324  return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
325  }
326 
334  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
336  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
337 
338  $value = $cache->get( $key );
339  if ( $value === false ) {
340  $queues = $this->getQueuesWithJobs();
341  if ( $type == self::TYPE_DEFAULT ) {
342  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
343  }
344  $value = count( $queues ) ? 'true' : 'false';
345  $cache->add( $key, $value, 15 );
346  }
347 
348  return ( $value === 'true' );
349  }
350 
356  public function getQueuesWithJobs() {
357  $types = [];
358  foreach ( $this->getCoalescedQueues() as $info ) {
360  $queue = $info['queue'];
361  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
362  if ( is_array( $nonEmpty ) ) { // batching features supported
363  $types = array_merge( $types, $nonEmpty );
364  } else { // we have to go through the queues in the bucket one-by-one
365  foreach ( $info['types'] as $type ) {
366  if ( !$this->get( $type )->isEmpty() ) {
367  $types[] = $type;
368  }
369  }
370  }
371  }
372 
373  return $types;
374  }
375 
381  public function getQueueSizes() {
382  $sizeMap = [];
383  foreach ( $this->getCoalescedQueues() as $info ) {
385  $queue = $info['queue'];
386  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
387  if ( is_array( $sizes ) ) { // batching features supported
388  $sizeMap = $sizeMap + $sizes;
389  } else { // we have to go through the queues in the bucket one-by-one
390  foreach ( $info['types'] as $type ) {
391  $sizeMap[$type] = $this->get( $type )->getSize();
392  }
393  }
394  }
395 
396  return $sizeMap;
397  }
398 
403  protected function getCoalescedQueues() {
404  global $wgJobTypeConf;
405 
406  if ( $this->coalescedQueues === null ) {
407  $this->coalescedQueues = [];
408  foreach ( $wgJobTypeConf as $type => $conf ) {
410  [ 'domain' => $this->domain, 'type' => 'null' ] + $conf );
411  $loc = $queue->getCoalesceLocationInternal();
412  if ( !isset( $this->coalescedQueues[$loc] ) ) {
413  $this->coalescedQueues[$loc]['queue'] = $queue;
414  $this->coalescedQueues[$loc]['types'] = [];
415  }
416  if ( $type === 'default' ) {
417  $this->coalescedQueues[$loc]['types'] = array_merge(
418  $this->coalescedQueues[$loc]['types'],
419  array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
420  );
421  } else {
422  $this->coalescedQueues[$loc]['types'][] = $type;
423  }
424  }
425  }
426 
427  return $this->coalescedQueues;
428  }
429 
434  private function getCachedConfigVar( $name ) {
435  // @TODO: cleanup this whole method with a proper config system
436  if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
437  return $GLOBALS[$name]; // common case
438  } else {
439  $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
440  $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
441  $value = $cache->getWithSetCallback(
442  $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
443  $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
444  function () use ( $wiki, $name ) {
445  global $wgConf;
446  // @TODO: use the full domain ID here
447  return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
448  },
449  [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
450  );
451 
452  return $value['v'];
453  }
454  }
455 
460  private function assertValidJobs( array $jobs ) {
461  foreach ( $jobs as $job ) { // sanity checks
462  if ( !( $job instanceof IJobSpecification ) ) {
463  throw new InvalidArgumentException( "Expected IJobSpecification objects" );
464  }
465  }
466  }
467 }
assertValidJobs(array $jobs)
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
$wgJobTypesExcludedFromDefaultQueue
Jobs that must be explicitly requested, i.e.
bool $invalidDomain
Whether the wiki is not recognized in configuration.
__construct( $domain, $readOnlyReason)
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:269
MapCacheLRU $cache
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
static getLocalClusterInstance()
Get the main cluster-local cache object.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
$wgJobClasses
Maps jobs to their handlers; extensions can add to this to provide custom jobs.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $blacklist=[])
Pop a job off one of the job queues.
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
getDefaultQueueTypes()
Get the list of default queue types.
wfConfiguredReadOnlyReason()
Get the value of $wgReadOnly or the contents of $wgReadOnlyFile.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
getQueueTypes()
Get the list of queue types.
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
getWithSetCallback( $key, callable $callback, $rank=self::RANK_TOP, $maxAge=INF)
Get an item with the given key, producing and setting it if not found.
const PROC_CACHE_TTL
string bool $readOnlyReason
Read only rationale (or false if r/w)
static logException( $e, $catcher=self::CAUGHT_BY_OTHER, $extraData=[])
Log an exception to the exception log (if enabled).
ack(RunnableJob $job)
Acknowledge that a job was completed.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
$GLOBALS['IP']
getQueueSizes()
Get the size of the queus for a list of job types.
$wgConf
wgConf hold the site configuration.
getCachedConfigVar( $name)
static getCurrentWikiDbDomain()
Definition: WikiMap.php:293
static JobQueueGroup [] $instances
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:114
$wgJobTypeConf
Map of job types to configuration arrays.
if(count( $args)< 1) $job
string $domain
Wiki domain ID.
static isCurrentWikiDbDomain( $domain)
Definition: WikiMap.php:304
push( $jobs)
Insert jobs into the respective queues of which they belong.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
static singleton( $domain=false)
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
Interface for serializable objects that describe a job queue task.
string [] $wgLocalDatabases
Other wikis on this site, can be administered from a single developer account.
static destroySingletons()
Destroy the singleton instances.