Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
0.00% |
0 / 193 |
|
0.00% |
0 / 28 |
CRAP | |
0.00% |
0 / 1 |
| JobQueueFederated | |
0.00% |
0 / 192 |
|
0.00% |
0 / 28 |
6642 | |
0.00% |
0 / 1 |
| __construct | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
30 | |||
| supportedOrders | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| optimalOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| supportsDelayedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
| doIsEmpty | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
| doGetSize | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| doGetAcquiredCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| doGetDelayedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| doGetAbandonedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| getCrossPartitionSum | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
| doBatchPush | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
30 | |||
| tryJobInsertions | |
0.00% |
0 / 34 |
|
0.00% |
0 / 1 |
156 | |||
| doPop | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
30 | |||
| doAck | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
| doIsRootJobOldDuplicate | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
| doDeduplicateRootJob | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
| doDelete | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
| doWaitForBackups | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
| doFlushCaches | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
| getAllQueuedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
| getAllDelayedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
| getAllAcquiredJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
| getAllAbandonedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
| getCoalesceLocationInternal | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
| doGetSiblingQueuesWithJobs | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
30 | |||
| doGetSiblingQueueSizes | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
30 | |||
| logException | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| throwErrorIfAllPartitionsDown | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
| 1 | <?php |
| 2 | /** |
| 3 | * @license GPL-2.0-or-later |
| 4 | * @file |
| 5 | */ |
| 6 | |
| 7 | namespace MediaWiki\JobQueue; |
| 8 | |
| 9 | use AppendIterator; |
| 10 | use Exception; |
| 11 | use InvalidArgumentException; |
| 12 | use MediaWiki\JobQueue\Exceptions\JobQueueError; |
| 13 | use UnexpectedValueException; |
| 14 | use Wikimedia\ArrayUtils\ArrayUtils; |
| 15 | use Wikimedia\HashRing\HashRing; |
| 16 | |
| 17 | /** |
| 18 | * Enqueue and run background jobs via a federated queue, for wiki farms. |
| 19 | * |
| 20 | * This class allows for queues to be partitioned into smaller queues. |
| 21 | * A partition is defined by the configuration for a JobQueue instance. |
| 22 | * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a |
| 23 | * JobQueueFederated instance, which itself would consist of three JobQueueRedis |
| 24 | * instances, each using their own redis server. This would allow for the jobs |
| 25 | * to be split (evenly or based on weights) across multiple servers if a single |
| 26 | * server becomes impractical or expensive. Different JobQueue classes can be mixed. |
| 27 | * |
| 28 | * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue |
| 29 | * is inherited by the partition queues. Additional configuration defines what |
| 30 | * section each wiki is in, what partition queues each section uses (and their weight), |
| 31 | * and the JobQueue configuration for each partition. Some sections might only need a |
| 32 | * single queue partition, like the sections for groups of small wikis. |
| 33 | * |
| 34 | * If used for performance, then $wgMainCacheType should be set to memcached/redis. |
| 35 | * Note that "fifo" cannot be used for the ordering, since the data is distributed. |
| 36 | * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, |
| 37 | * queue classes used by this should ignore down servers (with TTL) to avoid slowness. |
| 38 | * |
| 39 | * @since 1.22 |
| 40 | * @ingroup JobQueue |
| 41 | */ |
| 42 | class JobQueueFederated extends JobQueue { |
| 43 | /** @var HashRing */ |
| 44 | protected $partitionRing; |
| 45 | /** @var JobQueue[] (partition name => JobQueue) reverse sorted by weight */ |
| 46 | protected $partitionQueues = []; |
| 47 | |
| 48 | /** @var int Maximum number of partitions to try */ |
| 49 | protected $maxPartitionsTry; |
| 50 | |
| 51 | /** |
| 52 | * @param array $params Possible keys: |
| 53 | * - sectionsByWiki : A map of wiki IDs to section names. |
| 54 | * Wikis will default to using the section "default". |
| 55 | * - partitionsBySection : Map of section names to maps of (partition name => weight). |
| 56 | * A section called 'default' must be defined if not all wikis |
| 57 | * have explicitly defined sections. |
| 58 | * - configByPartition : Map of queue partition names to configuration arrays. |
| 59 | * These configuration arrays are passed to JobQueue::factory(). |
| 60 | * The options set here are overridden by those passed to this |
| 61 | * the federated queue itself (e.g. 'order' and 'claimTTL'). |
| 62 | * - maxPartitionsTry : Maximum number of times to attempt job insertion using |
| 63 | * different partition queues. This improves availability |
| 64 | * during failure, at the cost of added latency and somewhat |
| 65 | * less reliable job de-duplication mechanisms. |
| 66 | */ |
| 67 | protected function __construct( array $params ) { |
| 68 | parent::__construct( $params ); |
| 69 | $section = $params['sectionsByWiki'][$this->domain] ?? 'default'; |
| 70 | if ( !isset( $params['partitionsBySection'][$section] ) ) { |
| 71 | throw new InvalidArgumentException( "No configuration for section '$section'." ); |
| 72 | } |
| 73 | $this->maxPartitionsTry = $params['maxPartitionsTry'] ?? 2; |
| 74 | // Get the full partition map |
| 75 | $partitionMap = $params['partitionsBySection'][$section]; |
| 76 | arsort( $partitionMap, SORT_NUMERIC ); |
| 77 | // Get the config to pass to merge into each partition queue config |
| 78 | $baseConfig = $params; |
| 79 | foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry', |
| 80 | 'partitionsBySection', 'configByPartition', ] as $o |
| 81 | ) { |
| 82 | unset( $baseConfig[$o] ); // partition queue doesn't care about this |
| 83 | } |
| 84 | // Get the partition queue objects |
| 85 | foreach ( $partitionMap as $partition => $w ) { |
| 86 | if ( !isset( $params['configByPartition'][$partition] ) ) { |
| 87 | throw new InvalidArgumentException( "No configuration for partition '$partition'." ); |
| 88 | } |
| 89 | $this->partitionQueues[$partition] = JobQueue::factory( |
| 90 | $baseConfig + $params['configByPartition'][$partition] ); |
| 91 | } |
| 92 | // Ring of all partitions |
| 93 | $this->partitionRing = new HashRing( $partitionMap ); |
| 94 | } |
| 95 | |
| 96 | /** @inheritDoc */ |
| 97 | protected function supportedOrders() { |
| 98 | // No FIFO due to partitioning, though "rough timestamp order" is supported |
| 99 | return [ 'undefined', 'random', 'timestamp' ]; |
| 100 | } |
| 101 | |
| 102 | /** @inheritDoc */ |
| 103 | protected function optimalOrder() { |
| 104 | return 'undefined'; // defer to the partitions |
| 105 | } |
| 106 | |
| 107 | /** @inheritDoc */ |
| 108 | protected function supportsDelayedJobs() { |
| 109 | foreach ( $this->partitionQueues as $queue ) { |
| 110 | if ( !$queue->supportsDelayedJobs() ) { |
| 111 | return false; |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | return true; |
| 116 | } |
| 117 | |
| 118 | /** @inheritDoc */ |
| 119 | protected function doIsEmpty() { |
| 120 | $empty = true; |
| 121 | $failed = 0; |
| 122 | foreach ( $this->partitionQueues as $queue ) { |
| 123 | try { |
| 124 | $empty = $empty && $queue->doIsEmpty(); |
| 125 | } catch ( JobQueueError $e ) { |
| 126 | ++$failed; |
| 127 | $this->logException( $e ); |
| 128 | } |
| 129 | } |
| 130 | $this->throwErrorIfAllPartitionsDown( $failed ); |
| 131 | |
| 132 | return $empty; |
| 133 | } |
| 134 | |
| 135 | /** @inheritDoc */ |
| 136 | protected function doGetSize() { |
| 137 | return $this->getCrossPartitionSum( 'size', 'doGetSize' ); |
| 138 | } |
| 139 | |
| 140 | /** @inheritDoc */ |
| 141 | protected function doGetAcquiredCount() { |
| 142 | return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); |
| 143 | } |
| 144 | |
| 145 | /** @inheritDoc */ |
| 146 | protected function doGetDelayedCount() { |
| 147 | return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); |
| 148 | } |
| 149 | |
| 150 | /** @inheritDoc */ |
| 151 | protected function doGetAbandonedCount() { |
| 152 | return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); |
| 153 | } |
| 154 | |
| 155 | /** |
| 156 | * @param string $type |
| 157 | * @param string $method |
| 158 | * @return int |
| 159 | */ |
| 160 | protected function getCrossPartitionSum( $type, $method ) { |
| 161 | $count = 0; |
| 162 | $failed = 0; |
| 163 | foreach ( $this->partitionQueues as $queue ) { |
| 164 | try { |
| 165 | $count += $queue->$method(); |
| 166 | } catch ( JobQueueError $e ) { |
| 167 | ++$failed; |
| 168 | $this->logException( $e ); |
| 169 | } |
| 170 | } |
| 171 | $this->throwErrorIfAllPartitionsDown( $failed ); |
| 172 | |
| 173 | return $count; |
| 174 | } |
| 175 | |
| 176 | /** @inheritDoc */ |
| 177 | protected function doBatchPush( array $jobs, $flags ) { |
| 178 | // Local ring variable that may be changed to point to a new ring on failure |
| 179 | $partitionRing = $this->partitionRing; |
| 180 | // Try to insert the jobs and update $partitionsTry on any failures. |
| 181 | // Retry to insert any remaining jobs again, ignoring the bad partitions. |
| 182 | $jobsLeft = $jobs; |
| 183 | for ( $i = $this->maxPartitionsTry; $i-- && $jobsLeft; ) { |
| 184 | try { |
| 185 | $partitionRing->getLiveLocationWeights(); |
| 186 | } catch ( UnexpectedValueException ) { |
| 187 | break; // all servers down; nothing to insert to |
| 188 | } |
| 189 | $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); |
| 190 | } |
| 191 | if ( $jobsLeft ) { |
| 192 | throw new JobQueueError( |
| 193 | "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | /** |
| 198 | * @param array $jobs |
| 199 | * @param HashRing &$partitionRing |
| 200 | * @param int $flags |
| 201 | * @throws JobQueueError |
| 202 | * @return IJobSpecification[] List of Job object that could not be inserted |
| 203 | */ |
| 204 | protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { |
| 205 | $jobsLeft = []; |
| 206 | |
| 207 | // Because jobs are spread across partitions, per-job de-duplication needs |
| 208 | // to use a consistent hash to avoid allowing duplicate jobs per partition. |
| 209 | // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. |
| 210 | $uJobsByPartition = []; // (partition name => job list) |
| 211 | /** @var Job $job */ |
| 212 | foreach ( $jobs as $key => $job ) { |
| 213 | if ( $job->ignoreDuplicates() ) { |
| 214 | $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); |
| 215 | $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job; |
| 216 | unset( $jobs[$key] ); |
| 217 | } |
| 218 | } |
| 219 | // Get the batches of jobs that are not de-duplicated |
| 220 | if ( $flags & self::QOS_ATOMIC ) { |
| 221 | $nuJobBatches = [ $jobs ]; // all or nothing |
| 222 | } else { |
| 223 | // Split the jobs into batches and spread them out over servers if there |
| 224 | // are many jobs. This helps keep the partitions even. Otherwise, send all |
| 225 | // the jobs to a single partition queue to avoids the extra connections. |
| 226 | $nuJobBatches = array_chunk( $jobs, 300 ); |
| 227 | } |
| 228 | |
| 229 | // Insert the de-duplicated jobs into the queues... |
| 230 | foreach ( $uJobsByPartition as $partition => $jobBatch ) { |
| 231 | /** @var JobQueue $queue */ |
| 232 | $queue = $this->partitionQueues[$partition]; |
| 233 | try { |
| 234 | $ok = true; |
| 235 | $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); |
| 236 | } catch ( JobQueueError $e ) { |
| 237 | $ok = false; |
| 238 | $this->logException( $e ); |
| 239 | } |
| 240 | if ( !$ok ) { |
| 241 | if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
| 242 | throw new JobQueueError( "Could not insert job(s), no partitions available." ); |
| 243 | } |
| 244 | $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted |
| 245 | } |
| 246 | } |
| 247 | |
| 248 | // Insert the jobs that are not de-duplicated into the queues... |
| 249 | foreach ( $nuJobBatches as $jobBatch ) { |
| 250 | $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() ); |
| 251 | $queue = $this->partitionQueues[$partition]; |
| 252 | try { |
| 253 | $ok = true; |
| 254 | $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); |
| 255 | } catch ( JobQueueError $e ) { |
| 256 | $ok = false; |
| 257 | $this->logException( $e ); |
| 258 | } |
| 259 | if ( !$ok ) { |
| 260 | if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
| 261 | throw new JobQueueError( "Could not insert job(s), no partitions available." ); |
| 262 | } |
| 263 | $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted |
| 264 | } |
| 265 | } |
| 266 | |
| 267 | return $jobsLeft; |
| 268 | } |
| 269 | |
| 270 | /** @inheritDoc */ |
| 271 | protected function doPop() { |
| 272 | $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight) |
| 273 | |
| 274 | $failed = 0; |
| 275 | while ( count( $partitionsTry ) ) { |
| 276 | $partition = ArrayUtils::pickRandom( $partitionsTry ); |
| 277 | if ( $partition === false ) { |
| 278 | break; // all partitions at 0 weight |
| 279 | } |
| 280 | |
| 281 | /** @var JobQueue $queue */ |
| 282 | $queue = $this->partitionQueues[$partition]; |
| 283 | try { |
| 284 | $job = $queue->pop(); |
| 285 | } catch ( JobQueueError $e ) { |
| 286 | ++$failed; |
| 287 | $this->logException( $e ); |
| 288 | $job = false; |
| 289 | } |
| 290 | if ( $job ) { |
| 291 | $job->setMetadata( 'QueuePartition', $partition ); |
| 292 | |
| 293 | return $job; |
| 294 | } else { |
| 295 | unset( $partitionsTry[$partition] ); |
| 296 | } |
| 297 | } |
| 298 | $this->throwErrorIfAllPartitionsDown( $failed ); |
| 299 | |
| 300 | return false; |
| 301 | } |
| 302 | |
| 303 | /** @inheritDoc */ |
| 304 | protected function doAck( RunnableJob $job ) { |
| 305 | $partition = $job->getMetadata( 'QueuePartition' ); |
| 306 | if ( $partition === null ) { |
| 307 | throw new UnexpectedValueException( "The given job has no defined partition name." ); |
| 308 | } |
| 309 | |
| 310 | $this->partitionQueues[$partition]->ack( $job ); |
| 311 | } |
| 312 | |
| 313 | /** @inheritDoc */ |
| 314 | protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { |
| 315 | $signature = $job->getRootJobParams()['rootJobSignature']; |
| 316 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
| 317 | try { |
| 318 | return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); |
| 319 | } catch ( JobQueueError ) { |
| 320 | if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
| 321 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
| 322 | return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | return false; |
| 327 | } |
| 328 | |
| 329 | /** @inheritDoc */ |
| 330 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
| 331 | $signature = $job->getRootJobParams()['rootJobSignature']; |
| 332 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
| 333 | try { |
| 334 | return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); |
| 335 | } catch ( JobQueueError ) { |
| 336 | if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
| 337 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
| 338 | return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); |
| 339 | } |
| 340 | } |
| 341 | |
| 342 | return false; |
| 343 | } |
| 344 | |
| 345 | /** @inheritDoc */ |
| 346 | protected function doDelete() { |
| 347 | $failed = 0; |
| 348 | /** @var JobQueue $queue */ |
| 349 | foreach ( $this->partitionQueues as $queue ) { |
| 350 | try { |
| 351 | $queue->doDelete(); |
| 352 | } catch ( JobQueueError $e ) { |
| 353 | ++$failed; |
| 354 | $this->logException( $e ); |
| 355 | } |
| 356 | } |
| 357 | $this->throwErrorIfAllPartitionsDown( $failed ); |
| 358 | return true; |
| 359 | } |
| 360 | |
| 361 | /** @inheritDoc */ |
| 362 | protected function doWaitForBackups() { |
| 363 | $failed = 0; |
| 364 | /** @var JobQueue $queue */ |
| 365 | foreach ( $this->partitionQueues as $queue ) { |
| 366 | try { |
| 367 | $queue->waitForBackups(); |
| 368 | } catch ( JobQueueError $e ) { |
| 369 | ++$failed; |
| 370 | $this->logException( $e ); |
| 371 | } |
| 372 | } |
| 373 | $this->throwErrorIfAllPartitionsDown( $failed ); |
| 374 | } |
| 375 | |
| 376 | /** @inheritDoc */ |
| 377 | protected function doFlushCaches() { |
| 378 | /** @var JobQueue $queue */ |
| 379 | foreach ( $this->partitionQueues as $queue ) { |
| 380 | $queue->doFlushCaches(); |
| 381 | } |
| 382 | } |
| 383 | |
| 384 | /** @inheritDoc */ |
| 385 | public function getAllQueuedJobs() { |
| 386 | $iterator = new AppendIterator(); |
| 387 | |
| 388 | /** @var JobQueue $queue */ |
| 389 | foreach ( $this->partitionQueues as $queue ) { |
| 390 | $iterator->append( $queue->getAllQueuedJobs() ); |
| 391 | } |
| 392 | |
| 393 | return $iterator; |
| 394 | } |
| 395 | |
| 396 | /** @inheritDoc */ |
| 397 | public function getAllDelayedJobs() { |
| 398 | $iterator = new AppendIterator(); |
| 399 | |
| 400 | /** @var JobQueue $queue */ |
| 401 | foreach ( $this->partitionQueues as $queue ) { |
| 402 | $iterator->append( $queue->getAllDelayedJobs() ); |
| 403 | } |
| 404 | |
| 405 | return $iterator; |
| 406 | } |
| 407 | |
| 408 | /** @inheritDoc */ |
| 409 | public function getAllAcquiredJobs() { |
| 410 | $iterator = new AppendIterator(); |
| 411 | |
| 412 | /** @var JobQueue $queue */ |
| 413 | foreach ( $this->partitionQueues as $queue ) { |
| 414 | $iterator->append( $queue->getAllAcquiredJobs() ); |
| 415 | } |
| 416 | |
| 417 | return $iterator; |
| 418 | } |
| 419 | |
| 420 | /** @inheritDoc */ |
| 421 | public function getAllAbandonedJobs() { |
| 422 | $iterator = new AppendIterator(); |
| 423 | |
| 424 | /** @var JobQueue $queue */ |
| 425 | foreach ( $this->partitionQueues as $queue ) { |
| 426 | $iterator->append( $queue->getAllAbandonedJobs() ); |
| 427 | } |
| 428 | |
| 429 | return $iterator; |
| 430 | } |
| 431 | |
| 432 | /** @inheritDoc */ |
| 433 | public function getCoalesceLocationInternal() { |
| 434 | return "JobQueueFederated:wiki:{$this->domain}" . |
| 435 | sha1( serialize( array_keys( $this->partitionQueues ) ) ); |
| 436 | } |
| 437 | |
| 438 | /** @inheritDoc */ |
| 439 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
| 440 | $result = []; |
| 441 | |
| 442 | $failed = 0; |
| 443 | /** @var JobQueue $queue */ |
| 444 | foreach ( $this->partitionQueues as $queue ) { |
| 445 | try { |
| 446 | $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); |
| 447 | if ( is_array( $nonEmpty ) ) { |
| 448 | $result = array_unique( array_merge( $result, $nonEmpty ) ); |
| 449 | } else { |
| 450 | return null; // not supported on all partitions; bail |
| 451 | } |
| 452 | if ( count( $result ) == count( $types ) ) { |
| 453 | break; // short-circuit |
| 454 | } |
| 455 | } catch ( JobQueueError $e ) { |
| 456 | ++$failed; |
| 457 | $this->logException( $e ); |
| 458 | } |
| 459 | } |
| 460 | $this->throwErrorIfAllPartitionsDown( $failed ); |
| 461 | |
| 462 | return array_values( $result ); |
| 463 | } |
| 464 | |
| 465 | /** @inheritDoc */ |
| 466 | protected function doGetSiblingQueueSizes( array $types ) { |
| 467 | $result = []; |
| 468 | $failed = 0; |
| 469 | /** @var JobQueue $queue */ |
| 470 | foreach ( $this->partitionQueues as $queue ) { |
| 471 | try { |
| 472 | $sizes = $queue->doGetSiblingQueueSizes( $types ); |
| 473 | if ( is_array( $sizes ) ) { |
| 474 | foreach ( $sizes as $type => $size ) { |
| 475 | $result[$type] = ( $result[$type] ?? 0 ) + $size; |
| 476 | } |
| 477 | } else { |
| 478 | return null; // not supported on all partitions; bail |
| 479 | } |
| 480 | } catch ( JobQueueError $e ) { |
| 481 | ++$failed; |
| 482 | $this->logException( $e ); |
| 483 | } |
| 484 | } |
| 485 | $this->throwErrorIfAllPartitionsDown( $failed ); |
| 486 | |
| 487 | return $result; |
| 488 | } |
| 489 | |
| 490 | protected function logException( Exception $e ) { |
| 491 | wfDebugLog( 'JobQueue', $e->getMessage() . "\n" . $e->getTraceAsString() ); |
| 492 | } |
| 493 | |
| 494 | /** |
| 495 | * Throw an error if no partitions available |
| 496 | * |
| 497 | * @param int $down The number of up partitions down |
| 498 | * @return void |
| 499 | * @throws JobQueueError |
| 500 | */ |
| 501 | protected function throwErrorIfAllPartitionsDown( $down ) { |
| 502 | if ( $down >= count( $this->partitionQueues ) ) { |
| 503 | throw new JobQueueError( 'No queue partitions available.' ); |
| 504 | } |
| 505 | } |
| 506 | } |
| 507 | |
| 508 | /** @deprecated class alias since 1.44 */ |
| 509 | class_alias( JobQueueFederated::class, 'JobQueueFederated' ); |