75 parent::__construct( $params );
76 $section = $params[
'sectionsByWiki'][
$this->domain] ??
'default';
77 if ( !isset( $params[
'partitionsBySection'][$section] ) ) {
78 throw new MWException(
"No configuration for section '$section'." );
80 $this->maxPartitionsTry = $params[
'maxPartitionsTry'] ?? 2;
82 $partitionMap = $params[
'partitionsBySection'][$section];
83 arsort( $partitionMap, SORT_NUMERIC );
85 $baseConfig = $params;
86 foreach ( [
'class',
'sectionsByWiki',
'maxPartitionsTry',
87 'partitionsBySection',
'configByPartition', ] as $o
89 unset( $baseConfig[$o] );
92 foreach ( $partitionMap as $partition => $w ) {
93 if ( !isset( $params[
'configByPartition'][$partition] ) ) {
94 throw new MWException(
"No configuration for partition '$partition'." );
97 $baseConfig + $params[
'configByPartition'][$partition] );
100 $this->partitionRing =
new HashRing( $partitionMap );
105 return [
'undefined',
'random',
'timestamp' ];
113 foreach ( $this->partitionQueues as
$queue ) {
114 if ( !
$queue->supportsDelayedJobs() ) {
125 foreach ( $this->partitionQueues as
$queue ) {
127 $empty = $empty &&
$queue->doIsEmpty();
162 foreach ( $this->partitionQueues as
$queue ) {
164 $count +=
$queue->$method();
181 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
184 }
catch ( UnexpectedValueException $e ) {
189 if ( count( $jobsLeft ) ) {
191 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
208 $uJobsByPartition = [];
210 foreach ( $jobs as $key =>
$job ) {
211 if (
$job->ignoreDuplicates() ) {
214 unset( $jobs[$key] );
218 if ( $flags & self::QOS_ATOMIC ) {
219 $nuJobBatches = [ $jobs ];
224 $nuJobBatches = array_chunk( $jobs, 300 );
228 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
230 $queue = $this->partitionQueues[$partition];
233 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
240 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
242 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
247 foreach ( $nuJobBatches as $jobBatch ) {
249 $queue = $this->partitionQueues[$partition];
252 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
259 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
261 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
269 $partitionsTry = $this->partitionRing->getLiveLocationWeights();
272 while ( count( $partitionsTry ) ) {
274 if ( $partition ===
false ) {
279 $queue = $this->partitionQueues[$partition];
288 $job->setMetadata(
'QueuePartition', $partition );
292 unset( $partitionsTry[$partition] );
301 $partition =
$job->getMetadata(
'QueuePartition' );
302 if ( $partition ===
null ) {
303 throw new MWException(
"The given job has no defined partition name." );
306 $this->partitionQueues[$partition]->ack(
$job );
310 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
311 $partition = $this->partitionRing->getLiveLocation( $signature );
313 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
315 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
316 $partition = $this->partitionRing->getLiveLocation( $signature );
317 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
325 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
326 $partition = $this->partitionRing->getLiveLocation( $signature );
328 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
330 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
331 $partition = $this->partitionRing->getLiveLocation( $signature );
332 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
342 foreach ( $this->partitionQueues as
$queue ) {
357 foreach ( $this->partitionQueues as
$queue ) {
370 foreach ( $this->partitionQueues as
$queue ) {
376 $iterator =
new AppendIterator();
379 foreach ( $this->partitionQueues as
$queue ) {
380 $iterator->append(
$queue->getAllQueuedJobs() );
387 $iterator =
new AppendIterator();
390 foreach ( $this->partitionQueues as
$queue ) {
391 $iterator->append(
$queue->getAllDelayedJobs() );
398 $iterator =
new AppendIterator();
401 foreach ( $this->partitionQueues as
$queue ) {
402 $iterator->append(
$queue->getAllAcquiredJobs() );
409 $iterator =
new AppendIterator();
412 foreach ( $this->partitionQueues as
$queue ) {
413 $iterator->append(
$queue->getAllAbandonedJobs() );
420 return "JobQueueFederated:wiki:{$this->domain}" .
421 sha1(
serialize( array_keys( $this->partitionQueues ) ) );
429 foreach ( $this->partitionQueues as
$queue ) {
431 $nonEmpty =
$queue->doGetSiblingQueuesWithJobs( $types );
432 if ( is_array( $nonEmpty ) ) {
433 $result = array_unique( array_merge( $result, $nonEmpty ) );
437 if ( count( $result ) == count( $types ) ) {
447 return array_values( $result );
454 foreach ( $this->partitionQueues as
$queue ) {
456 $sizes =
$queue->doGetSiblingQueueSizes( $types );
457 if ( is_array( $sizes ) ) {
458 foreach ( $sizes as
$type => $size ) {
459 $result[
$type] = isset( $result[
$type] ) ? $result[
$type] + $size : $size;
475 wfDebugLog(
'JobQueueFederated', $e->getMessage() .
"\n" . $e->getTraceAsString() );
486 if ( $down >= count( $this->partitionQueues ) ) {