MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
24 
31 abstract class JobQueue {
33  protected $domain;
35  protected $type;
37  protected $order;
39  protected $claimTTL;
41  protected $maxTries;
43  protected $readOnlyReason;
45  protected $stats;
46 
48  protected $wanCache;
49 
50  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51 
52  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53 
66  protected function __construct( array $params ) {
67  $this->domain = $params['domain'] ?? $params['wiki']; // b/c
68  $this->type = $params['type'];
69  $this->claimTTL = $params['claimTTL'] ?? 0;
70  $this->maxTries = $params['maxTries'] ?? 3;
71  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
72  $this->order = $params['order'];
73  } else {
74  $this->order = $this->optimalOrder();
75  }
76  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
77  throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
78  }
79  $this->readOnlyReason = $params['readOnlyReason'] ?? false;
80  $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
81  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
82  }
83 
114  final public static function factory( array $params ) {
115  $class = $params['class'];
116  if ( !class_exists( $class ) ) {
117  throw new JobQueueError( "Invalid job queue class '$class'." );
118  }
119  $obj = new $class( $params );
120  if ( !( $obj instanceof self ) ) {
121  throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
122  }
123 
124  return $obj;
125  }
126 
130  final public function getDomain() {
131  return $this->domain;
132  }
133 
138  final public function getWiki() {
139  return WikiMap::getWikiIdFromDbDomain( $this->domain );
140  }
141 
145  final public function getType() {
146  return $this->type;
147  }
148 
152  final public function getOrder() {
153  return $this->order;
154  }
155 
161  abstract protected function supportedOrders();
162 
168  abstract protected function optimalOrder();
169 
175  protected function supportsDelayedJobs() {
176  return false; // not implemented
177  }
178 
183  final public function delayedJobsEnabled() {
184  return $this->supportsDelayedJobs();
185  }
186 
191  public function getReadOnlyReason() {
192  return $this->readOnlyReason;
193  }
194 
207  final public function isEmpty() {
208  $res = $this->doIsEmpty();
209 
210  return $res;
211  }
212 
217  abstract protected function doIsEmpty();
218 
228  final public function getSize() {
229  $res = $this->doGetSize();
230 
231  return $res;
232  }
233 
238  abstract protected function doGetSize();
239 
249  final public function getAcquiredCount() {
250  $res = $this->doGetAcquiredCount();
251 
252  return $res;
253  }
254 
259  abstract protected function doGetAcquiredCount();
260 
271  final public function getDelayedCount() {
272  $res = $this->doGetDelayedCount();
273 
274  return $res;
275  }
276 
281  protected function doGetDelayedCount() {
282  return 0; // not implemented
283  }
284 
294  final public function getAbandonedCount() {
295  $res = $this->doGetAbandonedCount();
296 
297  return $res;
298  }
299 
304  protected function doGetAbandonedCount() {
305  return 0; // not implemented
306  }
307 
318  final public function push( $jobs, $flags = 0 ) {
319  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
320  $this->batchPush( $jobs, $flags );
321  }
322 
333  final public function batchPush( array $jobs, $flags = 0 ) {
334  $this->assertNotReadOnly();
335 
336  if ( $jobs === [] ) {
337  return; // nothing to do
338  }
339 
340  foreach ( $jobs as $job ) {
341  if ( $job->getType() !== $this->type ) {
342  throw new JobQueueError(
343  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
344  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
345  throw new JobQueueError(
346  "Got delayed '{$job->getType()}' job; delays are not supported." );
347  }
348  }
349 
350  $this->doBatchPush( $jobs, $flags );
351 
352  foreach ( $jobs as $job ) {
353  if ( $job->isRootJob() ) {
354  $this->deduplicateRootJob( $job );
355  }
356  }
357  }
358 
364  abstract protected function doBatchPush( array $jobs, $flags );
365 
374  final public function pop() {
375  $this->assertNotReadOnly();
376 
377  $job = $this->doPop();
378 
379  // Flag this job as an old duplicate based on its "root" job...
380  try {
381  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
382  $this->incrStats( 'dupe_pops', $this->type );
383  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
384  }
385  } catch ( Exception $e ) {
386  // don't lose jobs over this
387  }
388 
389  return $job;
390  }
391 
396  abstract protected function doPop();
397 
408  final public function ack( RunnableJob $job ) {
409  $this->assertNotReadOnly();
410  if ( $job->getType() !== $this->type ) {
411  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
412  }
413 
414  $this->doAck( $job );
415  }
416 
421  abstract protected function doAck( RunnableJob $job );
422 
454  final public function deduplicateRootJob( IJobSpecification $job ) {
455  $this->assertNotReadOnly();
456  if ( $job->getType() !== $this->type ) {
457  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
458  }
459 
460  return $this->doDeduplicateRootJob( $job );
461  }
462 
469  protected function doDeduplicateRootJob( IJobSpecification $job ) {
470  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
471  if ( !$params ) {
472  throw new JobQueueError( "Cannot register root job; missing parameters." );
473  }
474 
475  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
476  // Callers should call JobQueueGroup::push() before this method so that if the
477  // insert fails, the de-duplication registration will be aborted. Having only the
478  // de-duplication registration succeed would cause jobs to become no-ops without
479  // any actual jobs that made them redundant.
480  $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
481  if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
482  return true; // a newer version of this root job was enqueued
483  }
484 
485  // Update the timestamp of the last root job started at the location...
486  return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
487  }
488 
496  final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
497  if ( $job->getType() !== $this->type ) {
498  throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
499  }
500 
501  return $this->doIsRootJobOldDuplicate( $job );
502  }
503 
509  protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
510  $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
511  if ( !$params ) {
512  return false; // job has no de-deplication info
513  }
514 
515  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
516  // Get the last time this root job was enqueued
517  $timestamp = $this->wanCache->get( $key );
518  if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
519  // Update the timestamp of the last known root job started at the location...
520  $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
521  }
522 
523  // Check if a new root job was started at the location after this one's...
524  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
525  }
526 
531  protected function getRootJobCacheKey( $signature ) {
532  return $this->wanCache->makeGlobalKey(
533  'jobqueue',
534  $this->domain,
535  $this->type,
536  'rootjob',
537  $signature
538  );
539  }
540 
548  final public function delete() {
549  $this->assertNotReadOnly();
550 
551  $this->doDelete();
552  }
553 
558  protected function doDelete() {
559  throw new JobQueueError( "This method is not implemented." );
560  }
561 
570  final public function waitForBackups() {
571  $this->doWaitForBackups();
572  }
573 
578  protected function doWaitForBackups() {
579  }
580 
586  final public function flushCaches() {
587  $this->doFlushCaches();
588  }
589 
594  protected function doFlushCaches() {
595  }
596 
605  abstract public function getAllQueuedJobs();
606 
615  public function getAllDelayedJobs() {
616  return new ArrayIterator( [] ); // not implemented
617  }
618 
629  public function getAllAcquiredJobs() {
630  return new ArrayIterator( [] ); // not implemented
631  }
632 
640  public function getAllAbandonedJobs() {
641  return new ArrayIterator( [] ); // not implemented
642  }
643 
650  public function getCoalesceLocationInternal() {
651  return null;
652  }
653 
663  final public function getSiblingQueuesWithJobs( array $types ) {
664  return $this->doGetSiblingQueuesWithJobs( $types );
665  }
666 
672  protected function doGetSiblingQueuesWithJobs( array $types ) {
673  return null; // not supported
674  }
675 
686  final public function getSiblingQueueSizes( array $types ) {
687  return $this->doGetSiblingQueueSizes( $types );
688  }
689 
695  protected function doGetSiblingQueueSizes( array $types ) {
696  return null; // not supported
697  }
698 
704  protected function factoryJob( $command, $params ) {
705  // @TODO: dependency inject this as a callback
706  return Job::factory( $command, $params );
707  }
708 
712  protected function assertNotReadOnly() {
713  if ( $this->readOnlyReason !== false ) {
714  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
715  }
716  }
717 
726  protected function incrStats( $key, $type, $delta = 1 ) {
727  $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
728  $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
729  }
730 }
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:43
StatsdDataFactoryInterface $stats
Definition: JobQueue.php:45
factoryJob( $command, $params)
Definition: JobQueue.php:704
string $order
Job priority for pop()
Definition: JobQueue.php:37
const QOS_ATOMIC
Definition: JobQueue.php:50
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:333
ack(RunnableJob $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:408
doFlushCaches()
Definition: JobQueue.php:594
delayedJobsEnabled()
Definition: JobQueue.php:183
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2158
$command
Definition: cdb.php:65
assertNotReadOnly()
Definition: JobQueue.php:712
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:175
isRootJobOldDuplicate(IJobSpecification $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:496
doDelete()
Definition: JobQueue.php:558
static getWikiIdFromDbDomain( $domain)
Get the wiki ID of a database domain.
Definition: WikiMap.php:268
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:726
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:672
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:294
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:650
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:271
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition: JobQueue.php:509
WANObjectCache $wanCache
Definition: JobQueue.php:48
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:207
__construct(array $params)
Definition: JobQueue.php:66
doGetAbandonedCount()
Definition: JobQueue.php:304
string $type
Job type.
Definition: JobQueue.php:35
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
getOrder()
Definition: JobQueue.php:152
getRootJobCacheKey( $signature)
Definition: JobQueue.php:531
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:249
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:454
doBatchPush(array $jobs, $flags)
$res
Definition: database.txt:21
getReadOnlyReason()
Definition: JobQueue.php:191
doAck(RunnableJob $job)
$params
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:773
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:63
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:663
push( $jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:318
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:629
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
doWaitForBackups()
Definition: JobQueue.php:578
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:114
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:695
doGetAcquiredCount()
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count( $args)< 1) $job
static newFromJob(RunnableJob $job)
Get a duplicate no-op version of a job.
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:41
getDomain()
Definition: JobQueue.php:130
const ROOTJOB_TTL
Definition: JobQueue.php:52
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:640
Interface for serializable objects that describe a job queue task.
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:39
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:686
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:615
doGetDelayedCount()
Definition: JobQueue.php:281
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:469
waitForBackups()
Wait for any replica DBs or backup servers to catch up.
Definition: JobQueue.php:570
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:586
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:228
supportedOrders()
Get the allowed queue orders for configuration validation.
pop()
Pop a job off of the queue.
Definition: JobQueue.php:374
string $domain
DB domain ID.
Definition: JobQueue.php:33