MediaWiki REL1_29
JobQueue.php
Go to the documentation of this file.
1<?php
25
32abstract class JobQueue {
34 protected $wiki;
36 protected $type;
38 protected $order;
40 protected $claimTTL;
42 protected $maxTries;
44 protected $readOnlyReason;
45
47 protected $dupCache;
49 protected $aggr;
50
51 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
52
53 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
54
59 protected function __construct( array $params ) {
60 $this->wiki = $params['wiki'];
61 $this->type = $params['type'];
62 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
63 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
64 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
65 $this->order = $params['order'];
66 } else {
67 $this->order = $this->optimalOrder();
68 }
69 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
70 throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
71 }
72 $this->dupCache = wfGetCache( CACHE_ANYTHING );
73 $this->aggr = isset( $params['aggregator'] )
74 ? $params['aggregator']
75 : new JobQueueAggregatorNull( [] );
76 $this->readOnlyReason = isset( $params['readOnlyReason'] )
77 ? $params['readOnlyReason']
78 : false;
79 }
80
109 final public static function factory( array $params ) {
110 $class = $params['class'];
111 if ( !class_exists( $class ) ) {
112 throw new MWException( "Invalid job queue class '$class'." );
113 }
114 $obj = new $class( $params );
115 if ( !( $obj instanceof self ) ) {
116 throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
117 }
118
119 return $obj;
120 }
121
125 final public function getWiki() {
126 return $this->wiki;
127 }
128
132 final public function getType() {
133 return $this->type;
134 }
135
139 final public function getOrder() {
140 return $this->order;
141 }
142
148 abstract protected function supportedOrders();
149
155 abstract protected function optimalOrder();
156
162 protected function supportsDelayedJobs() {
163 return false; // not implemented
164 }
165
170 final public function delayedJobsEnabled() {
171 return $this->supportsDelayedJobs();
172 }
173
178 public function getReadOnlyReason() {
180 }
181
194 final public function isEmpty() {
195 $res = $this->doIsEmpty();
196
197 return $res;
198 }
199
204 abstract protected function doIsEmpty();
205
215 final public function getSize() {
216 $res = $this->doGetSize();
217
218 return $res;
219 }
220
225 abstract protected function doGetSize();
226
236 final public function getAcquiredCount() {
237 $res = $this->doGetAcquiredCount();
238
239 return $res;
240 }
241
246 abstract protected function doGetAcquiredCount();
247
258 final public function getDelayedCount() {
259 $res = $this->doGetDelayedCount();
260
261 return $res;
262 }
263
268 protected function doGetDelayedCount() {
269 return 0; // not implemented
270 }
271
281 final public function getAbandonedCount() {
282 $res = $this->doGetAbandonedCount();
283
284 return $res;
285 }
286
291 protected function doGetAbandonedCount() {
292 return 0; // not implemented
293 }
294
305 final public function push( $jobs, $flags = 0 ) {
306 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
307 $this->batchPush( $jobs, $flags );
308 }
309
320 final public function batchPush( array $jobs, $flags = 0 ) {
321 $this->assertNotReadOnly();
322
323 if ( !count( $jobs ) ) {
324 return; // nothing to do
325 }
326
327 foreach ( $jobs as $job ) {
328 if ( $job->getType() !== $this->type ) {
329 throw new MWException(
330 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
331 } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
332 throw new MWException(
333 "Got delayed '{$job->getType()}' job; delays are not supported." );
334 }
335 }
336
337 $this->doBatchPush( $jobs, $flags );
338 $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
339
340 foreach ( $jobs as $job ) {
341 if ( $job->isRootJob() ) {
342 $this->deduplicateRootJob( $job );
343 }
344 }
345 }
346
352 abstract protected function doBatchPush( array $jobs, $flags );
353
362 final public function pop() {
364
365 $this->assertNotReadOnly();
366 if ( $this->wiki !== wfWikiID() ) {
367 throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
368 } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
369 // Do not pop jobs if there is no class for the queue type
370 throw new MWException( "Unrecognized job type '{$this->type}'." );
371 }
372
373 $job = $this->doPop();
374
375 if ( !$job ) {
376 $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
377 }
378
379 // Flag this job as an old duplicate based on its "root" job...
380 try {
381 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
382 JobQueue::incrStats( 'dupe_pops', $this->type );
383 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
384 }
385 } catch ( Exception $e ) {
386 // don't lose jobs over this
387 }
388
389 return $job;
390 }
391
396 abstract protected function doPop();
397
408 final public function ack( Job $job ) {
409 $this->assertNotReadOnly();
410 if ( $job->getType() !== $this->type ) {
411 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
412 }
413
414 $this->doAck( $job );
415 }
416
421 abstract protected function doAck( Job $job );
422
454 final public function deduplicateRootJob( IJobSpecification $job ) {
455 $this->assertNotReadOnly();
456 if ( $job->getType() !== $this->type ) {
457 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
458 }
459
460 return $this->doDeduplicateRootJob( $job );
461 }
462
470 if ( !$job->hasRootJobParams() ) {
471 throw new MWException( "Cannot register root job; missing parameters." );
472 }
473 $params = $job->getRootJobParams();
474
475 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
476 // Callers should call batchInsert() and then this function so that if the insert
477 // fails, the de-duplication registration will be aborted. Since the insert is
478 // deferred till "transaction idle", do the same here, so that the ordering is
479 // maintained. Having only the de-duplication registration succeed would cause
480 // jobs to become no-ops without any actual jobs that made them redundant.
481 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
482 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
483 return true; // a newer version of this root job was enqueued
484 }
485
486 // Update the timestamp of the last root job started at the location...
487 return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
488 }
489
497 final protected function isRootJobOldDuplicate( Job $job ) {
498 if ( $job->getType() !== $this->type ) {
499 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
500 }
501 $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
502
503 return $isDuplicate;
504 }
505
511 protected function doIsRootJobOldDuplicate( Job $job ) {
512 if ( !$job->hasRootJobParams() ) {
513 return false; // job has no de-deplication info
514 }
515 $params = $job->getRootJobParams();
516
517 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
518 // Get the last time this root job was enqueued
519 $timestamp = $this->dupCache->get( $key );
520
521 // Check if a new root job was started at the location after this one's...
522 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
523 }
524
529 protected function getRootJobCacheKey( $signature ) {
530 list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
531
532 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
533 }
534
542 final public function delete() {
543 $this->assertNotReadOnly();
544
545 $this->doDelete();
546 }
547
552 protected function doDelete() {
553 throw new MWException( "This method is not implemented." );
554 }
555
564 final public function waitForBackups() {
565 $this->doWaitForBackups();
566 }
567
572 protected function doWaitForBackups() {
573 }
574
580 final public function flushCaches() {
581 $this->doFlushCaches();
582 }
583
588 protected function doFlushCaches() {
589 }
590
599 abstract public function getAllQueuedJobs();
600
609 public function getAllDelayedJobs() {
610 return new ArrayIterator( [] ); // not implemented
611 }
612
623 public function getAllAcquiredJobs() {
624 return new ArrayIterator( [] ); // not implemented
625 }
626
634 public function getAllAbandonedJobs() {
635 return new ArrayIterator( [] ); // not implemented
636 }
637
644 public function getCoalesceLocationInternal() {
645 return null;
646 }
647
657 final public function getSiblingQueuesWithJobs( array $types ) {
658 return $this->doGetSiblingQueuesWithJobs( $types );
659 }
660
666 protected function doGetSiblingQueuesWithJobs( array $types ) {
667 return null; // not supported
668 }
669
680 final public function getSiblingQueueSizes( array $types ) {
681 return $this->doGetSiblingQueueSizes( $types );
682 }
683
689 protected function doGetSiblingQueueSizes( array $types ) {
690 return null; // not supported
691 }
692
696 protected function assertNotReadOnly() {
697 if ( $this->readOnlyReason !== false ) {
698 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
699 }
700 }
701
710 public static function incrStats( $key, $type, $delta = 1 ) {
711 static $stats;
712 if ( !$stats ) {
713 $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
714 }
715 $stats->updateCount( "jobqueue.{$key}.all", $delta );
716 $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
717 }
718}
719
725}
726
729
731
732}
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
$wgJobClasses
Maps jobs to their handling classes; extensions can add to this to provide custom jobs.
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
wfForeignMemcKey( $db, $prefix)
Make a cache key for a foreign DB.
wfGetCache( $cacheType)
Get a specific cache object.
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
interface is intended to be more or less compatible with the PHP memcached client.
Definition BagOStuff.php:47
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
Class to handle tracking information about all queues.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:32
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition JobQueue.php:194
string $order
Job priority for pop()
Definition JobQueue.php:38
__construct(array $params)
Definition JobQueue.php:59
BagOStuff $dupCache
Definition JobQueue.php:47
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition JobQueue.php:281
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition JobQueue.php:564
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition JobQueue.php:497
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition JobQueue.php:710
const ROOTJOB_TTL
Definition JobQueue.php:53
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition JobQueue.php:305
doGetAbandonedCount()
Definition JobQueue.php:291
pop()
Pop a job off of the queue.
Definition JobQueue.php:362
int $claimTTL
Time to live in seconds.
Definition JobQueue.php:40
doDeduplicateRootJob(IJobSpecification $job)
Definition JobQueue.php:469
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition JobQueue.php:644
doAck(Job $job)
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition JobQueue.php:320
doGetSiblingQueueSizes(array $types)
Definition JobQueue.php:689
string $type
Job type.
Definition JobQueue.php:36
doFlushCaches()
Definition JobQueue.php:588
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition JobQueue.php:162
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition JobQueue.php:680
int $maxTries
Maximum number of times to try a job.
Definition JobQueue.php:42
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:109
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition JobQueue.php:634
string $wiki
Wiki ID.
Definition JobQueue.php:34
getReadOnlyReason()
Definition JobQueue.php:178
doBatchPush(array $jobs, $flags)
ack(Job $job)
Acknowledge that a job was completed.
Definition JobQueue.php:408
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition JobQueue.php:623
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition JobQueue.php:44
doGetSiblingQueuesWithJobs(array $types)
Definition JobQueue.php:666
doGetAcquiredCount()
const QOS_ATOMIC
Definition JobQueue.php:51
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition JobQueue.php:454
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition JobQueue.php:215
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition JobQueue.php:236
assertNotReadOnly()
Definition JobQueue.php:696
doIsRootJobOldDuplicate(Job $job)
Definition JobQueue.php:511
flushCaches()
Clear any process and persistent caches.
Definition JobQueue.php:580
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition JobQueue.php:609
doWaitForBackups()
Definition JobQueue.php:572
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueueAggregator $aggr
Definition JobQueue.php:49
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition JobQueue.php:657
getRootJobCacheKey( $signature)
Definition JobQueue.php:529
delayedJobsEnabled()
Definition JobQueue.php:170
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition JobQueue.php:258
doGetDelayedCount()
Definition JobQueue.php:268
Class to both describe a background job and handle jobs.
Definition Job.php:31
getType()
Definition Job.php:121
MediaWiki exception.
MediaWikiServices is the service locator for the application scope of MediaWiki.
$res
Definition database.txt:21
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition deferred.txt:11
when a variable name is used in a it is silently declared as a new local masking the global
Definition design.txt:95
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition design.txt:19
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
const CACHE_ANYTHING
Definition Defines.php:99
the array() calling protocol came about after MediaWiki 1.4rc1.
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition hooks.txt:2753
returning false will NOT prevent logging $e
Definition hooks.txt:2127
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
Job queue task description interface.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition postgres.txt:36
if(count( $args)< 1) $job
$params