MediaWiki REL1_33
JobQueue.php
Go to the documentation of this file.
1<?php
23use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
24
31abstract class JobQueue {
33 protected $domain;
35 protected $type;
37 protected $order;
39 protected $claimTTL;
41 protected $maxTries;
43 protected $readOnlyReason;
45 protected $stats;
46
48 protected $dupCache;
49
50 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51
52 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53
58 protected function __construct( array $params ) {
59 $this->domain = $params['domain'] ?? $params['wiki']; // b/c
60 $this->type = $params['type'];
61 $this->claimTTL = $params['claimTTL'] ?? 0;
62 $this->maxTries = $params['maxTries'] ?? 3;
63 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64 $this->order = $params['order'];
65 } else {
66 $this->order = $this->optimalOrder();
67 }
68 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69 throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
70 }
71 $this->readOnlyReason = $params['readOnlyReason'] ?? false;
72 $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
73 $this->dupCache = $params['stash'] ?? new EmptyBagOStuff();
74 }
75
106 final public static function factory( array $params ) {
107 $class = $params['class'];
108 if ( !class_exists( $class ) ) {
109 throw new JobQueueError( "Invalid job queue class '$class'." );
110 }
111 $obj = new $class( $params );
112 if ( !( $obj instanceof self ) ) {
113 throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
114 }
115
116 return $obj;
117 }
118
122 final public function getDomain() {
123 return $this->domain;
124 }
125
130 final public function getWiki() {
131 return WikiMap::getWikiIdFromDbDomain( $this->domain );
132 }
133
137 final public function getType() {
138 return $this->type;
139 }
140
144 final public function getOrder() {
145 return $this->order;
146 }
147
153 abstract protected function supportedOrders();
154
160 abstract protected function optimalOrder();
161
167 protected function supportsDelayedJobs() {
168 return false; // not implemented
169 }
170
175 final public function delayedJobsEnabled() {
176 return $this->supportsDelayedJobs();
177 }
178
183 public function getReadOnlyReason() {
185 }
186
199 final public function isEmpty() {
200 $res = $this->doIsEmpty();
201
202 return $res;
203 }
204
209 abstract protected function doIsEmpty();
210
220 final public function getSize() {
221 $res = $this->doGetSize();
222
223 return $res;
224 }
225
230 abstract protected function doGetSize();
231
241 final public function getAcquiredCount() {
242 $res = $this->doGetAcquiredCount();
243
244 return $res;
245 }
246
251 abstract protected function doGetAcquiredCount();
252
263 final public function getDelayedCount() {
264 $res = $this->doGetDelayedCount();
265
266 return $res;
267 }
268
273 protected function doGetDelayedCount() {
274 return 0; // not implemented
275 }
276
286 final public function getAbandonedCount() {
287 $res = $this->doGetAbandonedCount();
288
289 return $res;
290 }
291
296 protected function doGetAbandonedCount() {
297 return 0; // not implemented
298 }
299
310 final public function push( $jobs, $flags = 0 ) {
311 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
312 $this->batchPush( $jobs, $flags );
313 }
314
325 final public function batchPush( array $jobs, $flags = 0 ) {
326 $this->assertNotReadOnly();
327
328 if ( $jobs === [] ) {
329 return; // nothing to do
330 }
331
332 foreach ( $jobs as $job ) {
333 if ( $job->getType() !== $this->type ) {
334 throw new JobQueueError(
335 "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
336 } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
337 throw new JobQueueError(
338 "Got delayed '{$job->getType()}' job; delays are not supported." );
339 }
340 }
341
342 $this->doBatchPush( $jobs, $flags );
343
344 foreach ( $jobs as $job ) {
345 if ( $job->isRootJob() ) {
346 $this->deduplicateRootJob( $job );
347 }
348 }
349 }
350
356 abstract protected function doBatchPush( array $jobs, $flags );
357
366 final public function pop() {
367 $this->assertNotReadOnly();
368
369 $job = $this->doPop();
370
371 // Flag this job as an old duplicate based on its "root" job...
372 try {
373 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
374 $this->incrStats( 'dupe_pops', $this->type );
375 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
376 }
377 } catch ( Exception $e ) {
378 // don't lose jobs over this
379 }
380
381 return $job;
382 }
383
388 abstract protected function doPop();
389
400 final public function ack( Job $job ) {
401 $this->assertNotReadOnly();
402 if ( $job->getType() !== $this->type ) {
403 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
404 }
405
406 $this->doAck( $job );
407 }
408
413 abstract protected function doAck( Job $job );
414
446 final public function deduplicateRootJob( IJobSpecification $job ) {
447 $this->assertNotReadOnly();
448 if ( $job->getType() !== $this->type ) {
449 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
450 }
451
452 return $this->doDeduplicateRootJob( $job );
453 }
454
462 if ( !$job->hasRootJobParams() ) {
463 throw new JobQueueError( "Cannot register root job; missing parameters." );
464 }
465 $params = $job->getRootJobParams();
466
467 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
468 // Callers should call JobQueueGroup::push() before this method so that if the insert
469 // fails, the de-duplication registration will be aborted. Since the insert is
470 // deferred till "transaction idle", do the same here, so that the ordering is
471 // maintained. Having only the de-duplication registration succeed would cause
472 // jobs to become no-ops without any actual jobs that made them redundant.
473 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
474 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
475 return true; // a newer version of this root job was enqueued
476 }
477
478 // Update the timestamp of the last root job started at the location...
479 return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
480 }
481
489 final protected function isRootJobOldDuplicate( Job $job ) {
490 if ( $job->getType() !== $this->type ) {
491 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
492 }
493 $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
494
495 return $isDuplicate;
496 }
497
503 protected function doIsRootJobOldDuplicate( Job $job ) {
504 if ( !$job->hasRootJobParams() ) {
505 return false; // job has no de-deplication info
506 }
507 $params = $job->getRootJobParams();
508
509 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
510 // Get the last time this root job was enqueued
511 $timestamp = $this->dupCache->get( $key );
512
513 // Check if a new root job was started at the location after this one's...
514 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
515 }
516
521 protected function getRootJobCacheKey( $signature ) {
522 return $this->dupCache->makeGlobalKey(
523 'jobqueue',
524 $this->domain,
525 $this->type,
526 'rootjob',
527 $signature
528 );
529 }
530
538 final public function delete() {
539 $this->assertNotReadOnly();
540
541 $this->doDelete();
542 }
543
548 protected function doDelete() {
549 throw new JobQueueError( "This method is not implemented." );
550 }
551
560 final public function waitForBackups() {
561 $this->doWaitForBackups();
562 }
563
568 protected function doWaitForBackups() {
569 }
570
576 final public function flushCaches() {
577 $this->doFlushCaches();
578 }
579
584 protected function doFlushCaches() {
585 }
586
595 abstract public function getAllQueuedJobs();
596
605 public function getAllDelayedJobs() {
606 return new ArrayIterator( [] ); // not implemented
607 }
608
619 public function getAllAcquiredJobs() {
620 return new ArrayIterator( [] ); // not implemented
621 }
622
630 public function getAllAbandonedJobs() {
631 return new ArrayIterator( [] ); // not implemented
632 }
633
640 public function getCoalesceLocationInternal() {
641 return null;
642 }
643
653 final public function getSiblingQueuesWithJobs( array $types ) {
654 return $this->doGetSiblingQueuesWithJobs( $types );
655 }
656
662 protected function doGetSiblingQueuesWithJobs( array $types ) {
663 return null; // not supported
664 }
665
676 final public function getSiblingQueueSizes( array $types ) {
677 return $this->doGetSiblingQueueSizes( $types );
678 }
679
685 protected function doGetSiblingQueueSizes( array $types ) {
686 return null; // not supported
687 }
688
692 protected function assertNotReadOnly() {
693 if ( $this->readOnlyReason !== false ) {
694 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
695 }
696 }
697
706 protected function incrStats( $key, $type, $delta = 1 ) {
707 $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
708 $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
709 }
710}
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:58
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
A BagOStuff object with no objects in it.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:31
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition JobQueue.php:199
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition JobQueue.php:706
string $order
Job priority for pop()
Definition JobQueue.php:37
__construct(array $params)
Definition JobQueue.php:58
BagOStuff $dupCache
Definition JobQueue.php:48
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition JobQueue.php:286
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition JobQueue.php:560
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition JobQueue.php:489
const ROOTJOB_TTL
Definition JobQueue.php:52
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition JobQueue.php:310
doGetAbandonedCount()
Definition JobQueue.php:296
pop()
Pop a job off of the queue.
Definition JobQueue.php:366
int $claimTTL
Time to live in seconds.
Definition JobQueue.php:39
doDeduplicateRootJob(IJobSpecification $job)
Definition JobQueue.php:461
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition JobQueue.php:640
doAck(Job $job)
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition JobQueue.php:325
doGetSiblingQueueSizes(array $types)
Definition JobQueue.php:685
string $type
Job type.
Definition JobQueue.php:35
doFlushCaches()
Definition JobQueue.php:584
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition JobQueue.php:167
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition JobQueue.php:676
StatsdDataFactoryInterface $stats
Definition JobQueue.php:45
int $maxTries
Maximum number of times to try a job.
Definition JobQueue.php:41
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:106
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition JobQueue.php:630
getReadOnlyReason()
Definition JobQueue.php:183
doBatchPush(array $jobs, $flags)
ack(Job $job)
Acknowledge that a job was completed.
Definition JobQueue.php:400
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition JobQueue.php:619
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition JobQueue.php:43
doGetSiblingQueuesWithJobs(array $types)
Definition JobQueue.php:662
doGetAcquiredCount()
const QOS_ATOMIC
Definition JobQueue.php:50
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition JobQueue.php:446
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition JobQueue.php:220
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition JobQueue.php:241
assertNotReadOnly()
Definition JobQueue.php:692
doIsRootJobOldDuplicate(Job $job)
Definition JobQueue.php:503
flushCaches()
Clear any process and persistent caches.
Definition JobQueue.php:576
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition JobQueue.php:605
doWaitForBackups()
Definition JobQueue.php:568
string $domain
DB domain ID.
Definition JobQueue.php:33
supportedOrders()
Get the allowed queue orders for configuration validation.
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition JobQueue.php:653
getRootJobCacheKey( $signature)
Definition JobQueue.php:521
delayedJobsEnabled()
Definition JobQueue.php:175
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition JobQueue.php:263
doGetDelayedCount()
Definition JobQueue.php:273
Class to both describe a background job and handle jobs.
Definition Job.php:30
getType()
Definition Job.php:143
$res
Definition database.txt:21
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
returning false will NOT prevent logging $e
Definition hooks.txt:2175
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
Job queue task description interface.
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition postgres.txt:30
if(count( $args)< 1) $job
$params