1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.wikimedia.gobblin.copy;
18
19 import com.codahale.metrics.Gauge;
20 import com.codahale.metrics.Metric;
21 import com.google.common.base.Function;
22 import com.google.common.base.Joiner;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Throwables;
25 import com.google.common.collect.FluentIterable;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.Iterators;
28 import com.google.common.collect.Lists;
29 import com.typesafe.config.Config;
30 import com.typesafe.config.ConfigFactory;
31
32 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
33 import lombok.EqualsAndHashCode;
34 import lombok.ToString;
35 import lombok.extern.slf4j.Slf4j;
36 import org.apache.gobblin.configuration.ConfigurationKeys;
37 import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
38 import org.apache.gobblin.kafka.client.BaseKafkaConsumerRecord;
39 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
40 import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
41 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
42 import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
43 import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
44 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
45 import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
46 import org.apache.gobblin.util.ConfigUtils;
47 import org.apache.kafka.clients.consumer.*;
48 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
49 import org.apache.kafka.common.MetricName;
50 import org.apache.kafka.common.PartitionInfo;
51 import org.apache.kafka.common.TopicPartition;
52 import org.apache.kafka.common.metrics.KafkaMetric;
53 import org.apache.kafka.common.record.TimestampType;
54
55 import javax.annotation.Nonnull;
56 import java.io.IOException;
57 import java.util.*;
58 import java.util.Map.Entry;
59 import java.util.stream.Collectors;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 @Slf4j
80 @SuppressFBWarnings(justification = "Class copied from upstream Gobblin")
81 public class Kafka1ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient {
82
83 private static final String CLIENT_BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
84 private static final String CLIENT_ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
85 private static final String CLIENT_SESSION_TIMEOUT_KEY = "session.timeout.ms";
86 private static final String CLIENT_KEY_DESERIALIZER_CLASS_KEY = "key.deserializer";
87 private static final String CLIENT_VALUE_DESERIALIZER_CLASS_KEY = "value.deserializer";
88 private static final String CLIENT_GROUP_ID = "group.id";
89
90 private static final String DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false);
91 public static final String DEFAULT_KEY_DESERIALIZER =
92 "org.apache.kafka.common.serialization.StringDeserializer";
93 private static final String DEFAULT_GROUP_ID = "kafka1";
94
95 public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
96 + CLIENT_KEY_DESERIALIZER_CLASS_KEY;
97 public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
98 + CLIENT_VALUE_DESERIALIZER_CLASS_KEY;
99
100 private static final Config FALLBACK =
101 ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
102 .put(CLIENT_ENABLE_AUTO_COMMIT_KEY, DEFAULT_ENABLE_AUTO_COMMIT)
103 .put(CLIENT_KEY_DESERIALIZER_CLASS_KEY, DEFAULT_KEY_DESERIALIZER)
104 .put(CLIENT_GROUP_ID, DEFAULT_GROUP_ID)
105 .build());
106
107 private final Consumer<K, V> consumer;
108
109 private Kafka1ConsumerClient(Config config) {
110 super(config);
111 Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
112 "Missing required property " + GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY);
113
114 Properties props = new Properties();
115 props.put(CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers));
116 props.put(CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
117
118
119 Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK);
120
121
122 Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG).withFallback(
123 ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
124
125 Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
126 props.putAll(ConfigUtils.configToProperties(scopedConfig));
127
128 this.consumer = new KafkaConsumer<>(props);
129 }
130
131 public Kafka1ConsumerClient(Config config, Consumer<K, V> consumer) {
132 super(config);
133 this.consumer = consumer;
134 }
135
136 @Override
137 public List<KafkaTopic> getTopics() {
138 return FluentIterable.from(this.consumer.listTopics().entrySet())
139 .transform(new Function<Entry<String, List<PartitionInfo>>, KafkaTopic>() {
140 @Override
141 public KafkaTopic apply(Entry<String, List<PartitionInfo>> filteredTopicEntry) {
142 return new KafkaTopic(filteredTopicEntry.getKey(), Lists.transform(filteredTopicEntry.getValue(),
143 PARTITION_INFO_TO_KAFKA_PARTITION));
144 }
145 }).toList();
146 }
147
148 @Override
149 public long getEarliestOffset(KafkaPartition partition) {
150 TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
151 List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
152 this.consumer.assign(topicPartitionList);
153 this.consumer.seekToBeginning(topicPartitionList);
154
155 return this.consumer.position(topicPartition);
156 }
157
158 @Override
159 public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
160 TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
161 List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
162 this.consumer.assign(topicPartitionList);
163 this.consumer.seekToEnd(topicPartitionList);
164
165 return this.consumer.position(topicPartition);
166 }
167
168 @Override
169 public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
170
171 if (nextOffset > maxOffset) {
172 return null;
173 }
174
175 this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
176 this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
177 return consume();
178 }
179
180 @Override
181 public Iterator<KafkaConsumerRecord> consume() {
182 try {
183 ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
184
185 return Iterators.transform(consumerRecords.iterator(), input -> {
186 try {
187 return new Kafka1ConsumerRecord(input);
188 } catch (Throwable t) {
189 throw Throwables.propagate(t);
190 }
191 });
192 } catch (Exception e) {
193 log.error("Exception on polling records", e);
194 throw new RuntimeException(e);
195 }
196 }
197
198
199
200
201
202
203 @Override
204 public void subscribe(String topic) {
205 this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
206 }
207
208
209
210
211
212
213 @Override
214 public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
215 this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
216 @Override
217 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
218 listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
219 }
220
221 @Override
222 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
223 listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
224 }
225 });
226 }
227
228 @Override
229 public Map<String, Metric> getMetrics() {
230 Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
231 Map<String, Metric> codaHaleMetricMap = new HashMap<>();
232
233 kafkaMetrics
234 .forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
235 return codaHaleMetricMap;
236 }
237
238
239
240
241 @Override
242 public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
243 Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
244 consumer.commitAsync(offsets, new OffsetCommitCallback() {
245 @Override
246 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
247 if(exception != null) {
248 log.error("Exception while committing offsets " + offsets, exception);
249 return;
250 }
251 }
252 });
253 }
254
255
256
257
258 @Override
259 public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
260 Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
261 consumer.commitSync(offsets);
262 }
263
264
265
266
267
268
269 @Override
270 public long committed(KafkaPartition partition) {
271 OffsetAndMetadata offsetAndMetadata = consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
272 return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
273 }
274
275
276
277
278
279
280 private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
281 if (log.isDebugEnabled()) {
282 log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
283 }
284 Gauge<Object> gauge = kafkaMetric::metricValue;
285 return gauge;
286 }
287
288 private String canonicalMetricName(KafkaMetric kafkaMetric) {
289 MetricName name = kafkaMetric.metricName();
290 return canonicalMetricName(name.group(), name.tags().values(), name.name());
291 }
292
293 @Override
294 public void close() throws IOException {
295 this.consumer.close();
296 }
297
298 private static final Function<PartitionInfo, KafkaPartition> PARTITION_INFO_TO_KAFKA_PARTITION =
299 new Function<PartitionInfo, KafkaPartition>() {
300 @Override
301 public KafkaPartition apply(@Nonnull PartitionInfo partitionInfo) {
302 return new KafkaPartition.Builder().withId(partitionInfo.partition()).withTopicName(partitionInfo.topic())
303 .withLeaderId(partitionInfo.leader().id())
304 .withLeaderHostAndPort(partitionInfo.leader().host(), partitionInfo.leader().port()).build();
305 }
306 };
307
308
309
310
311 public static class Factory implements GobblinKafkaConsumerClientFactory {
312 @SuppressWarnings("rawtypes")
313 @Override
314 public GobblinKafkaConsumerClient create(Config config) {
315 return new Kafka1ConsumerClient(config);
316 }
317 }
318
319
320
321
322
323
324
325 @EqualsAndHashCode
326 @ToString
327 public static class Kafka1ConsumerRecord<K, V> extends BaseKafkaConsumerRecord implements
328 DecodeableKafkaRecord<K, V> {
329 private final ConsumerRecord<K, V> consumerRecord;
330
331 public Kafka1ConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
332
333
334 super(consumerRecord.offset(), consumerRecord.serializedValueSize() , consumerRecord.topic(), consumerRecord.partition());
335 this.consumerRecord = consumerRecord;
336 }
337
338
339
340
341 public TimestampType getTimestampType() {
342 return this.consumerRecord.timestampType();
343 }
344
345
346
347
348 @Override
349 public boolean isTimestampLogAppend() {
350 return this.consumerRecord.timestampType() == TimestampType.LOG_APPEND_TIME;
351 }
352
353
354
355
356 @Override
357 public long getTimestamp() {
358 return this.consumerRecord.timestamp();
359 }
360
361 @Override
362 public K getKey() {
363 return this.consumerRecord.key();
364 }
365
366 @Override
367 public V getValue() {
368 return this.consumerRecord.value();
369 }
370 }
371 }