MediaWiki REL1_35
KafkaHandler.php
Go to the documentation of this file.
1<?php
22
23use Kafka\MetaDataFromKafka;
24use Kafka\Produce;
25use Kafka\Protocol\Decoder;
27use Monolog\Handler\AbstractProcessingHandler;
28use Monolog\Logger;
29use Psr\Log\LoggerInterface;
30
49class KafkaHandler extends AbstractProcessingHandler {
53 protected $produce;
54
58 protected $options;
59
63 protected $partitions = [];
64
68 private const DEFAULT_OPTIONS = [
69 'alias' => [], // map from monolog channel to kafka topic
70 'swallowExceptions' => false, // swallow exceptions sending records
71 'logExceptions' => null, // A PSR3 logger to inform about errors
72 'requireAck' => 0,
73 ];
74
81 public function __construct(
82 Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
83 ) {
84 parent::__construct( $level, $bubble );
85 $this->produce = $produce;
86 $this->options = array_merge( self::DEFAULT_OPTIONS, $options );
87 }
88
99 public static function factory(
100 $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
101 ) {
102 $metadata = new MetaDataFromKafka( $kafkaServers );
103 $produce = new Produce( $metadata );
104
105 if ( isset( $options['sendTimeout'] ) ) {
106 $timeOut = $options['sendTimeout'];
107 $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
108 $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
109 intval( $timeOut * 1000000 )
110 );
111 }
112 if ( isset( $options['recvTimeout'] ) ) {
113 $timeOut = $options['recvTimeout'];
114 $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
115 $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
116 intval( $timeOut * 1000000 )
117 );
118 }
119 if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
120 $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
121 }
122
123 if ( isset( $options['requireAck'] ) ) {
124 $produce->setRequireAck( $options['requireAck'] );
125 }
126
127 return new self( $produce, $options, $level, $bubble );
128 }
129
133 protected function write( array $record ): void {
134 if ( $record['formatted'] !== null ) {
135 $this->addMessages( $record['channel'], [ $record['formatted'] ] );
136 $this->send();
137 }
138 }
139
144 public function handleBatch( array $batch ): void {
145 $channels = [];
146 foreach ( $batch as $record ) {
147 if ( $record['level'] < $this->level ) {
148 continue;
149 }
150 $channels[$record['channel']][] = $this->processRecord( $record );
151 }
152
153 $formatter = $this->getFormatter();
154 foreach ( $channels as $channel => $records ) {
155 $messages = [];
156 foreach ( $records as $idx => $record ) {
157 $message = $formatter->format( $record );
158 if ( $message !== null ) {
159 $messages[] = $message;
160 }
161 }
162 if ( $messages ) {
163 $this->addMessages( $channel, $messages );
164 }
165 }
166
167 $this->send();
168 }
169
173 protected function send() {
174 try {
175 $response = $this->produce->send();
176 } catch ( \Kafka\Exception $e ) {
177 $ignore = $this->warning(
178 'Error sending records to kafka: {exception}',
179 [ 'exception' => $e ] );
180 if ( !$ignore ) {
181 throw $e;
182 } else {
183 return;
184 }
185 }
186
187 if ( is_bool( $response ) ) {
188 return;
189 }
190
191 $errors = [];
192 foreach ( $response as $topicName => $partitionResponse ) {
193 foreach ( $partitionResponse as $partition => $info ) {
194 if ( $info['errCode'] === 0 ) {
195 // no error
196 continue;
197 }
198 $errors[] = sprintf(
199 'Error producing to %s (errno %d): %s',
200 $topicName,
201 $info['errCode'],
202 Decoder::getError( $info['errCode'] )
203 );
204 }
205 }
206
207 if ( $errors ) {
208 $error = implode( "\n", $errors );
209 if ( !$this->warning( $error ) ) {
210 throw new \RuntimeException( $error );
211 }
212 }
213 }
214
220 protected function getRandomPartition( $topic ) {
221 if ( !array_key_exists( $topic, $this->partitions ) ) {
222 try {
223 $partitions = $this->produce->getAvailablePartitions( $topic );
224 } catch ( \Kafka\Exception $e ) {
225 $ignore = $this->warning(
226 'Error getting metadata for kafka topic {topic}: {exception}',
227 [ 'topic' => $topic, 'exception' => $e ] );
228 if ( $ignore ) {
229 return null;
230 }
231 throw $e;
232 }
233 if ( $partitions ) {
234 $key = array_rand( $partitions );
235 $this->partitions[$topic] = $partitions[$key];
236 } else {
237 $details = $this->produce->getClient()->getTopicDetail( $topic );
238 $ignore = $this->warning(
239 'No partitions available for kafka topic {topic}',
240 [ 'topic' => $topic, 'kafka' => $details ]
241 );
242 if ( !$ignore ) {
243 throw new \RuntimeException( "No partitions available for kafka topic $topic" );
244 }
245 $this->partitions[$topic] = null;
246 }
247 }
248 return $this->partitions[$topic];
249 }
250
257 protected function addMessages( $channel, array $records ) {
258 $topic = $this->options['alias'][$channel] ?? "monolog_$channel";
259 $partition = $this->getRandomPartition( $topic );
260 if ( $partition !== null ) {
261 $this->produce->setMessages( $topic, $partition, $records );
262 }
263 }
264
270 protected function warning( $message, array $context = [] ) {
271 if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
272 $this->options['logExceptions']->warning( $message, $context );
273 }
274 return $this->options['swallowExceptions'];
275 }
276}
if(ini_get('mbstring.func_overload')) if(!defined('MW_ENTRY_POINT'))
Pre-config setup: Before loading LocalSettings.php.
Definition Setup.php:85
PSR-3 logger instance factory.
static getInstance( $channel)
Get a named logger instance from the currently configured logger factory.
Log handler sends log events to a kafka server.
addMessages( $channel, array $records)
Adds records for a channel to the Kafka client internal queue.
array $partitions
Map from topic name to partition this request produces to.
send()
Send any records in the kafka client internal queue.
warning( $message, array $context=[])
Produce $produce
Sends requests to kafka.
handleBatch(array $batch)
-param array[] $batch
array $options
Optional handler configuration.
__construct(Produce $produce, array $options, $level=Logger::DEBUG, $bubble=true)
static factory( $kafkaServers, array $options=[], $level=Logger::DEBUG, $bubble=true)
Constructs the necessary support objects and returns a KafkaHandler instance.