80 throw new MWException(
"No configuration for section '$section'." );
82 $this->maxPartitionsTry = isset(
$params[
'maxPartitionsTry'] )
87 arsort( $partitionMap, SORT_NUMERIC );
90 foreach ( [
'class',
'sectionsByWiki',
'maxPartitionsTry',
91 'partitionsBySection',
'configByPartition', ] as $o
93 unset( $baseConfig[$o] );
96 unset( $baseConfig[
'aggregator'] );
98 foreach ( $partitionMap as $partition => $w ) {
99 if ( !isset(
$params[
'configByPartition'][$partition] ) ) {
100 throw new MWException(
"No configuration for partition '$partition'." );
103 $baseConfig +
$params[
'configByPartition'][$partition] );
106 $this->partitionRing =
new HashRing( $partitionMap );
111 return [
'undefined',
'random',
'timestamp' ];
119 foreach ( $this->partitionQueues as
$queue ) {
120 if ( !
$queue->supportsDelayedJobs() ) {
131 foreach ( $this->partitionQueues as
$queue ) {
133 $empty = $empty &&
$queue->doIsEmpty();
168 foreach ( $this->partitionQueues as
$queue ) {
170 $count +=
$queue->$method();
188 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
192 }
catch ( UnexpectedValueException
$e ) {
197 if ( count( $jobsLeft ) ) {
199 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
216 $uJobsByPartition = [];
218 foreach ( $jobs as $key =>
$job ) {
219 if (
$job->ignoreDuplicates() ) {
222 unset( $jobs[$key] );
226 if (
$flags & self::QOS_ATOMIC ) {
227 $nuJobBatches = [ $jobs ];
232 $nuJobBatches = array_chunk( $jobs, 300 );
236 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
238 $queue = $this->partitionQueues[$partition];
241 $queue->doBatchPush( $jobBatch,
$flags | self::QOS_ATOMIC );
248 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
250 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
255 foreach ( $nuJobBatches as $jobBatch ) {
257 $queue = $this->partitionQueues[$partition];
260 $queue->doBatchPush( $jobBatch,
$flags | self::QOS_ATOMIC );
267 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
269 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
277 $partitionsTry = $this->partitionRing->getLiveLocationWeights();
280 while ( count( $partitionsTry ) ) {
281 $partition = ArrayUtils::pickRandom( $partitionsTry );
282 if ( $partition ===
false ) {
287 $queue = $this->partitionQueues[$partition];
296 $job->metadata[
'QueuePartition'] = $partition;
300 unset( $partitionsTry[$partition] );
309 if ( !isset(
$job->metadata[
'QueuePartition'] ) ) {
310 throw new MWException(
"The given job has no defined partition name." );
313 $this->partitionQueues[
$job->metadata[
'QueuePartition']]->ack(
$job );
317 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
318 $partition = $this->partitionRing->getLiveLocation( $signature );
320 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
322 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
323 $partition = $this->partitionRing->getLiveLocation( $signature );
324 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
332 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
333 $partition = $this->partitionRing->getLiveLocation( $signature );
335 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
337 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
338 $partition = $this->partitionRing->getLiveLocation( $signature );
339 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
349 foreach ( $this->partitionQueues as
$queue ) {
364 foreach ( $this->partitionQueues as
$queue ) {
377 foreach ( $this->partitionQueues as
$queue ) {
383 $iterator =
new AppendIterator();
386 foreach ( $this->partitionQueues as
$queue ) {
387 $iterator->append(
$queue->getAllQueuedJobs() );
394 $iterator =
new AppendIterator();
397 foreach ( $this->partitionQueues as
$queue ) {
398 $iterator->append(
$queue->getAllDelayedJobs() );
405 $iterator =
new AppendIterator();
408 foreach ( $this->partitionQueues as
$queue ) {
409 $iterator->append(
$queue->getAllAcquiredJobs() );
416 $iterator =
new AppendIterator();
419 foreach ( $this->partitionQueues as
$queue ) {
420 $iterator->append(
$queue->getAllAbandonedJobs() );
427 return "JobQueueFederated:wiki:{$this->wiki}" .
428 sha1(
serialize( array_keys( $this->partitionQueues ) ) );
436 foreach ( $this->partitionQueues as
$queue ) {
438 $nonEmpty =
$queue->doGetSiblingQueuesWithJobs( $types );
439 if ( is_array( $nonEmpty ) ) {
440 $result = array_unique( array_merge( $result, $nonEmpty ) );
444 if ( count( $result ) == count( $types ) ) {
454 return array_values( $result );
461 foreach ( $this->partitionQueues as
$queue ) {
463 $sizes =
$queue->doGetSiblingQueueSizes( $types );
464 if ( is_array( $sizes ) ) {
465 foreach ( $sizes as
$type => $size ) {
466 $result[
$type] = isset( $result[
$type] ) ? $result[
$type] + $size : $size;
482 wfDebugLog(
'JobQueueFederated',
$e->getMessage() .
"\n" .
$e->getTraceAsString() );
493 if ( $down >= count( $this->partitionQueues ) ) {
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Convenience class for weighted consistent hash rings.
getLiveLocationWeights()
Get the map of "live" locations to weight (ignores 0-weight items)
getLiveLocation( $item)
Get the location of an item on the "live" ring.
getLiveRing()
Get the "live" hash ring (which does not include ejected locations)
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Class to handle enqueueing and running of background jobs for federated queues.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doGetSiblingQueuesWithJobs(array $types)
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
doGetSiblingQueueSizes(array $types)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
doBatchPush(array $jobs, $flags)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
getCrossPartitionSum( $type, $method)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
logException(Exception $e)
doIsRootJobOldDuplicate(Job $job)
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Class to handle enqueueing and running of background jobs.
static factory(array $params)
Get a job queue object of the specified type.
Class to both describe a background job and handle jobs.
namespace being checked & $result
it s the revision text itself In either if gzip is the revision text is gzipped $flags
usually copyright or history_copyright This message must be in HTML not wikitext if the section is included from a template $section
returning false will NOT prevent logging $e
Job queue task description interface.
if(count( $args)< 1) $job