MediaWiki master
JobQueueFederated.php
Go to the documentation of this file.
1<?php
50 protected $partitionRing;
52 protected $partitionQueues = [];
53
56
73 protected function __construct( array $params ) {
74 parent::__construct( $params );
75 $section = $params['sectionsByWiki'][$this->domain] ?? 'default';
76 if ( !isset( $params['partitionsBySection'][$section] ) ) {
77 throw new InvalidArgumentException( "No configuration for section '$section'." );
78 }
79 $this->maxPartitionsTry = $params['maxPartitionsTry'] ?? 2;
80 // Get the full partition map
81 $partitionMap = $params['partitionsBySection'][$section];
82 arsort( $partitionMap, SORT_NUMERIC );
83 // Get the config to pass to merge into each partition queue config
84 $baseConfig = $params;
85 foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry',
86 'partitionsBySection', 'configByPartition', ] as $o
87 ) {
88 unset( $baseConfig[$o] ); // partition queue doesn't care about this
89 }
90 // Get the partition queue objects
91 foreach ( $partitionMap as $partition => $w ) {
92 if ( !isset( $params['configByPartition'][$partition] ) ) {
93 throw new InvalidArgumentException( "No configuration for partition '$partition'." );
94 }
95 $this->partitionQueues[$partition] = JobQueue::factory(
96 $baseConfig + $params['configByPartition'][$partition] );
97 }
98 // Ring of all partitions
99 $this->partitionRing = new HashRing( $partitionMap );
100 }
101
102 protected function supportedOrders() {
103 // No FIFO due to partitioning, though "rough timestamp order" is supported
104 return [ 'undefined', 'random', 'timestamp' ];
105 }
106
107 protected function optimalOrder() {
108 return 'undefined'; // defer to the partitions
109 }
110
111 protected function supportsDelayedJobs() {
112 foreach ( $this->partitionQueues as $queue ) {
113 if ( !$queue->supportsDelayedJobs() ) {
114 return false;
115 }
116 }
117
118 return true;
119 }
120
121 protected function doIsEmpty() {
122 $empty = true;
123 $failed = 0;
124 foreach ( $this->partitionQueues as $queue ) {
125 try {
126 $empty = $empty && $queue->doIsEmpty();
127 } catch ( JobQueueError $e ) {
128 ++$failed;
129 $this->logException( $e );
130 }
131 }
132 $this->throwErrorIfAllPartitionsDown( $failed );
133
134 return $empty;
135 }
136
137 protected function doGetSize() {
138 return $this->getCrossPartitionSum( 'size', 'doGetSize' );
139 }
140
141 protected function doGetAcquiredCount() {
142 return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
143 }
144
145 protected function doGetDelayedCount() {
146 return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
147 }
148
149 protected function doGetAbandonedCount() {
150 return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
151 }
152
158 protected function getCrossPartitionSum( $type, $method ) {
159 $count = 0;
160 $failed = 0;
161 foreach ( $this->partitionQueues as $queue ) {
162 try {
163 $count += $queue->$method();
164 } catch ( JobQueueError $e ) {
165 ++$failed;
166 $this->logException( $e );
167 }
168 }
169 $this->throwErrorIfAllPartitionsDown( $failed );
170
171 return $count;
172 }
173
174 protected function doBatchPush( array $jobs, $flags ) {
175 // Local ring variable that may be changed to point to a new ring on failure
177 // Try to insert the jobs and update $partitionsTry on any failures.
178 // Retry to insert any remaining jobs again, ignoring the bad partitions.
179 $jobsLeft = $jobs;
180 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
181 try {
183 } catch ( UnexpectedValueException $e ) {
184 break; // all servers down; nothing to insert to
185 }
186 $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
187 }
188 if ( count( $jobsLeft ) ) {
189 throw new JobQueueError(
190 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
191 }
192 }
193
201 protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
202 $jobsLeft = [];
203
204 // Because jobs are spread across partitions, per-job de-duplication needs
205 // to use a consistent hash to avoid allowing duplicate jobs per partition.
206 // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
207 $uJobsByPartition = []; // (partition name => job list)
209 foreach ( $jobs as $key => $job ) {
210 if ( $job->ignoreDuplicates() ) {
211 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
212 $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
213 unset( $jobs[$key] );
214 }
215 }
216 // Get the batches of jobs that are not de-duplicated
217 if ( $flags & self::QOS_ATOMIC ) {
218 $nuJobBatches = [ $jobs ]; // all or nothing
219 } else {
220 // Split the jobs into batches and spread them out over servers if there
221 // are many jobs. This helps keep the partitions even. Otherwise, send all
222 // the jobs to a single partition queue to avoids the extra connections.
223 $nuJobBatches = array_chunk( $jobs, 300 );
224 }
225
226 // Insert the de-duplicated jobs into the queues...
227 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
229 $queue = $this->partitionQueues[$partition];
230 try {
231 $ok = true;
232 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
233 } catch ( JobQueueError $e ) {
234 $ok = false;
235 $this->logException( $e );
236 }
237 if ( !$ok ) {
238 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
239 throw new JobQueueError( "Could not insert job(s), no partitions available." );
240 }
241 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
242 }
243 }
244
245 // Insert the jobs that are not de-duplicated into the queues...
246 foreach ( $nuJobBatches as $jobBatch ) {
247 $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
248 $queue = $this->partitionQueues[$partition];
249 try {
250 $ok = true;
251 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
252 } catch ( JobQueueError $e ) {
253 $ok = false;
254 $this->logException( $e );
255 }
256 if ( !$ok ) {
257 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
258 throw new JobQueueError( "Could not insert job(s), no partitions available." );
259 }
260 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
261 }
262 }
263
264 return $jobsLeft;
265 }
266
267 protected function doPop() {
268 $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
269
270 $failed = 0;
271 while ( count( $partitionsTry ) ) {
272 $partition = ArrayUtils::pickRandom( $partitionsTry );
273 if ( $partition === false ) {
274 break; // all partitions at 0 weight
275 }
276
278 $queue = $this->partitionQueues[$partition];
279 try {
280 $job = $queue->pop();
281 } catch ( JobQueueError $e ) {
282 ++$failed;
283 $this->logException( $e );
284 $job = false;
285 }
286 if ( $job ) {
287 $job->setMetadata( 'QueuePartition', $partition );
288
289 return $job;
290 } else {
291 unset( $partitionsTry[$partition] );
292 }
293 }
294 $this->throwErrorIfAllPartitionsDown( $failed );
295
296 return false;
297 }
298
299 protected function doAck( RunnableJob $job ) {
300 $partition = $job->getMetadata( 'QueuePartition' );
301 if ( $partition === null ) {
302 throw new UnexpectedValueException( "The given job has no defined partition name." );
303 }
304
305 $this->partitionQueues[$partition]->ack( $job );
306 }
307
309 $signature = $job->getRootJobParams()['rootJobSignature'];
310 $partition = $this->partitionRing->getLiveLocation( $signature );
311 try {
312 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
313 } catch ( JobQueueError $e ) {
314 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
315 $partition = $this->partitionRing->getLiveLocation( $signature );
316 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
317 }
318 }
319
320 return false;
321 }
322
324 $signature = $job->getRootJobParams()['rootJobSignature'];
325 $partition = $this->partitionRing->getLiveLocation( $signature );
326 try {
327 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
328 } catch ( JobQueueError $e ) {
329 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
330 $partition = $this->partitionRing->getLiveLocation( $signature );
331 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
332 }
333 }
334
335 return false;
336 }
337
338 protected function doDelete() {
339 $failed = 0;
341 foreach ( $this->partitionQueues as $queue ) {
342 try {
343 $queue->doDelete();
344 } catch ( JobQueueError $e ) {
345 ++$failed;
346 $this->logException( $e );
347 }
348 }
349 $this->throwErrorIfAllPartitionsDown( $failed );
350 return true;
351 }
352
353 protected function doWaitForBackups() {
354 $failed = 0;
356 foreach ( $this->partitionQueues as $queue ) {
357 try {
358 $queue->waitForBackups();
359 } catch ( JobQueueError $e ) {
360 ++$failed;
361 $this->logException( $e );
362 }
363 }
364 $this->throwErrorIfAllPartitionsDown( $failed );
365 }
366
367 protected function doFlushCaches() {
369 foreach ( $this->partitionQueues as $queue ) {
370 $queue->doFlushCaches();
371 }
372 }
373
374 public function getAllQueuedJobs() {
375 $iterator = new AppendIterator();
376
378 foreach ( $this->partitionQueues as $queue ) {
379 $iterator->append( $queue->getAllQueuedJobs() );
380 }
381
382 return $iterator;
383 }
384
385 public function getAllDelayedJobs() {
386 $iterator = new AppendIterator();
387
389 foreach ( $this->partitionQueues as $queue ) {
390 $iterator->append( $queue->getAllDelayedJobs() );
391 }
392
393 return $iterator;
394 }
395
396 public function getAllAcquiredJobs() {
397 $iterator = new AppendIterator();
398
400 foreach ( $this->partitionQueues as $queue ) {
401 $iterator->append( $queue->getAllAcquiredJobs() );
402 }
403
404 return $iterator;
405 }
406
407 public function getAllAbandonedJobs() {
408 $iterator = new AppendIterator();
409
411 foreach ( $this->partitionQueues as $queue ) {
412 $iterator->append( $queue->getAllAbandonedJobs() );
413 }
414
415 return $iterator;
416 }
417
418 public function getCoalesceLocationInternal() {
419 return "JobQueueFederated:wiki:{$this->domain}" .
420 sha1( serialize( array_keys( $this->partitionQueues ) ) );
421 }
422
423 protected function doGetSiblingQueuesWithJobs( array $types ) {
424 $result = [];
425
426 $failed = 0;
428 foreach ( $this->partitionQueues as $queue ) {
429 try {
430 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
431 if ( is_array( $nonEmpty ) ) {
432 $result = array_unique( array_merge( $result, $nonEmpty ) );
433 } else {
434 return null; // not supported on all partitions; bail
435 }
436 if ( count( $result ) == count( $types ) ) {
437 break; // short-circuit
438 }
439 } catch ( JobQueueError $e ) {
440 ++$failed;
441 $this->logException( $e );
442 }
443 }
444 $this->throwErrorIfAllPartitionsDown( $failed );
445
446 return array_values( $result );
447 }
448
449 protected function doGetSiblingQueueSizes( array $types ) {
450 $result = [];
451 $failed = 0;
453 foreach ( $this->partitionQueues as $queue ) {
454 try {
455 $sizes = $queue->doGetSiblingQueueSizes( $types );
456 if ( is_array( $sizes ) ) {
457 foreach ( $sizes as $type => $size ) {
458 $result[$type] = ( $result[$type] ?? 0 ) + $size;
459 }
460 } else {
461 return null; // not supported on all partitions; bail
462 }
463 } catch ( JobQueueError $e ) {
464 ++$failed;
465 $this->logException( $e );
466 }
467 }
468 $this->throwErrorIfAllPartitionsDown( $failed );
469
470 return $result;
471 }
472
473 protected function logException( Exception $e ) {
474 wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() );
475 }
476
484 protected function throwErrorIfAllPartitionsDown( $down ) {
485 if ( $down >= count( $this->partitionQueues ) ) {
486 throw new JobQueueError( 'No queue partitions available.' );
487 }
488 }
489}
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
array $params
The job parameters.
Convenience class for weighted consistent hash rings.
Definition HashRing.php:44
getLiveLocationWeights()
Get the map of "live" locations to weight (does not include zero weight items)
Definition HashRing.php:268
getLiveLocation( $item)
Get the location of an item on the "live" ring.
Definition HashRing.php:246
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Definition HashRing.php:226
Class to handle enqueueing and running of background jobs for federated queues.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doGetSiblingQueuesWithJobs(array $types)
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
doGetSiblingQueueSizes(array $types)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
doAck(RunnableJob $job)
doIsRootJobOldDuplicate(IJobSpecification $job)
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
doBatchPush(array $jobs, $flags)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
getCrossPartitionSum( $type, $method)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
logException(Exception $e)
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:38
string $type
Job type.
Definition JobQueue.php:42
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:145
string $domain
DB domain ID.
Definition JobQueue.php:40
Class to both describe a background job and handle jobs.
Definition Job.php:40
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
if(count( $args)< 1) $job