23use Kafka\MetaDataFromKafka;
25use Kafka\Protocol\Decoder;
27use Monolog\Handler\AbstractProcessingHandler;
29use Psr\Log\LoggerInterface;
68 private const DEFAULT_OPTIONS = [
70 'swallowExceptions' =>
false,
71 'logExceptions' =>
null,
84 parent::__construct( $level, $bubble );
86 $this->options = array_merge( self::DEFAULT_OPTIONS,
$options );
100 $kafkaServers, array
$options = [], $level = Logger::DEBUG, $bubble =
true
102 $metadata =
new MetaDataFromKafka( $kafkaServers );
103 $produce =
new Produce( $metadata );
105 if ( isset(
$options[
'sendTimeout'] ) ) {
107 $produce->getClient()->setStreamOption(
'SendTimeoutSec', 0 );
108 $produce->getClient()->setStreamOption(
'SendTimeoutUSec',
109 intval( $timeOut * 1000000 )
112 if ( isset(
$options[
'recvTimeout'] ) ) {
114 $produce->getClient()->setStreamOption(
'RecvTimeoutSec', 0 );
115 $produce->getClient()->setStreamOption(
'RecvTimeoutUSec',
116 intval( $timeOut * 1000000 )
119 if ( isset(
$options[
'logExceptions'] ) && is_string(
$options[
'logExceptions'] ) ) {
123 if ( isset(
$options[
'requireAck'] ) ) {
133 protected function write( array $record ): void {
134 if ( $record[
'formatted'] !== null ) {
135 $this->
addMessages( $record[
'channel'], [ $record[
'formatted'] ] );
146 foreach ( $batch as $record ) {
147 if ( $record[
'level'] < $this->level ) {
150 $channels[$record[
'channel']][] = $this->processRecord( $record );
153 $formatter = $this->getFormatter();
154 foreach ( $channels as $channel => $records ) {
156 foreach ( $records as $idx => $record ) {
157 $message = $formatter->format( $record );
158 if ( $message !==
null ) {
159 $messages[] = $message;
163 $this->addMessages( $channel, $messages );
175 $response = $this->produce->send();
176 }
catch ( \Kafka\Exception $e ) {
177 $ignore = $this->warning(
178 'Error sending records to kafka: {exception}',
179 [
'exception' => $e ] );
187 if ( is_bool( $response ) ) {
192 foreach ( $response as $topicName => $partitionResponse ) {
193 foreach ( $partitionResponse as $partition => $info ) {
194 if ( $info[
'errCode'] === 0 ) {
199 'Error producing to %s (errno %d): %s',
202 Decoder::getError( $info[
'errCode'] )
208 $error = implode(
"\n", $errors );
209 if ( !$this->warning( $error ) ) {
210 throw new \RuntimeException( $error );
221 if ( !array_key_exists( $topic, $this->partitions ) ) {
223 $partitions = $this->produce->getAvailablePartitions( $topic );
224 }
catch ( \Kafka\Exception $e ) {
225 $ignore = $this->warning(
226 'Error getting metadata for kafka topic {topic}: {exception}',
227 [
'topic' => $topic,
'exception' => $e ] );
234 $key = array_rand( $partitions );
235 $this->partitions[$topic] = $partitions[$key];
237 $details = $this->produce->getClient()->getTopicDetail( $topic );
238 $ignore = $this->warning(
239 'No partitions available for kafka topic {topic}',
240 [
'topic' => $topic,
'kafka' => $details ]
243 throw new \RuntimeException(
"No partitions available for kafka topic $topic" );
245 $this->partitions[$topic] =
null;
248 return $this->partitions[$topic];
258 $topic = $this->options[
'alias'][$channel] ??
"monolog_$channel";
259 $partition = $this->getRandomPartition( $topic );
260 if ( $partition !==
null ) {
261 $this->produce->setMessages( $topic, $partition, $records );
270 protected function warning( $message, array $context = [] ) {
271 if ( $this->options[
'logExceptions'] instanceof LoggerInterface ) {
272 $this->options[
'logExceptions']->warning( $message, $context );
274 return $this->options[
'swallowExceptions'];
if(ini_get('mbstring.func_overload')) if(!defined('MW_ENTRY_POINT'))
Pre-config setup: Before loading LocalSettings.php.