Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 192 |
|
0.00% |
0 / 28 |
CRAP | |
0.00% |
0 / 1 |
JobQueueFederated | |
0.00% |
0 / 192 |
|
0.00% |
0 / 28 |
6642 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
30 | |||
supportedOrders | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
optimalOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
supportsDelayedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
doIsEmpty | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
doGetSize | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetAcquiredCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetDelayedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetAbandonedCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getCrossPartitionSum | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
doBatchPush | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
30 | |||
tryJobInsertions | |
0.00% |
0 / 34 |
|
0.00% |
0 / 1 |
156 | |||
doPop | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
30 | |||
doAck | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doIsRootJobOldDuplicate | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
doDeduplicateRootJob | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
doDelete | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
doWaitForBackups | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
doFlushCaches | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
getAllQueuedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getAllDelayedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getAllAcquiredJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getAllAbandonedJobs | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
30 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
30 | |||
logException | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
throwErrorIfAllPartitionsDown | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | /** |
22 | * Enqueue and run background jobs via a federated queue, for wiki farms. |
23 | * |
24 | * This class allows for queues to be partitioned into smaller queues. |
25 | * A partition is defined by the configuration for a JobQueue instance. |
26 | * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a |
27 | * JobQueueFederated instance, which itself would consist of three JobQueueRedis |
28 | * instances, each using their own redis server. This would allow for the jobs |
29 | * to be split (evenly or based on weights) across multiple servers if a single |
30 | * server becomes impractical or expensive. Different JobQueue classes can be mixed. |
31 | * |
32 | * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue |
33 | * is inherited by the partition queues. Additional configuration defines what |
34 | * section each wiki is in, what partition queues each section uses (and their weight), |
35 | * and the JobQueue configuration for each partition. Some sections might only need a |
36 | * single queue partition, like the sections for groups of small wikis. |
37 | * |
38 | * If used for performance, then $wgMainCacheType should be set to memcached/redis. |
39 | * Note that "fifo" cannot be used for the ordering, since the data is distributed. |
40 | * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, |
41 | * queue classes used by this should ignore down servers (with TTL) to avoid slowness. |
42 | * |
43 | * @since 1.22 |
44 | * @ingroup JobQueue |
45 | */ |
46 | class JobQueueFederated extends JobQueue { |
47 | /** @var HashRing */ |
48 | protected $partitionRing; |
49 | /** @var JobQueue[] (partition name => JobQueue) reverse sorted by weight */ |
50 | protected $partitionQueues = []; |
51 | |
52 | /** @var int Maximum number of partitions to try */ |
53 | protected $maxPartitionsTry; |
54 | |
55 | /** |
56 | * @param array $params Possible keys: |
57 | * - sectionsByWiki : A map of wiki IDs to section names. |
58 | * Wikis will default to using the section "default". |
59 | * - partitionsBySection : Map of section names to maps of (partition name => weight). |
60 | * A section called 'default' must be defined if not all wikis |
61 | * have explicitly defined sections. |
62 | * - configByPartition : Map of queue partition names to configuration arrays. |
63 | * These configuration arrays are passed to JobQueue::factory(). |
64 | * The options set here are overridden by those passed to this |
65 | * the federated queue itself (e.g. 'order' and 'claimTTL'). |
66 | * - maxPartitionsTry : Maximum number of times to attempt job insertion using |
67 | * different partition queues. This improves availability |
68 | * during failure, at the cost of added latency and somewhat |
69 | * less reliable job de-duplication mechanisms. |
70 | */ |
71 | protected function __construct( array $params ) { |
72 | parent::__construct( $params ); |
73 | $section = $params['sectionsByWiki'][$this->domain] ?? 'default'; |
74 | if ( !isset( $params['partitionsBySection'][$section] ) ) { |
75 | throw new InvalidArgumentException( "No configuration for section '$section'." ); |
76 | } |
77 | $this->maxPartitionsTry = $params['maxPartitionsTry'] ?? 2; |
78 | // Get the full partition map |
79 | $partitionMap = $params['partitionsBySection'][$section]; |
80 | arsort( $partitionMap, SORT_NUMERIC ); |
81 | // Get the config to pass to merge into each partition queue config |
82 | $baseConfig = $params; |
83 | foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry', |
84 | 'partitionsBySection', 'configByPartition', ] as $o |
85 | ) { |
86 | unset( $baseConfig[$o] ); // partition queue doesn't care about this |
87 | } |
88 | // Get the partition queue objects |
89 | foreach ( $partitionMap as $partition => $w ) { |
90 | if ( !isset( $params['configByPartition'][$partition] ) ) { |
91 | throw new InvalidArgumentException( "No configuration for partition '$partition'." ); |
92 | } |
93 | $this->partitionQueues[$partition] = JobQueue::factory( |
94 | $baseConfig + $params['configByPartition'][$partition] ); |
95 | } |
96 | // Ring of all partitions |
97 | $this->partitionRing = new HashRing( $partitionMap ); |
98 | } |
99 | |
100 | protected function supportedOrders() { |
101 | // No FIFO due to partitioning, though "rough timestamp order" is supported |
102 | return [ 'undefined', 'random', 'timestamp' ]; |
103 | } |
104 | |
105 | protected function optimalOrder() { |
106 | return 'undefined'; // defer to the partitions |
107 | } |
108 | |
109 | protected function supportsDelayedJobs() { |
110 | foreach ( $this->partitionQueues as $queue ) { |
111 | if ( !$queue->supportsDelayedJobs() ) { |
112 | return false; |
113 | } |
114 | } |
115 | |
116 | return true; |
117 | } |
118 | |
119 | protected function doIsEmpty() { |
120 | $empty = true; |
121 | $failed = 0; |
122 | foreach ( $this->partitionQueues as $queue ) { |
123 | try { |
124 | $empty = $empty && $queue->doIsEmpty(); |
125 | } catch ( JobQueueError $e ) { |
126 | ++$failed; |
127 | $this->logException( $e ); |
128 | } |
129 | } |
130 | $this->throwErrorIfAllPartitionsDown( $failed ); |
131 | |
132 | return $empty; |
133 | } |
134 | |
135 | protected function doGetSize() { |
136 | return $this->getCrossPartitionSum( 'size', 'doGetSize' ); |
137 | } |
138 | |
139 | protected function doGetAcquiredCount() { |
140 | return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); |
141 | } |
142 | |
143 | protected function doGetDelayedCount() { |
144 | return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); |
145 | } |
146 | |
147 | protected function doGetAbandonedCount() { |
148 | return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); |
149 | } |
150 | |
151 | /** |
152 | * @param string $type |
153 | * @param string $method |
154 | * @return int |
155 | */ |
156 | protected function getCrossPartitionSum( $type, $method ) { |
157 | $count = 0; |
158 | $failed = 0; |
159 | foreach ( $this->partitionQueues as $queue ) { |
160 | try { |
161 | $count += $queue->$method(); |
162 | } catch ( JobQueueError $e ) { |
163 | ++$failed; |
164 | $this->logException( $e ); |
165 | } |
166 | } |
167 | $this->throwErrorIfAllPartitionsDown( $failed ); |
168 | |
169 | return $count; |
170 | } |
171 | |
172 | protected function doBatchPush( array $jobs, $flags ) { |
173 | // Local ring variable that may be changed to point to a new ring on failure |
174 | $partitionRing = $this->partitionRing; |
175 | // Try to insert the jobs and update $partitionsTry on any failures. |
176 | // Retry to insert any remaining jobs again, ignoring the bad partitions. |
177 | $jobsLeft = $jobs; |
178 | for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { |
179 | try { |
180 | $partitionRing->getLiveLocationWeights(); |
181 | } catch ( UnexpectedValueException $e ) { |
182 | break; // all servers down; nothing to insert to |
183 | } |
184 | $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); |
185 | } |
186 | if ( count( $jobsLeft ) ) { |
187 | throw new JobQueueError( |
188 | "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); |
189 | } |
190 | } |
191 | |
192 | /** |
193 | * @param array $jobs |
194 | * @param HashRing &$partitionRing |
195 | * @param int $flags |
196 | * @throws JobQueueError |
197 | * @return IJobSpecification[] List of Job object that could not be inserted |
198 | */ |
199 | protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { |
200 | $jobsLeft = []; |
201 | |
202 | // Because jobs are spread across partitions, per-job de-duplication needs |
203 | // to use a consistent hash to avoid allowing duplicate jobs per partition. |
204 | // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. |
205 | $uJobsByPartition = []; // (partition name => job list) |
206 | /** @var Job $job */ |
207 | foreach ( $jobs as $key => $job ) { |
208 | if ( $job->ignoreDuplicates() ) { |
209 | $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); |
210 | $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job; |
211 | unset( $jobs[$key] ); |
212 | } |
213 | } |
214 | // Get the batches of jobs that are not de-duplicated |
215 | if ( $flags & self::QOS_ATOMIC ) { |
216 | $nuJobBatches = [ $jobs ]; // all or nothing |
217 | } else { |
218 | // Split the jobs into batches and spread them out over servers if there |
219 | // are many jobs. This helps keep the partitions even. Otherwise, send all |
220 | // the jobs to a single partition queue to avoids the extra connections. |
221 | $nuJobBatches = array_chunk( $jobs, 300 ); |
222 | } |
223 | |
224 | // Insert the de-duplicated jobs into the queues... |
225 | foreach ( $uJobsByPartition as $partition => $jobBatch ) { |
226 | /** @var JobQueue $queue */ |
227 | $queue = $this->partitionQueues[$partition]; |
228 | try { |
229 | $ok = true; |
230 | $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); |
231 | } catch ( JobQueueError $e ) { |
232 | $ok = false; |
233 | $this->logException( $e ); |
234 | } |
235 | if ( !$ok ) { |
236 | if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
237 | throw new JobQueueError( "Could not insert job(s), no partitions available." ); |
238 | } |
239 | $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted |
240 | } |
241 | } |
242 | |
243 | // Insert the jobs that are not de-duplicated into the queues... |
244 | foreach ( $nuJobBatches as $jobBatch ) { |
245 | $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() ); |
246 | $queue = $this->partitionQueues[$partition]; |
247 | try { |
248 | $ok = true; |
249 | $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); |
250 | } catch ( JobQueueError $e ) { |
251 | $ok = false; |
252 | $this->logException( $e ); |
253 | } |
254 | if ( !$ok ) { |
255 | if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
256 | throw new JobQueueError( "Could not insert job(s), no partitions available." ); |
257 | } |
258 | $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted |
259 | } |
260 | } |
261 | |
262 | return $jobsLeft; |
263 | } |
264 | |
265 | protected function doPop() { |
266 | $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight) |
267 | |
268 | $failed = 0; |
269 | while ( count( $partitionsTry ) ) { |
270 | $partition = ArrayUtils::pickRandom( $partitionsTry ); |
271 | if ( $partition === false ) { |
272 | break; // all partitions at 0 weight |
273 | } |
274 | |
275 | /** @var JobQueue $queue */ |
276 | $queue = $this->partitionQueues[$partition]; |
277 | try { |
278 | $job = $queue->pop(); |
279 | } catch ( JobQueueError $e ) { |
280 | ++$failed; |
281 | $this->logException( $e ); |
282 | $job = false; |
283 | } |
284 | if ( $job ) { |
285 | $job->setMetadata( 'QueuePartition', $partition ); |
286 | |
287 | return $job; |
288 | } else { |
289 | unset( $partitionsTry[$partition] ); |
290 | } |
291 | } |
292 | $this->throwErrorIfAllPartitionsDown( $failed ); |
293 | |
294 | return false; |
295 | } |
296 | |
297 | protected function doAck( RunnableJob $job ) { |
298 | $partition = $job->getMetadata( 'QueuePartition' ); |
299 | if ( $partition === null ) { |
300 | throw new UnexpectedValueException( "The given job has no defined partition name." ); |
301 | } |
302 | |
303 | $this->partitionQueues[$partition]->ack( $job ); |
304 | } |
305 | |
306 | protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { |
307 | $signature = $job->getRootJobParams()['rootJobSignature']; |
308 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
309 | try { |
310 | return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); |
311 | } catch ( JobQueueError $e ) { |
312 | if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
313 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
314 | return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); |
315 | } |
316 | } |
317 | |
318 | return false; |
319 | } |
320 | |
321 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
322 | $signature = $job->getRootJobParams()['rootJobSignature']; |
323 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
324 | try { |
325 | return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); |
326 | } catch ( JobQueueError $e ) { |
327 | if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { |
328 | $partition = $this->partitionRing->getLiveLocation( $signature ); |
329 | return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); |
330 | } |
331 | } |
332 | |
333 | return false; |
334 | } |
335 | |
336 | protected function doDelete() { |
337 | $failed = 0; |
338 | /** @var JobQueue $queue */ |
339 | foreach ( $this->partitionQueues as $queue ) { |
340 | try { |
341 | $queue->doDelete(); |
342 | } catch ( JobQueueError $e ) { |
343 | ++$failed; |
344 | $this->logException( $e ); |
345 | } |
346 | } |
347 | $this->throwErrorIfAllPartitionsDown( $failed ); |
348 | return true; |
349 | } |
350 | |
351 | protected function doWaitForBackups() { |
352 | $failed = 0; |
353 | /** @var JobQueue $queue */ |
354 | foreach ( $this->partitionQueues as $queue ) { |
355 | try { |
356 | $queue->waitForBackups(); |
357 | } catch ( JobQueueError $e ) { |
358 | ++$failed; |
359 | $this->logException( $e ); |
360 | } |
361 | } |
362 | $this->throwErrorIfAllPartitionsDown( $failed ); |
363 | } |
364 | |
365 | protected function doFlushCaches() { |
366 | /** @var JobQueue $queue */ |
367 | foreach ( $this->partitionQueues as $queue ) { |
368 | $queue->doFlushCaches(); |
369 | } |
370 | } |
371 | |
372 | public function getAllQueuedJobs() { |
373 | $iterator = new AppendIterator(); |
374 | |
375 | /** @var JobQueue $queue */ |
376 | foreach ( $this->partitionQueues as $queue ) { |
377 | $iterator->append( $queue->getAllQueuedJobs() ); |
378 | } |
379 | |
380 | return $iterator; |
381 | } |
382 | |
383 | public function getAllDelayedJobs() { |
384 | $iterator = new AppendIterator(); |
385 | |
386 | /** @var JobQueue $queue */ |
387 | foreach ( $this->partitionQueues as $queue ) { |
388 | $iterator->append( $queue->getAllDelayedJobs() ); |
389 | } |
390 | |
391 | return $iterator; |
392 | } |
393 | |
394 | public function getAllAcquiredJobs() { |
395 | $iterator = new AppendIterator(); |
396 | |
397 | /** @var JobQueue $queue */ |
398 | foreach ( $this->partitionQueues as $queue ) { |
399 | $iterator->append( $queue->getAllAcquiredJobs() ); |
400 | } |
401 | |
402 | return $iterator; |
403 | } |
404 | |
405 | public function getAllAbandonedJobs() { |
406 | $iterator = new AppendIterator(); |
407 | |
408 | /** @var JobQueue $queue */ |
409 | foreach ( $this->partitionQueues as $queue ) { |
410 | $iterator->append( $queue->getAllAbandonedJobs() ); |
411 | } |
412 | |
413 | return $iterator; |
414 | } |
415 | |
416 | public function getCoalesceLocationInternal() { |
417 | return "JobQueueFederated:wiki:{$this->domain}" . |
418 | sha1( serialize( array_keys( $this->partitionQueues ) ) ); |
419 | } |
420 | |
421 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
422 | $result = []; |
423 | |
424 | $failed = 0; |
425 | /** @var JobQueue $queue */ |
426 | foreach ( $this->partitionQueues as $queue ) { |
427 | try { |
428 | $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); |
429 | if ( is_array( $nonEmpty ) ) { |
430 | $result = array_unique( array_merge( $result, $nonEmpty ) ); |
431 | } else { |
432 | return null; // not supported on all partitions; bail |
433 | } |
434 | if ( count( $result ) == count( $types ) ) { |
435 | break; // short-circuit |
436 | } |
437 | } catch ( JobQueueError $e ) { |
438 | ++$failed; |
439 | $this->logException( $e ); |
440 | } |
441 | } |
442 | $this->throwErrorIfAllPartitionsDown( $failed ); |
443 | |
444 | return array_values( $result ); |
445 | } |
446 | |
447 | protected function doGetSiblingQueueSizes( array $types ) { |
448 | $result = []; |
449 | $failed = 0; |
450 | /** @var JobQueue $queue */ |
451 | foreach ( $this->partitionQueues as $queue ) { |
452 | try { |
453 | $sizes = $queue->doGetSiblingQueueSizes( $types ); |
454 | if ( is_array( $sizes ) ) { |
455 | foreach ( $sizes as $type => $size ) { |
456 | $result[$type] = ( $result[$type] ?? 0 ) + $size; |
457 | } |
458 | } else { |
459 | return null; // not supported on all partitions; bail |
460 | } |
461 | } catch ( JobQueueError $e ) { |
462 | ++$failed; |
463 | $this->logException( $e ); |
464 | } |
465 | } |
466 | $this->throwErrorIfAllPartitionsDown( $failed ); |
467 | |
468 | return $result; |
469 | } |
470 | |
471 | protected function logException( Exception $e ) { |
472 | wfDebugLog( 'JobQueue', $e->getMessage() . "\n" . $e->getTraceAsString() ); |
473 | } |
474 | |
475 | /** |
476 | * Throw an error if no partitions available |
477 | * |
478 | * @param int $down The number of up partitions down |
479 | * @return void |
480 | * @throws JobQueueError |
481 | */ |
482 | protected function throwErrorIfAllPartitionsDown( $down ) { |
483 | if ( $down >= count( $this->partitionQueues ) ) { |
484 | throw new JobQueueError( 'No queue partitions available.' ); |
485 | } |
486 | } |
487 | } |