MediaWiki REL1_31
KafkaHandler.php
Go to the documentation of this file.
1<?php
22
23use Kafka\MetaDataFromKafka;
24use Kafka\Produce;
25use Kafka\Protocol\Decoder;
27use Monolog\Handler\AbstractProcessingHandler;
28use Monolog\Logger;
29use Psr\Log\LoggerInterface;
30
49class KafkaHandler extends AbstractProcessingHandler {
53 protected $produce;
54
58 protected $options;
59
63 protected $partitions = [];
64
68 private static $defaultOptions = [
69 'alias' => [], // map from monolog channel to kafka topic
70 'swallowExceptions' => false, // swallow exceptions sending records
71 'logExceptions' => null, // A PSR3 logger to inform about errors
72 'requireAck' => 0,
73 ];
74
81 public function __construct(
82 Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
83 ) {
84 parent::__construct( $level, $bubble );
85 $this->produce = $produce;
86 $this->options = array_merge( self::$defaultOptions, $options );
87 }
88
99 public static function factory(
100 $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
101 ) {
102 $metadata = new MetaDataFromKafka( $kafkaServers );
103 $produce = new Produce( $metadata );
104
105 if ( isset( $options['sendTimeout'] ) ) {
106 $timeOut = $options['sendTimeout'];
107 $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
108 $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
109 intval( $timeOut * 1000000 )
110 );
111 }
112 if ( isset( $options['recvTimeout'] ) ) {
113 $timeOut = $options['recvTimeout'];
114 $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
115 $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
116 intval( $timeOut * 1000000 )
117 );
118 }
119 if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
120 $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
121 }
122
123 if ( isset( $options['requireAck'] ) ) {
124 $produce->setRequireAck( $options['requireAck'] );
125 }
126
127 return new self( $produce, $options, $level, $bubble );
128 }
129
133 protected function write( array $record ) {
134 if ( $record['formatted'] !== null ) {
135 $this->addMessages( $record['channel'], [ $record['formatted'] ] );
136 $this->send();
137 }
138 }
139
143 public function handleBatch( array $batch ) {
144 $channels = [];
145 foreach ( $batch as $record ) {
146 if ( $record['level'] < $this->level ) {
147 continue;
148 }
149 $channels[$record['channel']][] = $this->processRecord( $record );
150 }
151
152 $formatter = $this->getFormatter();
153 foreach ( $channels as $channel => $records ) {
154 $messages = [];
155 foreach ( $records as $idx => $record ) {
156 $message = $formatter->format( $record );
157 if ( $message !== null ) {
158 $messages[] = $message;
159 }
160 }
161 if ( $messages ) {
162 $this->addMessages( $channel, $messages );
163 }
164 }
165
166 $this->send();
167 }
168
172 protected function send() {
173 try {
174 $response = $this->produce->send();
175 } catch ( \Kafka\Exception $e ) {
176 $ignore = $this->warning(
177 'Error sending records to kafka: {exception}',
178 [ 'exception' => $e ] );
179 if ( !$ignore ) {
180 throw $e;
181 } else {
182 return;
183 }
184 }
185
186 if ( is_bool( $response ) ) {
187 return;
188 }
189
190 $errors = [];
191 foreach ( $response as $topicName => $partitionResponse ) {
192 foreach ( $partitionResponse as $partition => $info ) {
193 if ( $info['errCode'] === 0 ) {
194 // no error
195 continue;
196 }
197 $errors[] = sprintf(
198 'Error producing to %s (errno %d): %s',
199 $topicName,
200 $info['errCode'],
201 Decoder::getError( $info['errCode'] )
202 );
203 }
204 }
205
206 if ( $errors ) {
207 $error = implode( "\n", $errors );
208 if ( !$this->warning( $error ) ) {
209 throw new \RuntimeException( $error );
210 }
211 }
212 }
213
219 protected function getRandomPartition( $topic ) {
220 if ( !array_key_exists( $topic, $this->partitions ) ) {
221 try {
222 $partitions = $this->produce->getAvailablePartitions( $topic );
223 } catch ( \Kafka\Exception $e ) {
224 $ignore = $this->warning(
225 'Error getting metadata for kafka topic {topic}: {exception}',
226 [ 'topic' => $topic, 'exception' => $e ] );
227 if ( $ignore ) {
228 return null;
229 }
230 throw $e;
231 }
232 if ( $partitions ) {
233 $key = array_rand( $partitions );
234 $this->partitions[$topic] = $partitions[$key];
235 } else {
236 $details = $this->produce->getClient()->getTopicDetail( $topic );
237 $ignore = $this->warning(
238 'No partitions available for kafka topic {topic}',
239 [ 'topic' => $topic, 'kafka' => $details ]
240 );
241 if ( !$ignore ) {
242 throw new \RuntimeException( "No partitions available for kafka topic $topic" );
243 }
244 $this->partitions[$topic] = null;
245 }
246 }
247 return $this->partitions[$topic];
248 }
249
256 protected function addMessages( $channel, array $records ) {
257 if ( isset( $this->options['alias'][$channel] ) ) {
258 $topic = $this->options['alias'][$channel];
259 } else {
260 $topic = "monolog_$channel";
261 }
262 $partition = $this->getRandomPartition( $topic );
263 if ( $partition !== null ) {
264 $this->produce->setMessages( $topic, $partition, $records );
265 }
266 }
267
273 protected function warning( $message, array $context = [] ) {
274 if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
275 $this->options['logExceptions']->warning( $message, $context );
276 }
277 return $this->options['swallowExceptions'];
278 }
279}
$messages
PSR-3 logger instance factory.
static getInstance( $channel)
Get a named logger instance from the currently configured logger factory.
Log handler sends log events to a kafka server.
addMessages( $channel, array $records)
Adds records for a channel to the Kafka client internal queue.
static array $defaultOptions
defaults for constructor options
array $partitions
Map from topic name to partition this request produces to.
send()
Send any records in the kafka client internal queue.
warning( $message, array $context=[])
Produce $produce
Sends requests to kafka.
handleBatch(array $batch)
@inheritDoc
write(array $record)
@inheritDoc
array $options
Optional handler configuration.
__construct(Produce $produce, array $options, $level=Logger::DEBUG, $bubble=true)
static factory( $kafkaServers, array $options=[], $level=Logger::DEBUG, $bubble=true)
Constructs the necessary support objects and returns a KafkaHandler instance.
We ve cleaned up the code here by removing clumps of infrequently used code and moving them off somewhere else It s much easier for someone working with this code to see what s _really_ going and make changes or fix bugs In we can take all the code that deals with the little used title reversing options(say) and put it in one place. Instead of having little title-reversing if-blocks spread all over the codebase in showAnArticle
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction you ll probably need to make sure the header is varied on and they can depend only on the ResourceLoaderContext $context
Definition hooks.txt:2811
this hook is for auditing only $response
Definition hooks.txt:783
returning false will NOT prevent logging $e
Definition hooks.txt:2176
$batch
Definition linkcache.txt:23