MediaWiki REL1_32
JobQueue.php
Go to the documentation of this file.
1<?php
24
31abstract class JobQueue {
33 protected $wiki;
35 protected $type;
37 protected $order;
39 protected $claimTTL;
41 protected $maxTries;
43 protected $readOnlyReason;
44
46 protected $dupCache;
48 protected $aggr;
49
50 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51
52 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53
58 protected function __construct( array $params ) {
59 $this->wiki = $params['wiki'];
60 $this->type = $params['type'];
61 $this->claimTTL = $params['claimTTL'] ?? 0;
62 $this->maxTries = $params['maxTries'] ?? 3;
63 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64 $this->order = $params['order'];
65 } else {
66 $this->order = $this->optimalOrder();
67 }
68 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69 throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
70 }
71 $this->dupCache = wfGetCache( CACHE_ANYTHING );
72 $this->aggr = $params['aggregator'] ?? new JobQueueAggregatorNull( [] );
73 $this->readOnlyReason = $params['readOnlyReason'] ?? false;
74 }
75
104 final public static function factory( array $params ) {
105 $class = $params['class'];
106 if ( !class_exists( $class ) ) {
107 throw new MWException( "Invalid job queue class '$class'." );
108 }
109 $obj = new $class( $params );
110 if ( !( $obj instanceof self ) ) {
111 throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
112 }
113
114 return $obj;
115 }
116
120 final public function getWiki() {
121 return $this->wiki;
122 }
123
127 final public function getType() {
128 return $this->type;
129 }
130
134 final public function getOrder() {
135 return $this->order;
136 }
137
143 abstract protected function supportedOrders();
144
150 abstract protected function optimalOrder();
151
157 protected function supportsDelayedJobs() {
158 return false; // not implemented
159 }
160
165 final public function delayedJobsEnabled() {
166 return $this->supportsDelayedJobs();
167 }
168
173 public function getReadOnlyReason() {
175 }
176
189 final public function isEmpty() {
190 $res = $this->doIsEmpty();
191
192 return $res;
193 }
194
199 abstract protected function doIsEmpty();
200
210 final public function getSize() {
211 $res = $this->doGetSize();
212
213 return $res;
214 }
215
220 abstract protected function doGetSize();
221
231 final public function getAcquiredCount() {
232 $res = $this->doGetAcquiredCount();
233
234 return $res;
235 }
236
241 abstract protected function doGetAcquiredCount();
242
253 final public function getDelayedCount() {
254 $res = $this->doGetDelayedCount();
255
256 return $res;
257 }
258
263 protected function doGetDelayedCount() {
264 return 0; // not implemented
265 }
266
276 final public function getAbandonedCount() {
277 $res = $this->doGetAbandonedCount();
278
279 return $res;
280 }
281
286 protected function doGetAbandonedCount() {
287 return 0; // not implemented
288 }
289
300 final public function push( $jobs, $flags = 0 ) {
301 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
302 $this->batchPush( $jobs, $flags );
303 }
304
315 final public function batchPush( array $jobs, $flags = 0 ) {
316 $this->assertNotReadOnly();
317
318 if ( !count( $jobs ) ) {
319 return; // nothing to do
320 }
321
322 foreach ( $jobs as $job ) {
323 if ( $job->getType() !== $this->type ) {
324 throw new MWException(
325 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
326 } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
327 throw new MWException(
328 "Got delayed '{$job->getType()}' job; delays are not supported." );
329 }
330 }
331
332 $this->doBatchPush( $jobs, $flags );
333 $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
334
335 foreach ( $jobs as $job ) {
336 if ( $job->isRootJob() ) {
337 $this->deduplicateRootJob( $job );
338 }
339 }
340 }
341
347 abstract protected function doBatchPush( array $jobs, $flags );
348
357 final public function pop() {
358 global $wgJobClasses;
359
360 $this->assertNotReadOnly();
361 if ( !WikiMap::isCurrentWikiDbDomain( $this->wiki ) ) {
362 throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
363 } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
364 // Do not pop jobs if there is no class for the queue type
365 throw new MWException( "Unrecognized job type '{$this->type}'." );
366 }
367
368 $job = $this->doPop();
369
370 if ( !$job ) {
371 $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
372 }
373
374 // Flag this job as an old duplicate based on its "root" job...
375 try {
376 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
377 self::incrStats( 'dupe_pops', $this->type );
378 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
379 }
380 } catch ( Exception $e ) {
381 // don't lose jobs over this
382 }
383
384 return $job;
385 }
386
391 abstract protected function doPop();
392
403 final public function ack( Job $job ) {
404 $this->assertNotReadOnly();
405 if ( $job->getType() !== $this->type ) {
406 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
407 }
408
409 $this->doAck( $job );
410 }
411
416 abstract protected function doAck( Job $job );
417
449 final public function deduplicateRootJob( IJobSpecification $job ) {
450 $this->assertNotReadOnly();
451 if ( $job->getType() !== $this->type ) {
452 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
453 }
454
455 return $this->doDeduplicateRootJob( $job );
456 }
457
465 if ( !$job->hasRootJobParams() ) {
466 throw new MWException( "Cannot register root job; missing parameters." );
467 }
468 $params = $job->getRootJobParams();
469
470 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
471 // Callers should call JobQueueGroup::push() before this method so that if the insert
472 // fails, the de-duplication registration will be aborted. Since the insert is
473 // deferred till "transaction idle", do the same here, so that the ordering is
474 // maintained. Having only the de-duplication registration succeed would cause
475 // jobs to become no-ops without any actual jobs that made them redundant.
476 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
477 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
478 return true; // a newer version of this root job was enqueued
479 }
480
481 // Update the timestamp of the last root job started at the location...
482 return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
483 }
484
492 final protected function isRootJobOldDuplicate( Job $job ) {
493 if ( $job->getType() !== $this->type ) {
494 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
495 }
496 $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
497
498 return $isDuplicate;
499 }
500
506 protected function doIsRootJobOldDuplicate( Job $job ) {
507 if ( !$job->hasRootJobParams() ) {
508 return false; // job has no de-deplication info
509 }
510 $params = $job->getRootJobParams();
511
512 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
513 // Get the last time this root job was enqueued
514 $timestamp = $this->dupCache->get( $key );
515
516 // Check if a new root job was started at the location after this one's...
517 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
518 }
519
524 protected function getRootJobCacheKey( $signature ) {
525 list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
526
527 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
528 }
529
537 final public function delete() {
538 $this->assertNotReadOnly();
539
540 $this->doDelete();
541 }
542
547 protected function doDelete() {
548 throw new MWException( "This method is not implemented." );
549 }
550
559 final public function waitForBackups() {
560 $this->doWaitForBackups();
561 }
562
567 protected function doWaitForBackups() {
568 }
569
575 final public function flushCaches() {
576 $this->doFlushCaches();
577 }
578
583 protected function doFlushCaches() {
584 }
585
594 abstract public function getAllQueuedJobs();
595
604 public function getAllDelayedJobs() {
605 return new ArrayIterator( [] ); // not implemented
606 }
607
618 public function getAllAcquiredJobs() {
619 return new ArrayIterator( [] ); // not implemented
620 }
621
629 public function getAllAbandonedJobs() {
630 return new ArrayIterator( [] ); // not implemented
631 }
632
639 public function getCoalesceLocationInternal() {
640 return null;
641 }
642
652 final public function getSiblingQueuesWithJobs( array $types ) {
653 return $this->doGetSiblingQueuesWithJobs( $types );
654 }
655
661 protected function doGetSiblingQueuesWithJobs( array $types ) {
662 return null; // not supported
663 }
664
675 final public function getSiblingQueueSizes( array $types ) {
676 return $this->doGetSiblingQueueSizes( $types );
677 }
678
684 protected function doGetSiblingQueueSizes( array $types ) {
685 return null; // not supported
686 }
687
691 protected function assertNotReadOnly() {
692 if ( $this->readOnlyReason !== false ) {
693 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
694 }
695 }
696
705 public static function incrStats( $key, $type, $delta = 1 ) {
706 static $stats;
707 if ( !$stats ) {
708 $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
709 }
710 $stats->updateCount( "jobqueue.{$key}.all", $delta );
711 $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
712 }
713}
714
720}
721
724
726
727}
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
$wgJobClasses
Maps jobs to their handlers; extensions can add to this to provide custom jobs.
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
wfGetCache( $cacheType)
Get a specific cache object.
wfForeignMemcKey( $db, $prefix,... $args)
Make a cache key for a foreign DB.
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:58
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
Class to handle tracking information about all queues.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:31
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition JobQueue.php:189
string $order
Job priority for pop()
Definition JobQueue.php:37
__construct(array $params)
Definition JobQueue.php:58
BagOStuff $dupCache
Definition JobQueue.php:46
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition JobQueue.php:276
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition JobQueue.php:559
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition JobQueue.php:492
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition JobQueue.php:705
const ROOTJOB_TTL
Definition JobQueue.php:52
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition JobQueue.php:300
doGetAbandonedCount()
Definition JobQueue.php:286
pop()
Pop a job off of the queue.
Definition JobQueue.php:357
int $claimTTL
Time to live in seconds.
Definition JobQueue.php:39
doDeduplicateRootJob(IJobSpecification $job)
Definition JobQueue.php:464
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition JobQueue.php:639
doAck(Job $job)
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition JobQueue.php:315
doGetSiblingQueueSizes(array $types)
Definition JobQueue.php:684
string $type
Job type.
Definition JobQueue.php:35
doFlushCaches()
Definition JobQueue.php:583
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition JobQueue.php:157
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition JobQueue.php:675
int $maxTries
Maximum number of times to try a job.
Definition JobQueue.php:41
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:104
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition JobQueue.php:629
string $wiki
Wiki ID.
Definition JobQueue.php:33
getReadOnlyReason()
Definition JobQueue.php:173
doBatchPush(array $jobs, $flags)
ack(Job $job)
Acknowledge that a job was completed.
Definition JobQueue.php:403
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition JobQueue.php:618
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition JobQueue.php:43
doGetSiblingQueuesWithJobs(array $types)
Definition JobQueue.php:661
doGetAcquiredCount()
const QOS_ATOMIC
Definition JobQueue.php:50
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition JobQueue.php:449
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition JobQueue.php:210
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition JobQueue.php:231
assertNotReadOnly()
Definition JobQueue.php:691
doIsRootJobOldDuplicate(Job $job)
Definition JobQueue.php:506
flushCaches()
Clear any process and persistent caches.
Definition JobQueue.php:575
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition JobQueue.php:604
doWaitForBackups()
Definition JobQueue.php:567
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueueAggregator $aggr
Definition JobQueue.php:48
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition JobQueue.php:652
getRootJobCacheKey( $signature)
Definition JobQueue.php:524
delayedJobsEnabled()
Definition JobQueue.php:165
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition JobQueue.php:253
doGetDelayedCount()
Definition JobQueue.php:263
Class to both describe a background job and handle jobs.
Definition Job.php:30
getType()
Definition Job.php:128
MediaWiki exception.
MediaWikiServices is the service locator for the application scope of MediaWiki.
static isCurrentWikiDbDomain( $domain)
Definition WikiMap.php:267
$res
Definition database.txt:21
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition deferred.txt:11
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
const CACHE_ANYTHING
Definition Defines.php:101
returning false will NOT prevent logging $e
Definition hooks.txt:2226
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
Job queue task description interface.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition postgres.txt:36
if(count( $args)< 1) $job
$params