MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
24 
31 abstract class JobQueue {
33  protected $domain;
35  protected $type;
37  protected $order;
39  protected $claimTTL;
41  protected $maxTries;
43  protected $readOnlyReason;
45  protected $stats;
46 
48  protected $dupCache;
49 
50  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51 
52  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53 
58  protected function __construct( array $params ) {
59  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
60  $this->type = $params['type'];
61  $this->claimTTL = $params['claimTTL'] ?? 0;
62  $this->maxTries = $params['maxTries'] ?? 3;
63  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64  $this->order = $params['order'];
65  } else {
66  $this->order = $this->optimalOrder();
67  }
68  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
70  }
71  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
72  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
73  $this->dupCache = $params['stash'] ?? new EmptyBagOStuff();
74  }
75 
106  final public static function factory( array $params ) {
107  $class = $params['class'];
108  if ( !class_exists( $class ) ) {
109  throw new JobQueueError( "Invalid job queue class '$class'." );
110  }
111  $obj = new $class( $params );
112  if ( !( $obj instanceof self ) ) {
113  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
114  }
115 
116  return $obj;
117  }
118 
122  final public function getDomain() {
123  return $this->domain;
124  }
125 
130  final public function getWiki() {
131  return WikiMap::getWikiIdFromDbDomain( $this->domain );
132  }
133 
137  final public function getType() {
138  return $this->type;
139  }
140 
144  final public function getOrder() {
145  return $this->order;
146  }
147 
153  abstract protected function supportedOrders();
154 
160  abstract protected function optimalOrder();
161 
167  protected function supportsDelayedJobs() {
168  return false; // not implemented
169  }
170 
175  final public function delayedJobsEnabled() {
176  return $this->supportsDelayedJobs();
177  }
178 
183  public function getReadOnlyReason() {
184  return $this->readOnlyReason;
185  }
186 
199  final public function isEmpty() {
200  $res = $this->doIsEmpty();
201 
202  return $res;
203  }
204 
209  abstract protected function doIsEmpty();
210 
220  final public function getSize() {
221  $res = $this->doGetSize();
222 
223  return $res;
224  }
225 
230  abstract protected function doGetSize();
231 
241  final public function getAcquiredCount() {
242  $res = $this->doGetAcquiredCount();
243 
244  return $res;
245  }
246 
251  abstract protected function doGetAcquiredCount();
252 
263  final public function getDelayedCount() {
264  $res = $this->doGetDelayedCount();
265 
266  return $res;
267  }
268 
273  protected function doGetDelayedCount() {
274  return 0; // not implemented
275  }
276 
286  final public function getAbandonedCount() {
287  $res = $this->doGetAbandonedCount();
288 
289  return $res;
290  }
291 
296  protected function doGetAbandonedCount() {
297  return 0; // not implemented
298  }
299 
310  final public function push( $jobs, $flags = 0 ) {
311  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
312  $this->batchPush( $jobs, $flags );
313  }
314 
325  final public function batchPush( array $jobs, $flags = 0 ) {
326  $this->assertNotReadOnly();
327 
328  if ( $jobs === [] ) {
329  return; // nothing to do
330  }
331 
332  foreach ( $jobs as $job ) {
333  if ( $job->getType() !== $this->type ) {
334  throw new JobQueueError(
335  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
336  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
337  throw new JobQueueError(
338  "Got delayed '{$job->getType()}' job; delays are not supported." );
339  }
340  }
341 
342  $this->doBatchPush( $jobs, $flags );
343 
344  foreach ( $jobs as $job ) {
345  if ( $job->isRootJob() ) {
346  $this->deduplicateRootJob( $job );
347  }
348  }
349  }
350 
356  abstract protected function doBatchPush( array $jobs, $flags );
357 
366  final public function pop() {
367  $this->assertNotReadOnly();
368 
369  $job = $this->doPop();
370 
371  // Flag this job as an old duplicate based on its "root" job...
372  try {
373  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
374  $this->incrStats( 'dupe_pops', $this->type );
375  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
376  }
377  } catch ( Exception $e ) {
378  // don't lose jobs over this
379  }
380 
381  return $job;
382  }
383 
388  abstract protected function doPop();
389 
400  final public function ack( RunnableJob $job ) {
401  $this->assertNotReadOnly();
402  if ( $job->getType() !== $this->type ) {
403  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
404  }
405 
406  $this->doAck( $job );
407  }
408 
413  abstract protected function doAck( RunnableJob $job );
414 
446  final public function deduplicateRootJob( IJobSpecification $job ) {
447  $this->assertNotReadOnly();
448  if ( $job->getType() !== $this->type ) {
449  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
450  }
451 
452  return $this->doDeduplicateRootJob( $job );
453  }
454 
461  protected function doDeduplicateRootJob( IJobSpecification $job ) {
462  if ( !$job->hasRootJobParams() ) {
463  throw new JobQueueError( "Cannot register root job; missing parameters." );
464  }
465  $params = $job->getRootJobParams();
466 
467  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
468  // Callers should call JobQueueGroup::push() before this method so that if the insert
469  // fails, the de-duplication registration will be aborted. Since the insert is
470  // deferred till "transaction idle", do the same here, so that the ordering is
471  // maintained. Having only the de-duplication registration succeed would cause
472  // jobs to become no-ops without any actual jobs that made them redundant.
473  $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
474  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
475  return true; // a newer version of this root job was enqueued
476  }
477 
478  // Update the timestamp of the last root job started at the location...
479  return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
480  }
481 
489  final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
490  if ( $job->getType() !== $this->type ) {
491  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
492  }
493  $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
494 
495  return $isDuplicate;
496  }
497 
503  protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
504  if ( !$job->hasRootJobParams() ) {
505  return false; // job has no de-deplication info
506  }
507  $params = $job->getRootJobParams();
508 
509  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
510  // Get the last time this root job was enqueued
511  $timestamp = $this->dupCache->get( $key );
512 
513  // Check if a new root job was started at the location after this one's...
514  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
515  }
516 
521  protected function getRootJobCacheKey( $signature ) {
522  return $this->dupCache->makeGlobalKey(
523  'jobqueue',
524  $this->domain,
525  $this->type,
526  'rootjob',
527  $signature
528  );
529  }
530 
538  final public function delete() {
539  $this->assertNotReadOnly();
540 
541  $this->doDelete();
542  }
543 
548  protected function doDelete() {
549  throw new JobQueueError( "This method is not implemented." );
550  }
551 
560  final public function waitForBackups() {
561  $this->doWaitForBackups();
562  }
563 
568  protected function doWaitForBackups() {
569  }
570 
576  final public function flushCaches() {
577  $this->doFlushCaches();
578  }
579 
584  protected function doFlushCaches() {
585  }
586 
595  abstract public function getAllQueuedJobs();
596 
605  public function getAllDelayedJobs() {
606  return new ArrayIterator( [] ); // not implemented
607  }
608 
619  public function getAllAcquiredJobs() {
620  return new ArrayIterator( [] ); // not implemented
621  }
622 
630  public function getAllAbandonedJobs() {
631  return new ArrayIterator( [] ); // not implemented
632  }
633 
640  public function getCoalesceLocationInternal() {
641  return null;
642  }
643 
653  final public function getSiblingQueuesWithJobs( array $types ) {
654  return $this->doGetSiblingQueuesWithJobs( $types );
655  }
656 
662  protected function doGetSiblingQueuesWithJobs( array $types ) {
663  return null; // not supported
664  }
665 
676  final public function getSiblingQueueSizes( array $types ) {
677  return $this->doGetSiblingQueueSizes( $types );
678  }
679 
685  protected function doGetSiblingQueueSizes( array $types ) {
686  return null; // not supported
687  }
688 
694  protected function factoryJob( $command, $params ) {
695  // @TODO: dependency inject this as a callback
696  return Job::factory( $command, $params );
697  }
698 
702  protected function assertNotReadOnly() {
703  if ( $this->readOnlyReason !== false ) {
704  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
705  }
706  }
707 
716  protected function incrStats( $key, $type, $delta = 1 ) {
717  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
718  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
719  }
720 }
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:43
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:45
factoryJob( $command, $params)
Definition: JobQueue.php:694
string $order
Job priority for pop()
Definition: JobQueue.php:37
const QOS_ATOMIC
Definition: JobQueue.php:50
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:325
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:400
doFlushCaches()
Definition: JobQueue.php:584
delayedJobsEnabled()
Definition: JobQueue.php:175
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2159
$command
Definition: cdb.php:65
assertNotReadOnly()
Definition: JobQueue.php:702
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:167
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:489
doDelete()
Definition: JobQueue.php:548
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:259
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:716
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:662
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:286
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:640
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:263
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition: JobQueue.php:503
BagOStuff $dupCache
Definition: JobQueue.php:48
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:199
__construct(array $params)
Definition: JobQueue.php:58
doGetAbandonedCount()
Definition: JobQueue.php:296
string $type
Job type.
Definition: JobQueue.php:35
getOrder()
Definition: JobQueue.php:144
getRootJobCacheKey( $signature)
Definition: JobQueue.php:521
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:241
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:446
doBatchPush(array $jobs, $flags)
$res
Definition: database.txt:21
getReadOnlyReason()
Definition: JobQueue.php:183
doAck(RunnableJob $job)
$params
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:780
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:66
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:653
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:310
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:619
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
doWaitForBackups()
Definition: JobQueue.php:568
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:106
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:685
doGetAcquiredCount()
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count( $args)< 1) $job
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:41
getDomain()
Definition: JobQueue.php:122
const ROOTJOB_TTL
Definition: JobQueue.php:52
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:630
Interface for serializable objects that describe a job queue task.
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:39
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:676
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:605
doGetDelayedCount()
Definition: JobQueue.php:273
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:461
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:560
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:576
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:220
supportedOrders()
Get the allowed queue orders for configuration validation.
pop()
Pop a job off of the queue.
Definition: JobQueue.php:366
string $domain
DB domain ID.
Definition: JobQueue.php:33