MediaWiki  master
JobQueueGroup.php
Go to the documentation of this file.
1 <?php
23 
36  protected $cache;
37 
39  protected $domain;
41  protected $readOnlyMode;
43  private $localJobClasses;
45  private $jobTypeConfiguration;
47  private $jobTypesExcludedFromDefaultQueue;
49  private $statsdDataFactory;
51  private $wanCache;
53  private $globalIdGenerator;
54 
56  protected $coalescedQueues;
57 
58  public const TYPE_DEFAULT = 1; // integer; jobs popped by default
59  private const TYPE_ANY = 2; // integer; any job
60 
61  public const USE_CACHE = 1; // integer; use process or persistent cache
62 
63  private const PROC_CACHE_TTL = 15; // integer; seconds
64 
77  public function __construct(
78  $domain,
80  ?array $localJobClasses,
81  array $jobTypeConfiguration,
82  array $jobTypesExcludedFromDefaultQueue,
83  IBufferingStatsdDataFactory $statsdDataFactory,
84  WANObjectCache $wanCache,
85  GlobalIdGenerator $globalIdGenerator
86  ) {
87  $this->domain = $domain;
88  $this->readOnlyMode = $readOnlyMode;
89  $this->cache = new MapCacheLRU( 10 );
90  $this->localJobClasses = $localJobClasses;
91  $this->jobTypeConfiguration = $jobTypeConfiguration;
92  $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
93  $this->statsdDataFactory = $statsdDataFactory;
94  $this->wanCache = $wanCache;
95  $this->globalIdGenerator = $globalIdGenerator;
96  }
97 
104  public function get( $type ) {
105  $conf = [ 'domain' => $this->domain, 'type' => $type ];
106  $conf += $this->jobTypeConfiguration[$type] ?? $this->jobTypeConfiguration['default'];
107  if ( !isset( $conf['readOnlyReason'] ) ) {
108  $conf['readOnlyReason'] = $this->readOnlyMode->getConfiguredReason();
109  }
110 
111  $conf['stats'] = $this->statsdDataFactory;
112  $conf['wanCache'] = $this->wanCache;
113  $conf['idGenerator'] = $this->globalIdGenerator;
114 
115  return JobQueue::factory( $conf );
116  }
117 
127  public function push( $jobs ) {
128  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
129  if ( $jobs === [] ) {
130  return;
131  }
132 
133  $this->assertValidJobs( $jobs );
134 
135  $jobsByType = []; // (job type => list of jobs)
136  foreach ( $jobs as $job ) {
137  $type = $job->getType();
138  if ( isset( $this->jobTypeConfiguration[$type] ) ) {
139  $jobsByType[$type][] = $job;
140  } else {
141  if (
142  isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
143  $this->jobTypeConfiguration['default']['typeAgnostic']
144  ) {
145  $jobsByType['default'][] = $job;
146  } else {
147  $jobsByType[$type][] = $job;
148  }
149  }
150  }
151 
152  foreach ( $jobsByType as $type => $jobs ) {
153  $this->get( $type )->push( $jobs );
154  }
155 
156  if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
157  $list = $this->cache->getField( 'queues-ready', 'list' );
158  if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
159  $this->cache->clear( 'queues-ready' );
160  }
161  }
162 
164  $cache->set(
165  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
166  'true',
167  15
168  );
169  if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
170  $cache->set(
171  $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
172  'true',
173  15
174  );
175  }
176  }
177 
185  public function lazyPush( $jobs ) {
186  if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
187  $this->push( $jobs );
188  return;
189  }
190 
191  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
192 
193  // Throw errors now instead of on push(), when other jobs may be buffered
194  $this->assertValidJobs( $jobs );
195 
196  DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
197  }
198 
213  public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
214  $job = false;
215 
216  if ( !$this->localJobClasses ) {
217  throw new JobQueueError(
218  "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
219  }
220  if ( is_string( $qtype ) && !isset( $this->localJobClasses[$qtype] ) ) {
221  // Do not pop jobs if there is no class for the queue type
222  throw new JobQueueError( "Unrecognized job type '$qtype'." );
223  }
224 
225  if ( is_string( $qtype ) ) { // specific job type
226  if ( !in_array( $qtype, $ignored ) ) {
227  $job = $this->get( $qtype )->pop();
228  }
229  } else { // any job in the "default" jobs types
230  if ( $flags & self::USE_CACHE ) {
231  if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
232  $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
233  }
234  $types = $this->cache->getField( 'queues-ready', 'list' );
235  } else {
236  $types = $this->getQueuesWithJobs();
237  }
238 
239  if ( $qtype == self::TYPE_DEFAULT ) {
240  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
241  }
242 
243  $types = array_diff( $types, $ignored ); // avoid selected types
244  shuffle( $types ); // avoid starvation
245 
246  foreach ( $types as $type ) { // for each queue...
247  $job = $this->get( $type )->pop();
248  if ( $job ) { // found
249  break;
250  } else { // not found
251  $this->cache->clear( 'queues-ready' );
252  }
253  }
254  }
255 
256  return $job;
257  }
258 
265  public function ack( RunnableJob $job ) {
266  $this->get( $job->getType() )->ack( $job );
267  }
268 
277  public function deduplicateRootJob( RunnableJob $job ) {
278  wfDeprecated( __METHOD__, '1.40' );
279  return true;
280  }
281 
291  public function waitForBackups() {
292  wfDeprecated( __METHOD__, '1.41' );
293  // Try to avoid doing this more than once per queue storage medium
294  foreach ( $this->jobTypeConfiguration as $type => $conf ) {
295  $this->get( $type )->waitForBackups();
296  }
297  }
298 
306  public function getQueueTypes() {
307  if ( !$this->localJobClasses ) {
308  throw new JobQueueError( 'Cannot inspect job queue from foreign wiki' );
309  }
310  return array_keys( $this->localJobClasses );
311  }
312 
320  public function getDefaultQueueTypes() {
321  return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
322  }
323 
333  public function queuesHaveJobs( $type = self::TYPE_ANY ) {
335  $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
336 
337  $value = $cache->get( $key );
338  if ( $value === false ) {
339  $queues = $this->getQueuesWithJobs();
340  if ( $type == self::TYPE_DEFAULT ) {
341  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
342  }
343  $value = count( $queues ) ? 'true' : 'false';
344  $cache->add( $key, $value, 15 );
345  }
346 
347  return ( $value === 'true' );
348  }
349 
357  public function getQueuesWithJobs() {
358  $types = [];
359  foreach ( $this->getCoalescedQueues() as $info ) {
361  $queue = $info['queue'];
362  $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
363  if ( is_array( $nonEmpty ) ) { // batching features supported
364  $types = array_merge( $types, $nonEmpty );
365  } else { // we have to go through the queues in the bucket one-by-one
366  foreach ( $info['types'] as $type ) {
367  if ( !$this->get( $type )->isEmpty() ) {
368  $types[] = $type;
369  }
370  }
371  }
372  }
373 
374  return $types;
375  }
376 
384  public function getQueueSizes() {
385  $sizeMap = [];
386  foreach ( $this->getCoalescedQueues() as $info ) {
388  $queue = $info['queue'];
389  $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
390  if ( is_array( $sizes ) ) { // batching features supported
391  $sizeMap += $sizes;
392  } else { // we have to go through the queues in the bucket one-by-one
393  foreach ( $info['types'] as $type ) {
394  $sizeMap[$type] = $this->get( $type )->getSize();
395  }
396  }
397  }
398 
399  return $sizeMap;
400  }
401 
406  protected function getCoalescedQueues() {
407  if ( $this->coalescedQueues === null ) {
408  $this->coalescedQueues = [];
409  foreach ( $this->jobTypeConfiguration as $type => $conf ) {
410  $conf['domain'] = $this->domain;
411  $conf['type'] = 'null';
412  $conf['stats'] = $this->statsdDataFactory;
413  $conf['wanCache'] = $this->wanCache;
414  $conf['idGenerator'] = $this->globalIdGenerator;
415 
416  $queue = JobQueue::factory( $conf );
417  $loc = $queue->getCoalesceLocationInternal();
418  if ( !isset( $this->coalescedQueues[$loc] ) ) {
419  $this->coalescedQueues[$loc]['queue'] = $queue;
420  $this->coalescedQueues[$loc]['types'] = [];
421  }
422  if ( $type === 'default' ) {
423  $this->coalescedQueues[$loc]['types'] = array_merge(
424  $this->coalescedQueues[$loc]['types'],
425  array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
426  );
427  } else {
428  $this->coalescedQueues[$loc]['types'][] = $type;
429  }
430  }
431  }
432 
433  return $this->coalescedQueues;
434  }
435 
440  private function assertValidJobs( array $jobs ) {
441  foreach ( $jobs as $job ) {
442  if ( !( $job instanceof IJobSpecification ) ) {
443  $type = is_object( $job ) ? get_class( $job ) : gettype( $job );
444  throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
445  }
446  }
447  }
448 }
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the pending update queue for execution at the appropriate time.
Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup.
Handle enqueueing of background jobs.
push( $jobs)
Insert jobs into the respective queues of which they belong.
string $domain
Wiki domain ID.
getQueueSizes()
Get the size of the queues for a list of job types.
waitForBackups()
Wait for any replica DBs or backup queue servers to catch up.
array $coalescedQueues
Map of (bucket => (queue => JobQueue, types => list of types)
getDefaultQueueTypes()
Get the list of default queue types.
lazyPush( $jobs)
Buffer jobs for insertion via push() or call it now if in CLI mode.
pop( $qtype=self::TYPE_DEFAULT, $flags=0, array $ignored=[])
Pop one job off a job queue.
ack(RunnableJob $job)
Acknowledge that a job was completed.
queuesHaveJobs( $type=self::TYPE_ANY)
Check if there are any queues with jobs (this is cached)
ReadOnlyMode $readOnlyMode
Read only mode.
__construct( $domain, ReadOnlyMode $readOnlyMode, ?array $localJobClasses, array $jobTypeConfiguration, array $jobTypesExcludedFromDefaultQueue, IBufferingStatsdDataFactory $statsdDataFactory, WANObjectCache $wanCache, GlobalIdGenerator $globalIdGenerator)
MapCacheLRU $cache
getQueuesWithJobs()
Get the list of job types that have non-empty queues.
getQueueTypes()
Get the list of queue types.
deduplicateRootJob(RunnableJob $job)
Register the "root job" of a given job into the queue for de-duplication.
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:143
Store key-value entries in a size-limited in-memory LRU cache.
Definition: MapCacheLRU.php:34
set( $key, $value, $rank=self::RANK_TOP)
Set a key/value pair.
get( $key, $maxAge=INF, $default=null)
Get the value for a key.
static getLocalClusterInstance()
Get the main cluster-local cache object.
Multi-datacenter aware caching interface.
Determine whether a site is currently in read-only mode.
Class for getting statistically unique IDs without a central coordinator.
MediaWiki adaptation of StatsdDataFactory that provides buffering functionality.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
if(count( $args)< 1) $job