80 throw new MWException(
"No configuration for section '$section'." );
82 $this->maxPartitionsTry = isset(
$params[
'maxPartitionsTry'] )
87 arsort( $partitionMap, SORT_NUMERIC );
90 foreach ( [
'class',
'sectionsByWiki',
'maxPartitionsTry',
91 'partitionsBySection',
'configByPartition', ] as $o
93 unset( $baseConfig[$o] );
96 unset( $baseConfig[
'aggregator'] );
98 foreach ( $partitionMap as $partition => $w ) {
99 if ( !isset(
$params[
'configByPartition'][$partition] ) ) {
100 throw new MWException(
"No configuration for partition '$partition'." );
103 $baseConfig +
$params[
'configByPartition'][$partition] );
106 $this->partitionRing =
new HashRing( $partitionMap );
111 return [
'undefined',
'random',
'timestamp' ];
119 foreach ( $this->partitionQueues as
$queue ) {
120 if ( !
$queue->supportsDelayedJobs() ) {
131 foreach ( $this->partitionQueues as
$queue ) {
133 $empty = $empty &&
$queue->doIsEmpty();
168 foreach ( $this->partitionQueues as
$queue ) {
170 $count +=
$queue->$method();
188 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
191 }
catch ( UnexpectedValueException
$e ) {
196 if ( count( $jobsLeft ) ) {
198 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
215 $uJobsByPartition = [];
217 foreach ( $jobs as $key =>
$job ) {
218 if (
$job->ignoreDuplicates() ) {
221 unset( $jobs[$key] );
225 if ( $flags & self::QOS_ATOMIC ) {
226 $nuJobBatches = [ $jobs ];
231 $nuJobBatches = array_chunk( $jobs, 300 );
235 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
237 $queue = $this->partitionQueues[$partition];
240 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
247 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
249 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
254 foreach ( $nuJobBatches as $jobBatch ) {
256 $queue = $this->partitionQueues[$partition];
259 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
266 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
268 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
276 $partitionsTry = $this->partitionRing->getLiveLocationWeights();
279 while ( count( $partitionsTry ) ) {
280 $partition = ArrayUtils::pickRandom( $partitionsTry );
281 if ( $partition ===
false ) {
286 $queue = $this->partitionQueues[$partition];
295 $job->metadata[
'QueuePartition'] = $partition;
299 unset( $partitionsTry[$partition] );
308 if ( !isset(
$job->metadata[
'QueuePartition'] ) ) {
309 throw new MWException(
"The given job has no defined partition name." );
312 $this->partitionQueues[
$job->metadata[
'QueuePartition']]->ack(
$job );
316 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
317 $partition = $this->partitionRing->getLiveLocation( $signature );
319 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
321 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
322 $partition = $this->partitionRing->getLiveLocation( $signature );
323 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
331 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
332 $partition = $this->partitionRing->getLiveLocation( $signature );
334 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
336 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
337 $partition = $this->partitionRing->getLiveLocation( $signature );
338 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
348 foreach ( $this->partitionQueues as
$queue ) {
363 foreach ( $this->partitionQueues as
$queue ) {
376 foreach ( $this->partitionQueues as
$queue ) {
382 $iterator =
new AppendIterator();
385 foreach ( $this->partitionQueues as
$queue ) {
386 $iterator->append(
$queue->getAllQueuedJobs() );
393 $iterator =
new AppendIterator();
396 foreach ( $this->partitionQueues as
$queue ) {
397 $iterator->append(
$queue->getAllDelayedJobs() );
404 $iterator =
new AppendIterator();
407 foreach ( $this->partitionQueues as
$queue ) {
408 $iterator->append(
$queue->getAllAcquiredJobs() );
415 $iterator =
new AppendIterator();
418 foreach ( $this->partitionQueues as
$queue ) {
419 $iterator->append(
$queue->getAllAbandonedJobs() );
426 return "JobQueueFederated:wiki:{$this->wiki}" .
427 sha1(
serialize( array_keys( $this->partitionQueues ) ) );
435 foreach ( $this->partitionQueues as
$queue ) {
437 $nonEmpty =
$queue->doGetSiblingQueuesWithJobs( $types );
438 if ( is_array( $nonEmpty ) ) {
439 $result = array_unique( array_merge( $result, $nonEmpty ) );
443 if ( count( $result ) == count( $types ) ) {
453 return array_values( $result );
460 foreach ( $this->partitionQueues as
$queue ) {
462 $sizes =
$queue->doGetSiblingQueueSizes( $types );
463 if ( is_array( $sizes ) ) {
464 foreach ( $sizes as
$type => $size ) {
465 $result[
$type] = isset( $result[
$type] ) ? $result[
$type] + $size : $size;
481 wfDebugLog(
'JobQueueFederated',
$e->getMessage() .
"\n" .
$e->getTraceAsString() );
492 if ( $down >= count( $this->partitionQueues ) ) {
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Convenience class for weighted consistent hash rings.
getLiveLocationWeights()
Get the map of "live" locations to weight (ignores 0-weight items)
getLiveLocation( $item)
Get the location of an item on the "live" ring.
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Class to handle enqueueing and running of background jobs for federated queues.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doGetSiblingQueuesWithJobs(array $types)
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
doGetSiblingQueueSizes(array $types)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
doBatchPush(array $jobs, $flags)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
getCrossPartitionSum( $type, $method)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
logException(Exception $e)
doIsRootJobOldDuplicate(Job $job)
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Class to handle enqueueing and running of background jobs.
static factory(array $params)
Get a job queue object of the specified type.
Class to both describe a background job and handle jobs.
namespace being checked & $result
usually copyright or history_copyright This message must be in HTML not wikitext if the section is included from a template $section
returning false will NOT prevent logging $e
Job queue task description interface.
if(count( $args)< 1) $job