MediaWiki  1.30.2
JobQueue.php
Go to the documentation of this file.
1 <?php
24 
31 abstract class JobQueue {
33  protected $wiki;
35  protected $type;
37  protected $order;
39  protected $claimTTL;
41  protected $maxTries;
43  protected $readOnlyReason;
44 
46  protected $dupCache;
48  protected $aggr;
49 
50  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51 
52  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53 
58  protected function __construct( array $params ) {
59  $this->wiki = $params['wiki'];
60  $this->type = $params['type'];
61  $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
62  $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
63  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64  $this->order = $params['order'];
65  } else {
66  $this->order = $this->optimalOrder();
67  }
68  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69  throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
70  }
71  $this->dupCache = wfGetCache( CACHE_ANYTHING );
72  $this->aggr = isset( $params['aggregator'] )
73  ? $params['aggregator']
74  : new JobQueueAggregatorNull( [] );
75  $this->readOnlyReason = isset( $params['readOnlyReason'] )
76  ? $params['readOnlyReason']
77  : false;
78  }
79 
108  final public static function factory( array $params ) {
109  $class = $params['class'];
110  if ( !class_exists( $class ) ) {
111  throw new MWException( "Invalid job queue class '$class'." );
112  }
113  $obj = new $class( $params );
114  if ( !( $obj instanceof self ) ) {
115  throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
116  }
117 
118  return $obj;
119  }
120 
124  final public function getWiki() {
125  return $this->wiki;
126  }
127 
131  final public function getType() {
132  return $this->type;
133  }
134 
138  final public function getOrder() {
139  return $this->order;
140  }
141 
147  abstract protected function supportedOrders();
148 
154  abstract protected function optimalOrder();
155 
161  protected function supportsDelayedJobs() {
162  return false; // not implemented
163  }
164 
169  final public function delayedJobsEnabled() {
170  return $this->supportsDelayedJobs();
171  }
172 
177  public function getReadOnlyReason() {
178  return $this->readOnlyReason;
179  }
180 
193  final public function isEmpty() {
194  $res = $this->doIsEmpty();
195 
196  return $res;
197  }
198 
203  abstract protected function doIsEmpty();
204 
214  final public function getSize() {
215  $res = $this->doGetSize();
216 
217  return $res;
218  }
219 
224  abstract protected function doGetSize();
225 
235  final public function getAcquiredCount() {
236  $res = $this->doGetAcquiredCount();
237 
238  return $res;
239  }
240 
245  abstract protected function doGetAcquiredCount();
246 
257  final public function getDelayedCount() {
258  $res = $this->doGetDelayedCount();
259 
260  return $res;
261  }
262 
267  protected function doGetDelayedCount() {
268  return 0; // not implemented
269  }
270 
280  final public function getAbandonedCount() {
281  $res = $this->doGetAbandonedCount();
282 
283  return $res;
284  }
285 
290  protected function doGetAbandonedCount() {
291  return 0; // not implemented
292  }
293 
304  final public function push( $jobs, $flags = 0 ) {
305  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
306  $this->batchPush( $jobs, $flags );
307  }
308 
319  final public function batchPush( array $jobs, $flags = 0 ) {
320  $this->assertNotReadOnly();
321 
322  if ( !count( $jobs ) ) {
323  return; // nothing to do
324  }
325 
326  foreach ( $jobs as $job ) {
327  if ( $job->getType() !== $this->type ) {
328  throw new MWException(
329  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
330  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
331  throw new MWException(
332  "Got delayed '{$job->getType()}' job; delays are not supported." );
333  }
334  }
335 
336  $this->doBatchPush( $jobs, $flags );
337  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
338 
339  foreach ( $jobs as $job ) {
340  if ( $job->isRootJob() ) {
341  $this->deduplicateRootJob( $job );
342  }
343  }
344  }
345 
351  abstract protected function doBatchPush( array $jobs, $flags );
352 
361  final public function pop() {
363 
364  $this->assertNotReadOnly();
365  if ( $this->wiki !== wfWikiID() ) {
366  throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
367  } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
368  // Do not pop jobs if there is no class for the queue type
369  throw new MWException( "Unrecognized job type '{$this->type}'." );
370  }
371 
372  $job = $this->doPop();
373 
374  if ( !$job ) {
375  $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
376  }
377 
378  // Flag this job as an old duplicate based on its "root" job...
379  try {
380  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
381  self::incrStats( 'dupe_pops', $this->type );
382  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
383  }
384  } catch ( Exception $e ) {
385  // don't lose jobs over this
386  }
387 
388  return $job;
389  }
390 
395  abstract protected function doPop();
396 
407  final public function ack( Job $job ) {
408  $this->assertNotReadOnly();
409  if ( $job->getType() !== $this->type ) {
410  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
411  }
412 
413  $this->doAck( $job );
414  }
415 
420  abstract protected function doAck( Job $job );
421 
453  final public function deduplicateRootJob( IJobSpecification $job ) {
454  $this->assertNotReadOnly();
455  if ( $job->getType() !== $this->type ) {
456  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
457  }
458 
459  return $this->doDeduplicateRootJob( $job );
460  }
461 
469  if ( !$job->hasRootJobParams() ) {
470  throw new MWException( "Cannot register root job; missing parameters." );
471  }
472  $params = $job->getRootJobParams();
473 
474  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
475  // Callers should call batchInsert() and then this function so that if the insert
476  // fails, the de-duplication registration will be aborted. Since the insert is
477  // deferred till "transaction idle", do the same here, so that the ordering is
478  // maintained. Having only the de-duplication registration succeed would cause
479  // jobs to become no-ops without any actual jobs that made them redundant.
480  $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
481  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
482  return true; // a newer version of this root job was enqueued
483  }
484 
485  // Update the timestamp of the last root job started at the location...
486  return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
487  }
488 
496  final protected function isRootJobOldDuplicate( Job $job ) {
497  if ( $job->getType() !== $this->type ) {
498  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
499  }
500  $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
501 
502  return $isDuplicate;
503  }
504 
510  protected function doIsRootJobOldDuplicate( Job $job ) {
511  if ( !$job->hasRootJobParams() ) {
512  return false; // job has no de-deplication info
513  }
514  $params = $job->getRootJobParams();
515 
516  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
517  // Get the last time this root job was enqueued
518  $timestamp = $this->dupCache->get( $key );
519 
520  // Check if a new root job was started at the location after this one's...
521  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
522  }
523 
528  protected function getRootJobCacheKey( $signature ) {
529  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
530 
531  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
532  }
533 
541  final public function delete() {
542  $this->assertNotReadOnly();
543 
544  $this->doDelete();
545  }
546 
551  protected function doDelete() {
552  throw new MWException( "This method is not implemented." );
553  }
554 
563  final public function waitForBackups() {
564  $this->doWaitForBackups();
565  }
566 
571  protected function doWaitForBackups() {
572  }
573 
579  final public function flushCaches() {
580  $this->doFlushCaches();
581  }
582 
587  protected function doFlushCaches() {
588  }
589 
598  abstract public function getAllQueuedJobs();
599 
608  public function getAllDelayedJobs() {
609  return new ArrayIterator( [] ); // not implemented
610  }
611 
622  public function getAllAcquiredJobs() {
623  return new ArrayIterator( [] ); // not implemented
624  }
625 
633  public function getAllAbandonedJobs() {
634  return new ArrayIterator( [] ); // not implemented
635  }
636 
643  public function getCoalesceLocationInternal() {
644  return null;
645  }
646 
656  final public function getSiblingQueuesWithJobs( array $types ) {
657  return $this->doGetSiblingQueuesWithJobs( $types );
658  }
659 
665  protected function doGetSiblingQueuesWithJobs( array $types ) {
666  return null; // not supported
667  }
668 
679  final public function getSiblingQueueSizes( array $types ) {
680  return $this->doGetSiblingQueueSizes( $types );
681  }
682 
688  protected function doGetSiblingQueueSizes( array $types ) {
689  return null; // not supported
690  }
691 
695  protected function assertNotReadOnly() {
696  if ( $this->readOnlyReason !== false ) {
697  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
698  }
699  }
700 
709  public static function incrStats( $key, $type, $delta = 1 ) {
710  static $stats;
711  if ( !$stats ) {
712  $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
713  }
714  $stats->updateCount( "jobqueue.{$key}.all", $delta );
715  $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
716  }
717 }
718 
723 class JobQueueError extends MWException {
724 }
725 
727 }
728 
730 
731 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:193
Job\getType
getType()
Definition: Job.php:131
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:608
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
JobQueue\getReadOnlyReason
getReadOnlyReason()
Definition: JobQueue.php:177
JobQueue\incrStats
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:709
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:39
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:319
captcha-old.count
count
Definition: captcha-old.py:249
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:52
$params
$params
Definition: styleTest.css.php:40
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:665
BagOStuff
interface is intended to be more or less compatible with the PHP memcached client.
Definition: BagOStuff.php:47
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:2823
JobQueue\doGetSize
doGetSize()
$res
$res
Definition: database.txt:21
JobQueue\$readOnlyReason
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:43
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\deduplicateRootJob
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:453
JobQueueAggregatorNull
Definition: JobQueueAggregator.php:161
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:304
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
JobQueueAggregator
Class to handle tracking information about all queues.
Definition: JobQueueAggregator.php:29
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:280
JobQueue\assertNotReadOnly
assertNotReadOnly()
Definition: JobQueue.php:695
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
IJobSpecification\getType
getType()
MWException
MediaWiki exception.
Definition: MWException.php:26
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:235
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:551
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:688
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:169
JobQueue\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:407
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(Job $job)
Definition: JobQueue.php:510
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:35
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:587
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:46
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:267
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:257
wfGetCache
wfGetCache( $cacheType)
Get a specific cache object.
Definition: GlobalFunctions.php:3195
JobQueue\doPop
doPop()
global
when a variable name is used in a it is silently declared as a new masking the global
Definition: design.txt:93
JobQueueError
Definition: JobQueue.php:723
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix)
Make a cache key for a foreign DB.
Definition: GlobalFunctions.php:2773
DuplicateJob\newFromJob
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:46
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:214
JobQueue\doAck
doAck(Job $job)
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:138
wfWikiID
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
Definition: GlobalFunctions.php:2807
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2141
JobQueue\waitForBackups
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:563
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:290
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:571
CACHE_ANYTHING
const CACHE_ANYTHING
Definition: Defines.php:102
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:161
JobQueueConnectionError
Definition: JobQueue.php:726
JobQueue\$order
string $order
Job priority for pop()
Definition: JobQueue.php:37
JobQueue\$aggr
JobQueueAggregator $aggr
Definition: JobQueue.php:48
$wgJobClasses
$wgJobClasses
Maps jobs to their handlers; extensions can add to this to provide custom jobs.
Definition: DefaultSettings.php:7401
JobQueue\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:622
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\$wiki
string $wiki
Wiki ID.
Definition: JobQueue.php:33
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:528
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:47
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:579
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:41
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:361
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:50
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:643
JobQueueReadOnlyError
Definition: JobQueue.php:729
MediaWikiServices
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:496
JobQueue\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:633
order
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:124
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:679
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:656
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2801
JobQueue\getType
getType()
Definition: JobQueue.php:131
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:468
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:58
array
the array() calling protocol came about after MediaWiki 1.4rc1.