23use Kafka\MetaDataFromKafka;
25use Kafka\Protocol\Decoder;
27use Monolog\Handler\AbstractProcessingHandler;
29use Psr\Log\LoggerInterface;
70 'swallowExceptions' =>
false,
71 'logExceptions' =>
null,
84 parent::__construct( $level, $bubble );
86 $this->options = array_merge( self::$defaultOptions,
$options );
100 $kafkaServers, array
$options = [], $level = Logger::DEBUG, $bubble =
true
102 $metadata =
new MetaDataFromKafka( $kafkaServers );
103 $produce =
new Produce( $metadata );
105 if ( isset(
$options[
'sendTimeout'] ) ) {
107 $produce->getClient()->setStreamOption(
'SendTimeoutSec', 0 );
108 $produce->getClient()->setStreamOption(
'SendTimeoutUSec',
109 intval( $timeOut * 1000000 )
112 if ( isset(
$options[
'recvTimeout'] ) ) {
114 $produce->getClient()->setStreamOption(
'RecvTimeoutSec', 0 );
115 $produce->getClient()->setStreamOption(
'RecvTimeoutUSec',
116 intval( $timeOut * 1000000 )
119 if ( isset(
$options[
'logExceptions'] ) && is_string(
$options[
'logExceptions'] ) ) {
123 if ( isset(
$options[
'requireAck'] ) ) {
133 protected function write( array $record ) {
134 if ( $record[
'formatted'] !==
null ) {
135 $this->
addMessages( $record[
'channel'], [ $record[
'formatted'] ] );
145 foreach ( $batch as $record ) {
146 if ( $record[
'level'] < $this->level ) {
149 $channels[$record[
'channel']][] = $this->processRecord( $record );
152 $formatter = $this->getFormatter();
153 foreach ( $channels as $channel => $records ) {
155 foreach ( $records as $idx => $record ) {
156 $message = $formatter->format( $record );
157 if ( $message !==
null ) {
158 $messages[] = $message;
175 }
catch ( \Kafka\Exception $e ) {
177 'Error sending records to kafka: {exception}',
178 [
'exception' => $e ] );
191 foreach (
$response as $topicName => $partitionResponse ) {
192 foreach ( $partitionResponse as $partition => $info ) {
193 if ( $info[
'errCode'] === 0 ) {
198 'Error producing to %s (errno %d): %s',
201 Decoder::getError( $info[
'errCode'] )
207 $error = implode(
"\n", $errors );
208 if ( !$this->
warning( $error ) ) {
209 throw new \RuntimeException( $error );
220 if ( !array_key_exists( $topic, $this->partitions ) ) {
222 $partitions = $this->produce->getAvailablePartitions( $topic );
223 }
catch ( \Kafka\Exception $e ) {
225 'Error getting metadata for kafka topic {topic}: {exception}',
226 [
'topic' => $topic,
'exception' => $e ] );
236 $details = $this->produce->getClient()->getTopicDetail( $topic );
238 'No partitions available for kafka topic {topic}',
239 [
'topic' => $topic,
'kafka' => $details ]
242 throw new \RuntimeException(
"No partitions available for kafka topic $topic" );
244 $this->partitions[$topic] =
null;
247 return $this->partitions[$topic];
257 $topic = $this->options[
'alias'][$channel] ??
"monolog_$channel";
259 if ( $partition !==
null ) {
260 $this->produce->setMessages( $topic, $partition, $records );
270 if ( $this->options[
'logExceptions'] instanceof LoggerInterface ) {
271 $this->options[
'logExceptions']->warning( $message,
$context );
273 return $this->options[
'swallowExceptions'];