MediaWiki master
JobQueueFederated.php
Go to the documentation of this file.
1<?php
21namespace MediaWiki\JobQueue;
22
23use AppendIterator;
24use Exception;
25use InvalidArgumentException;
27use UnexpectedValueException;
30
58 protected $partitionRing;
60 protected $partitionQueues = [];
61
64
81 protected function __construct( array $params ) {
82 parent::__construct( $params );
83 $section = $params['sectionsByWiki'][$this->domain] ?? 'default';
84 if ( !isset( $params['partitionsBySection'][$section] ) ) {
85 throw new InvalidArgumentException( "No configuration for section '$section'." );
86 }
87 $this->maxPartitionsTry = $params['maxPartitionsTry'] ?? 2;
88 // Get the full partition map
89 $partitionMap = $params['partitionsBySection'][$section];
90 arsort( $partitionMap, SORT_NUMERIC );
91 // Get the config to pass to merge into each partition queue config
92 $baseConfig = $params;
93 foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry',
94 'partitionsBySection', 'configByPartition', ] as $o
95 ) {
96 unset( $baseConfig[$o] ); // partition queue doesn't care about this
97 }
98 // Get the partition queue objects
99 foreach ( $partitionMap as $partition => $w ) {
100 if ( !isset( $params['configByPartition'][$partition] ) ) {
101 throw new InvalidArgumentException( "No configuration for partition '$partition'." );
102 }
103 $this->partitionQueues[$partition] = JobQueue::factory(
104 $baseConfig + $params['configByPartition'][$partition] );
105 }
106 // Ring of all partitions
107 $this->partitionRing = new HashRing( $partitionMap );
108 }
109
110 protected function supportedOrders() {
111 // No FIFO due to partitioning, though "rough timestamp order" is supported
112 return [ 'undefined', 'random', 'timestamp' ];
113 }
114
115 protected function optimalOrder() {
116 return 'undefined'; // defer to the partitions
117 }
118
119 protected function supportsDelayedJobs() {
120 foreach ( $this->partitionQueues as $queue ) {
121 if ( !$queue->supportsDelayedJobs() ) {
122 return false;
123 }
124 }
125
126 return true;
127 }
128
129 protected function doIsEmpty() {
130 $empty = true;
131 $failed = 0;
132 foreach ( $this->partitionQueues as $queue ) {
133 try {
134 $empty = $empty && $queue->doIsEmpty();
135 } catch ( JobQueueError $e ) {
136 ++$failed;
137 $this->logException( $e );
138 }
139 }
140 $this->throwErrorIfAllPartitionsDown( $failed );
141
142 return $empty;
143 }
144
145 protected function doGetSize() {
146 return $this->getCrossPartitionSum( 'size', 'doGetSize' );
147 }
148
149 protected function doGetAcquiredCount() {
150 return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
151 }
152
153 protected function doGetDelayedCount() {
154 return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
155 }
156
157 protected function doGetAbandonedCount() {
158 return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
159 }
160
166 protected function getCrossPartitionSum( $type, $method ) {
167 $count = 0;
168 $failed = 0;
169 foreach ( $this->partitionQueues as $queue ) {
170 try {
171 $count += $queue->$method();
172 } catch ( JobQueueError $e ) {
173 ++$failed;
174 $this->logException( $e );
175 }
176 }
177 $this->throwErrorIfAllPartitionsDown( $failed );
178
179 return $count;
180 }
181
182 protected function doBatchPush( array $jobs, $flags ) {
183 // Local ring variable that may be changed to point to a new ring on failure
185 // Try to insert the jobs and update $partitionsTry on any failures.
186 // Retry to insert any remaining jobs again, ignoring the bad partitions.
187 $jobsLeft = $jobs;
188 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
189 try {
191 } catch ( UnexpectedValueException ) {
192 break; // all servers down; nothing to insert to
193 }
194 $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
195 }
196 if ( count( $jobsLeft ) ) {
197 throw new JobQueueError(
198 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
199 }
200 }
201
209 protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
210 $jobsLeft = [];
211
212 // Because jobs are spread across partitions, per-job de-duplication needs
213 // to use a consistent hash to avoid allowing duplicate jobs per partition.
214 // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
215 $uJobsByPartition = []; // (partition name => job list)
217 foreach ( $jobs as $key => $job ) {
218 if ( $job->ignoreDuplicates() ) {
219 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
220 $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
221 unset( $jobs[$key] );
222 }
223 }
224 // Get the batches of jobs that are not de-duplicated
225 if ( $flags & self::QOS_ATOMIC ) {
226 $nuJobBatches = [ $jobs ]; // all or nothing
227 } else {
228 // Split the jobs into batches and spread them out over servers if there
229 // are many jobs. This helps keep the partitions even. Otherwise, send all
230 // the jobs to a single partition queue to avoids the extra connections.
231 $nuJobBatches = array_chunk( $jobs, 300 );
232 }
233
234 // Insert the de-duplicated jobs into the queues...
235 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
237 $queue = $this->partitionQueues[$partition];
238 try {
239 $ok = true;
240 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
241 } catch ( JobQueueError $e ) {
242 $ok = false;
243 $this->logException( $e );
244 }
245 if ( !$ok ) {
246 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
247 throw new JobQueueError( "Could not insert job(s), no partitions available." );
248 }
249 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
250 }
251 }
252
253 // Insert the jobs that are not de-duplicated into the queues...
254 foreach ( $nuJobBatches as $jobBatch ) {
255 $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
256 $queue = $this->partitionQueues[$partition];
257 try {
258 $ok = true;
259 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
260 } catch ( JobQueueError $e ) {
261 $ok = false;
262 $this->logException( $e );
263 }
264 if ( !$ok ) {
265 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
266 throw new JobQueueError( "Could not insert job(s), no partitions available." );
267 }
268 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
269 }
270 }
271
272 return $jobsLeft;
273 }
274
275 protected function doPop() {
276 $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
277
278 $failed = 0;
279 while ( count( $partitionsTry ) ) {
280 $partition = ArrayUtils::pickRandom( $partitionsTry );
281 if ( $partition === false ) {
282 break; // all partitions at 0 weight
283 }
284
286 $queue = $this->partitionQueues[$partition];
287 try {
288 $job = $queue->pop();
289 } catch ( JobQueueError $e ) {
290 ++$failed;
291 $this->logException( $e );
292 $job = false;
293 }
294 if ( $job ) {
295 $job->setMetadata( 'QueuePartition', $partition );
296
297 return $job;
298 } else {
299 unset( $partitionsTry[$partition] );
300 }
301 }
302 $this->throwErrorIfAllPartitionsDown( $failed );
303
304 return false;
305 }
306
307 protected function doAck( RunnableJob $job ) {
308 $partition = $job->getMetadata( 'QueuePartition' );
309 if ( $partition === null ) {
310 throw new UnexpectedValueException( "The given job has no defined partition name." );
311 }
312
313 $this->partitionQueues[$partition]->ack( $job );
314 }
315
317 $signature = $job->getRootJobParams()['rootJobSignature'];
318 $partition = $this->partitionRing->getLiveLocation( $signature );
319 try {
320 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
321 } catch ( JobQueueError ) {
322 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
323 $partition = $this->partitionRing->getLiveLocation( $signature );
324 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
325 }
326 }
327
328 return false;
329 }
330
332 $signature = $job->getRootJobParams()['rootJobSignature'];
333 $partition = $this->partitionRing->getLiveLocation( $signature );
334 try {
335 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
336 } catch ( JobQueueError ) {
337 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
338 $partition = $this->partitionRing->getLiveLocation( $signature );
339 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
340 }
341 }
342
343 return false;
344 }
345
346 protected function doDelete() {
347 $failed = 0;
349 foreach ( $this->partitionQueues as $queue ) {
350 try {
351 $queue->doDelete();
352 } catch ( JobQueueError $e ) {
353 ++$failed;
354 $this->logException( $e );
355 }
356 }
357 $this->throwErrorIfAllPartitionsDown( $failed );
358 return true;
359 }
360
361 protected function doWaitForBackups() {
362 $failed = 0;
364 foreach ( $this->partitionQueues as $queue ) {
365 try {
366 $queue->waitForBackups();
367 } catch ( JobQueueError $e ) {
368 ++$failed;
369 $this->logException( $e );
370 }
371 }
372 $this->throwErrorIfAllPartitionsDown( $failed );
373 }
374
375 protected function doFlushCaches() {
377 foreach ( $this->partitionQueues as $queue ) {
378 $queue->doFlushCaches();
379 }
380 }
381
382 public function getAllQueuedJobs() {
383 $iterator = new AppendIterator();
384
386 foreach ( $this->partitionQueues as $queue ) {
387 $iterator->append( $queue->getAllQueuedJobs() );
388 }
389
390 return $iterator;
391 }
392
393 public function getAllDelayedJobs() {
394 $iterator = new AppendIterator();
395
397 foreach ( $this->partitionQueues as $queue ) {
398 $iterator->append( $queue->getAllDelayedJobs() );
399 }
400
401 return $iterator;
402 }
403
404 public function getAllAcquiredJobs() {
405 $iterator = new AppendIterator();
406
408 foreach ( $this->partitionQueues as $queue ) {
409 $iterator->append( $queue->getAllAcquiredJobs() );
410 }
411
412 return $iterator;
413 }
414
415 public function getAllAbandonedJobs() {
416 $iterator = new AppendIterator();
417
419 foreach ( $this->partitionQueues as $queue ) {
420 $iterator->append( $queue->getAllAbandonedJobs() );
421 }
422
423 return $iterator;
424 }
425
426 public function getCoalesceLocationInternal() {
427 return "JobQueueFederated:wiki:{$this->domain}" .
428 sha1( serialize( array_keys( $this->partitionQueues ) ) );
429 }
430
431 protected function doGetSiblingQueuesWithJobs( array $types ) {
432 $result = [];
433
434 $failed = 0;
436 foreach ( $this->partitionQueues as $queue ) {
437 try {
438 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
439 if ( is_array( $nonEmpty ) ) {
440 $result = array_unique( array_merge( $result, $nonEmpty ) );
441 } else {
442 return null; // not supported on all partitions; bail
443 }
444 if ( count( $result ) == count( $types ) ) {
445 break; // short-circuit
446 }
447 } catch ( JobQueueError $e ) {
448 ++$failed;
449 $this->logException( $e );
450 }
451 }
452 $this->throwErrorIfAllPartitionsDown( $failed );
453
454 return array_values( $result );
455 }
456
457 protected function doGetSiblingQueueSizes( array $types ) {
458 $result = [];
459 $failed = 0;
461 foreach ( $this->partitionQueues as $queue ) {
462 try {
463 $sizes = $queue->doGetSiblingQueueSizes( $types );
464 if ( is_array( $sizes ) ) {
465 foreach ( $sizes as $type => $size ) {
466 $result[$type] = ( $result[$type] ?? 0 ) + $size;
467 }
468 } else {
469 return null; // not supported on all partitions; bail
470 }
471 } catch ( JobQueueError $e ) {
472 ++$failed;
473 $this->logException( $e );
474 }
475 }
476 $this->throwErrorIfAllPartitionsDown( $failed );
477
478 return $result;
479 }
480
481 protected function logException( Exception $e ) {
482 wfDebugLog( 'JobQueue', $e->getMessage() . "\n" . $e->getTraceAsString() );
483 }
484
492 protected function throwErrorIfAllPartitionsDown( $down ) {
493 if ( $down >= count( $this->partitionQueues ) ) {
494 throw new JobQueueError( 'No queue partitions available.' );
495 }
496 }
497}
498
500class_alias( JobQueueFederated::class, 'JobQueueFederated' );
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Enqueue and run background jobs via a federated queue, for wiki farms.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doIsRootJobOldDuplicate(IJobSpecification $job)
supportedOrders()
Get the allowed queue orders for configuration validation.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
Base class for queueing and running background jobs from a storage backend.
Definition JobQueue.php:50
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:157
string $domain
DB domain ID.
Definition JobQueue.php:52
string $type
Job type.
Definition JobQueue.php:54
Describe and execute a background job.
Definition Job.php:41
A collection of static methods to play with arrays.
Convenience class for weighted consistent hash rings.
Definition HashRing.php:50
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Definition HashRing.php:231
getLiveLocationWeights()
Get the map of "live" locations to weight (does not include zero weight items)
Definition HashRing.php:273
getLiveLocation( $item)
Get the location of an item on the "live" ring.
Definition HashRing.php:251
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
if(count( $args)< 1) $job