MediaWiki  1.34.0
JobQueueFederated.php
Go to the documentation of this file.
1 <?php
48 class JobQueueFederated extends JobQueue {
50  protected $partitionRing;
52  protected $partitionQueues = [];
53 
55  protected $maxPartitionsTry;
56 
74  protected function __construct( array $params ) {
75  parent::__construct( $params );
76  $section = $params['sectionsByWiki'][$this->domain] ?? 'default';
77  if ( !isset( $params['partitionsBySection'][$section] ) ) {
78  throw new MWException( "No configuration for section '$section'." );
79  }
80  $this->maxPartitionsTry = $params['maxPartitionsTry'] ?? 2;
81  // Get the full partition map
82  $partitionMap = $params['partitionsBySection'][$section];
83  arsort( $partitionMap, SORT_NUMERIC );
84  // Get the config to pass to merge into each partition queue config
85  $baseConfig = $params;
86  foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry',
87  'partitionsBySection', 'configByPartition', ] as $o
88  ) {
89  unset( $baseConfig[$o] ); // partition queue doesn't care about this
90  }
91  // Get the partition queue objects
92  foreach ( $partitionMap as $partition => $w ) {
93  if ( !isset( $params['configByPartition'][$partition] ) ) {
94  throw new MWException( "No configuration for partition '$partition'." );
95  }
96  $this->partitionQueues[$partition] = JobQueue::factory(
97  $baseConfig + $params['configByPartition'][$partition] );
98  }
99  // Ring of all partitions
100  $this->partitionRing = new HashRing( $partitionMap );
101  }
102 
103  protected function supportedOrders() {
104  // No FIFO due to partitioning, though "rough timestamp order" is supported
105  return [ 'undefined', 'random', 'timestamp' ];
106  }
107 
108  protected function optimalOrder() {
109  return 'undefined'; // defer to the partitions
110  }
111 
112  protected function supportsDelayedJobs() {
113  foreach ( $this->partitionQueues as $queue ) {
114  if ( !$queue->supportsDelayedJobs() ) {
115  return false;
116  }
117  }
118 
119  return true;
120  }
121 
122  protected function doIsEmpty() {
123  $empty = true;
124  $failed = 0;
125  foreach ( $this->partitionQueues as $queue ) {
126  try {
127  $empty = $empty && $queue->doIsEmpty();
128  } catch ( JobQueueError $e ) {
129  ++$failed;
130  $this->logException( $e );
131  }
132  }
133  $this->throwErrorIfAllPartitionsDown( $failed );
134 
135  return $empty;
136  }
137 
138  protected function doGetSize() {
139  return $this->getCrossPartitionSum( 'size', 'doGetSize' );
140  }
141 
142  protected function doGetAcquiredCount() {
143  return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
144  }
145 
146  protected function doGetDelayedCount() {
147  return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
148  }
149 
150  protected function doGetAbandonedCount() {
151  return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
152  }
153 
159  protected function getCrossPartitionSum( $type, $method ) {
160  $count = 0;
161  $failed = 0;
162  foreach ( $this->partitionQueues as $queue ) {
163  try {
164  $count += $queue->$method();
165  } catch ( JobQueueError $e ) {
166  ++$failed;
167  $this->logException( $e );
168  }
169  }
170  $this->throwErrorIfAllPartitionsDown( $failed );
171 
172  return $count;
173  }
174 
175  protected function doBatchPush( array $jobs, $flags ) {
176  // Local ring variable that may be changed to point to a new ring on failure
178  // Try to insert the jobs and update $partitionsTry on any failures.
179  // Retry to insert any remaning jobs again, ignoring the bad partitions.
180  $jobsLeft = $jobs;
181  for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
182  try {
184  } catch ( UnexpectedValueException $e ) {
185  break; // all servers down; nothing to insert to
186  }
187  $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
188  }
189  if ( count( $jobsLeft ) ) {
190  throw new JobQueueError(
191  "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
192  }
193  }
194 
202  protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
203  $jobsLeft = [];
204 
205  // Because jobs are spread across partitions, per-job de-duplication needs
206  // to use a consistent hash to avoid allowing duplicate jobs per partition.
207  // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
208  $uJobsByPartition = []; // (partition name => job list)
210  foreach ( $jobs as $key => $job ) {
211  if ( $job->ignoreDuplicates() ) {
212  $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
213  $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
214  unset( $jobs[$key] );
215  }
216  }
217  // Get the batches of jobs that are not de-duplicated
218  if ( $flags & self::QOS_ATOMIC ) {
219  $nuJobBatches = [ $jobs ]; // all or nothing
220  } else {
221  // Split the jobs into batches and spread them out over servers if there
222  // are many jobs. This helps keep the partitions even. Otherwise, send all
223  // the jobs to a single partition queue to avoids the extra connections.
224  $nuJobBatches = array_chunk( $jobs, 300 );
225  }
226 
227  // Insert the de-duplicated jobs into the queues...
228  foreach ( $uJobsByPartition as $partition => $jobBatch ) {
230  $queue = $this->partitionQueues[$partition];
231  try {
232  $ok = true;
233  $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
234  } catch ( JobQueueError $e ) {
235  $ok = false;
236  $this->logException( $e );
237  }
238  if ( !$ok ) {
239  if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
240  throw new JobQueueError( "Could not insert job(s), no partitions available." );
241  }
242  $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
243  }
244  }
245 
246  // Insert the jobs that are not de-duplicated into the queues...
247  foreach ( $nuJobBatches as $jobBatch ) {
249  $queue = $this->partitionQueues[$partition];
250  try {
251  $ok = true;
252  $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
253  } catch ( JobQueueError $e ) {
254  $ok = false;
255  $this->logException( $e );
256  }
257  if ( !$ok ) {
258  if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
259  throw new JobQueueError( "Could not insert job(s), no partitions available." );
260  }
261  $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
262  }
263  }
264 
265  return $jobsLeft;
266  }
267 
268  protected function doPop() {
269  $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
270 
271  $failed = 0;
272  while ( count( $partitionsTry ) ) {
273  $partition = ArrayUtils::pickRandom( $partitionsTry );
274  if ( $partition === false ) {
275  break; // all partitions at 0 weight
276  }
277 
279  $queue = $this->partitionQueues[$partition];
280  try {
281  $job = $queue->pop();
282  } catch ( JobQueueError $e ) {
283  ++$failed;
284  $this->logException( $e );
285  $job = false;
286  }
287  if ( $job ) {
288  $job->setMetadata( 'QueuePartition', $partition );
289 
290  return $job;
291  } else {
292  unset( $partitionsTry[$partition] ); // blacklist partition
293  }
294  }
295  $this->throwErrorIfAllPartitionsDown( $failed );
296 
297  return false;
298  }
299 
300  protected function doAck( RunnableJob $job ) {
301  $partition = $job->getMetadata( 'QueuePartition' );
302  if ( $partition === null ) {
303  throw new MWException( "The given job has no defined partition name." );
304  }
305 
306  $this->partitionQueues[$partition]->ack( $job );
307  }
308 
310  $signature = $job->getRootJobParams()['rootJobSignature'];
311  $partition = $this->partitionRing->getLiveLocation( $signature );
312  try {
313  return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
314  } catch ( JobQueueError $e ) {
315  if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
316  $partition = $this->partitionRing->getLiveLocation( $signature );
317  return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
318  }
319  }
320 
321  return false;
322  }
323 
325  $signature = $job->getRootJobParams()['rootJobSignature'];
326  $partition = $this->partitionRing->getLiveLocation( $signature );
327  try {
328  return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
329  } catch ( JobQueueError $e ) {
330  if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
331  $partition = $this->partitionRing->getLiveLocation( $signature );
332  return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
333  }
334  }
335 
336  return false;
337  }
338 
339  protected function doDelete() {
340  $failed = 0;
342  foreach ( $this->partitionQueues as $queue ) {
343  try {
344  $queue->doDelete();
345  } catch ( JobQueueError $e ) {
346  ++$failed;
347  $this->logException( $e );
348  }
349  }
350  $this->throwErrorIfAllPartitionsDown( $failed );
351  return true;
352  }
353 
354  protected function doWaitForBackups() {
355  $failed = 0;
357  foreach ( $this->partitionQueues as $queue ) {
358  try {
359  $queue->waitForBackups();
360  } catch ( JobQueueError $e ) {
361  ++$failed;
362  $this->logException( $e );
363  }
364  }
365  $this->throwErrorIfAllPartitionsDown( $failed );
366  }
367 
368  protected function doFlushCaches() {
370  foreach ( $this->partitionQueues as $queue ) {
371  $queue->doFlushCaches();
372  }
373  }
374 
375  public function getAllQueuedJobs() {
376  $iterator = new AppendIterator();
377 
379  foreach ( $this->partitionQueues as $queue ) {
380  $iterator->append( $queue->getAllQueuedJobs() );
381  }
382 
383  return $iterator;
384  }
385 
386  public function getAllDelayedJobs() {
387  $iterator = new AppendIterator();
388 
390  foreach ( $this->partitionQueues as $queue ) {
391  $iterator->append( $queue->getAllDelayedJobs() );
392  }
393 
394  return $iterator;
395  }
396 
397  public function getAllAcquiredJobs() {
398  $iterator = new AppendIterator();
399 
401  foreach ( $this->partitionQueues as $queue ) {
402  $iterator->append( $queue->getAllAcquiredJobs() );
403  }
404 
405  return $iterator;
406  }
407 
408  public function getAllAbandonedJobs() {
409  $iterator = new AppendIterator();
410 
412  foreach ( $this->partitionQueues as $queue ) {
413  $iterator->append( $queue->getAllAbandonedJobs() );
414  }
415 
416  return $iterator;
417  }
418 
419  public function getCoalesceLocationInternal() {
420  return "JobQueueFederated:wiki:{$this->domain}" .
421  sha1( serialize( array_keys( $this->partitionQueues ) ) );
422  }
423 
424  protected function doGetSiblingQueuesWithJobs( array $types ) {
425  $result = [];
426 
427  $failed = 0;
429  foreach ( $this->partitionQueues as $queue ) {
430  try {
431  $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
432  if ( is_array( $nonEmpty ) ) {
433  $result = array_unique( array_merge( $result, $nonEmpty ) );
434  } else {
435  return null; // not supported on all partitions; bail
436  }
437  if ( count( $result ) == count( $types ) ) {
438  break; // short-circuit
439  }
440  } catch ( JobQueueError $e ) {
441  ++$failed;
442  $this->logException( $e );
443  }
444  }
445  $this->throwErrorIfAllPartitionsDown( $failed );
446 
447  return array_values( $result );
448  }
449 
450  protected function doGetSiblingQueueSizes( array $types ) {
451  $result = [];
452  $failed = 0;
454  foreach ( $this->partitionQueues as $queue ) {
455  try {
456  $sizes = $queue->doGetSiblingQueueSizes( $types );
457  if ( is_array( $sizes ) ) {
458  foreach ( $sizes as $type => $size ) {
459  $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
460  }
461  } else {
462  return null; // not supported on all partitions; bail
463  }
464  } catch ( JobQueueError $e ) {
465  ++$failed;
466  $this->logException( $e );
467  }
468  }
469  $this->throwErrorIfAllPartitionsDown( $failed );
470 
471  return $result;
472  }
473 
474  protected function logException( Exception $e ) {
475  wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() );
476  }
477 
485  protected function throwErrorIfAllPartitionsDown( $down ) {
486  if ( $down >= count( $this->partitionQueues ) ) {
487  throw new JobQueueError( 'No queue partitions available.' );
488  }
489  }
490 }
JobQueueFederated\throwErrorIfAllPartitionsDown
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
Definition: JobQueueFederated.php:485
JobQueueFederated\doFlushCaches
doFlushCaches()
Definition: JobQueueFederated.php:368
JobQueueFederated\doPop
doPop()
Definition: JobQueueFederated.php:268
JobQueueFederated\doIsEmpty
doIsEmpty()
Definition: JobQueueFederated.php:122
JobQueueFederated\getAllAbandonedJobs
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueueFederated.php:408
HashRing\getLiveLocationWeights
getLiveLocationWeights()
Get the map of "live" locations to weight (does not include zero weight items)
Definition: HashRing.php:264
JobQueueFederated\doAck
doAck(RunnableJob $job)
Definition: JobQueueFederated.php:300
JobQueueFederated\logException
logException(Exception $e)
Definition: JobQueueFederated.php:474
HashRing\ejectFromLiveRing
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Definition: HashRing.php:222
RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:35
JobQueueFederated\doGetDelayedCount
doGetDelayedCount()
Definition: JobQueueFederated.php:146
JobQueueFederated\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueueFederated.php:419
JobQueueFederated\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
Definition: JobQueueFederated.php:103
serialize
serialize()
Definition: ApiMessageTrait.php:138
wfDebugLog
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Definition: GlobalFunctions.php:1007
JobQueueFederated\getAllQueuedJobs
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Definition: JobQueueFederated.php:375
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:30
JobQueueFederated\doDelete
doDelete()
Definition: JobQueueFederated.php:339
MWException
MediaWiki exception.
Definition: MWException.php:26
JobQueueFederated\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueFederated.php:424
JobQueueFederated\tryJobInsertions
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
Definition: JobQueueFederated.php:202
JobQueueFederated\doGetSize
doGetSize()
Definition: JobQueueFederated.php:138
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:35
JobQueueFederated\getAllDelayedJobs
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueueFederated.php:386
$queue
$queue
Definition: mergeMessageFileList.php:157
JobQueueFederated\doBatchPush
doBatchPush(array $jobs, $flags)
Definition: JobQueueFederated.php:175
JobQueueFederated\$partitionQueues
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
Definition: JobQueueFederated.php:52
JobQueueError
Definition: JobQueueError.php:28
JobQueueFederated\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueFederated.php:324
JobQueueFederated\getCrossPartitionSum
getCrossPartitionSum( $type, $method)
Definition: JobQueueFederated.php:159
JobQueueFederated\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueueFederated.php:450
JobQueue\factory
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:114
ArrayUtils\pickRandom
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Definition: ArrayUtils.php:66
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:50
JobQueueFederated\$maxPartitionsTry
int $maxPartitionsTry
Maximum number of partitions to try.
Definition: JobQueueFederated.php:55
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
JobQueueFederated\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueueFederated.php:150
JobQueueFederated\__construct
__construct(array $params)
Definition: JobQueueFederated.php:74
JobQueueFederated\doWaitForBackups
doWaitForBackups()
Definition: JobQueueFederated.php:354
JobQueueFederated\$partitionRing
HashRing $partitionRing
Definition: JobQueueFederated.php:50
JobQueueFederated\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
Definition: JobQueueFederated.php:108
HashRing
Convenience class for weighted consistent hash rings.
Definition: HashRing.php:39
JobQueue\$domain
string $domain
DB domain ID.
Definition: JobQueue.php:33
JobQueueFederated\doGetAcquiredCount
doGetAcquiredCount()
Definition: JobQueueFederated.php:142
JobQueueFederated\doIsRootJobOldDuplicate
doIsRootJobOldDuplicate(IJobSpecification $job)
Definition: JobQueueFederated.php:309
HashRing\getLiveLocation
getLiveLocation( $item)
Get the location of an item on the "live" ring.
Definition: HashRing.php:242
IJobSpecification
Interface for serializable objects that describe a job queue task.
Definition: IJobSpecification.php:35
JobQueueFederated
Class to handle enqueueing and running of background jobs for federated queues.
Definition: JobQueueFederated.php:48
JobQueueFederated\supportsDelayedJobs
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueueFederated.php:112
JobQueueFederated\getAllAcquiredJobs
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueueFederated.php:397