11use InvalidArgumentException;
13use UnexpectedValueException;
68 parent::__construct( $params );
69 $section = $params[
'sectionsByWiki'][
$this->domain] ??
'default';
70 if ( !isset( $params[
'partitionsBySection'][$section] ) ) {
71 throw new InvalidArgumentException(
"No configuration for section '$section'." );
73 $this->maxPartitionsTry = $params[
'maxPartitionsTry'] ?? 2;
75 $partitionMap = $params[
'partitionsBySection'][$section];
76 arsort( $partitionMap, SORT_NUMERIC );
78 $baseConfig = $params;
79 foreach ( [
'class',
'sectionsByWiki',
'maxPartitionsTry',
80 'partitionsBySection',
'configByPartition', ] as $o
82 unset( $baseConfig[$o] );
85 foreach ( $partitionMap as $partition => $w ) {
86 if ( !isset( $params[
'configByPartition'][$partition] ) ) {
87 throw new InvalidArgumentException(
"No configuration for partition '$partition'." );
90 $baseConfig + $params[
'configByPartition'][$partition] );
93 $this->partitionRing =
new HashRing( $partitionMap );
99 return [
'undefined',
'random',
'timestamp' ];
109 foreach ( $this->partitionQueues as $queue ) {
110 if ( !$queue->supportsDelayedJobs() ) {
122 foreach ( $this->partitionQueues as $queue ) {
124 $empty = $empty && $queue->doIsEmpty();
163 foreach ( $this->partitionQueues as $queue ) {
165 $count += $queue->$method();
183 for ( $i = $this->maxPartitionsTry; $i-- && $jobsLeft; ) {
186 }
catch ( UnexpectedValueException ) {
193 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
210 $uJobsByPartition = [];
212 foreach ( $jobs as $key =>
$job ) {
213 if (
$job->ignoreDuplicates() ) {
214 $sha1 = sha1( serialize(
$job->getDeduplicationInfo() ) );
216 unset( $jobs[$key] );
220 if ( $flags & self::QOS_ATOMIC ) {
221 $nuJobBatches = [ $jobs ];
226 $nuJobBatches = array_chunk( $jobs, 300 );
230 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
232 $queue = $this->partitionQueues[$partition];
235 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
242 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
244 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
249 foreach ( $nuJobBatches as $jobBatch ) {
251 $queue = $this->partitionQueues[$partition];
254 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
261 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
263 $jobsLeft = array_merge( $jobsLeft, $jobBatch );
272 $partitionsTry = $this->partitionRing->getLiveLocationWeights();
275 while ( count( $partitionsTry ) ) {
276 $partition = ArrayUtils::pickRandom( $partitionsTry );
277 if ( $partition ===
false ) {
282 $queue = $this->partitionQueues[$partition];
284 $job = $queue->pop();
291 $job->setMetadata(
'QueuePartition', $partition );
295 unset( $partitionsTry[$partition] );
305 $partition =
$job->getMetadata(
'QueuePartition' );
306 if ( $partition ===
null ) {
307 throw new UnexpectedValueException(
"The given job has no defined partition name." );
310 $this->partitionQueues[$partition]->ack(
$job );
315 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
316 $partition = $this->partitionRing->getLiveLocation( $signature );
318 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
320 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
321 $partition = $this->partitionRing->getLiveLocation( $signature );
322 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate(
$job );
331 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
332 $partition = $this->partitionRing->getLiveLocation( $signature );
334 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
336 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
337 $partition = $this->partitionRing->getLiveLocation( $signature );
338 return $this->partitionQueues[$partition]->doDeduplicateRootJob(
$job );
349 foreach ( $this->partitionQueues as $queue ) {
365 foreach ( $this->partitionQueues as $queue ) {
367 $queue->waitForBackups();
379 foreach ( $this->partitionQueues as $queue ) {
380 $queue->doFlushCaches();
386 $iterator =
new AppendIterator();
389 foreach ( $this->partitionQueues as $queue ) {
390 $iterator->append( $queue->getAllQueuedJobs() );
398 $iterator =
new AppendIterator();
401 foreach ( $this->partitionQueues as $queue ) {
402 $iterator->append( $queue->getAllDelayedJobs() );
410 $iterator =
new AppendIterator();
413 foreach ( $this->partitionQueues as $queue ) {
414 $iterator->append( $queue->getAllAcquiredJobs() );
422 $iterator =
new AppendIterator();
425 foreach ( $this->partitionQueues as $queue ) {
426 $iterator->append( $queue->getAllAbandonedJobs() );
434 return "JobQueueFederated:wiki:{$this->domain}" .
435 sha1( serialize( array_keys( $this->partitionQueues ) ) );
444 foreach ( $this->partitionQueues as $queue ) {
446 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
447 if ( is_array( $nonEmpty ) ) {
448 $result = array_unique( array_merge( $result, $nonEmpty ) );
452 if ( count( $result ) == count( $types ) ) {
462 return array_values( $result );
470 foreach ( $this->partitionQueues as $queue ) {
472 $sizes = $queue->doGetSiblingQueueSizes( $types );
473 if ( is_array( $sizes ) ) {
474 foreach ( $sizes as
$type => $size ) {
475 $result[
$type] = ( $result[
$type] ?? 0 ) + $size;
491 wfDebugLog(
'JobQueue', $e->getMessage() .
"\n" . $e->getTraceAsString() );
502 if ( $down >= count( $this->partitionQueues ) ) {
509class_alias( JobQueueFederated::class,
'JobQueueFederated' );
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
if(count( $args)< 1) $job