MediaWiki  1.32.0
JobQueue.php
Go to the documentation of this file.
1 <?php
24 
31 abstract class JobQueue {
33  protected $wiki;
35  protected $type;
37  protected $order;
39  protected $claimTTL;
41  protected $maxTries;
43  protected $readOnlyReason;
44 
46  protected $dupCache;
48  protected $aggr;
49 
50  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51 
52  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53 
58  protected function __construct( array $params ) {
59  $this->wiki = $params['wiki'];
60  $this->type = $params['type'];
61  $this->claimTTL = $params['claimTTL'] ?? 0;
62  $this->maxTries = $params['maxTries'] ?? 3;
63  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64  $this->order = $params['order'];
65  } else {
66  $this->order = $this->optimalOrder();
67  }
68  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69  throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
70  }
71  $this->dupCache = wfGetCache( CACHE_ANYTHING );
72  $this->aggr = $params['aggregator'] ?? new JobQueueAggregatorNull( [] );
73  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
74  }
75 
104  final public static function factory( array $params ) {
105  $class = $params['class'];
106  if ( !class_exists( $class ) ) {
107  throw new MWException( "Invalid job queue class '$class'." );
108  }
109  $obj = new $class( $params );
110  if ( !( $obj instanceof self ) ) {
111  throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
112  }
113 
114  return $obj;
115  }
116 
120  final public function getWiki() {
121  return $this->wiki;
122  }
123 
127  final public function getType() {
128  return $this->type;
129  }
130 
134  final public function getOrder() {
135  return $this->order;
136  }
137 
143  abstract protected function supportedOrders();
144 
150  abstract protected function optimalOrder();
151 
157  protected function supportsDelayedJobs() {
158  return false; // not implemented
159  }
160 
165  final public function delayedJobsEnabled() {
166  return $this->supportsDelayedJobs();
167  }
168 
173  public function getReadOnlyReason() {
174  return $this->readOnlyReason;
175  }
176 
189  final public function isEmpty() {
190  $res = $this->doIsEmpty();
191 
192  return $res;
193  }
194 
199  abstract protected function doIsEmpty();
200 
210  final public function getSize() {
211  $res = $this->doGetSize();
212 
213  return $res;
214  }
215 
220  abstract protected function doGetSize();
221 
231  final public function getAcquiredCount() {
232  $res = $this->doGetAcquiredCount();
233 
234  return $res;
235  }
236 
241  abstract protected function doGetAcquiredCount();
242 
253  final public function getDelayedCount() {
254  $res = $this->doGetDelayedCount();
255 
256  return $res;
257  }
258 
263  protected function doGetDelayedCount() {
264  return 0; // not implemented
265  }
266 
276  final public function getAbandonedCount() {
277  $res = $this->doGetAbandonedCount();
278 
279  return $res;
280  }
281 
286  protected function doGetAbandonedCount() {
287  return 0; // not implemented
288  }
289 
300  final public function push( $jobs, $flags = 0 ) {
301  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
302  $this->batchPush( $jobs, $flags );
303  }
304 
315  final public function batchPush( array $jobs, $flags = 0 ) {
316  $this->assertNotReadOnly();
317 
318  if ( !count( $jobs ) ) {
319  return; // nothing to do
320  }
321 
322  foreach ( $jobs as $job ) {
323  if ( $job->getType() !== $this->type ) {
324  throw new MWException(
325  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
326  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
327  throw new MWException(
328  "Got delayed '{$job->getType()}' job; delays are not supported." );
329  }
330  }
331 
332  $this->doBatchPush( $jobs, $flags );
333  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
334 
335  foreach ( $jobs as $job ) {
336  if ( $job->isRootJob() ) {
337  $this->deduplicateRootJob( $job );
338  }
339  }
340  }
341 
347  abstract protected function doBatchPush( array $jobs, $flags );
348 
357  final public function pop() {
358  global $wgJobClasses;
359 
360  $this->assertNotReadOnly();
361  if ( $this->wiki !== wfWikiID() ) {
362  throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
363  } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
364  // Do not pop jobs if there is no class for the queue type
365  throw new MWException( "Unrecognized job type '{$this->type}'." );
366  }
367 
368  $job = $this->doPop();
369 
370  if ( !$job ) {
371  $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
372  }
373 
374  // Flag this job as an old duplicate based on its "root" job...
375  try {
376  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
377  self::incrStats( 'dupe_pops', $this->type );
378  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
379  }
380  } catch ( Exception $e ) {
381  // don't lose jobs over this
382  }
383 
384  return $job;
385  }
386 
391  abstract protected function doPop();
392 
403  final public function ack( Job $job ) {
404  $this->assertNotReadOnly();
405  if ( $job->getType() !== $this->type ) {
406  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
407  }
408 
409  $this->doAck( $job );
410  }
411 
416  abstract protected function doAck( Job $job );
417 
449  final public function deduplicateRootJob( IJobSpecification $job ) {
450  $this->assertNotReadOnly();
451  if ( $job->getType() !== $this->type ) {
452  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
453  }
454 
455  return $this->doDeduplicateRootJob( $job );
456  }
457 
465  if ( !$job->hasRootJobParams() ) {
466  throw new MWException( "Cannot register root job; missing parameters." );
467  }
468  $params = $job->getRootJobParams();
469 
470  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
471  // Callers should call JobQueueGroup::push() before this method so that if the insert
472  // fails, the de-duplication registration will be aborted. Since the insert is
473  // deferred till "transaction idle", do the same here, so that the ordering is
474  // maintained. Having only the de-duplication registration succeed would cause
475  // jobs to become no-ops without any actual jobs that made them redundant.
476  $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
477  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
478  return true; // a newer version of this root job was enqueued
479  }
480 
481  // Update the timestamp of the last root job started at the location...
482  return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
483  }
484 
492  final protected function isRootJobOldDuplicate( Job $job ) {
493  if ( $job->getType() !== $this->type ) {
494  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
495  }
496  $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
497 
498  return $isDuplicate;
499  }
500 
506  protected function doIsRootJobOldDuplicate( Job $job ) {
507  if ( !$job->hasRootJobParams() ) {
508  return false; // job has no de-deplication info
509  }
510  $params = $job->getRootJobParams();
511 
512  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
513  // Get the last time this root job was enqueued
514  $timestamp = $this->dupCache->get( $key );
515 
516  // Check if a new root job was started at the location after this one's...
517  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
518  }
519 
524  protected function getRootJobCacheKey( $signature ) {
525  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
526 
527  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
528  }
529 
537  final public function delete() {
538  $this->assertNotReadOnly();
539 
540  $this->doDelete();
541  }
542 
547  protected function doDelete() {
548  throw new MWException( "This method is not implemented." );
549  }
550 
559  final public function waitForBackups() {
560  $this->doWaitForBackups();
561  }
562 
567  protected function doWaitForBackups() {
568  }
569 
575  final public function flushCaches() {
576  $this->doFlushCaches();
577  }
578 
583  protected function doFlushCaches() {
584  }
585 
594  abstract public function getAllQueuedJobs();
595 
604  public function getAllDelayedJobs() {
605  return new ArrayIterator( [] ); // not implemented
606  }
607 
618  public function getAllAcquiredJobs() {
619  return new ArrayIterator( [] ); // not implemented
620  }
621 
629  public function getAllAbandonedJobs() {
630  return new ArrayIterator( [] ); // not implemented
631  }
632 
639  public function getCoalesceLocationInternal() {
640  return null;
641  }
642 
652  final public function getSiblingQueuesWithJobs( array $types ) {
653  return $this->doGetSiblingQueuesWithJobs( $types );
654  }
655 
661  protected function doGetSiblingQueuesWithJobs( array $types ) {
662  return null; // not supported
663  }
664 
675  final public function getSiblingQueueSizes( array $types ) {
676  return $this->doGetSiblingQueueSizes( $types );
677  }
678 
684  protected function doGetSiblingQueueSizes( array $types ) {
685  return null; // not supported
686  }
687 
691  protected function assertNotReadOnly() {
692  if ( $this->readOnlyReason !== false ) {
693  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
694  }
695  }
696 
705  public static function incrStats( $key, $type, $delta = 1 ) {
706  static $stats;
707  if ( !$stats ) {
708  $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
709  }
710  $stats->updateCount( "jobqueue.{$key}.all", $delta );
711  $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
712  }
713 }
714 
719 class JobQueueError extends MWException {
720 }
721 
723 }
724 
726 
727 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:189
Job\getType
getType()
Definition: Job.php:128
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:604
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:173
JobQueue\incrStats
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:705
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:39
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:315
captcha-old.count
count
Definition: captcha-old.py:249
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:52
$params
$params
Definition: styleTest.css.php:44
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:661
BagOStuff
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:58
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:2660
JobQueue\doGetSize
doGetSize()
$res
$res
Definition: database.txt:21
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:43
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:449
JobQueueAggregatorNull
Definition: JobQueueAggregator.php:164
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
$wgJobClasses
$wgJobClasses['replaceText']
Definition: ReplaceText.php:52
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:300
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
JobQueueAggregator
Class to handle tracking information about all queues.
Definition: JobQueueAggregator.php:29
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:276
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:691
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:30
IJobSpecification\getType
getType()
MWException
MediaWiki exception.
Definition: MWException.php:26
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:231
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:547
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:684
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:165
JobQueue\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:403
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(Job $job)
Definition: JobQueue.php:506
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:35
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:583
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:46
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:263
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:253
wfGetCache
wfGetCache( $cacheType)
Get a specific cache object.
Definition: GlobalFunctions.php:3019
JobQueue\doPop
doPop()
JobQueueError
Definition: JobQueue.php:719
array
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
DuplicateJob\newFromJob
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:46
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:210
JobQueue\doAck
doAck(Job $job)
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix,... $args)
Make a cache key for a foreign DB.
Definition: GlobalFunctions.php:2617
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:134
wfWikiID
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
Definition: GlobalFunctions.php:2644
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2213
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:559
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:286
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:567
CACHE_ANYTHING
const CACHE_ANYTHING
Definition: Defines.php:101
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:104
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:157
JobQueueConnectionError
Definition: JobQueue.php:722
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:37
JobQueue\$aggr
JobQueueAggregator $aggr
Definition: JobQueue.php:48
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:618
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\$wiki
string $wiki
Wiki ID.
Definition: JobQueue.php:33
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:524
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:48
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:575
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:41
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:357
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:50
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:639
JobQueueReadOnlyError
Definition: JobQueue.php:725
MediaWikiServices
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:492
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:629
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:120
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:675
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:652
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:29
JobQueue\getType
getType()
Definition: JobQueue.php:127
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:464
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:58