MediaWiki  1.34.0
KafkaHandler.php
Go to the documentation of this file.
1 <?php
21 namespace MediaWiki\Logger\Monolog;
22 
23 use Kafka\MetaDataFromKafka;
24 use Kafka\Produce;
25 use Kafka\Protocol\Decoder;
27 use Monolog\Handler\AbstractProcessingHandler;
28 use Monolog\Logger;
29 use Psr\Log\LoggerInterface;
30 
49 class KafkaHandler extends AbstractProcessingHandler {
53  protected $produce;
54 
58  protected $options;
59 
63  protected $partitions = [];
64 
68  private static $defaultOptions = [
69  'alias' => [], // map from monolog channel to kafka topic
70  'swallowExceptions' => false, // swallow exceptions sending records
71  'logExceptions' => null, // A PSR3 logger to inform about errors
72  'requireAck' => 0,
73  ];
74 
81  public function __construct(
82  Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
83  ) {
84  parent::__construct( $level, $bubble );
85  $this->produce = $produce;
86  $this->options = array_merge( self::$defaultOptions, $options );
87  }
88 
99  public static function factory(
100  $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
101  ) {
102  $metadata = new MetaDataFromKafka( $kafkaServers );
103  $produce = new Produce( $metadata );
104 
105  if ( isset( $options['sendTimeout'] ) ) {
106  $timeOut = $options['sendTimeout'];
107  $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
108  $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
109  intval( $timeOut * 1000000 )
110  );
111  }
112  if ( isset( $options['recvTimeout'] ) ) {
113  $timeOut = $options['recvTimeout'];
114  $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
115  $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
116  intval( $timeOut * 1000000 )
117  );
118  }
119  if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
120  $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
121  }
122 
123  if ( isset( $options['requireAck'] ) ) {
124  $produce->setRequireAck( $options['requireAck'] );
125  }
126 
127  return new self( $produce, $options, $level, $bubble );
128  }
129 
133  protected function write( array $record ) {
134  if ( $record['formatted'] !== null ) {
135  $this->addMessages( $record['channel'], [ $record['formatted'] ] );
136  $this->send();
137  }
138  }
139 
143  public function handleBatch( array $batch ) {
144  $channels = [];
145  foreach ( $batch as $record ) {
146  if ( $record['level'] < $this->level ) {
147  continue;
148  }
149  $channels[$record['channel']][] = $this->processRecord( $record );
150  }
151 
152  $formatter = $this->getFormatter();
153  foreach ( $channels as $channel => $records ) {
154  $messages = [];
155  foreach ( $records as $idx => $record ) {
156  $message = $formatter->format( $record );
157  if ( $message !== null ) {
158  $messages[] = $message;
159  }
160  }
161  if ( $messages ) {
162  $this->addMessages( $channel, $messages );
163  }
164  }
165 
166  $this->send();
167  }
168 
172  protected function send() {
173  try {
174  $response = $this->produce->send();
175  } catch ( \Kafka\Exception $e ) {
176  $ignore = $this->warning(
177  'Error sending records to kafka: {exception}',
178  [ 'exception' => $e ] );
179  if ( !$ignore ) {
180  throw $e;
181  } else {
182  return;
183  }
184  }
185 
186  if ( is_bool( $response ) ) {
187  return;
188  }
189 
190  $errors = [];
191  foreach ( $response as $topicName => $partitionResponse ) {
192  foreach ( $partitionResponse as $partition => $info ) {
193  if ( $info['errCode'] === 0 ) {
194  // no error
195  continue;
196  }
197  $errors[] = sprintf(
198  'Error producing to %s (errno %d): %s',
199  $topicName,
200  $info['errCode'],
201  Decoder::getError( $info['errCode'] )
202  );
203  }
204  }
205 
206  if ( $errors ) {
207  $error = implode( "\n", $errors );
208  if ( !$this->warning( $error ) ) {
209  throw new \RuntimeException( $error );
210  }
211  }
212  }
213 
219  protected function getRandomPartition( $topic ) {
220  if ( !array_key_exists( $topic, $this->partitions ) ) {
221  try {
222  $partitions = $this->produce->getAvailablePartitions( $topic );
223  } catch ( \Kafka\Exception $e ) {
224  $ignore = $this->warning(
225  'Error getting metadata for kafka topic {topic}: {exception}',
226  [ 'topic' => $topic, 'exception' => $e ] );
227  if ( $ignore ) {
228  return null;
229  }
230  throw $e;
231  }
232  if ( $partitions ) {
233  $key = array_rand( $partitions );
234  $this->partitions[$topic] = $partitions[$key];
235  } else {
236  $details = $this->produce->getClient()->getTopicDetail( $topic );
237  $ignore = $this->warning(
238  'No partitions available for kafka topic {topic}',
239  [ 'topic' => $topic, 'kafka' => $details ]
240  );
241  if ( !$ignore ) {
242  throw new \RuntimeException( "No partitions available for kafka topic $topic" );
243  }
244  $this->partitions[$topic] = null;
245  }
246  }
247  return $this->partitions[$topic];
248  }
249 
256  protected function addMessages( $channel, array $records ) {
257  $topic = $this->options['alias'][$channel] ?? "monolog_$channel";
258  $partition = $this->getRandomPartition( $topic );
259  if ( $partition !== null ) {
260  $this->produce->setMessages( $topic, $partition, $records );
261  }
262  }
263 
269  protected function warning( $message, array $context = [] ) {
270  if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
271  $this->options['logExceptions']->warning( $message, $context );
272  }
273  return $this->options['swallowExceptions'];
274  }
275 }
MediaWiki\Logger\Monolog\KafkaHandler\warning
warning( $message, array $context=[])
Definition: KafkaHandler.php:269
$response
$response
Definition: opensearch_desc.php:38
MediaWiki\Logger\Monolog\KafkaHandler\addMessages
addMessages( $channel, array $records)
Adds records for a channel to the Kafka client internal queue.
Definition: KafkaHandler.php:256
MediaWiki\Logger\LoggerFactory\getInstance
static getInstance( $channel)
Get a named logger instance from the currently configured logger factory.
Definition: LoggerFactory.php:92
MediaWiki\Logger\Monolog\KafkaHandler\write
write(array $record)
Definition: KafkaHandler.php:133
MediaWiki\Logger\Monolog
Definition: AvroFormatter.php:21
MediaWiki\Logger\Monolog\KafkaHandler
Log handler sends log events to a kafka server.
Definition: KafkaHandler.php:49
MediaWiki\Logger\LoggerFactory
PSR-3 logger instance factory.
Definition: LoggerFactory.php:45
MediaWiki\Logger\Monolog\KafkaHandler\__construct
__construct(Produce $produce, array $options, $level=Logger::DEBUG, $bubble=true)
Definition: KafkaHandler.php:81
MediaWiki\Logger\Monolog\KafkaHandler\$produce
Produce $produce
Sends requests to kafka.
Definition: KafkaHandler.php:53
MediaWiki\Logger\Monolog\KafkaHandler\$options
array $options
Optional handler configuration.
Definition: KafkaHandler.php:58
MediaWiki\Logger\Monolog\KafkaHandler\handleBatch
handleBatch(array $batch)
Definition: KafkaHandler.php:143
MediaWiki\Logger\Monolog\KafkaHandler\send
send()
Send any records in the kafka client internal queue.
Definition: KafkaHandler.php:172
MediaWiki\Logger\Monolog\KafkaHandler\$defaultOptions
static array $defaultOptions
defaults for constructor options
Definition: KafkaHandler.php:68
MediaWiki\Logger\Monolog\KafkaHandler\factory
static factory( $kafkaServers, array $options=[], $level=Logger::DEBUG, $bubble=true)
Constructs the necessary support objects and returns a KafkaHandler instance.
Definition: KafkaHandler.php:99
MediaWiki\Logger\Monolog\KafkaHandler\getRandomPartition
getRandomPartition( $topic)
Definition: KafkaHandler.php:219
MediaWiki\$context
IContextSource $context
Definition: MediaWiki.php:38
MediaWiki\Logger\Monolog\KafkaHandler\$partitions
array $partitions
Map from topic name to partition this request produces to.
Definition: KafkaHandler.php:63