76 if ( !isset(
$params[
'partitionsBySection'][$section] ) ) {
77 throw new InvalidArgumentException(
"No configuration for section '$section'." );
79 $this->maxPartitionsTry =
$params[
'maxPartitionsTry'] ?? 2;
81 $partitionMap =
$params[
'partitionsBySection'][$section];
82 arsort( $partitionMap, SORT_NUMERIC );
85 foreach ( [
'class',
'sectionsByWiki',
'maxPartitionsTry',
86 'partitionsBySection',
'configByPartition', ] as $o
88 unset( $baseConfig[$o] );
91 foreach ( $partitionMap as $partition => $w ) {
92 if ( !isset(
$params[
'configByPartition'][$partition] ) ) {
93 throw new InvalidArgumentException(
"No configuration for partition '$partition'." );
96 $baseConfig +
$params[
'configByPartition'][$partition] );
99 $this->partitionRing =
new HashRing( $partitionMap );
104 return [
'undefined',
'random',
'timestamp' ];
112 foreach ( $this->partitionQueues as $queue ) {
113 if ( !$queue->supportsDelayedJobs() ) {
124 foreach ( $this->partitionQueues as $queue ) {
126 $empty = $empty && $queue->doIsEmpty();
161 foreach ( $this->partitionQueues as $queue ) {
163 $count += $queue->$method();
180 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
183 }
catch ( UnexpectedValueException $e ) {
188 if ( count( $jobsLeft ) ) {
190 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
207 $uJobsByPartition = [];
209 foreach ( $jobs as $key =>
$job ) {
210 if (
$job->ignoreDuplicates() ) {
211 $sha1 = sha1( serialize(
$job->getDeduplicationInfo() ) );
213 unset( $jobs[$key] );
217 if ( $flags & self::QOS_ATOMIC ) {
218 $nuJobBatches = [ $jobs ];
223 $nuJobBatches = array_chunk( $jobs, 300 );
227 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
229 $queue = $this->partitionQueues[$partition];
232 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
239 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
241 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
246 foreach ( $nuJobBatches as $jobBatch ) {
248 $queue = $this->partitionQueues[$partition];
251 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
258 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
260 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
268 $partitionsTry = $this->partitionRing->getLiveLocationWeights();
271 while ( count( $partitionsTry ) ) {
272 $partition = ArrayUtils::pickRandom( $partitionsTry );
273 if ( $partition ===
false ) {
278 $queue = $this->partitionQueues[$partition];
280 $job = $queue->pop();
287 $job->setMetadata(
'QueuePartition', $partition );
291 unset( $partitionsTry[$partition] );
300 $partition =
$job->getMetadata(
'QueuePartition' );
301 if ( $partition ===
null ) {
302 throw new UnexpectedValueException(
"The given job has no defined partition name." );
305 $this->partitionQueues[$partition]->ack(
$job );
309 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
310 $partition = $this->partitionRing->getLiveLocation( $signature );
312 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
314 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
315 $partition = $this->partitionRing->getLiveLocation( $signature );
316 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
324 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
325 $partition = $this->partitionRing->getLiveLocation( $signature );
327 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
329 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
330 $partition = $this->partitionRing->getLiveLocation( $signature );
331 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
341 foreach ( $this->partitionQueues as $queue ) {
356 foreach ( $this->partitionQueues as $queue ) {
358 $queue->waitForBackups();
369 foreach ( $this->partitionQueues as $queue ) {
370 $queue->doFlushCaches();
375 $iterator =
new AppendIterator();
378 foreach ( $this->partitionQueues as $queue ) {
379 $iterator->append( $queue->getAllQueuedJobs() );
386 $iterator =
new AppendIterator();
389 foreach ( $this->partitionQueues as $queue ) {
390 $iterator->append( $queue->getAllDelayedJobs() );
397 $iterator =
new AppendIterator();
400 foreach ( $this->partitionQueues as $queue ) {
401 $iterator->append( $queue->getAllAcquiredJobs() );
408 $iterator =
new AppendIterator();
411 foreach ( $this->partitionQueues as $queue ) {
412 $iterator->append( $queue->getAllAbandonedJobs() );
419 return "JobQueueFederated:wiki:{$this->domain}" .
420 sha1( serialize( array_keys( $this->partitionQueues ) ) );
428 foreach ( $this->partitionQueues as $queue ) {
430 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
431 if ( is_array( $nonEmpty ) ) {
432 $result = array_unique( array_merge( $result, $nonEmpty ) );
436 if ( count( $result ) == count( $types ) ) {
446 return array_values( $result );
453 foreach ( $this->partitionQueues as $queue ) {
455 $sizes = $queue->doGetSiblingQueueSizes( $types );
456 if ( is_array( $sizes ) ) {
457 foreach ( $sizes as
$type => $size ) {
458 $result[
$type] = ( $result[
$type] ?? 0 ) + $size;
474 wfDebugLog(
'JobQueueFederated', $e->getMessage() .
"\n" . $e->getTraceAsString() );
485 if ( $down >= count( $this->partitionQueues ) ) {
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
array $params
The job parameters.
Convenience class for weighted consistent hash rings.
getLiveLocationWeights()
Get the map of "live" locations to weight (does not include zero weight items)
getLiveLocation( $item)
Get the location of an item on the "live" ring.
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Class to handle enqueueing and running of background jobs for federated queues.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doGetSiblingQueuesWithJobs(array $types)
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
doGetSiblingQueueSizes(array $types)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
doIsRootJobOldDuplicate(IJobSpecification $job)
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
doBatchPush(array $jobs, $flags)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
getCrossPartitionSum( $type, $method)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
logException(Exception $e)
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Class to handle enqueueing and running of background jobs.
static factory(array $params)
Get a job queue object of the specified type.
string $domain
DB domain ID.
Class to both describe a background job and handle jobs.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
if(count( $args)< 1) $job