View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.wikimedia.gobblin.kafka;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import java.util.NoSuchElementException;
23  import java.util.Optional;
24  
25  import org.apache.kafka.common.record.TimestampType;
26  import org.apache.gobblin.configuration.WorkUnitState;
27  import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
28  import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
29  import org.apache.gobblin.source.extractor.DataRecordException;
30  import org.apache.gobblin.source.extractor.extract.kafka.KafkaExtractor;
31  import org.wikimedia.gobblin.copy.Kafka1ConsumerClient;
32  import org.wikimedia.gobblin.TimestampedRecord;
33  
34  
35  import com.google.common.collect.ImmutableSet;
36  
37  import lombok.extern.slf4j.Slf4j;
38  
39  
40  /**
41   * Extracts {@link TimestampedRecord} records from Kafka using Kafka1 client.
42   * The extracted record timestamp is set to the kafka record timestamp if the kafka
43   * record timestamp-type is in the list of extracted-timestamp-types defined using
44   * the property source.kafka.extract.timestampTypes (accepted values are CreateTime
45   * and LogAppendTime, or unset-property). If the timestamp is not extracted from the
46   * kafka record, it is set to current-timestamp if the state property
47   * source.kafka.extract.timestamp.defaultCurrent is set to true, and is absent otherwise.
48   * @param <P> The type of the deserialized payload from the kafka record.
49   */
50  @Slf4j
51  public class Kafka1TimestampedRecordExtractor<P> extends KafkaExtractor<Object, Object> {
52  
53      public static final String EXTRACTED_TIMESTAMP_TYPES_KEY = "source.kafka.extract.timestampTypes";
54      public static final String CURRENT_TIMESTAMP_AS_DEFAULT_KEY = "source.kafka.extract.timestamp.defaultCurrent";
55  
56      private final ImmutableSet<TimestampType> extractedTimestampTypes;
57      private final boolean useCurrentTimestampAsDefault;
58  
59      public Kafka1TimestampedRecordExtractor(WorkUnitState state) {
60          super(state);
61          this.extractedTimestampTypes = getExtractedTimestampTypes(state);
62          this.useCurrentTimestampAsDefault = getUseCurrentTimestampAsDefault(state);
63      }
64  
65      /**
66       * Uses state EXTRACTED_TIMESTAMP_TYPES_KEY property to construct a list
67       * of {@link TimestampType} defining the types of kafka-timestamps that
68       * will lead to extracted records having kafka timestamp set.
69       * Accepted values are CreateTime and LogAppendTime, or unset-property (leading
70       * to all records having the default timestamp).
71       *
72       * @return The {@link TimestampType} list, empty if the property is not present.
73       * @Throws NoSuchElementException in case of invalid {@link TimestampType}
74       * @throws IllegalArgumentException if {@link TimestampType} is not of accepted values
75       */
76      private ImmutableSet<TimestampType> getExtractedTimestampTypes(WorkUnitState state) {
77          ImmutableSet.Builder<TimestampType> result = ImmutableSet.builder();
78          if (state.contains(EXTRACTED_TIMESTAMP_TYPES_KEY)) {
79              for (String timestampTypeName : state.getPropAsList(EXTRACTED_TIMESTAMP_TYPES_KEY)) {
80                  TimestampType timestampType = TimestampType.forName(timestampTypeName);
81                  if (timestampType == TimestampType.CREATE_TIME || timestampType == TimestampType.LOG_APPEND_TIME) {
82                      result.add(timestampType);
83                  } else {
84                      throw new IllegalArgumentException("Invalid TimestampType to extract: " + timestampType +
85                              ". Accepted values are CreateTime and/or LogAppendTime");
86                  }
87              }
88          }
89          return result.build();
90      }
91  
92      /**
93       * Uses state CURRENT_TIMESTAMP_AS_DEFAULT_KEY property to define if record timestamp
94       * should be set to current-time if kafka timestamp can't be used (by opposition to be absent).
95       *
96       * @return The {@link TimestampType} list, empty if the property is not present.
97       */
98      private boolean getUseCurrentTimestampAsDefault(WorkUnitState state) throws NoSuchElementException {
99          return  (state.contains(CURRENT_TIMESTAMP_AS_DEFAULT_KEY) &&
100                 state.getPropAsBoolean(CURRENT_TIMESTAMP_AS_DEFAULT_KEY));
101     }
102 
103     /**
104      * We need to override this protected method as the kafka timestamp information is not
105      * available in the {@link KafkaExtractor#decodeRecord(ByteArrayBasedKafkaRecord)} abstract method.
106      */
107     @Override
108     protected TimestampedRecord<P> decodeKafkaMessage(KafkaConsumerRecord message) throws DataRecordException, IOException {
109         if (message instanceof Kafka1ConsumerClient.Kafka1ConsumerRecord) {
110             Kafka1ConsumerClient.Kafka1ConsumerRecord typedRecord = (Kafka1ConsumerClient.Kafka1ConsumerRecord)message;
111             // if value is null then this is a bad record that is returned for further error handling, so raise an error
112             if (typedRecord.getValue() == null) {
113                 throw new DataRecordException("Could not decode Kafka record");
114             }
115 
116             Optional<Long> timestamp = Optional.empty();
117             if (extractedTimestampTypes.contains(typedRecord.getTimestampType())) {
118                 timestamp = Optional.of(typedRecord.getTimestamp());
119             } else if (useCurrentTimestampAsDefault) {
120                 timestamp = Optional.of(System.currentTimeMillis());
121             }
122             return new TimestampedRecord(typedRecord.getValue(), timestamp);
123         } else {
124             throw new IllegalStateException(
125                     "Unsupported KafkaConsumerRecord type. The record should be Kafka1ConsumerRecord but is " +
126                     message.getClass().getCanonicalName());
127         }
128     }
129 
130     /**
131      * Unreachable, as {@link KafkaExtractor#decodeKafkaMessage(KafkaConsumerRecord)} is override not to use this method.
132      */
133     @Override
134     protected Object decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
135         throw new IllegalArgumentException("Could not decode Kafka record");
136     }
137 
138     // TODO: Implement schema getter option (for events!)
139     @Override
140     public Object getSchema() {
141         return null;
142     }
143 
144 }