78 throw new MWException(
"No configuration for section '$section'." );
80 $this->maxPartitionsTry =
$params[
'maxPartitionsTry'] ?? 2;
83 arsort( $partitionMap, SORT_NUMERIC );
86 foreach ( [
'class',
'sectionsByWiki',
'maxPartitionsTry',
87 'partitionsBySection',
'configByPartition', ] as $o
89 unset( $baseConfig[$o] );
92 unset( $baseConfig[
'aggregator'] );
94 foreach ( $partitionMap as $partition => $w ) {
95 if ( !isset(
$params[
'configByPartition'][$partition] ) ) {
96 throw new MWException(
"No configuration for partition '$partition'." );
99 $baseConfig +
$params[
'configByPartition'][$partition] );
102 $this->partitionRing =
new HashRing( $partitionMap );
107 return [
'undefined',
'random',
'timestamp' ];
115 foreach ( $this->partitionQueues as
$queue ) {
116 if ( !
$queue->supportsDelayedJobs() ) {
127 foreach ( $this->partitionQueues as
$queue ) {
129 $empty = $empty &&
$queue->doIsEmpty();
164 foreach ( $this->partitionQueues as
$queue ) {
166 $count +=
$queue->$method();
184 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
187 }
catch ( UnexpectedValueException
$e ) {
192 if ( count( $jobsLeft ) ) {
194 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
211 $uJobsByPartition = [];
213 foreach ( $jobs as $key =>
$job ) {
214 if (
$job->ignoreDuplicates() ) {
217 unset( $jobs[$key] );
221 if ( $flags & self::QOS_ATOMIC ) {
222 $nuJobBatches = [ $jobs ];
227 $nuJobBatches = array_chunk( $jobs, 300 );
231 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
233 $queue = $this->partitionQueues[$partition];
236 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
243 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
245 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
250 foreach ( $nuJobBatches as $jobBatch ) {
252 $queue = $this->partitionQueues[$partition];
255 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
262 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
264 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
272 $partitionsTry = $this->partitionRing->getLiveLocationWeights();
275 while ( count( $partitionsTry ) ) {
276 $partition = ArrayUtils::pickRandom( $partitionsTry );
277 if ( $partition ===
false ) {
282 $queue = $this->partitionQueues[$partition];
291 $job->metadata[
'QueuePartition'] = $partition;
295 unset( $partitionsTry[$partition] );
304 if ( !isset(
$job->metadata[
'QueuePartition'] ) ) {
305 throw new MWException(
"The given job has no defined partition name." );
308 $this->partitionQueues[
$job->metadata[
'QueuePartition']]->ack(
$job );
312 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
313 $partition = $this->partitionRing->getLiveLocation( $signature );
315 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
317 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
318 $partition = $this->partitionRing->getLiveLocation( $signature );
319 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
327 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
328 $partition = $this->partitionRing->getLiveLocation( $signature );
330 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
332 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
333 $partition = $this->partitionRing->getLiveLocation( $signature );
334 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
344 foreach ( $this->partitionQueues as
$queue ) {
359 foreach ( $this->partitionQueues as
$queue ) {
372 foreach ( $this->partitionQueues as
$queue ) {
378 $iterator =
new AppendIterator();
381 foreach ( $this->partitionQueues as
$queue ) {
382 $iterator->append(
$queue->getAllQueuedJobs() );
389 $iterator =
new AppendIterator();
392 foreach ( $this->partitionQueues as
$queue ) {
393 $iterator->append(
$queue->getAllDelayedJobs() );
400 $iterator =
new AppendIterator();
403 foreach ( $this->partitionQueues as
$queue ) {
404 $iterator->append(
$queue->getAllAcquiredJobs() );
411 $iterator =
new AppendIterator();
414 foreach ( $this->partitionQueues as
$queue ) {
415 $iterator->append(
$queue->getAllAbandonedJobs() );
422 return "JobQueueFederated:wiki:{$this->wiki}" .
423 sha1(
serialize( array_keys( $this->partitionQueues ) ) );
431 foreach ( $this->partitionQueues as
$queue ) {
433 $nonEmpty =
$queue->doGetSiblingQueuesWithJobs( $types );
434 if ( is_array( $nonEmpty ) ) {
435 $result = array_unique( array_merge( $result, $nonEmpty ) );
439 if ( count( $result ) == count( $types ) ) {
449 return array_values( $result );
456 foreach ( $this->partitionQueues as
$queue ) {
458 $sizes =
$queue->doGetSiblingQueueSizes( $types );
459 if ( is_array( $sizes ) ) {
460 foreach ( $sizes as
$type => $size ) {
461 $result[
$type] = isset( $result[
$type] ) ? $result[
$type] + $size : $size;
477 wfDebugLog(
'JobQueueFederated',
$e->getMessage() .
"\n" .
$e->getTraceAsString() );
488 if ( $down >= count( $this->partitionQueues ) ) {
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Convenience class for weighted consistent hash rings.
getLiveLocationWeights()
Get the map of "live" locations to weight (does not include zero weight items)
getLiveLocation( $item)
Get the location of an item on the "live" ring.
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Class to handle enqueueing and running of background jobs for federated queues.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doGetSiblingQueuesWithJobs(array $types)
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
doGetSiblingQueueSizes(array $types)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
doBatchPush(array $jobs, $flags)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
getCrossPartitionSum( $type, $method)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
logException(Exception $e)
doIsRootJobOldDuplicate(Job $job)
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Class to handle enqueueing and running of background jobs.
static factory(array $params)
Get a job queue object of the specified type.
Class to both describe a background job and handle jobs.
namespace being checked & $result
usually copyright or history_copyright This message must be in HTML not wikitext if the section is included from a template $section
returning false will NOT prevent logging $e
Job queue task description interface.
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
if(count( $args)< 1) $job