MediaWiki REL1_31
JobQueue.php
Go to the documentation of this file.
1<?php
24
31abstract class JobQueue {
33 protected $wiki;
35 protected $type;
37 protected $order;
39 protected $claimTTL;
41 protected $maxTries;
43 protected $readOnlyReason;
44
46 protected $dupCache;
48 protected $aggr;
49
50 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51
52 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53
58 protected function __construct( array $params ) {
59 $this->wiki = $params['wiki'];
60 $this->type = $params['type'];
61 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
62 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
63 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64 $this->order = $params['order'];
65 } else {
66 $this->order = $this->optimalOrder();
67 }
68 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69 throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
70 }
71 $this->dupCache = wfGetCache( CACHE_ANYTHING );
72 $this->aggr = isset( $params['aggregator'] )
73 ? $params['aggregator']
74 : new JobQueueAggregatorNull( [] );
75 $this->readOnlyReason = isset( $params['readOnlyReason'] )
76 ? $params['readOnlyReason']
77 : false;
78 }
79
108 final public static function factory( array $params ) {
109 $class = $params['class'];
110 if ( !class_exists( $class ) ) {
111 throw new MWException( "Invalid job queue class '$class'." );
112 }
113 $obj = new $class( $params );
114 if ( !( $obj instanceof self ) ) {
115 throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
116 }
117
118 return $obj;
119 }
120
124 final public function getWiki() {
125 return $this->wiki;
126 }
127
131 final public function getType() {
132 return $this->type;
133 }
134
138 final public function getOrder() {
139 return $this->order;
140 }
141
147 abstract protected function supportedOrders();
148
154 abstract protected function optimalOrder();
155
161 protected function supportsDelayedJobs() {
162 return false; // not implemented
163 }
164
169 final public function delayedJobsEnabled() {
170 return $this->supportsDelayedJobs();
171 }
172
177 public function getReadOnlyReason() {
179 }
180
193 final public function isEmpty() {
194 $res = $this->doIsEmpty();
195
196 return $res;
197 }
198
203 abstract protected function doIsEmpty();
204
214 final public function getSize() {
215 $res = $this->doGetSize();
216
217 return $res;
218 }
219
224 abstract protected function doGetSize();
225
235 final public function getAcquiredCount() {
236 $res = $this->doGetAcquiredCount();
237
238 return $res;
239 }
240
245 abstract protected function doGetAcquiredCount();
246
257 final public function getDelayedCount() {
258 $res = $this->doGetDelayedCount();
259
260 return $res;
261 }
262
267 protected function doGetDelayedCount() {
268 return 0; // not implemented
269 }
270
280 final public function getAbandonedCount() {
281 $res = $this->doGetAbandonedCount();
282
283 return $res;
284 }
285
290 protected function doGetAbandonedCount() {
291 return 0; // not implemented
292 }
293
304 final public function push( $jobs, $flags = 0 ) {
305 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
306 $this->batchPush( $jobs, $flags );
307 }
308
319 final public function batchPush( array $jobs, $flags = 0 ) {
320 $this->assertNotReadOnly();
321
322 if ( !count( $jobs ) ) {
323 return; // nothing to do
324 }
325
326 foreach ( $jobs as $job ) {
327 if ( $job->getType() !== $this->type ) {
328 throw new MWException(
329 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
330 } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
331 throw new MWException(
332 "Got delayed '{$job->getType()}' job; delays are not supported." );
333 }
334 }
335
336 $this->doBatchPush( $jobs, $flags );
337 $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
338
339 foreach ( $jobs as $job ) {
340 if ( $job->isRootJob() ) {
341 $this->deduplicateRootJob( $job );
342 }
343 }
344 }
345
351 abstract protected function doBatchPush( array $jobs, $flags );
352
361 final public function pop() {
362 global $wgJobClasses;
363
364 $this->assertNotReadOnly();
365 if ( !WikiMap::isCurrentWikiDbDomain( $this->wiki ) ) {
366 throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
367 } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
368 // Do not pop jobs if there is no class for the queue type
369 throw new MWException( "Unrecognized job type '{$this->type}'." );
370 }
371
372 $job = $this->doPop();
373
374 if ( !$job ) {
375 $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
376 }
377
378 // Flag this job as an old duplicate based on its "root" job...
379 try {
380 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
381 self::incrStats( 'dupe_pops', $this->type );
382 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
383 }
384 } catch ( Exception $e ) {
385 // don't lose jobs over this
386 }
387
388 return $job;
389 }
390
395 abstract protected function doPop();
396
407 final public function ack( Job $job ) {
408 $this->assertNotReadOnly();
409 if ( $job->getType() !== $this->type ) {
410 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
411 }
412
413 $this->doAck( $job );
414 }
415
420 abstract protected function doAck( Job $job );
421
453 final public function deduplicateRootJob( IJobSpecification $job ) {
454 $this->assertNotReadOnly();
455 if ( $job->getType() !== $this->type ) {
456 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
457 }
458
459 return $this->doDeduplicateRootJob( $job );
460 }
461
469 if ( !$job->hasRootJobParams() ) {
470 throw new MWException( "Cannot register root job; missing parameters." );
471 }
472 $params = $job->getRootJobParams();
473
474 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
475 // Callers should call batchInsert() and then this function so that if the insert
476 // fails, the de-duplication registration will be aborted. Since the insert is
477 // deferred till "transaction idle", do the same here, so that the ordering is
478 // maintained. Having only the de-duplication registration succeed would cause
479 // jobs to become no-ops without any actual jobs that made them redundant.
480 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
481 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
482 return true; // a newer version of this root job was enqueued
483 }
484
485 // Update the timestamp of the last root job started at the location...
486 return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
487 }
488
496 final protected function isRootJobOldDuplicate( Job $job ) {
497 if ( $job->getType() !== $this->type ) {
498 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
499 }
500 $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
501
502 return $isDuplicate;
503 }
504
510 protected function doIsRootJobOldDuplicate( Job $job ) {
511 if ( !$job->hasRootJobParams() ) {
512 return false; // job has no de-deplication info
513 }
514 $params = $job->getRootJobParams();
515
516 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
517 // Get the last time this root job was enqueued
518 $timestamp = $this->dupCache->get( $key );
519
520 // Check if a new root job was started at the location after this one's...
521 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
522 }
523
528 protected function getRootJobCacheKey( $signature ) {
529 list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
530
531 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
532 }
533
541 final public function delete() {
542 $this->assertNotReadOnly();
543
544 $this->doDelete();
545 }
546
551 protected function doDelete() {
552 throw new MWException( "This method is not implemented." );
553 }
554
563 final public function waitForBackups() {
564 $this->doWaitForBackups();
565 }
566
571 protected function doWaitForBackups() {
572 }
573
579 final public function flushCaches() {
580 $this->doFlushCaches();
581 }
582
587 protected function doFlushCaches() {
588 }
589
598 abstract public function getAllQueuedJobs();
599
608 public function getAllDelayedJobs() {
609 return new ArrayIterator( [] ); // not implemented
610 }
611
622 public function getAllAcquiredJobs() {
623 return new ArrayIterator( [] ); // not implemented
624 }
625
633 public function getAllAbandonedJobs() {
634 return new ArrayIterator( [] ); // not implemented
635 }
636
643 public function getCoalesceLocationInternal() {
644 return null;
645 }
646
656 final public function getSiblingQueuesWithJobs( array $types ) {
657 return $this->doGetSiblingQueuesWithJobs( $types );
658 }
659
665 protected function doGetSiblingQueuesWithJobs( array $types ) {
666 return null; // not supported
667 }
668
679 final public function getSiblingQueueSizes( array $types ) {
680 return $this->doGetSiblingQueueSizes( $types );
681 }
682
688 protected function doGetSiblingQueueSizes( array $types ) {
689 return null; // not supported
690 }
691
695 protected function assertNotReadOnly() {
696 if ( $this->readOnlyReason !== false ) {
697 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
698 }
699 }
700
709 public static function incrStats( $key, $type, $delta = 1 ) {
710 static $stats;
711 if ( !$stats ) {
712 $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
713 }
714 $stats->updateCount( "jobqueue.{$key}.all", $delta );
715 $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
716 }
717}
718
724}
725
728
730
731}
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
wfForeignMemcKey( $db, $prefix)
Make a cache key for a foreign DB.
wfGetCache( $cacheType)
Get a specific cache object.
$wgJobClasses['replaceText']
interface is intended to be more or less compatible with the PHP memcached client.
Definition BagOStuff.php:47
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
Class to handle tracking information about all queues.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:31
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition JobQueue.php:193
string $order
Job priority for pop()
Definition JobQueue.php:37
__construct(array $params)
Definition JobQueue.php:58
BagOStuff $dupCache
Definition JobQueue.php:46
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition JobQueue.php:280
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition JobQueue.php:563
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition JobQueue.php:496
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition JobQueue.php:709
const ROOTJOB_TTL
Definition JobQueue.php:52
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition JobQueue.php:304
doGetAbandonedCount()
Definition JobQueue.php:290
pop()
Pop a job off of the queue.
Definition JobQueue.php:361
int $claimTTL
Time to live in seconds.
Definition JobQueue.php:39
doDeduplicateRootJob(IJobSpecification $job)
Definition JobQueue.php:468
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition JobQueue.php:643
doAck(Job $job)
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition JobQueue.php:319
doGetSiblingQueueSizes(array $types)
Definition JobQueue.php:688
string $type
Job type.
Definition JobQueue.php:35
doFlushCaches()
Definition JobQueue.php:587
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition JobQueue.php:161
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition JobQueue.php:679
int $maxTries
Maximum number of times to try a job.
Definition JobQueue.php:41
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:108
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition JobQueue.php:633
string $wiki
Wiki ID.
Definition JobQueue.php:33
getReadOnlyReason()
Definition JobQueue.php:177
doBatchPush(array $jobs, $flags)
ack(Job $job)
Acknowledge that a job was completed.
Definition JobQueue.php:407
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition JobQueue.php:622
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition JobQueue.php:43
doGetSiblingQueuesWithJobs(array $types)
Definition JobQueue.php:665
doGetAcquiredCount()
const QOS_ATOMIC
Definition JobQueue.php:50
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition JobQueue.php:453
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition JobQueue.php:214
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition JobQueue.php:235
assertNotReadOnly()
Definition JobQueue.php:695
doIsRootJobOldDuplicate(Job $job)
Definition JobQueue.php:510
flushCaches()
Clear any process and persistent caches.
Definition JobQueue.php:579
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition JobQueue.php:608
doWaitForBackups()
Definition JobQueue.php:571
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueueAggregator $aggr
Definition JobQueue.php:48
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition JobQueue.php:656
getRootJobCacheKey( $signature)
Definition JobQueue.php:528
delayedJobsEnabled()
Definition JobQueue.php:169
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition JobQueue.php:257
doGetDelayedCount()
Definition JobQueue.php:267
Class to both describe a background job and handle jobs.
Definition Job.php:31
getType()
Definition Job.php:146
MediaWiki exception.
MediaWikiServices is the service locator for the application scope of MediaWiki.
static isCurrentWikiDbDomain( $domain)
Definition WikiMap.php:267
$res
Definition database.txt:21
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition deferred.txt:11
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition design.txt:19
returning false will NOT prevent logging $e
Definition hooks.txt:2176
const CACHE_ANYTHING
Definition Defines.php:111
Job queue task description interface.
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition postgres.txt:30
if(count( $args)< 1) $job
$params