MediaWiki  1.23.5
JobQueue.php
Go to the documentation of this file.
1 <?php
31 abstract class JobQueue {
33  protected $wiki;
34 
36  protected $type;
37 
39  protected $order;
40 
42  protected $claimTTL;
43 
45  protected $maxTries;
46 
48  protected $checkDelay;
49 
51  protected $dupCache;
52 
53  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
54 
55  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
56 
61  protected function __construct( array $params ) {
62  $this->wiki = $params['wiki'];
63  $this->type = $params['type'];
64  $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
65  $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
66  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
67  $this->order = $params['order'];
68  } else {
69  $this->order = $this->optimalOrder();
70  }
71  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
72  throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
73  }
74  $this->checkDelay = !empty( $params['checkDelay'] );
75  if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
76  throw new MWException( __CLASS__ . " does not support delayed jobs." );
77  }
78  $this->dupCache = wfGetCache( CACHE_ANYTHING );
79  }
80 
112  final public static function factory( array $params ) {
113  $class = $params['class'];
114  if ( !class_exists( $class ) ) {
115  throw new MWException( "Invalid job queue class '$class'." );
116  }
117  $obj = new $class( $params );
118  if ( !( $obj instanceof self ) ) {
119  throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
120  }
121 
122  return $obj;
123  }
124 
128  final public function getWiki() {
129  return $this->wiki;
130  }
131 
135  final public function getType() {
136  return $this->type;
137  }
138 
142  final public function getOrder() {
143  return $this->order;
144  }
145 
150  final public function delayedJobsEnabled() {
151  return $this->checkDelay;
152  }
153 
159  abstract protected function supportedOrders();
160 
166  abstract protected function optimalOrder();
167 
173  protected function supportsDelayedJobs() {
174  return false; // not implemented
175  }
176 
189  final public function isEmpty() {
190  wfProfileIn( __METHOD__ );
191  $res = $this->doIsEmpty();
192  wfProfileOut( __METHOD__ );
193 
194  return $res;
195  }
196 
201  abstract protected function doIsEmpty();
202 
212  final public function getSize() {
213  wfProfileIn( __METHOD__ );
214  $res = $this->doGetSize();
215  wfProfileOut( __METHOD__ );
216 
217  return $res;
218  }
219 
224  abstract protected function doGetSize();
225 
235  final public function getAcquiredCount() {
236  wfProfileIn( __METHOD__ );
237  $res = $this->doGetAcquiredCount();
238  wfProfileOut( __METHOD__ );
239 
240  return $res;
241  }
242 
247  abstract protected function doGetAcquiredCount();
248 
259  final public function getDelayedCount() {
260  wfProfileIn( __METHOD__ );
261  $res = $this->doGetDelayedCount();
262  wfProfileOut( __METHOD__ );
263 
264  return $res;
265  }
266 
271  protected function doGetDelayedCount() {
272  return 0; // not implemented
273  }
274 
284  final public function getAbandonedCount() {
285  wfProfileIn( __METHOD__ );
286  $res = $this->doGetAbandonedCount();
287  wfProfileOut( __METHOD__ );
288 
289  return $res;
290  }
291 
296  protected function doGetAbandonedCount() {
297  return 0; // not implemented
298  }
299 
310  final public function push( $jobs, $flags = 0 ) {
311  return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
312  }
313 
324  final public function batchPush( array $jobs, $flags = 0 ) {
325  if ( !count( $jobs ) ) {
326  return true; // nothing to do
327  }
328 
329  foreach ( $jobs as $job ) {
330  if ( $job->getType() !== $this->type ) {
331  throw new MWException(
332  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
333  } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
334  throw new MWException(
335  "Got delayed '{$job->getType()}' job; delays are not supported." );
336  }
337  }
338 
339  wfProfileIn( __METHOD__ );
340  $ok = $this->doBatchPush( $jobs, $flags );
341  wfProfileOut( __METHOD__ );
342 
343  return $ok;
344  }
345 
352  abstract protected function doBatchPush( array $jobs, $flags );
353 
362  final public function pop() {
363  global $wgJobClasses;
364 
365  if ( $this->wiki !== wfWikiID() ) {
366  throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
367  } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
368  // Do not pop jobs if there is no class for the queue type
369  throw new MWException( "Unrecognized job type '{$this->type}'." );
370  }
371 
372  wfProfileIn( __METHOD__ );
373  $job = $this->doPop();
374  wfProfileOut( __METHOD__ );
375 
376  // Flag this job as an old duplicate based on its "root" job...
377  try {
378  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
379  JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
380  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
381  }
382  } catch ( MWException $e ) {
383  // don't lose jobs over this
384  }
385 
386  return $job;
387  }
388 
393  abstract protected function doPop();
394 
405  final public function ack( Job $job ) {
406  if ( $job->getType() !== $this->type ) {
407  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
408  }
409  wfProfileIn( __METHOD__ );
410  $ok = $this->doAck( $job );
411  wfProfileOut( __METHOD__ );
412 
413  return $ok;
414  }
415 
421  abstract protected function doAck( Job $job );
422 
454  final public function deduplicateRootJob( Job $job ) {
455  if ( $job->getType() !== $this->type ) {
456  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
457  }
458  wfProfileIn( __METHOD__ );
459  $ok = $this->doDeduplicateRootJob( $job );
460  wfProfileOut( __METHOD__ );
461 
462  return $ok;
463  }
464 
471  protected function doDeduplicateRootJob( Job $job ) {
472  if ( !$job->hasRootJobParams() ) {
473  throw new MWException( "Cannot register root job; missing parameters." );
474  }
475  $params = $job->getRootJobParams();
476 
477  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
478  // Callers should call batchInsert() and then this function so that if the insert
479  // fails, the de-duplication registration will be aborted. Since the insert is
480  // deferred till "transaction idle", do the same here, so that the ordering is
481  // maintained. Having only the de-duplication registration succeed would cause
482  // jobs to become no-ops without any actual jobs that made them redundant.
483  $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
484  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
485  return true; // a newer version of this root job was enqueued
486  }
487 
488  // Update the timestamp of the last root job started at the location...
489  return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
490  }
491 
499  final protected function isRootJobOldDuplicate( Job $job ) {
500  if ( $job->getType() !== $this->type ) {
501  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
502  }
503  wfProfileIn( __METHOD__ );
504  $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
505  wfProfileOut( __METHOD__ );
506 
507  return $isDuplicate;
508  }
509 
515  protected function doIsRootJobOldDuplicate( Job $job ) {
516  if ( !$job->hasRootJobParams() ) {
517  return false; // job has no de-deplication info
518  }
519  $params = $job->getRootJobParams();
520 
521  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
522  // Get the last time this root job was enqueued
523  $timestamp = $this->dupCache->get( $key );
524 
525  // Check if a new root job was started at the location after this one's...
526  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
527  }
528 
533  protected function getRootJobCacheKey( $signature ) {
534  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
535 
536  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
537  }
538 
546  final public function delete() {
547  wfProfileIn( __METHOD__ );
548  $res = $this->doDelete();
549  wfProfileOut( __METHOD__ );
550 
551  return $res;
552  }
553 
559  protected function doDelete() {
560  throw new MWException( "This method is not implemented." );
561  }
562 
571  final public function waitForBackups() {
572  wfProfileIn( __METHOD__ );
573  $this->doWaitForBackups();
574  wfProfileOut( __METHOD__ );
575  }
576 
581  protected function doWaitForBackups() {
582  }
583 
596  final public function getPeriodicTasks() {
597  $tasks = $this->doGetPeriodicTasks();
598  foreach ( $tasks as $name => &$def ) {
599  $def['name'] = $name;
600  }
601 
602  return $tasks;
603  }
604 
609  protected function doGetPeriodicTasks() {
610  return array();
611  }
612 
618  final public function flushCaches() {
619  wfProfileIn( __METHOD__ );
620  $this->doFlushCaches();
621  wfProfileOut( __METHOD__ );
622  }
623 
628  protected function doFlushCaches() {
629  }
630 
639  abstract public function getAllQueuedJobs();
640 
649  public function getAllDelayedJobs() {
650  return new ArrayIterator( array() ); // not implemented
651  }
652 
659  public function getCoalesceLocationInternal() {
660  return null;
661  }
662 
672  final public function getSiblingQueuesWithJobs( array $types ) {
673  $section = new ProfileSection( __METHOD__ );
674 
675  return $this->doGetSiblingQueuesWithJobs( $types );
676  }
677 
683  protected function doGetSiblingQueuesWithJobs( array $types ) {
684  return null; // not supported
685  }
686 
697  final public function getSiblingQueueSizes( array $types ) {
698  $section = new ProfileSection( __METHOD__ );
699 
700  return $this->doGetSiblingQueueSizes( $types );
701  }
702 
708  protected function doGetSiblingQueueSizes( array $types ) {
709  return null; // not supported
710  }
711 
721  public static function incrStats( $key, $type, $delta = 1, $wiki = null ) {
722  wfIncrStats( $key, $delta );
723  wfIncrStats( "{$key}-{$type}", $delta );
724  if ( $wiki !== null ) {
725  wfIncrStats( "{$key}-{$type}-{$wiki}", $delta );
726  }
727  }
728 
736  public function setTestingPrefix( $key ) {
737  throw new MWException( "Queue namespacing not supported for this queue type." );
738  }
739 }
740 
745 class JobQueueError extends MWException {
746 }
747 
749 }
JobQueue\isEmpty
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:182
JobQueue\setTestingPrefix
setTestingPrefix( $key)
Namespace the queue with a key to isolate it for testing.
Definition: JobQueue.php:729
JobQueue\doGetPeriodicTasks
doGetPeriodicTasks()
Definition: JobQueue.php:602
JobQueue\deduplicateRootJob
deduplicateRootJob(Job $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:447
JobQueue\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
JobQueue\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:642
php
skin txt MediaWiki includes four core it has been set as the default in MediaWiki since the replacing Monobook it had been been the default skin since before being replaced by Vector largely rewritten in while keeping its appearance Several legacy skins were removed in the as the burden of supporting them became too heavy to bear Those in etc for skin dependent CSS etc for skin dependent JavaScript These can also be customised on a per user by etc This feature has led to a wide variety of user styles becoming that gallery is a good place to ending in php
Definition: skin.txt:62
JobQueue\incrStats
static incrStats( $key, $type, $delta=1, $wiki=null)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:714
JobQueue\$claimTTL
int $claimTTL
Time to live in seconds *.
Definition: JobQueue.php:38
JobQueue\batchPush
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:317
$timestamp
if( $limit) $timestamp
Definition: importImages.php:104
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
wfProfileIn
wfProfileIn( $functionname)
Begin profiling of a function.
Definition: Profiler.php:33
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:48
$params
$params
Definition: styleTest.css.php:40
JobQueue\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:676
wfGetCache
wfGetCache( $inputType)
Get a cache object.
Definition: GlobalFunctions.php:3957
BagOStuff
interface is intended to be more or less compatible with the PHP memcached client.
Definition: BagOStuff.php:43
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:3629
JobQueue\doGetSize
doGetSize()
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2113
JobQueue\doBatchPush
doBatchPush(array $jobs, $flags)
JobQueue\$checkDelay
bool $checkDelay
Allow delayed jobs *.
Definition: JobQueue.php:42
JobQueue\doGetAcquiredCount
doGetAcquiredCount()
JobQueue\push
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:303
JobQueue\getAbandonedCount
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:277
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
JobQueue\doDeduplicateRootJob
doDeduplicateRootJob(Job $job)
Definition: JobQueue.php:464
ProfileSection
Class for handling function-scope profiling.
Definition: Profiler.php:60
MWException
MediaWiki exception.
Definition: MWException.php:26
JobQueue\getAcquiredCount
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:228
JobQueue\doDelete
doDelete()
Definition: JobQueue.php:552
JobQueue\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:701
JobQueue\delayedJobsEnabled
delayedJobsEnabled()
Definition: JobQueue.php:143
wfIncrStats
wfIncrStats( $key, $count=1)
Increment a statistics counter.
Definition: GlobalFunctions.php:1304
JobQueue\ack
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:398
JobQueue\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(Job $job)
Definition: JobQueue.php:508
JobQueue\$type
string $type
Job type *.
Definition: JobQueue.php:34
JobQueue\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue\doFlushCaches
doFlushCaches()
Definition: JobQueue.php:621
wfProfileOut
wfProfileOut( $functionname='missing')
Stop profiling of a function.
Definition: Profiler.php:46
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:44
JobQueue\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueue.php:264
JobQueue\getDelayedCount
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:252
array
the array() calling protocol came about after MediaWiki 1.4rc1.
List of Api Query prop modules.
JobQueue\doPop
doPop()
global
when a variable name is used in a it is silently declared as a new masking the global
Definition: design.txt:93
JobQueueError
Definition: JobQueue.php:738
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix)
Get a cache key for a foreign DB.
Definition: GlobalFunctions.php:3597
DuplicateJob\newFromJob
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
Definition: DuplicateJob.php:46
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
JobQueue\getSize
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:205
JobQueue\doAck
doAck(Job $job)
JobQueue\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
$ok
$ok
Definition: UtfNormalTest.php:71
$section
$section
Definition: Utf8Test.php:88
JobQueue\getOrder
getOrder()
Definition: JobQueue.php:135
wfWikiID
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
Definition: GlobalFunctions.php:3613
$name
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:336
JobQueue\waitForBackups
waitForBackups()
Wait for any slaves or backup servers to catch up.
Definition: JobQueue.php:564
JobQueue\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueue.php:289
JobQueue\doWaitForBackups
doWaitForBackups()
Definition: JobQueue.php:574
CACHE_ANYTHING
const CACHE_ANYTHING
Definition: Defines.php:111
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:105
JobQueue\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:166
JobQueueConnectionError
Definition: JobQueue.php:741
JobQueue\$order
string $order
Job priority for pop() *.
Definition: JobQueue.php:36
JobQueue\doIsEmpty
doIsEmpty()
JobQueue\$wiki
string $wiki
Wiki ID *.
Definition: JobQueue.php:32
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:526
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:42
JobQueue\flushCaches
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:611
JobQueue\$maxTries
int $maxTries
Maximum number of times to try a job *.
Definition: JobQueue.php:40
JobQueue\pop
pop()
Pop a job off of the queue.
Definition: JobQueue.php:355
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
JobQueue\QOS_ATOMIC
const QOS_ATOMIC
Definition: JobQueue.php:46
JobQueue\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:652
JobQueue\isRootJobOldDuplicate
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:492
$e
if( $useReadline) $e
Definition: eval.php:66
order
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
JobQueue\getWiki
getWiki()
Definition: JobQueue.php:121
$res
$res
Definition: database.txt:21
JobQueue\getSiblingQueueSizes
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:690
JobQueue\getSiblingQueuesWithJobs
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:665
JobQueue\getType
getType()
Definition: JobQueue.php:128
JobQueue\__construct
__construct(array $params)
Definition: JobQueue.php:54
JobQueue\getPeriodicTasks
getPeriodicTasks()
Return a map of task names to task definition maps.
Definition: JobQueue.php:589