View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.wikimedia.gobblin.copy;
18  
19  import com.codahale.metrics.Gauge;
20  import com.codahale.metrics.Metric;
21  import com.google.common.base.Function;
22  import com.google.common.base.Joiner;
23  import com.google.common.base.Preconditions;
24  import com.google.common.base.Throwables;
25  import com.google.common.collect.FluentIterable;
26  import com.google.common.collect.ImmutableMap;
27  import com.google.common.collect.Iterators;
28  import com.google.common.collect.Lists;
29  import com.typesafe.config.Config;
30  import com.typesafe.config.ConfigFactory;
31  
32  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
33  import lombok.EqualsAndHashCode;
34  import lombok.ToString;
35  import lombok.extern.slf4j.Slf4j;
36  import org.apache.gobblin.configuration.ConfigurationKeys;
37  import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
38  import org.apache.gobblin.kafka.client.BaseKafkaConsumerRecord;
39  import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
40  import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
41  import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
42  import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
43  import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
44  import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
45  import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
46  import org.apache.gobblin.util.ConfigUtils;
47  import org.apache.kafka.clients.consumer.*;
48  import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
49  import org.apache.kafka.common.MetricName;
50  import org.apache.kafka.common.PartitionInfo;
51  import org.apache.kafka.common.TopicPartition;
52  import org.apache.kafka.common.metrics.KafkaMetric;
53  import org.apache.kafka.common.record.TimestampType;
54  
55  import javax.annotation.Nonnull;
56  import java.io.IOException;
57  import java.util.*;
58  import java.util.Map.Entry;
59  import java.util.stream.Collectors;
60  
61  
62  /**
63   * A {@link GobblinKafkaConsumerClient} that uses kafka 1.1 consumer client. Use {@link Factory#create(Config)} to create
64   * new Kafka1.1ConsumerClients. The {@link Config} used to create clients must have required key {@value #GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY}
65   *
66   * <p>
67   * This is an updated copy of {@link org.apache.gobblin.kafka.client.Kafka1ConsumerClient}
68   * in gobblin-kafka-1 module. This file should be deleted in favor of the upstream version when possible.
69   * Updates are:
70   * <ul>
71   *  <li>Expose kafka-record timestamp in {@link Kafka1ConsumerRecord} -- https://github.com/apache/gobblin/pull/3244</li>
72   * </ul>
73   * </p>
74   *
75   * @param <K> Message key type
76   * @param <V> Message value type
77   *
78   */
79  @Slf4j
80  @SuppressFBWarnings(justification = "Class copied from upstream Gobblin")
81  public class Kafka1ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient {
82  
83      private static final String CLIENT_BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
84      private static final String CLIENT_ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
85      private static final String CLIENT_SESSION_TIMEOUT_KEY = "session.timeout.ms";
86      private static final String CLIENT_KEY_DESERIALIZER_CLASS_KEY = "key.deserializer";
87      private static final String CLIENT_VALUE_DESERIALIZER_CLASS_KEY = "value.deserializer";
88      private static final String CLIENT_GROUP_ID = "group.id";
89  
90      private static final String DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false);
91      public static final String DEFAULT_KEY_DESERIALIZER =
92              "org.apache.kafka.common.serialization.StringDeserializer";
93      private static final String DEFAULT_GROUP_ID = "kafka1";
94  
95      public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
96              + CLIENT_KEY_DESERIALIZER_CLASS_KEY;
97      public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
98              + CLIENT_VALUE_DESERIALIZER_CLASS_KEY;
99  
100     private static final Config FALLBACK =
101             ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
102                     .put(CLIENT_ENABLE_AUTO_COMMIT_KEY, DEFAULT_ENABLE_AUTO_COMMIT)
103                     .put(CLIENT_KEY_DESERIALIZER_CLASS_KEY, DEFAULT_KEY_DESERIALIZER)
104                     .put(CLIENT_GROUP_ID, DEFAULT_GROUP_ID)
105                     .build());
106 
107     private final Consumer<K, V> consumer;
108 
109     private Kafka1ConsumerClient(Config config) {
110         super(config);
111         Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
112                 "Missing required property " + GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY);
113 
114         Properties props = new Properties();
115         props.put(CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers));
116         props.put(CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
117 
118         // grab all the config under "source.kafka" and add the defaults as fallback.
119         Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK);
120         // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka with a fallback to the
121         // shared config that start with "gobblin.kafka.sharedConfig"
122         Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG).withFallback(
123                 ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
124         // The specific config overrides settings in the base config
125         Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
126         props.putAll(ConfigUtils.configToProperties(scopedConfig));
127 
128         this.consumer = new KafkaConsumer<>(props);
129     }
130 
131     public Kafka1ConsumerClient(Config config, Consumer<K, V> consumer) {
132         super(config);
133         this.consumer = consumer;
134     }
135 
136     @Override
137     public List<KafkaTopic> getTopics() {
138         return FluentIterable.from(this.consumer.listTopics().entrySet())
139                 .transform(new Function<Entry<String, List<PartitionInfo>>, KafkaTopic>() {
140                     @Override
141                     public KafkaTopic apply(Entry<String, List<PartitionInfo>> filteredTopicEntry) {
142                         return new KafkaTopic(filteredTopicEntry.getKey(), Lists.transform(filteredTopicEntry.getValue(),
143                                 PARTITION_INFO_TO_KAFKA_PARTITION));
144                     }
145                 }).toList();
146     }
147 
148     @Override
149     public long getEarliestOffset(KafkaPartition partition) {
150         TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
151         List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
152         this.consumer.assign(topicPartitionList);
153         this.consumer.seekToBeginning(topicPartitionList);
154 
155         return this.consumer.position(topicPartition);
156     }
157 
158     @Override
159     public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
160         TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
161         List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
162         this.consumer.assign(topicPartitionList);
163         this.consumer.seekToEnd(topicPartitionList);
164 
165         return this.consumer.position(topicPartition);
166     }
167 
168     @Override
169     public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
170 
171         if (nextOffset > maxOffset) {
172             return null;
173         }
174 
175         this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
176         this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
177         return consume();
178     }
179 
180     @Override
181     public Iterator<KafkaConsumerRecord> consume() {
182         try {
183             ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
184 
185             return Iterators.transform(consumerRecords.iterator(), input -> {
186                 try {
187                     return new Kafka1ConsumerRecord(input);
188                 } catch (Throwable t) {
189                     throw Throwables.propagate(t);
190                 }
191             });
192         } catch (Exception e) {
193             log.error("Exception on polling records", e);
194             throw new RuntimeException(e);
195         }
196     }
197 
198     /**
199      * Subscribe to a kafka topic
200      * TODO Add multi topic support
201      * @param topic
202      */
203     @Override
204     public void subscribe(String topic) {
205         this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
206     }
207 
208     /**
209      * Subscribe to a kafka topic with a {#GobblinConsumerRebalanceListener}
210      * TODO Add multi topic support
211      * @param topic
212      */
213     @Override
214     public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
215         this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
216             @Override
217             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
218                 listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
219             }
220 
221             @Override
222             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
223                 listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
224             }
225         });
226     }
227 
228     @Override
229     public Map<String, Metric> getMetrics() {
230         Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
231         Map<String, Metric> codaHaleMetricMap = new HashMap<>();
232 
233         kafkaMetrics
234                 .forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
235         return codaHaleMetricMap;
236     }
237 
238     /**
239      * Commit offsets to Kafka asynchronously
240      */
241     @Override
242     public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
243         Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
244         consumer.commitAsync(offsets, new OffsetCommitCallback() {
245             @Override
246             public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
247                 if(exception != null) {
248                     log.error("Exception while committing offsets " + offsets, exception);
249                     return;
250                 }
251             }
252         });
253     }
254 
255     /**
256      * Commit offsets to Kafka synchronously
257      */
258     @Override
259     public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
260         Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
261         consumer.commitSync(offsets);
262     }
263 
264     /**
265      * returns the last committed offset for a KafkaPartition
266      * @param partition
267      * @return last committed offset or -1 for invalid KafkaPartition
268      */
269     @Override
270     public long committed(KafkaPartition partition) {
271         OffsetAndMetadata offsetAndMetadata =  consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
272         return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
273     }
274 
275     /**
276      * Convert a {@link KafkaMetric} instance to a {@link Metric}.
277      * @param kafkaMetric
278      * @return
279      */
280     private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
281         if (log.isDebugEnabled()) {
282             log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
283         }
284         Gauge<Object> gauge = kafkaMetric::metricValue;
285         return gauge;
286     }
287 
288     private String canonicalMetricName(KafkaMetric kafkaMetric) {
289         MetricName name = kafkaMetric.metricName();
290         return canonicalMetricName(name.group(), name.tags().values(), name.name());
291     }
292 
293     @Override
294     public void close() throws IOException {
295         this.consumer.close();
296     }
297 
298     private static final Function<PartitionInfo, KafkaPartition> PARTITION_INFO_TO_KAFKA_PARTITION =
299             new Function<PartitionInfo, KafkaPartition>() {
300                 @Override
301                 public KafkaPartition apply(@Nonnull PartitionInfo partitionInfo) {
302                     return new KafkaPartition.Builder().withId(partitionInfo.partition()).withTopicName(partitionInfo.topic())
303                             .withLeaderId(partitionInfo.leader().id())
304                             .withLeaderHostAndPort(partitionInfo.leader().host(), partitionInfo.leader().port()).build();
305                 }
306             };
307 
308     /**
309      * A factory class to instantiate {@link Kafka1ConsumerClient}
310      */
311     public static class Factory implements GobblinKafkaConsumerClientFactory {
312         @SuppressWarnings("rawtypes")
313         @Override
314         public GobblinKafkaConsumerClient create(Config config) {
315             return new Kafka1ConsumerClient(config);
316         }
317     }
318 
319     /**
320      * A record returned by {@link Kafka1ConsumerClient}
321      *
322      * @param <K> Message key type
323      * @param <V> Message value type
324      */
325     @EqualsAndHashCode
326     @ToString
327     public static class Kafka1ConsumerRecord<K, V> extends BaseKafkaConsumerRecord implements
328             DecodeableKafkaRecord<K, V> {
329         private final ConsumerRecord<K, V> consumerRecord;
330 
331         public Kafka1ConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
332             // Kafka 09 consumerRecords do not provide value size.
333             // Only 08 and 11 versions provide them.
334             super(consumerRecord.offset(), consumerRecord.serializedValueSize() , consumerRecord.topic(), consumerRecord.partition());
335             this.consumerRecord = consumerRecord;
336         }
337 
338         /**
339          * @return the timestamp type of the underlying ConsumerRecord (only for Kafka 1+ records)
340          */
341         public TimestampType getTimestampType() {
342             return this.consumerRecord.timestampType();
343         }
344 
345         /**
346          * @return true if the timestamp in the ConsumerRecord is the timestamp when the record is written to Kafka.
347          */
348         @Override
349         public boolean isTimestampLogAppend() {
350             return this.consumerRecord.timestampType() == TimestampType.LOG_APPEND_TIME;
351         }
352 
353         /**
354          * @return the timestamp of the underlying ConsumerRecord.  NOTE: check TimestampType
355          */
356         @Override
357         public long getTimestamp() {
358             return this.consumerRecord.timestamp();
359         }
360 
361         @Override
362         public K getKey() {
363             return this.consumerRecord.key();
364         }
365 
366         @Override
367         public V getValue() {
368             return this.consumerRecord.value();
369         }
370     }
371 }