MediaWiki  1.33.0
JobQueue.php
Go to the documentation of this file.
1 <?php
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
24 
31 abstract class JobQueue {
33  protected $domain;
35  protected $type;
37  protected $order;
39  protected $claimTTL;
41  protected $maxTries;
43  protected $readOnlyReason;
45  protected $stats;
46 
48  protected $dupCache;
49 
50  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51 
52  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53 
58  protected function __construct( array $params ) {
59  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
60  $this->type = $params['type'];
61  $this->claimTTL = $params['claimTTL'] ?? 0;
62  $this->maxTries = $params['maxTries'] ?? 3;
63  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64  $this->order = $params['order'];
65  } else {
66  $this->order = $this->optimalOrder();
67  }
68  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
70  }
71  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
72  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
73  $this->dupCache = $params['stash'] ?? new EmptyBagOStuff();
74  }
75 
106  final public static function factory( array $params ) {
107  $class = $params['class'];
108  if ( !class_exists( $class ) ) {
109  throw new JobQueueError( "Invalid job queue class '$class'." );
110  }
111  $obj = new $class( $params );
112  if ( !( $obj instanceof self ) ) {
113  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
114  }
115 
116  return $obj;
117  }
118 
122  final public function getDomain() {
123  return $this->domain;
124  }
125 
130  final public function getWiki() {
131  return WikiMap::getWikiIdFromDbDomain( $this->domain );
132  }
133 
137  final public function getType() {
138  return $this->type;
139  }
140 
144  final public function getOrder() {
145  return $this->order;
146  }
147 
153  abstract protected function supportedOrders();
154 
160  abstract protected function optimalOrder();
161 
167  protected function supportsDelayedJobs() {
168  return false; // not implemented
169  }
170 
175  final public function delayedJobsEnabled() {
176  return $this->supportsDelayedJobs();
177  }
178 
183  public function getReadOnlyReason() {
184  return $this->readOnlyReason;
185  }
186 
199  final public function isEmpty() {
200  $res = $this->doIsEmpty();
201 
202  return $res;
203  }
204 
209  abstract protected function doIsEmpty();
210 
220  final public function getSize() {
221  $res = $this->doGetSize();
222 
223  return $res;
224  }
225 
230  abstract protected function doGetSize();
231 
241  final public function getAcquiredCount() {
242  $res = $this->doGetAcquiredCount();
243 
244  return $res;
245  }
246 
251  abstract protected function doGetAcquiredCount();
252 
263  final public function getDelayedCount() {
264  $res = $this->doGetDelayedCount();
265 
266  return $res;
267  }
268 
273  protected function doGetDelayedCount() {
274  return 0; // not implemented
275  }
276 
286  final public function getAbandonedCount() {
287  $res = $this->doGetAbandonedCount();
288 
289  return $res;
290  }
291 
296  protected function doGetAbandonedCount() {
297  return 0; // not implemented
298  }
299 
310  final public function push( $jobs, $flags = 0 ) {
311  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
312  $this->batchPush( $jobs, $flags );
313  }
314 
325  final public function batchPush( array $jobs, $flags = 0 ) {
326  $this->assertNotReadOnly();
327 
328  if ( $jobs === [] ) {
329  return; // nothing to do
330  }
331 
332  foreach ( $jobs as $job ) {
333  if ( $job->getType() !== $this->type ) {
334  throw new JobQueueError(
335  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
336  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
337  throw new JobQueueError(
338  "Got delayed '{$job->getType()}' job; delays are not supported." );
339  }
340  }
341 
342  $this->doBatchPush( $jobs, $flags );
343 
344  foreach ( $jobs as $job ) {
345  if ( $job->isRootJob() ) {
346  $this->deduplicateRootJob( $job );
347  }
348  }
349  }
350 
356  abstract protected function doBatchPush( array $jobs, $flags );
357 
366  final public function pop() {
367  $this->assertNotReadOnly();
368 
369  $job = $this->doPop();
370 
371  // Flag this job as an old duplicate based on its "root" job...
372  try {
373  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
374  $this->incrStats( 'dupe_pops', $this->type );
375  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
376  }
377  } catch ( Exception $e ) {
378  // don't lose jobs over this
379  }
380 
381  return $job;
382  }
383 
388  abstract protected function doPop();
389 
400  final public function ack( Job $job ) {
401  $this->assertNotReadOnly();
402  if ( $job->getType() !== $this->type ) {
403  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
404  }
405 
406  $this->doAck( $job );
407  }
408 
413  abstract protected function doAck( Job $job );
414 
446  final public function deduplicateRootJob( IJobSpecification $job ) {
447  $this->assertNotReadOnly();
448  if ( $job->getType() !== $this->type ) {
449  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
450  }
451 
452  return $this->doDeduplicateRootJob( $job );
453  }
454 
462  if ( !$job->hasRootJobParams() ) {
463  throw new JobQueueError( "Cannot register root job; missing parameters." );
464  }
465  $params = $job->getRootJobParams();
466 
467  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
468  // Callers should call JobQueueGroup::push() before this method so that if the insert
469  // fails, the de-duplication registration will be aborted. Since the insert is
470  // deferred till "transaction idle", do the same here, so that the ordering is
471  // maintained. Having only the de-duplication registration succeed would cause
472  // jobs to become no-ops without any actual jobs that made them redundant.
473  $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
474  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
475  return true; // a newer version of this root job was enqueued
476  }
477 
478  // Update the timestamp of the last root job started at the location...
479  return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
480  }
481 
489  final protected function isRootJobOldDuplicate( Job $job ) {
490  if ( $job->getType() !== $this->type ) {
491  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
492  }
493  $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
494 
495  return $isDuplicate;
496  }
497 
503  protected function doIsRootJobOldDuplicate( Job $job ) {
504  if ( !$job->hasRootJobParams() ) {
505  return false; // job has no de-deplication info
506  }
507  $params = $job->getRootJobParams();
508 
509  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
510  // Get the last time this root job was enqueued
511  $timestamp = $this->dupCache->get( $key );
512 
513  // Check if a new root job was started at the location after this one's...
514  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
515  }
516 
521  protected function getRootJobCacheKey( $signature ) {
522  return $this->dupCache->makeGlobalKey(
523  'jobqueue',
524  $this->domain,
525  $this->type,
526  'rootjob',
527  $signature
528  );
529  }
530 
538  final public function delete() {
539  $this->assertNotReadOnly();
540 
541  $this->doDelete();
542  }
543 
548  protected function doDelete() {
549  throw new JobQueueError( "This method is not implemented." );
550  }
551 
560  final public function waitForBackups() {
561  $this->doWaitForBackups();
562  }
563 
568  protected function doWaitForBackups() {
569  }
570 
576  final public function flushCaches() {
577  $this->doFlushCaches();
578  }
579 
584  protected function doFlushCaches() {
585  }
586 
595  abstract public function getAllQueuedJobs();
596 
605  public function getAllDelayedJobs() {
606  return new ArrayIterator( [] ); // not implemented
607  }
608 
619  public function getAllAcquiredJobs() {
620  return new ArrayIterator( [] ); // not implemented
621  }
622 
630  public function getAllAbandonedJobs() {
631  return new ArrayIterator( [] ); // not implemented
632  }
633 
640  public function getCoalesceLocationInternal() {
641  return null;
642  }
643 
653  final public function getSiblingQueuesWithJobs( array $types ) {
654  return $this->doGetSiblingQueuesWithJobs( $types );
655  }
656 
662  protected function doGetSiblingQueuesWithJobs( array $types ) {
663  return null; // not supported
664  }
665 
676  final public function getSiblingQueueSizes( array $types ) {
677  return $this->doGetSiblingQueueSizes( $types );
678  }
679 
685  protected function doGetSiblingQueueSizes( array $types ) {
686  return null; // not supported
687  }
688 
692  protected function assertNotReadOnly() {
693  if ( $this->readOnlyReason !== false ) {
694  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
695  }
696  }
697 
706  protected function incrStats( $key, $type, $delta = 1 ) {
707  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
708  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
709  }
710 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:199
Job\getType
getType()
Definition: Job.php:143
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:605
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:183
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:39
EmptyBagOStuff
A BagOStuff object with no objects in it.
Definition: EmptyBagOStuff.php:29
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:325
JobQueue\incrStats
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:706
NullStatsdDataFactory
Definition: NullStatsdDataFactory.php:10
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:52
$params
$params
Definition: styleTest.css.php:44
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:662
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:58
JobQueue\doGetSize
doGetSize()
$res
$res
Definition: database.txt:21
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:43
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:446
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:310
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:286
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:692
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:30
IJobSpecification\getType
getType()
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:241
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:548
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:685
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:175
JobQueue\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:400
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(Job $job)
Definition: JobQueue.php:503
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:35
WikiMap\getWikiIdFromDbDomain
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:259
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:584
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:48
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:273
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:263
JobQueue\doPop
doPop()
JobQueueError
Definition: JobQueueError.php:28
array
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
DuplicateJob\newFromJob
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:46
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:220
JobQueue\doAck
doAck(Job $job)
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\$stats
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:45
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:144
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2162
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:560
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:296
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:568
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:106
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:167
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:37
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:619
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:521
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:49
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:576
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:41
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:366
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:50
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:640
JobQueueReadOnlyError
Definition: JobQueueReadOnlyError.php:28
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:489
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:630
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:130
JobQueue\$domain
string $domain
DB domain ID.
Definition: JobQueue.php:33
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:676
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:653
IJobSpecification
Job queue task description interface.
Definition: IJobSpecification.php:29
JobQueue\getType
getType()
Definition: JobQueue.php:137
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:461
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:58
JobQueue\getDomain
getDomain()
Definition: JobQueue.php:122