22use Psr\Log\LoggerInterface;
94 $params[
'redisConfig'][
'serializer'] =
'none';
95 $this->server =
$params[
'redisServer'];
96 $this->compression = isset(
$params[
'compression'] ) ?
$params[
'compression'] :
'none';
98 if ( empty(
$params[
'daemonized'] ) ) {
99 throw new InvalidArgumentException(
100 "Non-daemonized mode is no longer supported. Please install the " .
101 "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
103 $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance(
'redis' );
107 return [
'timestamp',
'fifo' ];
135 return $conn->lSize( $this->
getQueueKey(
'l-unclaimed' ) );
136 }
catch ( RedisException
$e ) {
149 $conn->multi( Redis::PIPELINE );
151 $conn->zSize( $this->
getQueueKey(
'z-abandoned' ) );
153 return array_sum( $conn->exec() );
154 }
catch ( RedisException
$e ) {
167 return $conn->zSize( $this->
getQueueKey(
'z-delayed' ) );
168 }
catch ( RedisException
$e ) {
181 return $conn->zSize( $this->
getQueueKey(
'z-abandoned' ) );
182 }
catch ( RedisException
$e ) {
197 foreach ( $jobs as
$job ) {
199 if ( strlen( $item[
'sha1'] ) ) {
200 $items[$item[
'sha1']] = $item;
202 $items[$item[
'uuid']] = $item;
206 if ( !count( $items ) ) {
213 if (
$flags & self::QOS_ATOMIC ) {
214 $batches = [ $items ];
216 $batches = array_chunk( $items, self::MAX_PUSH_SIZE );
220 foreach ( $batches as $itemBatch ) {
221 $added = $this->
pushBlobs( $conn, $itemBatch );
222 if ( is_int( $added ) ) {
225 $failed += count( $itemBatch );
231 count( $items ) - $failed - $pushed );
233 $err =
"Could not insert {$failed} {$this->type} job(s).";
235 throw new RedisException( $err );
237 }
catch ( RedisException
$e ) {
251 foreach ( $items as $item ) {
260 local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
261 --
First argument is the queue ID
262 local queueId = ARGV[1]
263 -- Next arguments all come in 4s (one per job)
264 local variadicArgCount = #ARGV - 1
265 if variadicArgCount % 4 ~= 0
then
266 return redis.error_reply(
'Unmatched arguments')
268 -- Insert each job into
this queue as
needed
271 local
id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
272 if sha1 ==
'' or redis.call(
'hExists',kIdBySha1,sha1) == 0
then
273 if 1*rtimestamp > 0
then
274 -- Insert into delayed queue (
release time as score)
275 redis.call(
'zAdd',kDelayed,rtimestamp,
id)
277 -- Insert into unclaimed queue
278 redis.call(
'lPush',kUnclaimed,
id)
281 redis.call(
'hSet',kSha1ById,
id,sha1)
282 redis.call(
'hSet',kIdBySha1,sha1,
id)
284 redis.call(
'hSet',kData,
id,blob)
288 -- Mark
this queue as having jobs
289 redis.call(
'sAdd',kQwJobs,queueId)
292 return $conn->
luaEval( $script,
304 6 # number of first argument(s) that are keys
320 if ( !is_string(
$blob ) ) {
326 if ( $item ===
false ) {
327 wfDebugLog(
'JobQueueRedis',
"Could not unserialize {$this->type} job." );
334 }
catch ( RedisException
$e ) {
350 local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
351 local rTime = unpack(ARGV)
352 -- Pop an item off the queue
353 local
id = redis.call(
'rPop',kUnclaimed)
357 -- Allow
new duplicates of
this job
358 local sha1 = redis.call(
'hGet',kSha1ById,
id)
359 if sha1
then redis.call(
'hDel',kIdBySha1,sha1) end
360 redis.call(
'hDel',kSha1ById,
id)
361 -- Mark the jobs as claimed and
return it
362 redis.call(
'zAdd',kClaimed,rTime,
id)
363 redis.call(
'hIncrBy',kAttempts,
id,1)
364 return redis.call(
'hGet',kData,
id)
366 return $conn->
luaEval( $script,
374 time(), # ARGV[1] (injected to be replication-safe)
376 6 # number of first argument(s) that are keys
388 if ( !isset(
$job->metadata[
'uuid'] ) ) {
389 throw new UnexpectedValueException(
"Job of type '{$job->getType()}' has no UUID." );
392 $uuid =
$job->metadata[
'uuid'];
398 local kClaimed, kAttempts, kData = unpack(KEYS)
399 local
id = unpack(ARGV)
400 -- Unmark the job as claimed
401 local removed = redis.call(
'zRem',kClaimed,
id)
402 -- Check
if the job was recycled
406 -- Delete the retry data
407 redis.call(
'hDel',kAttempts,
id)
408 -- Delete the job data itself
409 return redis.call(
'hDel',kData,
id)
411 $res = $conn->luaEval( $script,
418 3 # number of first argument(s) that are keys
422 wfDebugLog(
'JobQueueRedis',
"Could not acknowledge {$this->type} job $uuid." );
428 }
catch ( RedisException
$e ) {
443 if ( !
$job->hasRootJobParams() ) {
444 throw new LogicException(
"Cannot register root job; missing parameters." );
452 $timestamp = $conn->get( $key );
453 if ( $timestamp && $timestamp >=
$params[
'rootJobTimestamp'] ) {
458 return $conn->set( $key,
$params[
'rootJobTimestamp'], self::ROOTJOB_TTL );
459 }
catch ( RedisException
$e ) {
471 if ( !
$job->hasRootJobParams() ) {
480 }
catch ( RedisException
$e ) {
486 return ( $timestamp && $timestamp >
$params[
'rootJobTimestamp'] );
495 static $props = [
'l-unclaimed',
'z-claimed',
'z-abandoned',
496 'z-delayed',
'h-idBySha1',
'h-sha1ById',
'h-attempts',
'h-data' ];
501 foreach ( $props as $prop ) {
509 }
catch ( RedisException
$e ) {
522 $uids = $conn->lRange( $this->
getQueueKey(
'l-unclaimed' ), 0, -1 );
523 }
catch ( RedisException
$e ) {
538 $uids = $conn->zRange( $this->
getQueueKey(
'z-delayed' ), 0, -1 );
539 }
catch ( RedisException
$e ) {
554 $uids = $conn->zRange( $this->
getQueueKey(
'z-claimed' ), 0, -1 );
555 }
catch ( RedisException
$e ) {
570 $uids = $conn->zRange( $this->
getQueueKey(
'z-abandoned' ), 0, -1 );
571 }
catch ( RedisException
$e ) {
586 function ( $uid ) use ( $conn ) {
589 [
'accept' =>
function (
$job ) {
590 return is_object(
$job );
605 $types = array_values( $types );
608 $conn->multi( Redis::PIPELINE );
609 foreach ( $types as
$type ) {
610 $conn->lSize( $this->
getQueueKey(
'l-unclaimed', $type ) );
612 $res = $conn->exec();
613 if ( is_array(
$res ) ) {
614 foreach (
$res as $i => $size ) {
615 $sizes[$types[$i]] = $size;
618 }
catch ( RedisException
$e ) {
636 $data = $conn->hGet( $this->
getQueueKey(
'h-data' ), $uid );
637 if ( $data ===
false ) {
641 if ( !is_array( $item ) ) {
642 throw new UnexpectedValueException(
"Could not find job with ID '$uid'." );
644 $title = Title::makeTitle( $item[
'namespace'], $item[
'title'] );
646 $job->metadata[
'uuid'] = $item[
'uuid'];
647 $job->metadata[
'timestamp'] = $item[
'timestamp'];
649 $job->metadata[
'attempts'] = $conn->hGet( $this->
getQueueKey(
'h-attempts' ), $uid );
652 }
catch ( RedisException
$e ) {
667 $set = $conn->sMembers( $this->
getGlobalKey(
's-queuesWithJobs' ) );
668 foreach ( $set as
$queue ) {
671 }
catch ( RedisException
$e ) {
685 'type' =>
$job->getType(),
686 'namespace' =>
$job->getTitle()->getNamespace(),
687 'title' =>
$job->getTitle()->getDBkey(),
688 'params' =>
$job->getParams(),
690 'rtimestamp' =>
$job->getReleaseTimestamp() ?: 0,
693 'sha1' =>
$job->ignoreDuplicates()
694 ? Wikimedia\base_convert( sha1(
serialize(
$job->getDeduplicationInfo() ) ), 16, 36, 31 )
696 'timestamp' => time()
705 $title = Title::makeTitle( $fields[
'namespace'], $fields[
'title'] );
707 $job->metadata[
'uuid'] = $fields[
'uuid'];
708 $job->metadata[
'timestamp'] = $fields[
'timestamp'];
719 if ( $this->compression ===
'gzip'
720 && strlen(
$blob ) >= 1024
721 && function_exists(
'gzdeflate' )
723 $object = (
object)[
'blob' => gzdeflate(
$blob ),
'enc' =>
'gzip' ];
726 return ( strlen( $blobz ) < strlen(
$blob ) ) ? $blobz :
$blob;
738 if ( is_object( $fields ) ) {
739 if ( $fields->enc ===
'gzip' && function_exists(
'gzinflate' ) ) {
740 $fields =
unserialize( gzinflate( $fields->blob ) );
746 return is_array( $fields ) ? $fields :
false;
756 $conn = $this->redisPool->getConnection( $this->server, $this->logger );
759 "Unable to connect to redis server {$this->server}." );
771 $this->redisPool->handleError( $conn,
$e );
772 throw new JobQueueError(
"Redis server error: {$e->getMessage()}\n" );
779 return json_encode( [ $this->
type, $this->wiki ] );
787 return json_decode( $name );
795 $parts = [
'global',
'jobqueue',
$name ];
796 foreach ( $parts as $part ) {
797 if ( !preg_match(
'/[a-zA-Z0-9_-]+/', $part ) ) {
798 throw new InvalidArgumentException(
"Key part characters are out of range." );
802 return implode(
':', $parts );
813 $keyspace = $prefix ?
"$db-$prefix" : $db;
815 $parts = [ $keyspace,
'jobqueue',
$type, $prop ];
818 return implode(
':', array_map(
'rawurlencode', $parts ) );
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
Class to handle job queues stored in Redis.
doIsRootJobOldDuplicate(Job $job)
doDeduplicateRootJob(IJobSpecification $job)
throwRedisException(RedisConnRef $conn, $e)
__construct(array $params)
popAndAcquireBlob(RedisConnRef $conn)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
getQueueKey( $prop, $type=null)
doGetSiblingQueuesWithJobs(array $types)
doGetSiblingQueueSizes(array $types)
getServerQueuesWithJobs()
getJobFromUidInternal( $uid, RedisConnRef $conn)
This function should not be called outside JobQueueRedis.
RedisConnectionPool $redisPool
pushBlobs(RedisConnRef $conn, array $items)
string $server
Server address.
supportedOrders()
Get the allowed queue orders for configuration validation.
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
string $compression
Compression method to use.
doBatchPush(array $jobs, $flags)
getJobIterator(RedisConnRef $conn, array $uids)
getConnection()
Get a connection to the server that handles all sub-queues for this queue.
getJobFromFields(array $fields)
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getNewJobFields(IJobSpecification $job)
Class to handle enqueueing and running of background jobs.
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
getRootJobCacheKey( $signature)
Class to both describe a background job and handle jobs.
static factory( $command, Title $title, $params=[])
Create the appropriate object to handle a specific job.
Convenience class for generating iterators from iterators.
Helper class to handle automatically marking connectons as reusable (via RAII pattern)
luaEval( $script, array $params, $numKeys)
Helper class to manage Redis connections.
static singleton(array $options)
static newRawUUIDv4( $flags=0)
Return an RFC4122 compliant v4 UUID.
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
this class mediates it Skin Encapsulates a look and feel for the wiki All of the functions that render HTML and make choices about how to render it are here and are called from various other places when needed(most notably, OutputPage::addWikiText()). The StandardSkin object is a complete implementation
This code would result in ircNotify being run twice when an article is and once for brion Hooks can return three possible true was required This is the default since MediaWiki *some string
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Allows to change the fields on the form that will be generated $name
processing should stop and the error should be shown to the user * false
returning false will NOT prevent logging $e
globals will be eliminated from MediaWiki replaced by an application object which would be passed to constructors Whether that would be an convenient solution remains to be but certainly PHP makes such object oriented programming models easier than they were in previous versions For the time being MediaWiki programmers will have to work in an environment with some global context At the time of globals were initialised on startup by MediaWiki of these were configuration which are documented in DefaultSettings php There is no comprehensive documentation for the remaining however some of the most important ones are listed below They are typically initialised either in index php or in Setup php For a description of the see design txt $wgTitle Title object created from the request URL $wgOut OutputPage object for HTTP response $wgUser User object for the user associated with the current request $wgLang Language object selected by user preferences $wgContLang Language object associated with the wiki being viewed $wgParser Parser object Parser extensions register their hooks here $wgRequest WebRequest object
Job queue task description interface.
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
if(count( $args)< 1) $job
MediaWiki s SiteStore can be cached and stored in a flat in a json format If the SiteStore is frequently the file cache may provide a performance benefit over a database even with memcached in front of it sites are listed in a key value with the key being the site s global id(e.g. "enwiki") and a key-value map as the value. The site list is wrapped with in a "sites" key. Example
skin txt MediaWiki includes four core it has been set as the default in MediaWiki since the replacing Monobook it had been the default skin since then
skin txt MediaWiki includes four core it has been set as the default in MediaWiki since the replacing Monobook it had been the default skin since before being replaced by Vector largely rewritten in while keeping its appearance Several legacy skins were removed in the release