75 parent::__construct( $params );
76 $section = $params[
'sectionsByWiki'][
$this->domain] ??
'default';
77 if ( !isset( $params[
'partitionsBySection'][$section] ) ) {
78 throw new MWException(
"No configuration for section '$section'." );
80 $this->maxPartitionsTry = $params[
'maxPartitionsTry'] ?? 2;
82 $partitionMap = $params[
'partitionsBySection'][$section];
83 arsort( $partitionMap, SORT_NUMERIC );
85 $baseConfig = $params;
86 foreach ( [
'class',
'sectionsByWiki',
'maxPartitionsTry',
87 'partitionsBySection',
'configByPartition', ] as $o
89 unset( $baseConfig[$o] );
92 foreach ( $partitionMap as $partition => $w ) {
93 if ( !isset( $params[
'configByPartition'][$partition] ) ) {
94 throw new MWException(
"No configuration for partition '$partition'." );
97 $baseConfig + $params[
'configByPartition'][$partition] );
100 $this->partitionRing =
new HashRing( $partitionMap );
105 return [
'undefined',
'random',
'timestamp' ];
113 foreach ( $this->partitionQueues as
$queue ) {
114 if ( !
$queue->supportsDelayedJobs() ) {
125 foreach ( $this->partitionQueues as
$queue ) {
127 $empty = $empty &&
$queue->doIsEmpty();
162 foreach ( $this->partitionQueues as
$queue ) {
164 $count +=
$queue->$method();
181 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
184 }
catch ( UnexpectedValueException $e ) {
189 if ( count( $jobsLeft ) ) {
191 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
208 $uJobsByPartition = [];
210 foreach ( $jobs as $key =>
$job ) {
211 if (
$job->ignoreDuplicates() ) {
214 unset( $jobs[$key] );
218 if ( $flags & self::QOS_ATOMIC ) {
219 $nuJobBatches = [ $jobs ];
224 $nuJobBatches = array_chunk( $jobs, 300 );
228 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
230 $queue = $this->partitionQueues[$partition];
233 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
240 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
242 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
247 foreach ( $nuJobBatches as $jobBatch ) {
249 $queue = $this->partitionQueues[$partition];
252 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
259 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
261 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
269 $partitionsTry = $this->partitionRing->getLiveLocationWeights();
272 while ( count( $partitionsTry ) ) {
273 $partition = ArrayUtils::pickRandom( $partitionsTry );
274 if ( $partition ===
false ) {
279 $queue = $this->partitionQueues[$partition];
288 $job->setMetadata(
'QueuePartition', $partition );
292 unset( $partitionsTry[$partition] );
301 $partition =
$job->getMetadata(
'QueuePartition' );
302 if ( $partition ===
null ) {
303 throw new MWException(
"The given job has no defined partition name." );
306 $this->partitionQueues[$partition]->ack(
$job );
310 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
311 $partition = $this->partitionRing->getLiveLocation( $signature );
313 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
315 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
316 $partition = $this->partitionRing->getLiveLocation( $signature );
317 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
325 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
326 $partition = $this->partitionRing->getLiveLocation( $signature );
328 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
330 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
331 $partition = $this->partitionRing->getLiveLocation( $signature );
332 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
342 foreach ( $this->partitionQueues as
$queue ) {
357 foreach ( $this->partitionQueues as
$queue ) {
370 foreach ( $this->partitionQueues as
$queue ) {
376 $iterator =
new AppendIterator();
379 foreach ( $this->partitionQueues as
$queue ) {
380 $iterator->append(
$queue->getAllQueuedJobs() );
387 $iterator =
new AppendIterator();
390 foreach ( $this->partitionQueues as
$queue ) {
391 $iterator->append(
$queue->getAllDelayedJobs() );
398 $iterator =
new AppendIterator();
401 foreach ( $this->partitionQueues as
$queue ) {
402 $iterator->append(
$queue->getAllAcquiredJobs() );
409 $iterator =
new AppendIterator();
412 foreach ( $this->partitionQueues as
$queue ) {
413 $iterator->append(
$queue->getAllAbandonedJobs() );
420 return "JobQueueFederated:wiki:{$this->domain}" .
421 sha1(
serialize( array_keys( $this->partitionQueues ) ) );
429 foreach ( $this->partitionQueues as
$queue ) {
431 $nonEmpty =
$queue->doGetSiblingQueuesWithJobs( $types );
432 if ( is_array( $nonEmpty ) ) {
433 $result = array_unique( array_merge( $result, $nonEmpty ) );
437 if ( count( $result ) == count( $types ) ) {
447 return array_values( $result );
454 foreach ( $this->partitionQueues as
$queue ) {
456 $sizes =
$queue->doGetSiblingQueueSizes( $types );
457 if ( is_array( $sizes ) ) {
458 foreach ( $sizes as
$type => $size ) {
459 $result[
$type] = ( $result[
$type] ?? 0 ) + $size;
475 wfDebugLog(
'JobQueueFederated', $e->getMessage() .
"\n" . $e->getTraceAsString() );
486 if ( $down >= count( $this->partitionQueues ) ) {
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Convenience class for weighted consistent hash rings.
getLiveLocationWeights()
Get the map of "live" locations to weight (does not include zero weight items)
getLiveLocation( $item)
Get the location of an item on the "live" ring.
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Class to handle enqueueing and running of background jobs for federated queues.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doGetSiblingQueuesWithJobs(array $types)
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
doGetSiblingQueueSizes(array $types)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
doIsRootJobOldDuplicate(IJobSpecification $job)
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
doBatchPush(array $jobs, $flags)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
getCrossPartitionSum( $type, $method)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
logException(Exception $e)
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Class to handle enqueueing and running of background jobs.
static factory(array $params)
Get a job queue object of the specified type.
string $domain
DB domain ID.
Class to both describe a background job and handle jobs.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
if(count( $args)< 1) $job