1 <?php
23 use Psr\Log\LoggerInterface;
36  protected $redisPool;
38  protected $logger;
40  protected $servers;
51  public function __construct( array $params ) {
52  parent::__construct( $params );
53  $this->servers = isset( $params['redisServers'] )
54  ? $params['redisServers']
55  : [ $params['redisServer'] ]; // b/c
56  $params['redisConfig']['serializer'] = 'none';
57  $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
58  $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' );
59  }
61  protected function doNotifyQueueEmpty( $wiki, $type ) {
62  return true; // managed by the service
63  }
65  protected function doNotifyQueueNonEmpty( $wiki, $type ) {
66  return true; // managed by the service
67  }
69  protected function doGetAllReadyWikiQueues() {
70  $conn = $this->getConnection();
71  if ( !$conn ) {
72  return [];
73  }
74  try {
75  $map = $conn->hGetAll( $this->getReadyQueueKey() );
77  if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
78  unset( $map['_epoch'] ); // ignore
79  $pendingDBs = []; // (type => list of wikis)
80  foreach ( $map as $key => $time ) {
81  list( $type, $wiki ) = $this->decodeQueueName( $key );
82  $pendingDBs[$type][] = $wiki;
83  }
84  } else {
85  throw new UnexpectedValueException(
86  "No queue listing found; make sure redisJobChronService is running."
87  );
88  }
90  return $pendingDBs;
91  } catch ( RedisException $e ) {
92  $this->redisPool->handleError( $conn, $e );
94  return [];
95  }
96  }
98  protected function doPurge() {
99  return true; // fully and only refreshed by the service
100  }
108  protected function getConnection() {
109  $conn = false;
110  foreach ( $this->servers as $server ) {
111  $conn = $this->redisPool->getConnection( $server, $this->logger );
112  if ( $conn ) {
113  break;
114  }
115  }
117  return $conn;
118  }
123  private function getReadyQueueKey() {
124  return "jobqueue:aggregator:h-ready-queues:v2"; // global
125  }
131  private function decodeQueueName( $name ) {
132  list( $type, $wiki ) = explode( '/', $name, 2 );
134  return [ rawurldecode( $type ), rawurldecode( $wiki ) ];
135  }
136 }
static singleton(array $options)
Definition: RedisConnectionPool.php:148
doNotifyQueueEmpty( $wiki, $type)
Definition: JobQueueAggregatorRedis.php:61
Definition: JobQueueAggregatorRedis.php:69
static getInstance( $channel)
Get a named logger instance from the currently configured logger factory.
Definition: LoggerFactory.php:93
Definition: JobQueueAggregatorRedis.php:98
Class to handle tracking information about all queues.
Definition: JobQueueAggregator.php:30
array $servers
List of Redis server addresses.
Definition: JobQueueAggregatorRedis.php:40
decodeQueueName( $name)
Definition: JobQueueAggregatorRedis.php:131
LoggerInterface $logger
Definition: JobQueueAggregatorRedis.php:38
Class to handle tracking information about all queues using PhpRedis.
Definition: JobQueueAggregatorRedis.php:34
Helper class to manage Redis connections.
Definition: RedisConnectionPool.php:41
RedisConnectionPool $redisPool
Definition: JobQueueAggregatorRedis.php:36
__construct(array $params)
Definition: JobQueueAggregatorRedis.php:51
Get a connection to the server that handles all sub-queues for this queue.
Definition: JobQueueAggregatorRedis.php:108
doNotifyQueueNonEmpty( $wiki, $type)
Definition: JobQueueAggregatorRedis.php:65
Definition: JobQueueAggregatorRedis.php:123