Kafka1ConsumerClient.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wikimedia.gobblin.copy;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
import org.apache.gobblin.kafka.client.BaseKafkaConsumerRecord;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.record.TimestampType;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
* A {@link GobblinKafkaConsumerClient} that uses kafka 1.1 consumer client. Use {@link Factory#create(Config)} to create
* new Kafka1.1ConsumerClients. The {@link Config} used to create clients must have required key {@value #GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY}
*
* <p>
* This is an updated copy of {@link org.apache.gobblin.kafka.client.Kafka1ConsumerClient}
* in gobblin-kafka-1 module. This file should be deleted in favor of the upstream version when possible.
* Updates are:
* <ul>
* <li>Expose kafka-record timestamp in {@link Kafka1ConsumerRecord} -- https://github.com/apache/gobblin/pull/3244</li>
* </ul>
* </p>
*
* @param <K> Message key type
* @param <V> Message value type
*
*/
@Slf4j
@SuppressFBWarnings(justification = "Class copied from upstream Gobblin")
public class Kafka1ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient {
private static final String CLIENT_BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
private static final String CLIENT_ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
private static final String CLIENT_SESSION_TIMEOUT_KEY = "session.timeout.ms";
private static final String CLIENT_KEY_DESERIALIZER_CLASS_KEY = "key.deserializer";
private static final String CLIENT_VALUE_DESERIALIZER_CLASS_KEY = "value.deserializer";
private static final String CLIENT_GROUP_ID = "group.id";
private static final String DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false);
public static final String DEFAULT_KEY_DESERIALIZER =
"org.apache.kafka.common.serialization.StringDeserializer";
private static final String DEFAULT_GROUP_ID = "kafka1";
public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
+ CLIENT_KEY_DESERIALIZER_CLASS_KEY;
public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
+ CLIENT_VALUE_DESERIALIZER_CLASS_KEY;
private static final Config FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
.put(CLIENT_ENABLE_AUTO_COMMIT_KEY, DEFAULT_ENABLE_AUTO_COMMIT)
.put(CLIENT_KEY_DESERIALIZER_CLASS_KEY, DEFAULT_KEY_DESERIALIZER)
.put(CLIENT_GROUP_ID, DEFAULT_GROUP_ID)
.build());
private final Consumer<K, V> consumer;
private Kafka1ConsumerClient(Config config) {
super(config);
Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
"Missing required property " + GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY);
Properties props = new Properties();
props.put(CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers));
props.put(CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
// grab all the config under "source.kafka" and add the defaults as fallback.
Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK);
// get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka with a fallback to the
// shared config that start with "gobblin.kafka.sharedConfig"
Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG).withFallback(
ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
// The specific config overrides settings in the base config
Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
props.putAll(ConfigUtils.configToProperties(scopedConfig));
this.consumer = new KafkaConsumer<>(props);
}
public Kafka1ConsumerClient(Config config, Consumer<K, V> consumer) {
super(config);
this.consumer = consumer;
}
@Override
public List<KafkaTopic> getTopics() {
return FluentIterable.from(this.consumer.listTopics().entrySet())
.transform(new Function<Entry<String, List<PartitionInfo>>, KafkaTopic>() {
@Override
public KafkaTopic apply(Entry<String, List<PartitionInfo>> filteredTopicEntry) {
return new KafkaTopic(filteredTopicEntry.getKey(), Lists.transform(filteredTopicEntry.getValue(),
PARTITION_INFO_TO_KAFKA_PARTITION));
}
}).toList();
}
@Override
public long getEarliestOffset(KafkaPartition partition) {
TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
this.consumer.assign(topicPartitionList);
this.consumer.seekToBeginning(topicPartitionList);
return this.consumer.position(topicPartition);
}
@Override
public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
this.consumer.assign(topicPartitionList);
this.consumer.seekToEnd(topicPartitionList);
return this.consumer.position(topicPartition);
}
@Override
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
if (nextOffset > maxOffset) {
return null;
}
this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
return consume();
}
@Override
public Iterator<KafkaConsumerRecord> consume() {
try {
ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
return Iterators.transform(consumerRecords.iterator(), input -> {
try {
return new Kafka1ConsumerRecord(input);
} catch (Throwable t) {
throw Throwables.propagate(t);
}
});
} catch (Exception e) {
log.error("Exception on polling records", e);
throw new RuntimeException(e);
}
}
/**
* Subscribe to a kafka topic
* TODO Add multi topic support
* @param topic
*/
@Override
public void subscribe(String topic) {
this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
}
/**
* Subscribe to a kafka topic with a {#GobblinConsumerRebalanceListener}
* TODO Add multi topic support
* @param topic
*/
@Override
public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
}
});
}
@Override
public Map<String, Metric> getMetrics() {
Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
Map<String, Metric> codaHaleMetricMap = new HashMap<>();
kafkaMetrics
.forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
return codaHaleMetricMap;
}
/**
* Commit offsets to Kafka asynchronously
*/
@Override
public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception != null) {
log.error("Exception while committing offsets " + offsets, exception);
return;
}
}
});
}
/**
* Commit offsets to Kafka synchronously
*/
@Override
public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
consumer.commitSync(offsets);
}
/**
* returns the last committed offset for a KafkaPartition
* @param partition
* @return last committed offset or -1 for invalid KafkaPartition
*/
@Override
public long committed(KafkaPartition partition) {
OffsetAndMetadata offsetAndMetadata = consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
}
/**
* Convert a {@link KafkaMetric} instance to a {@link Metric}.
* @param kafkaMetric
* @return
*/
private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
if (log.isDebugEnabled()) {
log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
}
Gauge<Object> gauge = kafkaMetric::metricValue;
return gauge;
}
private String canonicalMetricName(KafkaMetric kafkaMetric) {
MetricName name = kafkaMetric.metricName();
return canonicalMetricName(name.group(), name.tags().values(), name.name());
}
@Override
public void close() throws IOException {
this.consumer.close();
}
private static final Function<PartitionInfo, KafkaPartition> PARTITION_INFO_TO_KAFKA_PARTITION =
new Function<PartitionInfo, KafkaPartition>() {
@Override
public KafkaPartition apply(@Nonnull PartitionInfo partitionInfo) {
return new KafkaPartition.Builder().withId(partitionInfo.partition()).withTopicName(partitionInfo.topic())
.withLeaderId(partitionInfo.leader().id())
.withLeaderHostAndPort(partitionInfo.leader().host(), partitionInfo.leader().port()).build();
}
};
/**
* A factory class to instantiate {@link Kafka1ConsumerClient}
*/
public static class Factory implements GobblinKafkaConsumerClientFactory {
@SuppressWarnings("rawtypes")
@Override
public GobblinKafkaConsumerClient create(Config config) {
return new Kafka1ConsumerClient(config);
}
}
/**
* A record returned by {@link Kafka1ConsumerClient}
*
* @param <K> Message key type
* @param <V> Message value type
*/
@EqualsAndHashCode
@ToString
public static class Kafka1ConsumerRecord<K, V> extends BaseKafkaConsumerRecord implements
DecodeableKafkaRecord<K, V> {
private final ConsumerRecord<K, V> consumerRecord;
public Kafka1ConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
// Kafka 09 consumerRecords do not provide value size.
// Only 08 and 11 versions provide them.
super(consumerRecord.offset(), consumerRecord.serializedValueSize() , consumerRecord.topic(), consumerRecord.partition());
this.consumerRecord = consumerRecord;
}
/**
* @return the timestamp type of the underlying ConsumerRecord (only for Kafka 1+ records)
*/
public TimestampType getTimestampType() {
return this.consumerRecord.timestampType();
}
/**
* @return true if the timestamp in the ConsumerRecord is the timestamp when the record is written to Kafka.
*/
@Override
public boolean isTimestampLogAppend() {
return this.consumerRecord.timestampType() == TimestampType.LOG_APPEND_TIME;
}
/**
* @return the timestamp of the underlying ConsumerRecord. NOTE: check TimestampType
*/
@Override
public long getTimestamp() {
return this.consumerRecord.timestamp();
}
@Override
public K getKey() {
return this.consumerRecord.key();
}
@Override
public V getValue() {
return this.consumerRecord.value();
}
}
}