78 throw new MWException(
"No configuration for section '$section'." );
80 $this->maxPartitionsTry =
$params[
'maxPartitionsTry'] ?? 2;
83 arsort( $partitionMap, SORT_NUMERIC );
86 foreach ( [
'class',
'sectionsByWiki',
'maxPartitionsTry',
87 'partitionsBySection',
'configByPartition', ] as $o
89 unset( $baseConfig[$o] );
92 unset( $baseConfig[
'aggregator'] );
94 foreach ( $partitionMap as $partition => $w ) {
95 if ( !
isset(
$params[
'configByPartition'][$partition] ) ) {
96 throw new MWException(
"No configuration for partition '$partition'." );
99 $baseConfig +
$params[
'configByPartition'][$partition] );
102 $this->partitionRing =
new HashRing( $partitionMap );
107 return [
'undefined',
'random',
'timestamp' ];
115 foreach ( $this->partitionQueues as
$queue ) {
116 if ( !
$queue->supportsDelayedJobs() ) {
127 foreach ( $this->partitionQueues as
$queue ) {
129 $empty = $empty &&
$queue->doIsEmpty();
164 foreach ( $this->partitionQueues as
$queue ) {
166 $count +=
$queue->$method();
183 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --
$i ) {
186 }
catch ( UnexpectedValueException
$e ) {
191 if ( count( $jobsLeft ) ) {
193 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
210 $uJobsByPartition = [];
212 foreach ( $jobs as $key =>
$job ) {
213 if (
$job->ignoreDuplicates() ) {
216 unset( $jobs[$key] );
220 if ( $flags & self::QOS_ATOMIC ) {
221 $nuJobBatches = [
$jobs ];
230 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
235 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
242 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
249 foreach ( $nuJobBatches as $jobBatch ) {
254 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
261 throw new JobQueueError(
"Could not insert job(s), no partitions available." );
271 $partitionsTry = $this->partitionRing->getLiveLocationWeights();
274 while ( count( $partitionsTry ) ) {
275 $partition = ArrayUtils::pickRandom( $partitionsTry );
276 if ( $partition ===
false ) {
290 $job->setMetadata(
'QueuePartition', $partition );
294 unset( $partitionsTry[$partition] );
303 $partition =
$job->getMetadata(
'QueuePartition' );
304 if ( $partition ===
null ) {
305 throw new MWException(
"The given job has no defined partition name." );
312 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
313 $partition = $this->partitionRing->getLiveLocation( $signature );
315 return $this->partitionQueues[
$partition]->doIsRootJobOldDuplicate(
$job );
317 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
318 $partition = $this->partitionRing->getLiveLocation( $signature );
319 return $this->partitionQueues[
$partition]->doIsRootJobOldDuplicate(
$job );
327 $signature =
$job->getRootJobParams()[
'rootJobSignature'];
328 $partition = $this->partitionRing->getLiveLocation( $signature );
330 return $this->partitionQueues[
$partition]->doDeduplicateRootJob(
$job );
332 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
333 $partition = $this->partitionRing->getLiveLocation( $signature );
334 return $this->partitionQueues[
$partition]->doDeduplicateRootJob(
$job );
344 foreach ( $this->partitionQueues as
$queue ) {
359 foreach ( $this->partitionQueues as
$queue ) {
372 foreach ( $this->partitionQueues as
$queue ) {
381 foreach ( $this->partitionQueues as
$queue ) {
382 $iterator->append(
$queue->getAllQueuedJobs() );
392 foreach ( $this->partitionQueues as
$queue ) {
393 $iterator->append(
$queue->getAllDelayedJobs() );
403 foreach ( $this->partitionQueues as
$queue ) {
404 $iterator->append(
$queue->getAllAcquiredJobs() );
414 foreach ( $this->partitionQueues as
$queue ) {
415 $iterator->append(
$queue->getAllAbandonedJobs() );
422 return "JobQueueFederated:wiki:{$this->domain}" .
431 foreach ( $this->partitionQueues as
$queue ) {
433 $nonEmpty =
$queue->doGetSiblingQueuesWithJobs( $types );
439 if ( count( $result ) == count( $types ) ) {
456 foreach ( $this->partitionQueues as
$queue ) {
458 $sizes =
$queue->doGetSiblingQueueSizes( $types );
460 foreach ( $sizes as
$type => $size ) {
477 wfDebugLog(
'JobQueueFederated',
$e->getMessage() .
"\n" .
$e->getTraceAsString() );
488 if ( $down >= count( $this->partitionQueues ) ) {
and that you know you can do these things To protect your we need to make restrictions that forbid anyone to deny you these rights or to ask you to surrender the rights These restrictions translate to certain responsibilities for you if you distribute copies of the or if you modify it For if you distribute copies of such a whether gratis or for a you must give the recipients all the rights that you have You must make sure that receive or can get the source code And you must show them these terms so they know their rights We protect your rights with two and(2) offer you this license which gives you legal permission to copy
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Convenience class for weighted consistent hash rings.
getLiveLocationWeights()
Get the map of "live" locations to weight (does not include zero weight items)
getLiveLocation( $item)
Get the location of an item on the "live" ring.
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Class to handle enqueueing and running of background jobs for federated queues.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doGetSiblingQueuesWithJobs(array $types)
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
doGetSiblingQueueSizes(array $types)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
doBatchPush(array $jobs, $flags)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
getCrossPartitionSum( $type, $method)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
logException(Exception $e)
doIsRootJobOldDuplicate(Job $job)
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Class to handle enqueueing and running of background jobs.
static factory(array $params)
Get a job queue object of the specified type.
string $domain
DB domain ID.
Class to both describe a background job and handle jobs.
namespace being checked & $result
usually copyright or history_copyright This message must be in HTML not wikitext if the section is included from a template $section
returning false will NOT prevent logging $e
Job queue task description interface.
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
if(count( $args)< 1) $job