Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 193 |
|
0.00% |
0 / 28 |
CRAP | |
0.00% |
0 / 1 |
JobQueueFederated | |
0.00% |
0 / 192 |
|
0.00% |
0 / 28 |
6642 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
30 | |||
supportedOrders | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
optimalOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
supportsDelayedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
doIsEmpty | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
doGetSize | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetAcquiredCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetDelayedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetAbandonedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getCrossPartitionSum | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
doBatchPush | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
30 | |||
tryJobInsertions | |
0.00% |
0 / 34 |
|
0.00% |
0 / 1 |
156 | |||
doPop | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
30 | |||
doAck | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doIsRootJobOldDuplicate | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
doDeduplicateRootJob | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
doDelete | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
doWaitForBackups | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
doFlushCaches | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
getAllQueuedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getAllDelayedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getAllAcquiredJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getAllAbandonedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
30 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
30 | |||
logException | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
throwErrorIfAllPartitionsDown | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | namespace MediaWiki\JobQueue; |
22 | |
23 | use AppendIterator; |
24 | use Exception; |
25 | use InvalidArgumentException; |
26 | use MediaWiki\JobQueue\Exceptions\JobQueueError; |
27 | use UnexpectedValueException; |
28 | use Wikimedia\ArrayUtils\ArrayUtils; |
29 | use Wikimedia\HashRing\HashRing; |
30 | |
31 | /** |
32 | * Enqueue and run background jobs via a federated queue, for wiki farms. |
33 | * |
34 | * This class allows for queues to be partitioned into smaller queues. |
35 | * A partition is defined by the configuration for a JobQueue instance. |
36 | * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a |
37 | * JobQueueFederated instance, which itself would consist of three JobQueueRedis |
38 | * instances, each using their own redis server. This would allow for the jobs |
39 | * to be split (evenly or based on weights) across multiple servers if a single |
40 | * server becomes impractical or expensive. Different JobQueue classes can be mixed. |
41 | * |
42 | * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue |
43 | * is inherited by the partition queues. Additional configuration defines what |
44 | * section each wiki is in, what partition queues each section uses (and their weight), |
45 | * and the JobQueue configuration for each partition. Some sections might only need a |
46 | * single queue partition, like the sections for groups of small wikis. |
47 | * |
48 | * If used for performance, then $wgMainCacheType should be set to memcached/redis. |
49 | * Note that "fifo" cannot be used for the ordering, since the data is distributed. |
50 | * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, |
51 | * queue classes used by this should ignore down servers (with TTL) to avoid slowness. |
52 | * |
53 | * @since 1.22 |
54 | * @ingroup JobQueue |
55 | */ |
56 | class JobQueueFederated extends JobQueue { |
57 | /** @var HashRing */ |
58 | protected $partitionRing; |
59 | /** @var JobQueue[] (partition name => JobQueue) reverse sorted by weight */ |
60 | protected $partitionQueues = []; |
61 | |
62 | /** @var int Maximum number of partitions to try */ |
63 | protected $maxPartitionsTry; |
64 | |
65 | /** |
66 | * @param array $params Possible keys: |
67 | * - sectionsByWiki : A map of wiki IDs to section names. |
68 | * Wikis will default to using the section "default". |
69 | * - partitionsBySection : Map of section names to maps of (partition name => weight). |
70 | * A section called 'default' must be defined if not all wikis |
71 | * have explicitly defined sections. |
72 | * - configByPartition : Map of queue partition names to configuration arrays. |
73 | * These configuration arrays are passed to JobQueue::factory(). |
74 | * The options set here are overridden by those passed to this |
75 | * the federated queue itself (e.g. 'order' and 'claimTTL'). |
76 | * - maxPartitionsTry : Maximum number of times to attempt job insertion using |
77 | * different partition queues. This improves availability |
78 | * during failure, at the cost of added latency and somewhat |
79 | * less reliable job de-duplication mechanisms. |
80 | */ |
81 | protected function __construct( array $params ) { |
82 | parent::__construct( $params ); |
83 | $section = $params['sectionsByWiki'][$this->domain] ?? 'default'; |
84 | if ( !isset( $params['partitionsBySection'][$section] ) ) { |
85 | throw new InvalidArgumentException( "No configuration for section '$section'." ); |
86 | } |
87 | $this->maxPartitionsTry = $params['maxPartitionsTry'] ?? 2; |
88 | // Get the full partition map |
89 | $partitionMap = $params['partitionsBySection'][$section]; |
90 | arsort( $partitionMap, SORT_NUMERIC ); |
91 | // Get the config to pass to merge into each partition queue config |
92 | $baseConfig = $params; |
93 | foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry', |
94 | 'partitionsBySection', 'configByPartition', ] as $o |
95 | ) { |
96 | unset( $baseConfig[$o] ); // partition queue doesn't care about this |
97 | } |
98 | // Get the partition queue objects |
99 | foreach ( $partitionMap as $partition => $w ) { |
100 | if ( !isset( $params['configByPartition'][$partition] ) ) { |
101 | throw new InvalidArgumentException( "No configuration for partition '$partition'." ); |
102 | } |
103 | $this->partitionQueues[$partition] = JobQueue::factory( |
104 | $baseConfig + $params['configByPartition'][$partition] ); |
105 | } |
106 | // Ring of all partitions |
107 | $this->partitionRing = new HashRing( $partitionMap ); |
108 | } |
109 | |
110 | protected function supportedOrders() { |
111 | // No FIFO due to partitioning, though "rough timestamp order" is supported |
112 | return [ 'undefined', 'random', 'timestamp' ]; |
113 | } |
114 | |
115 | protected function optimalOrder() { |
116 | return 'undefined'; // defer to the partitions |
117 | } |
118 | |
119 | protected function supportsDelayedJobs() { |
120 | foreach ( $this->partitionQueues as $queue ) { |
121 | if ( !$queue->supportsDelayedJobs() ) { |
122 | return false; |
123 | } |
124 | } |
125 | |
126 | return true; |
127 | } |
128 | |
129 | protected function doIsEmpty() { |
130 | $empty = true; |
131 | $failed = 0; |
132 | foreach ( $this->partitionQueues as $queue ) { |
133 | try { |
134 | $empty = $empty && $queue->doIsEmpty(); |
135 | } catch ( JobQueueError $e ) { |
136 | ++$failed; |
137 | $this->logException( $e ); |
138 | } |
139 | } |
140 | $this->throwErrorIfAllPartitionsDown( $failed ); |
141 | |
142 | return $empty; |
143 | } |
144 | |
145 | protected function doGetSize() { |
146 | return $this->getCrossPartitionSum( 'size', 'doGetSize' ); |
147 | } |
148 | |
149 | protected function doGetAcquiredCount() { |
150 | return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); |
151 | } |
152 | |
153 | protected function doGetDelayedCount() { |
154 | return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); |
155 | } |
156 | |
157 | protected function doGetAbandonedCount() { |
158 | return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); |
159 | } |
160 | |
161 | /** |
162 | * @param string $type |
163 | * @param string $method |
164 | * @return int |
165 | */ |
166 | protected function getCrossPartitionSum( $type, $method ) { |
167 | $count = 0; |
168 | $failed = 0; |
169 | foreach ( $this->partitionQueues as $queue ) { |
170 | try { |
171 | $count += $queue->$method(); |
172 | } catch ( JobQueueError $e ) { |
173 | ++$failed; |
174 | $this->logException( $e ); |
175 | } |
176 | } |
177 | $this->throwErrorIfAllPartitionsDown( $failed ); |
178 | |
179 | return $count; |
180 | } |
181 | |
182 | protected function doBatchPush( array $jobs, $flags ) { |
183 | // Local ring variable that may be changed to point to a new ring on failure |
184 | $partitionRing = $this->partitionRing; |
185 | // Try to insert the jobs and update $partitionsTry on any failures. |
186 | // Retry to insert any remaining jobs again, ignoring the bad partitions. |
187 | $jobsLeft = $jobs; |
188 | for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { |
189 | try { |
190 | $partitionRing->getLiveLocationWeights(); |
191 | } catch ( UnexpectedValueException ) { |
192 | break; // all servers down; nothing to insert to |
193 | } |
194 | $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); |
195 | } |
196 | if ( count( $jobsLeft ) ) { |
197 | throw new JobQueueError( |
198 | "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); |
199 | } |
200 | } |
201 | |
202 | /** |
203 | * @param array $jobs |
204 | * @param HashRing &$partitionRing |
205 | * @param int $flags |
206 | * @throws JobQueueError |
207 | * @return IJobSpecification[] List of Job object that could not be inserted |
208 | */ |
209 | protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { |
210 | $jobsLeft = []; |
211 | |
212 | // Because jobs are spread across partitions, per-job de-duplication needs |
213 | // to use a consistent hash to avoid allowing duplicate jobs per partition. |
214 | // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. |
215 | $uJobsByPartition = []; // (partition name => job list) |
216 | /** @var Job $job */ |
217 | foreach ( $jobs as $key => $job ) { |
218 | if ( $job->ignoreDuplicates() ) { |
219 | $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); |
220 | $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job; |
221 | unset( $jobs[$key] ); |
222 | } |
223 | } |
224 | // Get the batches of jobs that are not de-duplicated |
225 | if ( $flags & self::QOS_ATOMIC ) { |
226 | $nuJobBatches = [ $jobs ]; // all or nothing |
227 | } else { |
228 | // Split the jobs into batches and spread them out over servers if there |
229 | // are many jobs. This helps keep the partitions even. Otherwise, send all |
230 | // the jobs to a single partition queue to avoids the extra connections. |
231 | $nuJobBatches = array_chunk( $jobs, 300 ); |
232 | } |
233 | |
234 | // Insert the de-duplicated jobs into the queues... |
235 | foreach ( $uJobsByPartition as $partition => $jobBatch ) { |
236 | /** @var JobQueue $queue */ |
237 | $queue = $this->partitionQueues[$partition]; |
238 | try { |
239 | $ok = true; |
240 | $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); |
241 | } catch ( JobQueueError $e ) { |
242 | $ok = false; |
243 | $this->logException( $e ); |
244 | } |
245 | if ( !$ok ) { |
246 | if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
247 | throw new JobQueueError( "Could not insert job(s), no partitions available." ); |
248 | } |
249 | $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted |
250 | } |
251 | } |
252 | |
253 | // Insert the jobs that are not de-duplicated into the queues... |
254 | foreach ( $nuJobBatches as $jobBatch ) { |
255 | $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() ); |
256 | $queue = $this->partitionQueues[$partition]; |
257 | try { |
258 | $ok = true; |
259 | $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); |
260 | } catch ( JobQueueError $e ) { |
261 | $ok = false; |
262 | $this->logException( $e ); |
263 | } |
264 | if ( !$ok ) { |
265 | if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
266 | throw new JobQueueError( "Could not insert job(s), no partitions available." ); |
267 | } |
268 | $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted |
269 | } |
270 | } |
271 | |
272 | return $jobsLeft; |
273 | } |
274 | |
275 | protected function doPop() { |
276 | $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight) |
277 | |
278 | $failed = 0; |
279 | while ( count( $partitionsTry ) ) { |
280 | $partition = ArrayUtils::pickRandom( $partitionsTry ); |
281 | if ( $partition === false ) { |
282 | break; // all partitions at 0 weight |
283 | } |
284 | |
285 | /** @var JobQueue $queue */ |
286 | $queue = $this->partitionQueues[$partition]; |
287 | try { |
288 | $job = $queue->pop(); |
289 | } catch ( JobQueueError $e ) { |
290 | ++$failed; |
291 | $this->logException( $e ); |
292 | $job = false; |
293 | } |
294 | if ( $job ) { |
295 | $job->setMetadata( 'QueuePartition', $partition ); |
296 | |
297 | return $job; |
298 | } else { |
299 | unset( $partitionsTry[$partition] ); |
300 | } |
301 | } |
302 | $this->throwErrorIfAllPartitionsDown( $failed ); |
303 | |
304 | return false; |
305 | } |
306 | |
307 | protected function doAck( RunnableJob $job ) { |
308 | $partition = $job->getMetadata( 'QueuePartition' ); |
309 | if ( $partition === null ) { |
310 | throw new UnexpectedValueException( "The given job has no defined partition name." ); |
311 | } |
312 | |
313 | $this->partitionQueues[$partition]->ack( $job ); |
314 | } |
315 | |
316 | protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { |
317 | $signature = $job->getRootJobParams()['rootJobSignature']; |
318 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
319 | try { |
320 | return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); |
321 | } catch ( JobQueueError ) { |
322 | if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
323 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
324 | return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); |
325 | } |
326 | } |
327 | |
328 | return false; |
329 | } |
330 | |
331 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
332 | $signature = $job->getRootJobParams()['rootJobSignature']; |
333 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
334 | try { |
335 | return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); |
336 | } catch ( JobQueueError ) { |
337 | if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
338 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
339 | return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); |
340 | } |
341 | } |
342 | |
343 | return false; |
344 | } |
345 | |
346 | protected function doDelete() { |
347 | $failed = 0; |
348 | /** @var JobQueue $queue */ |
349 | foreach ( $this->partitionQueues as $queue ) { |
350 | try { |
351 | $queue->doDelete(); |
352 | } catch ( JobQueueError $e ) { |
353 | ++$failed; |
354 | $this->logException( $e ); |
355 | } |
356 | } |
357 | $this->throwErrorIfAllPartitionsDown( $failed ); |
358 | return true; |
359 | } |
360 | |
361 | protected function doWaitForBackups() { |
362 | $failed = 0; |
363 | /** @var JobQueue $queue */ |
364 | foreach ( $this->partitionQueues as $queue ) { |
365 | try { |
366 | $queue->waitForBackups(); |
367 | } catch ( JobQueueError $e ) { |
368 | ++$failed; |
369 | $this->logException( $e ); |
370 | } |
371 | } |
372 | $this->throwErrorIfAllPartitionsDown( $failed ); |
373 | } |
374 | |
375 | protected function doFlushCaches() { |
376 | /** @var JobQueue $queue */ |
377 | foreach ( $this->partitionQueues as $queue ) { |
378 | $queue->doFlushCaches(); |
379 | } |
380 | } |
381 | |
382 | public function getAllQueuedJobs() { |
383 | $iterator = new AppendIterator(); |
384 | |
385 | /** @var JobQueue $queue */ |
386 | foreach ( $this->partitionQueues as $queue ) { |
387 | $iterator->append( $queue->getAllQueuedJobs() ); |
388 | } |
389 | |
390 | return $iterator; |
391 | } |
392 | |
393 | public function getAllDelayedJobs() { |
394 | $iterator = new AppendIterator(); |
395 | |
396 | /** @var JobQueue $queue */ |
397 | foreach ( $this->partitionQueues as $queue ) { |
398 | $iterator->append( $queue->getAllDelayedJobs() ); |
399 | } |
400 | |
401 | return $iterator; |
402 | } |
403 | |
404 | public function getAllAcquiredJobs() { |
405 | $iterator = new AppendIterator(); |
406 | |
407 | /** @var JobQueue $queue */ |
408 | foreach ( $this->partitionQueues as $queue ) { |
409 | $iterator->append( $queue->getAllAcquiredJobs() ); |
410 | } |
411 | |
412 | return $iterator; |
413 | } |
414 | |
415 | public function getAllAbandonedJobs() { |
416 | $iterator = new AppendIterator(); |
417 | |
418 | /** @var JobQueue $queue */ |
419 | foreach ( $this->partitionQueues as $queue ) { |
420 | $iterator->append( $queue->getAllAbandonedJobs() ); |
421 | } |
422 | |
423 | return $iterator; |
424 | } |
425 | |
426 | public function getCoalesceLocationInternal() { |
427 | return "JobQueueFederated:wiki:{$this->domain}" . |
428 | sha1( serialize( array_keys( $this->partitionQueues ) ) ); |
429 | } |
430 | |
431 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
432 | $result = []; |
433 | |
434 | $failed = 0; |
435 | /** @var JobQueue $queue */ |
436 | foreach ( $this->partitionQueues as $queue ) { |
437 | try { |
438 | $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); |
439 | if ( is_array( $nonEmpty ) ) { |
440 | $result = array_unique( array_merge( $result, $nonEmpty ) ); |
441 | } else { |
442 | return null; // not supported on all partitions; bail |
443 | } |
444 | if ( count( $result ) == count( $types ) ) { |
445 | break; // short-circuit |
446 | } |
447 | } catch ( JobQueueError $e ) { |
448 | ++$failed; |
449 | $this->logException( $e ); |
450 | } |
451 | } |
452 | $this->throwErrorIfAllPartitionsDown( $failed ); |
453 | |
454 | return array_values( $result ); |
455 | } |
456 | |
457 | protected function doGetSiblingQueueSizes( array $types ) { |
458 | $result = []; |
459 | $failed = 0; |
460 | /** @var JobQueue $queue */ |
461 | foreach ( $this->partitionQueues as $queue ) { |
462 | try { |
463 | $sizes = $queue->doGetSiblingQueueSizes( $types ); |
464 | if ( is_array( $sizes ) ) { |
465 | foreach ( $sizes as $type => $size ) { |
466 | $result[$type] = ( $result[$type] ?? 0 ) + $size; |
467 | } |
468 | } else { |
469 | return null; // not supported on all partitions; bail |
470 | } |
471 | } catch ( JobQueueError $e ) { |
472 | ++$failed; |
473 | $this->logException( $e ); |
474 | } |
475 | } |
476 | $this->throwErrorIfAllPartitionsDown( $failed ); |
477 | |
478 | return $result; |
479 | } |
480 | |
481 | protected function logException( Exception $e ) { |
482 | wfDebugLog( 'JobQueue', $e->getMessage() . "\n" . $e->getTraceAsString() ); |
483 | } |
484 | |
485 | /** |
486 | * Throw an error if no partitions available |
487 | * |
488 | * @param int $down The number of up partitions down |
489 | * @return void |
490 | * @throws JobQueueError |
491 | */ |
492 | protected function throwErrorIfAllPartitionsDown( $down ) { |
493 | if ( $down >= count( $this->partitionQueues ) ) { |
494 | throw new JobQueueError( 'No queue partitions available.' ); |
495 | } |
496 | } |
497 | } |
498 | |
499 | /** @deprecated class alias since 1.44 */ |
500 | class_alias( JobQueueFederated::class, 'JobQueueFederated' ); |