MediaWiki  1.29.1
JobQueue.php
Go to the documentation of this file.
1 <?php
25 
32 abstract class JobQueue {
34  protected $wiki;
36  protected $type;
38  protected $order;
40  protected $claimTTL;
42  protected $maxTries;
44  protected $readOnlyReason;
45 
47  protected $dupCache;
49  protected $aggr;
50 
51  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
52 
53  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
54 
59  protected function __construct( array $params ) {
60  $this->wiki = $params['wiki'];
61  $this->type = $params['type'];
62  $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
63  $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
64  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
65  $this->order = $params['order'];
66  } else {
67  $this->order = $this->optimalOrder();
68  }
69  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
70  throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
71  }
72  $this->dupCache = wfGetCache( CACHE_ANYTHING );
73  $this->aggr = isset( $params['aggregator'] )
74  ? $params['aggregator']
75  : new JobQueueAggregatorNull( [] );
76  $this->readOnlyReason = isset( $params['readOnlyReason'] )
77  ? $params['readOnlyReason']
78  : false;
79  }
80 
109  final public static function factory( array $params ) {
110  $class = $params['class'];
111  if ( !class_exists( $class ) ) {
112  throw new MWException( "Invalid job queue class '$class'." );
113  }
114  $obj = new $class( $params );
115  if ( !( $obj instanceof self ) ) {
116  throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
117  }
118 
119  return $obj;
120  }
121 
125  final public function getWiki() {
126  return $this->wiki;
127  }
128 
132  final public function getType() {
133  return $this->type;
134  }
135 
139  final public function getOrder() {
140  return $this->order;
141  }
142 
148  abstract protected function supportedOrders();
149 
155  abstract protected function optimalOrder();
156 
162  protected function supportsDelayedJobs() {
163  return false; // not implemented
164  }
165 
170  final public function delayedJobsEnabled() {
171  return $this->supportsDelayedJobs();
172  }
173 
178  public function getReadOnlyReason() {
179  return $this->readOnlyReason;
180  }
181 
194  final public function isEmpty() {
195  $res = $this->doIsEmpty();
196 
197  return $res;
198  }
199 
204  abstract protected function doIsEmpty();
205 
215  final public function getSize() {
216  $res = $this->doGetSize();
217 
218  return $res;
219  }
220 
225  abstract protected function doGetSize();
226 
236  final public function getAcquiredCount() {
237  $res = $this->doGetAcquiredCount();
238 
239  return $res;
240  }
241 
246  abstract protected function doGetAcquiredCount();
247 
258  final public function getDelayedCount() {
259  $res = $this->doGetDelayedCount();
260 
261  return $res;
262  }
263 
268  protected function doGetDelayedCount() {
269  return 0; // not implemented
270  }
271 
281  final public function getAbandonedCount() {
282  $res = $this->doGetAbandonedCount();
283 
284  return $res;
285  }
286 
291  protected function doGetAbandonedCount() {
292  return 0; // not implemented
293  }
294 
305  final public function push( $jobs, $flags = 0 ) {
306  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
307  $this->batchPush( $jobs, $flags );
308  }
309 
320  final public function batchPush( array $jobs, $flags = 0 ) {
321  $this->assertNotReadOnly();
322 
323  if ( !count( $jobs ) ) {
324  return; // nothing to do
325  }
326 
327  foreach ( $jobs as $job ) {
328  if ( $job->getType() !== $this->type ) {
329  throw new MWException(
330  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
331  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
332  throw new MWException(
333  "Got delayed '{$job->getType()}' job; delays are not supported." );
334  }
335  }
336 
337  $this->doBatchPush( $jobs, $flags );
338  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
339 
340  foreach ( $jobs as $job ) {
341  if ( $job->isRootJob() ) {
342  $this->deduplicateRootJob( $job );
343  }
344  }
345  }
346 
352  abstract protected function doBatchPush( array $jobs, $flags );
353 
362  final public function pop() {
363  global $wgJobClasses;
364 
365  $this->assertNotReadOnly();
366  if ( $this->wiki !== wfWikiID() ) {
367  throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
368  } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
369  // Do not pop jobs if there is no class for the queue type
370  throw new MWException( "Unrecognized job type '{$this->type}'." );
371  }
372 
373  $job = $this->doPop();
374 
375  if ( !$job ) {
376  $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
377  }
378 
379  // Flag this job as an old duplicate based on its "root" job...
380  try {
381  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
382  JobQueue::incrStats( 'dupe_pops', $this->type );
383  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
384  }
385  } catch ( Exception $e ) {
386  // don't lose jobs over this
387  }
388 
389  return $job;
390  }
391 
396  abstract protected function doPop();
397 
408  final public function ack( Job $job ) {
409  $this->assertNotReadOnly();
410  if ( $job->getType() !== $this->type ) {
411  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
412  }
413 
414  $this->doAck( $job );
415  }
416 
421  abstract protected function doAck( Job $job );
422 
454  final public function deduplicateRootJob( IJobSpecification $job ) {
455  $this->assertNotReadOnly();
456  if ( $job->getType() !== $this->type ) {
457  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
458  }
459 
460  return $this->doDeduplicateRootJob( $job );
461  }
462 
470  if ( !$job->hasRootJobParams() ) {
471  throw new MWException( "Cannot register root job; missing parameters." );
472  }
473  $params = $job->getRootJobParams();
474 
475  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
476  // Callers should call batchInsert() and then this function so that if the insert
477  // fails, the de-duplication registration will be aborted. Since the insert is
478  // deferred till "transaction idle", do the same here, so that the ordering is
479  // maintained. Having only the de-duplication registration succeed would cause
480  // jobs to become no-ops without any actual jobs that made them redundant.
481  $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
482  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
483  return true; // a newer version of this root job was enqueued
484  }
485 
486  // Update the timestamp of the last root job started at the location...
487  return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
488  }
489 
497  final protected function isRootJobOldDuplicate( Job $job ) {
498  if ( $job->getType() !== $this->type ) {
499  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
500  }
501  $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
502 
503  return $isDuplicate;
504  }
505 
511  protected function doIsRootJobOldDuplicate( Job $job ) {
512  if ( !$job->hasRootJobParams() ) {
513  return false; // job has no de-deplication info
514  }
515  $params = $job->getRootJobParams();
516 
517  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
518  // Get the last time this root job was enqueued
519  $timestamp = $this->dupCache->get( $key );
520 
521  // Check if a new root job was started at the location after this one's...
522  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
523  }
524 
529  protected function getRootJobCacheKey( $signature ) {
530  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
531 
532  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
533  }
534 
542  final public function delete() {
543  $this->assertNotReadOnly();
544 
545  $this->doDelete();
546  }
547 
552  protected function doDelete() {
553  throw new MWException( "This method is not implemented." );
554  }
555 
564  final public function waitForBackups() {
565  $this->doWaitForBackups();
566  }
567 
572  protected function doWaitForBackups() {
573  }
574 
580  final public function flushCaches() {
581  $this->doFlushCaches();
582  }
583 
588  protected function doFlushCaches() {
589  }
590 
599  abstract public function getAllQueuedJobs();
600 
609  public function getAllDelayedJobs() {
610  return new ArrayIterator( [] ); // not implemented
611  }
612 
623  public function getAllAcquiredJobs() {
624  return new ArrayIterator( [] ); // not implemented
625  }
626 
634  public function getAllAbandonedJobs() {
635  return new ArrayIterator( [] ); // not implemented
636  }
637 
644  public function getCoalesceLocationInternal() {
645  return null;
646  }
647 
657  final public function getSiblingQueuesWithJobs( array $types ) {
658  return $this->doGetSiblingQueuesWithJobs( $types );
659  }
660 
666  protected function doGetSiblingQueuesWithJobs( array $types ) {
667  return null; // not supported
668  }
669 
680  final public function getSiblingQueueSizes( array $types ) {
681  return $this->doGetSiblingQueueSizes( $types );
682  }
683 
689  protected function doGetSiblingQueueSizes( array $types ) {
690  return null; // not supported
691  }
692 
696  protected function assertNotReadOnly() {
697  if ( $this->readOnlyReason !== false ) {
698  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
699  }
700  }
701 
710  public static function incrStats( $key, $type, $delta = 1 ) {
711  static $stats;
712  if ( !$stats ) {
713  $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
714  }
715  $stats->updateCount( "jobqueue.{$key}.all", $delta );
716  $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
717  }
718 }
719 
724 class JobQueueError extends MWException {
725 }
726 
728 }
729 
731 
732 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:194
Job\getType
getType()
Definition: Job.php:121
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:609
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:178
JobQueue\incrStats
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:710
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:40
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:320
captcha-old.count
count
Definition: captcha-old.py:225
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:53
$params
$params
Definition: styleTest.css.php:40
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:666
BagOStuff
interface is intended to be more or less compatible with the PHP memcached client.
Definition: BagOStuff.php:47
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:3027
JobQueue\doGetSize
doGetSize()
$res
$res
Definition: database.txt:21
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:44
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:454
JobQueueAggregatorNull
Definition: JobQueueAggregator.php:156
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:305
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
JobQueueAggregator
Class to handle tracking information about all queues.
Definition: JobQueueAggregator.php:30
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:281
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:696
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
IJobSpecification\getType
getType()
MWException
MediaWiki exception.
Definition: MWException.php:26
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:236
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:552
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:689
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:170
JobQueue\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:408
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(Job $job)
Definition: JobQueue.php:511
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:36
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:588
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:47
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:268
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:258
wfGetCache
wfGetCache( $cacheType)
Get a specific cache object.
Definition: GlobalFunctions.php:3398
JobQueue\doPop
doPop()
global
when a variable name is used in a it is silently declared as a new masking the global
Definition: design.txt:93
JobQueueError
Definition: JobQueue.php:724
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix)
Make a cache key for a foreign DB.
Definition: GlobalFunctions.php:2978
DuplicateJob\newFromJob
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:46
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:215
JobQueue\doAck
doAck(Job $job)
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:139
wfWikiID
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
Definition: GlobalFunctions.php:3011
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2122
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:564
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:291
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:572
CACHE_ANYTHING
const CACHE_ANYTHING
Definition: Defines.php:99
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:109
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:162
JobQueueConnectionError
Definition: JobQueue.php:727
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:38
JobQueue\$aggr
JobQueueAggregator $aggr
Definition: JobQueue.php:49
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:623
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\$wiki
string $wiki
Wiki ID.
Definition: JobQueue.php:34
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:529
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:47
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:580
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:42
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:362
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:32
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:51
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:644
JobQueueReadOnlyError
Definition: JobQueue.php:730
MediaWikiServices
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:497
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:634
order
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:125
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:680
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:657
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2749
JobQueue\getType
getType()
Definition: JobQueue.php:132
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:469
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:59
array
the array() calling protocol came about after MediaWiki 1.4rc1.