MediaWiki REL1_28
JobQueueFederated.php
Go to the documentation of this file.
1<?php
51 protected $partitionRing;
53 protected $partitionQueues = [];
54
57
75 protected function __construct( array $params ) {
76 parent::__construct( $params );
77 $section = isset( $params['sectionsByWiki'][$this->wiki] )
78 ? $params['sectionsByWiki'][$this->wiki]
79 : 'default';
80 if ( !isset( $params['partitionsBySection'][$section] ) ) {
81 throw new MWException( "No configuration for section '$section'." );
82 }
83 $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] )
84 ? $params['maxPartitionsTry']
85 : 2;
86 // Get the full partition map
87 $partitionMap = $params['partitionsBySection'][$section];
88 arsort( $partitionMap, SORT_NUMERIC );
89 // Get the config to pass to merge into each partition queue config
90 $baseConfig = $params;
91 foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry',
92 'partitionsBySection', 'configByPartition', ] as $o
93 ) {
94 unset( $baseConfig[$o] ); // partition queue doesn't care about this
95 }
96 // The class handles all aggregator calls already
97 unset( $baseConfig['aggregator'] );
98 // Get the partition queue objects
99 foreach ( $partitionMap as $partition => $w ) {
100 if ( !isset( $params['configByPartition'][$partition] ) ) {
101 throw new MWException( "No configuration for partition '$partition'." );
102 }
103 $this->partitionQueues[$partition] = JobQueue::factory(
104 $baseConfig + $params['configByPartition'][$partition] );
105 }
106 // Ring of all partitions
107 $this->partitionRing = new HashRing( $partitionMap );
108 }
109
110 protected function supportedOrders() {
111 // No FIFO due to partitioning, though "rough timestamp order" is supported
112 return [ 'undefined', 'random', 'timestamp' ];
113 }
114
115 protected function optimalOrder() {
116 return 'undefined'; // defer to the partitions
117 }
118
119 protected function supportsDelayedJobs() {
120 foreach ( $this->partitionQueues as $queue ) {
121 if ( !$queue->supportsDelayedJobs() ) {
122 return false;
123 }
124 }
125
126 return true;
127 }
128
129 protected function doIsEmpty() {
130 $empty = true;
131 $failed = 0;
132 foreach ( $this->partitionQueues as $queue ) {
133 try {
134 $empty = $empty && $queue->doIsEmpty();
135 } catch ( JobQueueError $e ) {
136 ++$failed;
137 $this->logException( $e );
138 }
139 }
140 $this->throwErrorIfAllPartitionsDown( $failed );
141
142 return $empty;
143 }
144
145 protected function doGetSize() {
146 return $this->getCrossPartitionSum( 'size', 'doGetSize' );
147 }
148
149 protected function doGetAcquiredCount() {
150 return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
151 }
152
153 protected function doGetDelayedCount() {
154 return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
155 }
156
157 protected function doGetAbandonedCount() {
158 return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
159 }
160
166 protected function getCrossPartitionSum( $type, $method ) {
167 $count = 0;
168 $failed = 0;
169 foreach ( $this->partitionQueues as $queue ) {
170 try {
171 $count += $queue->$method();
172 } catch ( JobQueueError $e ) {
173 ++$failed;
174 $this->logException( $e );
175 }
176 }
177 $this->throwErrorIfAllPartitionsDown( $failed );
178
179 return $count;
180 }
181
182 protected function doBatchPush( array $jobs, $flags ) {
183 // Local ring variable that may be changed to point to a new ring on failure
185 // Try to insert the jobs and update $partitionsTry on any failures.
186 // Retry to insert any remaning jobs again, ignoring the bad partitions.
187 $jobsLeft = $jobs;
188 // @codingStandardsIgnoreStart Generic.CodeAnalysis.ForLoopWithTestFunctionCall.NotAllowed
189 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
190 // @codingStandardsIgnoreEnd
191 try {
193 } catch ( UnexpectedValueException $e ) {
194 break; // all servers down; nothing to insert to
195 }
196 $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
197 }
198 if ( count( $jobsLeft ) ) {
199 throw new JobQueueError(
200 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
201 }
202 }
203
211 protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
212 $jobsLeft = [];
213
214 // Because jobs are spread across partitions, per-job de-duplication needs
215 // to use a consistent hash to avoid allowing duplicate jobs per partition.
216 // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
217 $uJobsByPartition = []; // (partition name => job list)
219 foreach ( $jobs as $key => $job ) {
220 if ( $job->ignoreDuplicates() ) {
221 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
222 $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
223 unset( $jobs[$key] );
224 }
225 }
226 // Get the batches of jobs that are not de-duplicated
227 if ( $flags & self::QOS_ATOMIC ) {
228 $nuJobBatches = [ $jobs ]; // all or nothing
229 } else {
230 // Split the jobs into batches and spread them out over servers if there
231 // are many jobs. This helps keep the partitions even. Otherwise, send all
232 // the jobs to a single partition queue to avoids the extra connections.
233 $nuJobBatches = array_chunk( $jobs, 300 );
234 }
235
236 // Insert the de-duplicated jobs into the queues...
237 foreach ( $uJobsByPartition as $partition => $jobBatch ) {
239 $queue = $this->partitionQueues[$partition];
240 try {
241 $ok = true;
242 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
243 } catch ( JobQueueError $e ) {
244 $ok = false;
245 $this->logException( $e );
246 }
247 if ( !$ok ) {
248 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
249 throw new JobQueueError( "Could not insert job(s), no partitions available." );
250 }
251 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
252 }
253 }
254
255 // Insert the jobs that are not de-duplicated into the queues...
256 foreach ( $nuJobBatches as $jobBatch ) {
258 $queue = $this->partitionQueues[$partition];
259 try {
260 $ok = true;
261 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
262 } catch ( JobQueueError $e ) {
263 $ok = false;
264 $this->logException( $e );
265 }
266 if ( !$ok ) {
267 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
268 throw new JobQueueError( "Could not insert job(s), no partitions available." );
269 }
270 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
271 }
272 }
273
274 return $jobsLeft;
275 }
276
277 protected function doPop() {
278 $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
279
280 $failed = 0;
281 while ( count( $partitionsTry ) ) {
282 $partition = ArrayUtils::pickRandom( $partitionsTry );
283 if ( $partition === false ) {
284 break; // all partitions at 0 weight
285 }
286
288 $queue = $this->partitionQueues[$partition];
289 try {
290 $job = $queue->pop();
291 } catch ( JobQueueError $e ) {
292 ++$failed;
293 $this->logException( $e );
294 $job = false;
295 }
296 if ( $job ) {
297 $job->metadata['QueuePartition'] = $partition;
298
299 return $job;
300 } else {
301 unset( $partitionsTry[$partition] ); // blacklist partition
302 }
303 }
304 $this->throwErrorIfAllPartitionsDown( $failed );
305
306 return false;
307 }
308
309 protected function doAck( Job $job ) {
310 if ( !isset( $job->metadata['QueuePartition'] ) ) {
311 throw new MWException( "The given job has no defined partition name." );
312 }
313
314 $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
315 }
316
317 protected function doIsRootJobOldDuplicate( Job $job ) {
318 $signature = $job->getRootJobParams()['rootJobSignature'];
319 $partition = $this->partitionRing->getLiveLocation( $signature );
320 try {
321 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
322 } catch ( JobQueueError $e ) {
323 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
324 $partition = $this->partitionRing->getLiveLocation( $signature );
325 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
326 }
327 }
328
329 return false;
330 }
331
333 $signature = $job->getRootJobParams()['rootJobSignature'];
334 $partition = $this->partitionRing->getLiveLocation( $signature );
335 try {
336 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
337 } catch ( JobQueueError $e ) {
338 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
339 $partition = $this->partitionRing->getLiveLocation( $signature );
340 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
341 }
342 }
343
344 return false;
345 }
346
347 protected function doDelete() {
348 $failed = 0;
350 foreach ( $this->partitionQueues as $queue ) {
351 try {
352 $queue->doDelete();
353 } catch ( JobQueueError $e ) {
354 ++$failed;
355 $this->logException( $e );
356 }
357 }
358 $this->throwErrorIfAllPartitionsDown( $failed );
359 return true;
360 }
361
362 protected function doWaitForBackups() {
363 $failed = 0;
365 foreach ( $this->partitionQueues as $queue ) {
366 try {
367 $queue->waitForBackups();
368 } catch ( JobQueueError $e ) {
369 ++$failed;
370 $this->logException( $e );
371 }
372 }
373 $this->throwErrorIfAllPartitionsDown( $failed );
374 }
375
376 protected function doFlushCaches() {
378 foreach ( $this->partitionQueues as $queue ) {
379 $queue->doFlushCaches();
380 }
381 }
382
383 public function getAllQueuedJobs() {
384 $iterator = new AppendIterator();
385
387 foreach ( $this->partitionQueues as $queue ) {
388 $iterator->append( $queue->getAllQueuedJobs() );
389 }
390
391 return $iterator;
392 }
393
394 public function getAllDelayedJobs() {
395 $iterator = new AppendIterator();
396
398 foreach ( $this->partitionQueues as $queue ) {
399 $iterator->append( $queue->getAllDelayedJobs() );
400 }
401
402 return $iterator;
403 }
404
405 public function getAllAcquiredJobs() {
406 $iterator = new AppendIterator();
407
409 foreach ( $this->partitionQueues as $queue ) {
410 $iterator->append( $queue->getAllAcquiredJobs() );
411 }
412
413 return $iterator;
414 }
415
416 public function getAllAbandonedJobs() {
417 $iterator = new AppendIterator();
418
420 foreach ( $this->partitionQueues as $queue ) {
421 $iterator->append( $queue->getAllAbandonedJobs() );
422 }
423
424 return $iterator;
425 }
426
427 public function getCoalesceLocationInternal() {
428 return "JobQueueFederated:wiki:{$this->wiki}" .
429 sha1( serialize( array_keys( $this->partitionQueues ) ) );
430 }
431
432 protected function doGetSiblingQueuesWithJobs( array $types ) {
433 $result = [];
434
435 $failed = 0;
437 foreach ( $this->partitionQueues as $queue ) {
438 try {
439 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
440 if ( is_array( $nonEmpty ) ) {
441 $result = array_unique( array_merge( $result, $nonEmpty ) );
442 } else {
443 return null; // not supported on all partitions; bail
444 }
445 if ( count( $result ) == count( $types ) ) {
446 break; // short-circuit
447 }
448 } catch ( JobQueueError $e ) {
449 ++$failed;
450 $this->logException( $e );
451 }
452 }
453 $this->throwErrorIfAllPartitionsDown( $failed );
454
455 return array_values( $result );
456 }
457
458 protected function doGetSiblingQueueSizes( array $types ) {
459 $result = [];
460 $failed = 0;
462 foreach ( $this->partitionQueues as $queue ) {
463 try {
464 $sizes = $queue->doGetSiblingQueueSizes( $types );
465 if ( is_array( $sizes ) ) {
466 foreach ( $sizes as $type => $size ) {
467 $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
468 }
469 } else {
470 return null; // not supported on all partitions; bail
471 }
472 } catch ( JobQueueError $e ) {
473 ++$failed;
474 $this->logException( $e );
475 }
476 }
477 $this->throwErrorIfAllPartitionsDown( $failed );
478
479 return $result;
480 }
481
482 protected function logException( Exception $e ) {
483 wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() );
484 }
485
493 protected function throwErrorIfAllPartitionsDown( $down ) {
494 if ( $down >= count( $this->partitionQueues ) ) {
495 throw new JobQueueError( 'No queue partitions available.' );
496 }
497 }
498}
serialize()
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Convenience class for weighted consistent hash rings.
Definition HashRing.php:29
getLiveLocationWeights()
Get the map of "live" locations to weight (ignores 0-weight items)
Definition HashRing.php:236
getLiveLocation( $item)
Get the location of an item on the "live" ring.
Definition HashRing.php:214
getLiveRing()
Get the "live" hash ring (which does not include ejected locations)
Definition HashRing.php:177
ejectFromLiveRing( $location, $ttl)
Remove a location from the "live" hash ring.
Definition HashRing.php:159
Class to handle enqueueing and running of background jobs for federated queues.
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
doGetSiblingQueuesWithJobs(array $types)
tryJobInsertions(array $jobs, HashRing &$partitionRing, $flags)
doGetSiblingQueueSizes(array $types)
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doDeduplicateRootJob(IJobSpecification $job)
int $maxPartitionsTry
Maximum number of partitions to try.
doBatchPush(array $jobs, $flags)
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
JobQueue[] $partitionQueues
(partition name => JobQueue) reverse sorted by weight
throwErrorIfAllPartitionsDown( $down)
Throw an error if no partitions available.
getCrossPartitionSum( $type, $method)
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
logException(Exception $e)
doIsRootJobOldDuplicate(Job $job)
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:31
string $type
Job type.
Definition JobQueue.php:35
static factory(array $params)
Get a job queue object of the specified type.
Definition JobQueue.php:108
string $wiki
Wiki ID.
Definition JobQueue.php:33
Class to both describe a background job and handle jobs.
Definition Job.php:31
MediaWiki exception.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
the array() calling protocol came about after MediaWiki 1.4rc1.
The index of the header message $result[1]=The index of the body text message $result[2 through n]=Parameters passed to body text message. Please note the header message cannot receive/use parameters. 'ImportHandleLogItemXMLTag':When parsing a XML tag in a log item. Return false to stop further processing of the tag $reader:XMLReader object $logInfo:Array of information 'ImportHandlePageXMLTag':When parsing a XML tag in a page. Return false to stop further processing of the tag $reader:XMLReader object & $pageInfo:Array of information 'ImportHandleRevisionXMLTag':When parsing a XML tag in a page revision. Return false to stop further processing of the tag $reader:XMLReader object $pageInfo:Array of page information $revisionInfo:Array of revision information 'ImportHandleToplevelXMLTag':When parsing a top level XML tag. Return false to stop further processing of the tag $reader:XMLReader object 'ImportHandleUploadXMLTag':When parsing a XML tag in a file upload. Return false to stop further processing of the tag $reader:XMLReader object $revisionInfo:Array of information 'ImportLogInterwikiLink':Hook to change the interwiki link used in log entries and edit summaries for transwiki imports. & $fullInterwikiPrefix:Interwiki prefix, may contain colons. & $pageTitle:String that contains page title. 'ImportSources':Called when reading from the $wgImportSources configuration variable. Can be used to lazy-load the import sources list. & $importSources:The value of $wgImportSources. Modify as necessary. See the comment in DefaultSettings.php for the detail of how to structure this array. 'InfoAction':When building information to display on the action=info page. $context:IContextSource object & $pageInfo:Array of information 'InitializeArticleMaybeRedirect':MediaWiki check to see if title is a redirect. & $title:Title object for the current page & $request:WebRequest & $ignoreRedirect:boolean to skip redirect check & $target:Title/string of redirect target & $article:Article object 'InternalParseBeforeLinks':during Parser 's internalParse method before links but after nowiki/noinclude/includeonly/onlyinclude and other processings. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InternalParseBeforeSanitize':during Parser 's internalParse method just before the parser removes unwanted/dangerous HTML tags and after nowiki/noinclude/includeonly/onlyinclude and other processings. Ideal for syntax-extensions after template/parser function execution which respect nowiki and HTML-comments. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InterwikiLoadPrefix':When resolving if a given prefix is an interwiki or not. Return true without providing an interwiki to continue interwiki search. $prefix:interwiki prefix we are looking for. & $iwData:output array describing the interwiki with keys iw_url, iw_local, iw_trans and optionally iw_api and iw_wikiid. 'InvalidateEmailComplete':Called after a user 's email has been invalidated successfully. $user:user(object) whose email is being invalidated 'IRCLineURL':When constructing the URL to use in an IRC notification. Callee may modify $url and $query, URL will be constructed as $url . $query & $url:URL to index.php & $query:Query string $rc:RecentChange object that triggered url generation 'IsFileCacheable':Override the result of Article::isFileCacheable()(if true) & $article:article(object) being checked 'IsTrustedProxy':Override the result of IP::isTrustedProxy() & $ip:IP being check & $result:Change this value to override the result of IP::isTrustedProxy() 'IsUploadAllowedFromUrl':Override the result of UploadFromUrl::isAllowedUrl() $url:URL used to upload from & $allowed:Boolean indicating if uploading is allowed for given URL 'isValidEmailAddr':Override the result of Sanitizer::validateEmail(), for instance to return false if the domain name doesn 't match your organization. $addr:The e-mail address entered by the user & $result:Set this and return false to override the internal checks 'isValidPassword':Override the result of User::isValidPassword() $password:The password entered by the user & $result:Set this and return false to override the internal checks $user:User the password is being validated for 'Language::getMessagesFileName':$code:The language code or the language we 're looking for a messages file for & $file:The messages file path, you can override this to change the location. 'LanguageGetMagic':DEPRECATED! Use $magicWords in a file listed in $wgExtensionMessagesFiles instead. Use this to define synonyms of magic words depending of the language & $magicExtensions:associative array of magic words synonyms $lang:language code(string) 'LanguageGetNamespaces':Provide custom ordering for namespaces or remove namespaces. Do not use this hook to add namespaces. Use CanonicalNamespaces for that. & $namespaces:Array of namespaces indexed by their numbers 'LanguageGetSpecialPageAliases':DEPRECATED! Use $specialPageAliases in a file listed in $wgExtensionMessagesFiles instead. Use to define aliases of special pages names depending of the language & $specialPageAliases:associative array of magic words synonyms $lang:language code(string) 'LanguageGetTranslatedLanguageNames':Provide translated language names. & $names:array of language code=> language name $code:language of the preferred translations 'LanguageLinks':Manipulate a page 's language links. This is called in various places to allow extensions to define the effective language links for a page. $title:The page 's Title. & $links:Associative array mapping language codes to prefixed links of the form "language:title". & $linkFlags:Associative array mapping prefixed links to arrays of flags. Currently unused, but planned to provide support for marking individual language links in the UI, e.g. for featured articles. 'LanguageSelector':Hook to change the language selector available on a page. $out:The output page. $cssClassName:CSS class name of the language selector. 'LinkBegin':DEPRECATED! Use HtmlPageLinkRendererBegin instead. Used when generating internal and interwiki links in Linker::link(), before processing starts. Return false to skip default processing and return $ret. See documentation for Linker::link() for details on the expected meanings of parameters. $skin:the Skin object $target:the Title that the link is pointing to & $html:the contents that the< a > tag should have(raw HTML) $result
Definition hooks.txt:1937
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition hooks.txt:2710
usually copyright or history_copyright This message must be in HTML not wikitext if the section is included from a template $section
Definition hooks.txt:2901
returning false will NOT prevent logging $e
Definition hooks.txt:2110
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
Job queue task description interface.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
if(count( $args)< 1) $job
$params