MediaWiki REL1_33
KafkaHandler.php
Go to the documentation of this file.
1<?php
22
30
49class KafkaHandler extends AbstractProcessingHandler {
53 protected $produce;
54
58 protected $options;
59
63 protected $partitions = [];
64
68 private static $defaultOptions = [
69 'alias' => [], // map from monolog channel to kafka topic
70 'swallowExceptions' => false, // swallow exceptions sending records
71 'logExceptions' => null, // A PSR3 logger to inform about errors
72 'requireAck' => 0,
73 ];
74
81 public function __construct(
82 Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
83 ) {
84 parent::__construct( $level, $bubble );
85 $this->produce = $produce;
86 $this->options = array_merge( self::$defaultOptions, $options );
87 }
88
99 public static function factory(
100 $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
101 ) {
102 $metadata = new MetaDataFromKafka( $kafkaServers );
103 $produce = new Produce( $metadata );
104
105 if ( isset( $options['sendTimeout'] ) ) {
106 $timeOut = $options['sendTimeout'];
107 $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
108 $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
109 intval( $timeOut * 1000000 )
110 );
111 }
112 if ( isset( $options['recvTimeout'] ) ) {
113 $timeOut = $options['recvTimeout'];
114 $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
115 $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
116 intval( $timeOut * 1000000 )
117 );
118 }
119 if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
120 $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
121 }
122
123 if ( isset( $options['requireAck'] ) ) {
124 $produce->setRequireAck( $options['requireAck'] );
125 }
126
127 return new self( $produce, $options, $level, $bubble );
128 }
129
133 protected function write( array $record ) {
134 if ( $record['formatted'] !== null ) {
135 $this->addMessages( $record['channel'], [ $record['formatted'] ] );
136 $this->send();
137 }
138 }
139
143 public function handleBatch( array $batch ) {
144 $channels = [];
145 foreach ( $batch as $record ) {
146 if ( $record['level'] < $this->level ) {
147 continue;
148 }
149 $channels[$record['channel']][] = $this->processRecord( $record );
150 }
151
152 $formatter = $this->getFormatter();
153 foreach ( $channels as $channel => $records ) {
154 $messages = [];
155 foreach ( $records as $idx => $record ) {
156 $message = $formatter->format( $record );
157 if ( $message !== null ) {
158 $messages[] = $message;
159 }
160 }
161 if ( $messages ) {
162 $this->addMessages( $channel, $messages );
163 }
164 }
165
166 $this->send();
167 }
168
172 protected function send() {
173 try {
174 $response = $this->produce->send();
175 } catch ( \Kafka\Exception $e ) {
176 $ignore = $this->warning(
177 'Error sending records to kafka: {exception}',
178 [ 'exception' => $e ] );
179 if ( !$ignore ) {
180 throw $e;
181 } else {
182 return;
183 }
184 }
185
186 if ( is_bool( $response ) ) {
187 return;
188 }
189
190 $errors = [];
191 foreach ( $response as $topicName => $partitionResponse ) {
192 foreach ( $partitionResponse as $partition => $info ) {
193 if ( $info['errCode'] === 0 ) {
194 // no error
195 continue;
196 }
197 $errors[] = sprintf(
198 'Error producing to %s (errno %d): %s',
199 $topicName,
200 $info['errCode'],
201 Decoder::getError( $info['errCode'] )
202 );
203 }
204 }
205
206 if ( $errors ) {
207 $error = implode( "\n", $errors );
208 if ( !$this->warning( $error ) ) {
209 throw new \RuntimeException( $error );
210 }
211 }
212 }
213
219 protected function getRandomPartition( $topic ) {
220 if ( !array_key_exists( $topic, $this->partitions ) ) {
221 try {
222 $partitions = $this->produce->getAvailablePartitions( $topic );
223 } catch ( \Kafka\Exception $e ) {
224 $ignore = $this->warning(
225 'Error getting metadata for kafka topic {topic}: {exception}',
226 [ 'topic' => $topic, 'exception' => $e ] );
227 if ( $ignore ) {
228 return null;
229 }
230 throw $e;
231 }
232 if ( $partitions ) {
233 $key = array_rand( $partitions );
234 $this->partitions[$topic] = $partitions[$key];
235 } else {
236 $details = $this->produce->getClient()->getTopicDetail( $topic );
237 $ignore = $this->warning(
238 'No partitions available for kafka topic {topic}',
239 [ 'topic' => $topic, 'kafka' => $details ]
240 );
241 if ( !$ignore ) {
242 throw new \RuntimeException( "No partitions available for kafka topic $topic" );
243 }
244 $this->partitions[$topic] = null;
245 }
246 }
247 return $this->partitions[$topic];
248 }
249
256 protected function addMessages( $channel, array $records ) {
257 $topic = $this->options['alias'][$channel] ?? "monolog_$channel";
258 $partition = $this->getRandomPartition( $topic );
259 if ( $partition !== null ) {
260 $this->produce->setMessages( $topic, $partition, $records );
261 }
262 }
263
269 protected function warning( $message, array $context = [] ) {
270 if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
271 $this->options['logExceptions']->warning( $message, $context );
272 }
273 return $this->options['swallowExceptions'];
274 }
275}
and that you know you can do these things To protect your we need to make restrictions that forbid anyone to deny you these rights or to ask you to surrender the rights These restrictions translate to certain responsibilities for you if you distribute copies of the or if you modify it For if you distribute copies of such a whether gratis or for a you must give the recipients all the rights that you have You must make sure that receive or can get the source code And you must show them these terms so they know their rights We protect your rights with two and(2) offer you this license which gives you legal permission to copy
$messages
PSR-3 logger instance factory.
static getInstance( $channel)
Get a named logger instance from the currently configured logger factory.
Log handler sends log events to a kafka server.
addMessages( $channel, array $records)
Adds records for a channel to the Kafka client internal queue.
static array $defaultOptions
defaults for constructor options
array $partitions
Map from topic name to partition this request produces to.
send()
Send any records in the kafka client internal queue.
warning( $message, array $context=[])
Produce $produce
Sends requests to kafka.
handleBatch(array $batch)
@inheritDoc
write(array $record)
@inheritDoc
array $options
Optional handler configuration.
__construct(Produce $produce, array $options, $level=Logger::DEBUG, $bubble=true)
static factory( $kafkaServers, array $options=[], $level=Logger::DEBUG, $bubble=true)
Constructs the necessary support objects and returns a KafkaHandler instance.
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction you ll probably need to make sure the header is varied on and they can depend only on the ResourceLoaderContext $context
Definition hooks.txt:2848
Using a hook running we can avoid having all this option specific stuff in our mainline code Using the function We ve cleaned up the code here by removing clumps of infrequently used code and moving them off somewhere else It s much easier for someone working with this code to see what s _really_ going and make changes or fix bugs In we can take all the code that deals with the little used title reversing options(say) and put it in one place. Instead of having little title-reversing if-blocks spread all over the codebase in showAnArticle
this hook is for auditing only $response
Definition hooks.txt:780
returning false will NOT prevent logging $e
Definition hooks.txt:2175
$batch
Definition linkcache.txt:23
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))