MediaWiki master
JobQueueFederated.php
Go to the documentation of this file.
1<?php
7namespace MediaWiki\JobQueue;
8
9use AppendIterator;
10use Exception;
11use InvalidArgumentException;
13use UnexpectedValueException;
16
44 protected $partitionRing;
46 protected $partitionQueues = [];
47
50
67 protected function __construct( array $params ) {
68 parent::__construct( $params );
69 $section = $params['sectionsByWiki'][$this->domain] ?? 'default';
70 if ( !isset( $params['partitionsBySection'][$section] ) ) {
71 throw new InvalidArgumentException( "No configuration for section '$section'." );
72 }
73 $this->maxPartitionsTry = $params['maxPartitionsTry'] ?? 2;
74 // Get the full partition map
75 $partitionMap = $params['partitionsBySection'][$section];
76 arsort( $partitionMap, SORT_NUMERIC );
77 // Get the config to pass to merge into each partition queue config
78 $baseConfig = $params;
79 foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry',
80 'partitionsBySection', 'configByPartition', ] as $o
81 ) {
82 unset( $baseConfig[$o] ); // partition queue doesn't care about this
83 }
84 // Get the partition queue objects
85 foreach ( $partitionMap as $partition => $w ) {
86 if ( !isset( $params['configByPartition'][$partition] ) ) {
87 throw new InvalidArgumentException( "No configuration for partition '$partition'." );
88 }
89 $this->partitionQueues[$partition] = JobQueue::factory(
90 $baseConfig + $params['configByPartition'][$partition] );
91 }
92 // Ring of all partitions
93 $this->partitionRing = new HashRing( $partitionMap );
94 }
95
97 protected function supportedOrders() {
98 // No FIFO due to partitioning, though "rough timestamp order" is supported
99 return [ 'undefined', 'random', 'timestamp' ];
100 }
101
103 protected function optimalOrder() {
104 return 'undefined'; // defer to the partitions
105 }
106
108 protected function supportsDelayedJobs() {
109 foreach ( $this->partitionQueues as $queue ) {
110 if ( !$queue->supportsDelayedJobs() ) {
111 return false;
112 }
113 }
114
115 return true;
116 }
117
119 protected function doIsEmpty() {
120 $empty = true;
121 $failed = 0;
122 foreach ( $this->partitionQueues as $queue ) {
123 try {
124 $empty = $empty && $queue->doIsEmpty();
125 } catch ( JobQueueError $e ) {
126 ++$failed;
127 $this->logException( $e );
128 }
129 }
130 $this->throwErrorIfAllPartitionsDown( $failed );
131
132 return $empty;
133 }
134
136 protected function doGetSize() {
137 return $this->getCrossPartitionSum( 'size', 'doGetSize' );
138 }
139
141 protected function doGetAcquiredCount() {
142 return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
143 }
144
146 protected function doGetDelayedCount() {
147 return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
148 }
149
151 protected function doGetAbandonedCount() {
152 return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
153 }
154
160 protected function getCrossPartitionSum( $type, $method ) {
161 $count = 0;
162 $failed = 0;
163 foreach ( $this->partitionQueues as $queue ) {
164 try {
165 $count += $queue->$method();
166 } catch ( JobQueueError $e ) {
167 ++$failed;
168 $this->logException( $e );
169 }
170 }
171 $this->throwErrorIfAllPartitionsDown( $failed );
172
173 return $count;
174 }
175
177 protected function doBatchPush( array $jobs, $flags ) {
178 // Local ring variable that may be changed to point to a new ring on failure
180 // Try to insert the jobs and update $partitionsTry on any failures.
181 // Retry to insert any remaining jobs again, ignoring the bad partitions.
182 $jobsLeft = $jobs;
183 for ( $i = $this->maxPartitionsTry; $i-- && $jobsLeft; ) {
184 try {
186 } catch ( UnexpectedValueException ) {
187 break; // all servers down; nothing to insert to
188 }
189 $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
190 }
191 if ( $jobsLeft ) {
192 throw new JobQueueError(
193 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
194 }
195 }
196
204 protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
205 $jobsLeft = [];
206
207 // Because jobs are spread across partitions, per-job de-duplication needs
208 // to use a consistent hash to avoid allowing duplicate jobs per partition.
209 // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
210 $uJobsByPartition = []; // (partition name => job list)
212 foreach ( $jobs as $key => $job ) {
213 if ( $job->ignoreDuplicates() ) {
214 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
215 $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
216 unset( $jobs[$key] );
217 }
218 }
219 // Get the batches of jobs that are not de-duplicated
220 if ( $flags & self::QOS_ATOMIC ) {
221 $nuJobBatches = [ $jobs ]; // all or nothing
222 } else {
223 // Split the jobs into batches and spread them out over servers if there
224 // are many jobs. This helps keep the partitions even. Otherwise, send all
225 // the jobs to a single partition queue to avoids the extra connections.
226 $nuJobBatches = array_chunk( $jobs, 300 );
227 }
228
229 // Insert the de-duplicated jobs into the queues...
230 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
232 $queue = $this->partitionQueues[$partition];
233 try {
234 $ok = true;
235 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
236 } catch ( JobQueueError $e ) {
237 $ok = false;
238 $this->logException( $e );
239 }
240 if ( !$ok ) {
241 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
242 throw new JobQueueError( "Could not insert job(s), no partitions available." );
243 }
244 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
245 }
246 }
247
248 // Insert the jobs that are not de-duplicated into the queues...
249 foreach ( $nuJobBatches as $jobBatch ) {
250 $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
251 $queue = $this->partitionQueues[$partition];
252 try {
253 $ok = true;
254 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
255 } catch ( JobQueueError $e ) {
256 $ok = false;
257 $this->logException( $e );
258 }
259 if ( !$ok ) {
260 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
261 throw new JobQueueError( "Could not insert job(s), no partitions available." );
262 }
263 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
264 }
265 }
266
267 return $jobsLeft;
268 }
269
271 protected function doPop() {
272 $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
273
274 $failed = 0;
275 while ( count( $partitionsTry ) ) {
276 $partition = ArrayUtils::pickRandom( $partitionsTry );
277 if ( $partition === false ) {
278 break; // all partitions at 0 weight
279 }
280
282 $queue = $this->partitionQueues[$partition];
283 try {
284 $job = $queue->pop();
285 } catch ( JobQueueError $e ) {
286 ++$failed;
287 $this->logException( $e );
288 $job = false;
289 }
290 if ( $job ) {
291 $job->setMetadata( 'QueuePartition', $partition );
292
293 return $job;
294 } else {
295 unset( $partitionsTry[$partition] );
296 }
297 }
298 $this->throwErrorIfAllPartitionsDown( $failed );
299
300 return false;
301 }
302
304 protected function doAck( RunnableJob $job ) {
305 $partition = $job->getMetadata( 'QueuePartition' );
306 if ( $partition === null ) {
307 throw new UnexpectedValueException( "The given job has no defined partition name." );
308 }
309
310 $this->partitionQueues[$partition]->ack( $job );
311 }
312
315 $signature = $job->getRootJobParams()['rootJobSignature'];
316 $partition = $this->partitionRing->getLiveLocation( $signature );
317 try {
318 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
319 } catch ( JobQueueError ) {
320 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
321 $partition = $this->partitionRing->getLiveLocation( $signature );
322 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
323 }
324 }
325
326 return false;
327 }
328
331 $signature = $job->getRootJobParams()['rootJobSignature'];
332 $partition = $this->partitionRing->getLiveLocation( $signature );
333 try {
334 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
335 } catch ( JobQueueError ) {
336 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
337 $partition = $this->partitionRing->getLiveLocation( $signature );
338 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
339 }
340 }
341
342 return false;
343 }
344
346 protected function doDelete() {
347 $failed = 0;
349 foreach ( $this->partitionQueues as $queue ) {
350 try {
351 $queue->doDelete();
352 } catch ( JobQueueError $e ) {
353 ++$failed;
354 $this->logException( $e );
355 }
356 }
357 $this->throwErrorIfAllPartitionsDown( $failed );
358 return true;
359 }
360
362 protected function doWaitForBackups() {
363 $failed = 0;
365 foreach ( $this->partitionQueues as $queue ) {
366 try {
367 $queue->waitForBackups();
368 } catch ( JobQueueError $e ) {
369 ++$failed;
370 $this->logException( $e );
371 }
372 }
373 $this->throwErrorIfAllPartitionsDown( $failed );
374 }
375
377 protected function doFlushCaches() {
379 foreach ( $this->partitionQueues as $queue ) {
380 $queue->doFlushCaches();
381 }
382 }
383
385 public function getAllQueuedJobs() {
386 $iterator = new AppendIterator();
387
389 foreach ( $this->partitionQueues as $queue ) {
390 $iterator->append( $queue->getAllQueuedJobs() );
391 }
392
393 return $iterator;
394 }
395
397 public function getAllDelayedJobs() {
398 $iterator = new AppendIterator();
399
401 foreach ( $this->partitionQueues as $queue ) {
402 $iterator->append( $queue->getAllDelayedJobs() );
403 }
404
405 return $iterator;
406 }
407
409 public function getAllAcquiredJobs() {
410 $iterator = new AppendIterator();
411
413 foreach ( $this->partitionQueues as $queue ) {
414 $iterator->append( $queue->getAllAcquiredJobs() );
415 }
416
417 return $iterator;
418 }
419
421 public function getAllAbandonedJobs() {
422 $iterator = new AppendIterator();
423
425 foreach ( $this->partitionQueues as $queue ) {
426 $iterator->append( $queue->getAllAbandonedJobs() );
427 }
428
429 return $iterator;
430 }
431
433 public function getCoalesceLocationInternal() {
434 return "JobQueueFederated:wiki:{$this->domain}" .
435 sha1( serialize( array_keys( $this->partitionQueues ) ) );
436 }
437
439 protected function doGetSiblingQueuesWithJobs( array $types ) {
440 $result = [];
441
442 $failed = 0;
444 foreach ( $this->partitionQueues as $queue ) {
445 try {
446 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
447 if ( is_array( $nonEmpty ) ) {
448 $result = array_unique( array_merge( $result, $nonEmpty ) );
449 } else {
450 return null; // not supported on all partitions; bail
451 }
452 if ( count( $result ) == count( $types ) ) {
453 break; // short-circuit
454 }
455 } catch ( JobQueueError $e ) {
456 ++$failed;
457 $this->logException( $e );
458 }
459 }
460 $this->throwErrorIfAllPartitionsDown( $failed );
461
462 return array_values( $result );
463 }
464
466 protected function doGetSiblingQueueSizes( array $types ) {
467 $result = [];
468 $failed = 0;
470 foreach ( $this->partitionQueues as $queue ) {
471 try {
472 $sizes = $queue->doGetSiblingQueueSizes( $types );
473 if ( is_array( $sizes ) ) {
474 foreach ( $sizes as $type => $size ) {
475 $result[$type] = ( $result[$type] ?? 0 ) + $size;
476 }
477 } else {
478 return null; // not supported on all partitions; bail
479 }
480 } catch ( JobQueueError $e ) {
481 ++$failed;
482 $this->logException( $e );
483 }
484 }
485 $this->throwErrorIfAllPartitionsDown( $failed );
486
487 return $result;
488 }
489
490 protected function logException( Exception $e ) {
491 wfDebugLog( 'JobQueue', $e->getMessage() . "\n" . $e->getTraceAsString() );
492 }
493
501 protected function throwErrorIfAllPartitionsDown( $down ) {
502 if ( $down >= count( $this->partitionQueues ) ) {
503 throw new JobQueueError( 'No queue partitions available.' );
504 }
505 }
506}
507
509class_alias( JobQueueFederated::class, 'JobQueueFederated' );
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Enqueue and run background jobs via a federated queue, for wiki farms.
doGetAcquiredCount()
JobQueue::getAcquiredCount() int
doGetAbandonedCount()
to override JobQueue::getAbandonedCount() int
optimalOrder()
Get the default queue order to use if configuration does not specify one.string One of (random,...
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.to override string|null 1....
doAck(RunnableJob $job)
JobQueue::ack()
doIsRootJobOldDuplicate(IJobSpecification $job)
to override JobQueue::isRootJobOldDuplicate() bool
supportedOrders()
Get the allowed queue orders for configuration validation.array Subset of (random,...
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
doBatchPush(array $jobs, $flags)
JobQueue::batchPush()
doDeduplicateRootJob(IJobSpecification $job)
to override JobQueue::deduplicateRootJob() bool
int $maxPartitionsTry
Maximum number of partitions to try.
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.to override bool Whether delayed ...
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.Callers should be quick to iterator o...
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.to override \Iterator<RunnableJob> ...
doDelete()
to override JobQueue::delete()
doPop()
JobQueue::pop() RunnableJob|false
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.This does not include jobs that are...
doFlushCaches()
to override JobQueue::flushCaches() void
doGetSiblingQueueSizes(array $types)
to override JobQueue::getSiblingQueuesSize() array|null (list of queue types) or null if unsupported
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.Note: results may be stale if the que...
doGetDelayedCount()
to override JobQueue::getDelayedCount() int
doWaitForBackups()
to override JobQueue::waitForBackups() void
doGetSiblingQueuesWithJobs(array $types)
to override JobQueue::getSiblingQueuesWithJobs() array|null (list of queue types) or null if unsuppor...
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
Base class for queueing and running background jobs from a storage backend.
Definition JobQueue.php:36
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:142
string $domain
DB domain ID.
Definition JobQueue.php:38
string $type
Job type.
Definition JobQueue.php:40
Describe and execute a background job.
Definition Job.php:28
A collection of static methods to play with arrays.
Convenience class for weighted consistent hash rings.
Definition HashRing.php:36
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Definition HashRing.php:217
getLiveLocationWeights()
Get the map of "live" locations to weight (does not include zero weight items)
Definition HashRing.php:259
getLiveLocation( $item)
Get the location of an item on the "live" ring.
Definition HashRing.php:237
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
if(count( $args)< 1) $job