View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.wikimedia.gobblin.copy;
18  
19  import static java.util.stream.Collectors.toSet;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Set;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.regex.Pattern;
32  
33  import org.apache.gobblin.configuration.ConfigurationKeys;
34  import org.apache.gobblin.configuration.SourceState;
35  import org.apache.gobblin.configuration.State;
36  import org.apache.gobblin.configuration.WorkUnitState;
37  import org.apache.gobblin.dataset.DatasetConstants;
38  import org.apache.gobblin.dataset.DatasetDescriptor;
39  import org.apache.gobblin.instrumented.Instrumented;
40  import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
41  import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
42  import org.apache.gobblin.metrics.MetricContext;
43  import org.apache.gobblin.metrics.event.lineage.LineageInfo;
44  import org.apache.gobblin.source.extractor.extract.EventBasedSource;
45  import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
46  import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
47  import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
48  import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
49  import org.apache.gobblin.source.extractor.extract.kafka.PreviousOffsetNotFoundException;
50  import org.apache.gobblin.source.extractor.extract.kafka.StartOffsetOutOfRangeException;
51  import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
52  import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
53  import org.apache.gobblin.source.workunit.Extract;
54  import org.apache.gobblin.source.workunit.MultiWorkUnit;
55  import org.apache.gobblin.source.workunit.WorkUnit;
56  import org.apache.gobblin.util.ClassAliasResolver;
57  import org.apache.gobblin.util.ConfigUtils;
58  import org.apache.gobblin.util.DatasetFilterUtils;
59  import org.apache.gobblin.util.ExecutorsUtils;
60  import org.apache.gobblin.util.dataset.DatasetUtils;
61  import org.slf4j.Logger;
62  import org.slf4j.LoggerFactory;
63  
64  import com.codahale.metrics.Timer;
65  import com.google.common.base.Function;
66  import com.google.common.base.Joiner;
67  import com.google.common.base.Optional;
68  import com.google.common.base.Preconditions;
69  import com.google.common.base.Splitter;
70  import com.google.common.base.Stopwatch;
71  import com.google.common.collect.Iterables;
72  import com.google.common.collect.Lists;
73  import com.google.common.collect.Maps;
74  import com.google.common.collect.Sets;
75  import com.typesafe.config.Config;
76  
77  import de.thetaphi.forbiddenapis.SuppressForbidden;
78  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
79  import lombok.Getter;
80  import lombok.Setter;
81  
82  
83  /**
84   * A {@link org.apache.gobblin.source.Source} implementation for Kafka source.
85   *
86   * <p>
87   * This is an updated copy of {@link org.apache.gobblin.source.extractor.extract.kafka.KafkaSource}
88   * in the gobblin-kafka-common module. This file should be deleted in favor of the upstream version when possible.
89   * Updates are:
90   * <ul>
91   *  <li>Make {@link #getFilteredTopics} method protected instead of private -- https://github.com/apache/gobblin/pull/3408</li>
92   * </ul>
93   * </p>
94   *
95   *  @author Ziyang Liu
96   */
97  @SuppressForbidden
98  @SuppressFBWarnings(justification = "Class copied from upstream Gobblin")
99  public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
100 
101     private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
102 
103     public static final String TOPIC_BLACKLIST = "topic.blacklist";
104     public static final String TOPIC_WHITELIST = "topic.whitelist";
105     public static final String LATEST_OFFSET = "latest";
106     public static final String EARLIEST_OFFSET = "earliest";
107     public static final String NEAREST_OFFSET = "nearest";
108     public static final String OFFSET_LOOKBACK = "offset_lookback";
109     public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
110     public static final String KAFKA_OFFSET_LOOKBACK = "kafka.offset.lookback";
111     public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = LATEST_OFFSET;
112     public static final String TOPICS_MOVE_TO_LATEST_OFFSET = "topics.move.to.latest.offset";
113     public static final String RESET_ON_OFFSET_OUT_OF_RANGE = "reset.on.offset.out.of.range";
114     public static final String DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE = NEAREST_OFFSET;
115     public static final String TOPIC_NAME = "topic.name";
116     public static final String PARTITION_ID = "partition.id";
117     public static final String LEADER_ID = "leader.id";
118     public static final String LEADER_HOSTANDPORT = "leader.hostandport";
119     public static final Extract.TableType DEFAULT_TABLE_TYPE = Extract.TableType.APPEND_ONLY;
120     public static final String DEFAULT_NAMESPACE_NAME = "KAFKA";
121     public static final String ALL_TOPICS = "all";
122     //A workunit property that contains the number of topic partitions for a given topic. Useful for
123     //workunit size estimation to assign weights to a given topic partition.
124     public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
125     public static final String AVG_RECORD_MILLIS = "avg.record.millis";
126     public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
127     public static final String STOP_FETCH_EPOCH_TIME = "stopFetchEpochTime";
128     public static final String PREVIOUS_START_FETCH_EPOCH_TIME = "previousStartFetchEpochTime";
129     public static final String PREVIOUS_STOP_FETCH_EPOCH_TIME = "previousStopFetchEpochTime";
130     public static final String PREVIOUS_LOW_WATERMARK = "previousLowWatermark";
131     public static final String PREVIOUS_HIGH_WATERMARK = "previousHighWatermark";
132     public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
133     public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
134     public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
135     public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
136     public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
137             "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
138     public static final String DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS =
139             "org.apache.gobblin.kafka.client.Kafka08ConsumerClient$Factory";
140     public static final String GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE =
141             "gobblin.kafka.shouldEnableDatasetStateStore";
142     public static final boolean DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE = false;
143     public static final String OFFSET_FETCH_TIMER = "offsetFetchTimer";
144     public static final String RECORD_LEVEL_SLA_MINUTES_KEY = "gobblin.kafka.recordLevelSlaMinutes";
145     public static final String MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = "gobblin.kafka.maxobservedLatencyInHours";
146     public static final Integer DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = 24;
147     public static final String OBSERVED_LATENCY_PRECISION = "gobblin.kafka.observedLatencyPrecision";
148     public static final Integer DEFAULT_OBSERVED_LATENCY_PRECISION = 3;
149     public static final String OBSERVED_LATENCY_MEASUREMENT_ENABLED = "gobblin.kafka.observedLatencyMeasurementEnabled";
150     public static final Boolean DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED = false;
151     public static final String RECORD_CREATION_TIMESTAMP_FIELD = "gobblin.kafka.recordCreationTimestampField";
152     public static final String RECORD_CREATION_TIMESTAMP_UNIT = "gobblin.kafka.recordCreationTimestampUnit";
153 
154     private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
155     private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
156     private final Map<KafkaPartition, Long> previousLowWatermarks = Maps.newConcurrentMap();
157     private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap();
158     private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap();
159     private final Map<KafkaPartition, Long> previousStartFetchEpochTimes = Maps.newConcurrentMap();
160     private final Map<KafkaPartition, Long> previousStopFetchEpochTimes = Maps.newConcurrentMap();
161 
162     private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet();
163 
164     private final AtomicInteger failToGetOffsetCount = new AtomicInteger(0);
165     private final AtomicInteger offsetTooEarlyCount = new AtomicInteger(0);
166     private final AtomicInteger offsetTooLateCount = new AtomicInteger(0);
167 
168     // sharing the kafka consumer may result in contention, so support thread local consumers
169     protected final ConcurrentLinkedQueue<GobblinKafkaConsumerClient> kafkaConsumerClientPool = new ConcurrentLinkedQueue();
170     protected static final ThreadLocal<GobblinKafkaConsumerClient> kafkaConsumerClient =
171             new ThreadLocal<GobblinKafkaConsumerClient>();
172     private GobblinKafkaConsumerClient sharedKafkaConsumerClient = null;
173     private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver =
174             new ClassAliasResolver<>(GobblinKafkaConsumerClientFactory.class);
175 
176     private volatile boolean doneGettingAllPreviousOffsets = false;
177     private Extract.TableType tableType;
178     private String extractNamespace;
179     private boolean isFullExtract;
180     private String kafkaBrokers;
181     private boolean shouldEnableDatasetStateStore;
182     private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false);
183     private Set<String> topicsToProcess;
184 
185     private MetricContext metricContext;
186 
187     protected Optional<LineageInfo> lineageInfo;
188 
189     private List<String> getLimiterExtractorReportKeys() {
190         List<String> keyNames = new ArrayList<>();
191         keyNames.add(KafkaSource.TOPIC_NAME);
192         keyNames.add(KafkaSource.PARTITION_ID);
193         return keyNames;
194     }
195 
196     private void setLimiterReportKeyListToWorkUnits(List<WorkUnit> workUnits, List<String> keyNameList) {
197         if (keyNameList.isEmpty()) {
198             return;
199         }
200         String keyList = Joiner.on(',').join(keyNameList.iterator());
201         for (WorkUnit workUnit : workUnits) {
202             workUnit.setProp(LimiterConfigurationKeys.LIMITER_REPORT_KEY_LIST, keyList);
203         }
204     }
205 
206     @Override
207     public List<WorkUnit> getWorkunits(SourceState state) {
208         this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class);
209         this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
210 
211         Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
212         if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
213             String tableTypeStr =
214                     state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
215             tableType = Extract.TableType.valueOf(tableTypeStr);
216             extractNamespace =
217                     state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
218         } else {
219             // To be compatible, reject table type and namespace configuration keys as previous implementation
220             tableType = KafkaSource.DEFAULT_TABLE_TYPE;
221             extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
222         }
223         isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
224         kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
225         this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
226                 DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);
227 
228         try {
229             Config config = ConfigUtils.propertiesToConfig(state.getProperties());
230             GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
231                     .resolveClass(
232                             state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
233                                     DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
234 
235             this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
236 
237             List<KafkaTopic> topics = getFilteredTopics(state);
238             this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());
239 
240             for (String topic : this.topicsToProcess) {
241                 LOG.info("Discovered topic " + topic);
242             }
243             Map<String, State> topicSpecificStateMap =
244                     DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {
245 
246                         @Override
247                         public String apply(KafkaTopic topic) {
248                             return topic.getName();
249                         }
250                     }), state);
251 
252             for (KafkaTopic topic : topics) {
253                 if (topic.getTopicSpecificState().isPresent()) {
254                     topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
255                             .addAllIfNotExist(topic.getTopicSpecificState().get());
256                 }
257             }
258 
259             int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
260                     ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
261             ExecutorService threadPool =
262                     Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));
263 
264             if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
265                     ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
266                 this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
267             } else {
268                 // preallocate one client per thread
269                 populateClientPool(numOfThreads, kafkaConsumerClientFactory, config);
270             }
271 
272             Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
273 
274             for (KafkaTopic topic : topics) {
275                 threadPool.submit(
276                         new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
277                                 workUnits));
278             }
279 
280             ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
281             LOG.info(String.format("Created workunits for %d topics in %d seconds", workUnits.size(),
282                     createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));
283 
284             // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
285             // but aren't processed).
286             createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state);
287             //determine the number of mappers
288             int maxMapperNum =
289                     state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
290             KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
291             int numOfMultiWorkunits = maxMapperNum;
292             if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
293                 double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
294                 LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
295                 double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
296                 numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
297                 numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
298             }
299             addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
300             List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits);
301             setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
302             return workUnitList;
303         } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
304             throw new RuntimeException("Checked exception caught", e);
305         } catch (Throwable t) {
306             throw new RuntimeException("Unexpected throwable caught, ", t);
307         } finally {
308             try {
309                 GobblinKafkaConsumerClient consumerClient = this.kafkaConsumerClient.get();
310                 if (consumerClient != null) {
311                     consumerClient.close();
312                 }
313                 // cleanup clients from pool
314                 for (GobblinKafkaConsumerClient client: kafkaConsumerClientPool) {
315                     client.close();
316                 }
317             } catch (Throwable t) {
318                 //Swallow any exceptions in the finally{..} block to allow potential exceptions from the main try{..} block to be
319                 //propagated
320                 LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
321             }
322         }
323     }
324 
325     protected void populateClientPool(int count,
326                                       GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
327                                       Config config) {
328         for (int i = 0; i < count; i++) {
329             kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
330         }
331     }
332 
333     private void addTopicSpecificPropsToWorkUnits(Map<String, List<WorkUnit>> workUnits, Map<String, State> topicSpecificStateMap) {
334         for (List<WorkUnit> workUnitList : workUnits.values()) {
335             for (WorkUnit workUnit : workUnitList) {
336                 addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);
337             }
338         }
339     }
340 
341     private void addTopicSpecificPropsToWorkUnit(WorkUnit workUnit, Map<String, State> topicSpecificStateMap) {
342         if (workUnit instanceof MultiWorkUnit) {
343             for (WorkUnit wu : ((MultiWorkUnit) workUnit).getWorkUnits()) {
344                 addTopicSpecificPropsToWorkUnit(wu, topicSpecificStateMap);
345             }
346         } else if (!workUnit.contains(TOPIC_NAME)) {
347             return;
348         } else {
349             addDatasetUrnOptionally(workUnit);
350             if (topicSpecificStateMap == null) {
351                 return;
352             } else if (!topicSpecificStateMap.containsKey(workUnit.getProp(TOPIC_NAME))) {
353                 return;
354             } else {
355                 workUnit.addAll(topicSpecificStateMap.get(workUnit.getProp(TOPIC_NAME)));
356             }
357         }
358     }
359 
360     private void addDatasetUrnOptionally(WorkUnit workUnit) {
361         if (!this.shouldEnableDatasetStateStore) {
362             return;
363         }
364         workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, workUnit.getProp(TOPIC_NAME));
365     }
366 
367     private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>> workUnits,
368                                                           Map<String, State> topicSpecificStateMap, SourceState state) {
369 
370         // in case the previous offset not been set
371         getAllPreviousOffsetState(state);
372 
373         // For each partition that has a previous offset, create an empty WorkUnit for it if
374         // it is not in this.partitionsToBeProcessed.
375         for (Map.Entry<KafkaPartition, Long> entry : this.previousOffsets.entrySet()) {
376             KafkaPartition partition = entry.getKey();
377             if (!this.partitionsToBeProcessed.contains(partition)) {
378                 String topicName = partition.getTopicName();
379                 if (!this.isDatasetStateEnabled.get() || this.topicsToProcess.contains(topicName)) {
380                     long previousOffset = entry.getValue();
381                     WorkUnit emptyWorkUnit = createEmptyWorkUnit(partition, previousOffset,
382                             this.previousOffsetFetchEpochTimes.get(partition),
383                             Optional.fromNullable(topicSpecificStateMap.get(partition.getTopicName())));
384 
385                     if (workUnits.containsKey(topicName)) {
386                         workUnits.get(topicName).add(emptyWorkUnit);
387                     } else {
388                         workUnits.put(topicName, Lists.newArrayList(emptyWorkUnit));
389                     }
390                 }
391             }
392         }
393     }
394 
395     /*
396      * This function need to be thread safe since it is called in the Runnable
397      */
398     private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) {
399         Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time();
400         boolean topicQualified = isTopicQualified(topic);
401         context.close();
402 
403         List<WorkUnit> workUnits = Lists.newArrayList();
404         List<KafkaPartition> topicPartitions = topic.getPartitions();
405         for (KafkaPartition partition : topicPartitions) {
406             WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
407             if (workUnit != null) {
408                 // For disqualified topics, for each of its workunits set the high watermark to be the same
409                 // as the low watermark, so that it will be skipped.
410                 if (!topicQualified) {
411                     skipWorkUnit(workUnit);
412                 }
413                 workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
414                 workUnits.add(workUnit);
415             }
416         }
417         this.partitionsToBeProcessed.addAll(topic.getPartitions());
418         return workUnits;
419     }
420 
421     /**
422      * Whether a {@link KafkaTopic} is qualified to be pulled.
423      *
424      * This method can be overridden by subclasses for verifying topic eligibility, e.g., one may want to
425      * skip a topic if its schema cannot be found in the schema registry.
426      */
427     protected boolean isTopicQualified(KafkaTopic topic) {
428         return true;
429     }
430 
431     @SuppressWarnings("deprecation")
432     private static void skipWorkUnit(WorkUnit workUnit) {
433         workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, workUnit.getLowWaterMark());
434     }
435 
436     private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceState state,
437                                                   Optional<State> topicSpecificState) {
438         Offsets offsets = new Offsets();
439 
440         boolean failedToGetKafkaOffsets = false;
441 
442         try (Timer.Context context = this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
443             offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
444             offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
445             offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
446         } catch (Throwable t) {
447             failedToGetKafkaOffsets = true;
448             LOG.error("Caught error in creating work unit for {}", partition, t);
449         }
450 
451         long previousOffset = 0;
452         long previousOffsetFetchEpochTime = 0;
453         boolean previousOffsetNotFound = false;
454         try {
455             previousOffset = getPreviousOffsetForPartition(partition, state);
456             offsets.setPreviousEndOffset(previousOffset);
457             offsets.setPreviousStartOffset(getPreviousLowWatermark(partition, state));
458             offsets.setPreviousStartFetchEpochTime(getPreviousStartFetchEpochTimeForPartition(partition, state));
459             offsets.setPreviousStopFetchEpochTime(getPreviousStopFetchEpochTimeForPartition(partition, state));
460             offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
461             previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
462             offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
463         } catch (PreviousOffsetNotFoundException e) {
464             previousOffsetNotFound = true;
465         }
466 
467         if (failedToGetKafkaOffsets) {
468 
469             // Increment counts, which will be reported as job metrics
470             this.failToGetOffsetCount.incrementAndGet();
471 
472             // When unable to get earliest/latest offsets from Kafka, skip the partition and create an empty workunit,
473             // so that previousOffset is persisted.
474             LOG.warn(String
475                     .format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.",
476                             partition));
477             return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime,
478                     topicSpecificState);
479         }
480 
481         if (shouldMoveToLatestOffset(partition, state)) {
482             offsets.startAtLatestOffset();
483         } else if (previousOffsetNotFound) {
484 
485             /**
486              * When previous offset cannot be found, either start at earliest offset, latest offset, go back with (latest - lookback)
487              * (long value to be deducted from latest offset in order to avoid data loss) or skip the partition
488              * (no need to create an empty workunit in this case since there's no offset to persist).
489              * In case of no previous state OFFSET_LOOKBACK will make sure to avoid consuming huge amount of data (earlist) and data loss (latest offset)
490              * lookback can be set to any long value where (latest-lookback) is nearest offset for each partition. If computed offset is out of range then
491              * partition will be consumed from latest offset
492              **/
493             String offsetNotFoundMsg = String.format("Previous offset for partition %s does not exist. ", partition);
494             String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET, DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
495             if (offsetOption.equals(LATEST_OFFSET)) {
496                 LOG.warn(offsetNotFoundMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
497                 offsets.startAtLatestOffset();
498             } else if (offsetOption.equals(EARLIEST_OFFSET)) {
499                 LOG.warn(
500                         offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
501                 offsets.startAtEarliestOffset();
502             } else if (offsetOption.equals(OFFSET_LOOKBACK)) {
503                 long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
504                 long latestOffset = offsets.getLatestOffset();
505                 long offset = latestOffset - lookbackOffsetRange;
506                 LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ]  start offset: " + offset);
507                 try {
508                     offsets.startAt(offset);
509                 } catch (StartOffsetOutOfRangeException e) {
510                     // Increment counts, which will be reported as job metrics
511                     if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
512                         this.offsetTooEarlyCount.incrementAndGet();
513                     } else {
514                         this.offsetTooLateCount.incrementAndGet();
515                     }
516 
517                     // When above computed offset (latest-lookback) is out of range, either start at earliest, latest or nearest offset, or skip the
518                     // partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
519                     String offsetOutOfRangeMsg = String.format(
520                             "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
521                             partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
522                     offsetOption =
523                             state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
524                     if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
525                             && offsets.getStartOffset() >= offsets.getLatestOffset())) {
526                         LOG.warn(
527                                 offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
528                         offsets.startAtLatestOffset();
529                     } else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
530                         LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
531                                 .getEarliestOffset());
532                         offsets.startAtEarliestOffset();
533                     } else {
534                         LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
535                         return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
536                     }
537                 }
538             }
539             else {
540                 LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
541                 return null;
542             }
543         } else {
544             try {
545                 offsets.startAt(previousOffset);
546             } catch (StartOffsetOutOfRangeException e) {
547 
548                 // Increment counts, which will be reported as job metrics
549                 if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
550                     this.offsetTooEarlyCount.incrementAndGet();
551                 } else {
552                     this.offsetTooLateCount.incrementAndGet();
553                 }
554 
555                 // When previous offset is out of range, either start at earliest, latest or nearest offset, or skip the
556                 // partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
557                 String offsetOutOfRangeMsg = String.format(
558                         "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
559                         partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
560                 String offsetOption =
561                         state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
562                 if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
563                         && offsets.getStartOffset() >= offsets.getLatestOffset())) {
564                     LOG.warn(
565                             offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
566                     offsets.startAtLatestOffset();
567                 } else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
568                     LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
569                             .getEarliestOffset());
570                     offsets.startAtEarliestOffset();
571                 } else {
572                     LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
573                     return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
574                 }
575             }
576         }
577         WorkUnit workUnit = getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
578         addSourceStatePropsToWorkUnit(workUnit, state);
579         return workUnit;
580     }
581 
582     /**
583      * A method to copy specific properties from the {@link SourceState} object to {@link WorkUnitState}.
584      * @param workUnit WorkUnit state
585      * @param state Source state
586      */
587     private void addSourceStatePropsToWorkUnit(WorkUnit workUnit, SourceState state) {
588         //Copy the SLA config from SourceState to WorkUnitState.
589         if (state.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
590             workUnit.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, state.getProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
591         }
592         boolean isobservedLatencyMeasurementEnabled = state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED);
593         if (isobservedLatencyMeasurementEnabled) {
594             Preconditions.checkArgument(state.contains(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD), "Missing config key: " + KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD);
595             workUnit.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, isobservedLatencyMeasurementEnabled);
596             workUnit.setProp(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS,
597                     state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS, DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS));
598             workUnit.setProp(KafkaSource.OBSERVED_LATENCY_PRECISION,
599                     state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION));
600             workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD));
601             workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
602         }
603     }
604 
605     private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
606         getAllPreviousOffsetState(state);
607         return this.previousStartFetchEpochTimes.containsKey(partition) ?
608                 this.previousStartFetchEpochTimes.get(partition) : 0;
609     }
610 
611     private long getPreviousStopFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
612         getAllPreviousOffsetState(state);
613         return this.previousStopFetchEpochTimes.containsKey(partition) ?
614                 this.previousStopFetchEpochTimes.get(partition) : 0;
615     }
616 
617     private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state)
618             throws PreviousOffsetNotFoundException {
619 
620         getAllPreviousOffsetState(state);
621 
622         if (this.previousOffsetFetchEpochTimes.containsKey(partition)) {
623             return this.previousOffsetFetchEpochTimes.get(partition);
624         }
625 
626         throw new PreviousOffsetNotFoundException(String
627                 .format("Previous offset fetch epoch time for topic %s, partition %s not found.", partition.getTopicName(),
628                         partition.getId()));
629     }
630 
631     private long getPreviousOffsetForPartition(KafkaPartition partition, SourceState state)
632             throws PreviousOffsetNotFoundException {
633 
634         getAllPreviousOffsetState(state);
635 
636         if (this.previousOffsets.containsKey(partition)) {
637             return this.previousOffsets.get(partition);
638         }
639         throw new PreviousOffsetNotFoundException(String
640                 .format("Previous offset for topic %s, partition %s not found.", partition.getTopicName(), partition.getId()));
641     }
642 
643     private long getPreviousExpectedHighWatermark(KafkaPartition partition, SourceState state)
644             throws PreviousOffsetNotFoundException {
645 
646         getAllPreviousOffsetState(state);
647 
648         if (this.previousExpectedHighWatermarks.containsKey(partition)) {
649             return this.previousExpectedHighWatermarks.get(partition);
650         }
651         throw new PreviousOffsetNotFoundException(String
652                 .format("Previous expected high watermark for topic %s, partition %s not found.", partition.getTopicName(),
653                         partition.getId()));
654     }
655 
656     private long getPreviousLowWatermark(KafkaPartition partition, SourceState state)
657             throws PreviousOffsetNotFoundException {
658 
659         getAllPreviousOffsetState(state);
660 
661         if (this.previousLowWatermarks.containsKey(partition)) {
662             return this.previousLowWatermarks.get(partition);
663         }
664         throw new PreviousOffsetNotFoundException(String
665                 .format("Previous low watermark for topic %s, partition %s not found.", partition.getTopicName(),
666                         partition.getId()));
667     }
668 
669     // need to be synchronized as this.previousOffsets, this.previousExpectedHighWatermarks, and
670     // this.previousOffsetFetchEpochTimes need to be initialized once
671     private synchronized void getAllPreviousOffsetState(SourceState state) {
672         if (this.doneGettingAllPreviousOffsets) {
673             return;
674         }
675         this.previousOffsets.clear();
676         this.previousLowWatermarks.clear();
677         this.previousExpectedHighWatermarks.clear();
678         this.previousOffsetFetchEpochTimes.clear();
679         this.previousStartFetchEpochTimes.clear();
680         this.previousStopFetchEpochTimes.clear();
681         Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns();
682 
683         if (!workUnitStatesByDatasetUrns.isEmpty() &&
684                 !(workUnitStatesByDatasetUrns.size() == 1 && workUnitStatesByDatasetUrns.keySet().iterator().next()
685                         .equals(""))) {
686             this.isDatasetStateEnabled.set(true);
687         }
688 
689         for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
690             List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
691             WorkUnit workUnit = workUnitState.getWorkunit();
692 
693             MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class);
694             MultiLongWatermark previousLowWatermark = workUnit.getLowWatermark(MultiLongWatermark.class);
695             MultiLongWatermark previousExpectedHighWatermark = workUnit.getExpectedHighWatermark(MultiLongWatermark.class);
696             Preconditions.checkArgument(partitions.size() == watermark.size(), String
697                     .format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions,
698                             watermark));
699 
700             for (int i = 0; i < partitions.size(); i++) {
701                 KafkaPartition partition = partitions.get(i);
702 
703                 if (watermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
704                     this.previousOffsets.put(partition, watermark.get(i));
705                 }
706 
707                 if (previousLowWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
708                     this.previousLowWatermarks.put(partition, previousLowWatermark.get(i));
709                 }
710 
711                 if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
712                     this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i));
713                 }
714 
715                 this.previousOffsetFetchEpochTimes.put(partition,
716                         KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, OFFSET_FETCH_EPOCH_TIME, i));
717 
718                 this.previousStartFetchEpochTimes.put(partition,
719                         KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, START_FETCH_EPOCH_TIME, i));
720 
721                 this.previousStopFetchEpochTimes.put(partition,
722                         KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, STOP_FETCH_EPOCH_TIME, i));
723             }
724         }
725 
726         this.doneGettingAllPreviousOffsets = true;
727     }
728 
729     /**
730      * A topic can be configured to move to the latest offset in {@link #TOPICS_MOVE_TO_LATEST_OFFSET}.
731      *
732      * Need to be synchronized as access by multiple threads
733      */
734     private synchronized boolean shouldMoveToLatestOffset(KafkaPartition partition, SourceState state) {
735         if (!state.contains(TOPICS_MOVE_TO_LATEST_OFFSET)) {
736             return false;
737         }
738         if (this.moveToLatestTopics.isEmpty()) {
739             this.moveToLatestTopics.addAll(
740                     Splitter.on(',').trimResults().omitEmptyStrings().splitToList(state.getProp(TOPICS_MOVE_TO_LATEST_OFFSET)));
741         }
742         return this.moveToLatestTopics.contains(partition.getTopicName()) || this.moveToLatestTopics.contains(ALL_TOPICS);
743     }
744 
745     // thread safe
746     private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset, long previousFetchEpochTime,
747                                          Optional<State> topicSpecificState) {
748         Offsets offsets = new Offsets();
749         offsets.setEarliestOffset(previousOffset);
750         offsets.setLatestOffset(previousOffset);
751         offsets.startAtEarliestOffset();
752         offsets.setOffsetFetchEpochTime(previousFetchEpochTime);
753         return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
754     }
755 
756     private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets offsets,
757                                                   Optional<State> topicSpecificState) {
758         // Default to job level configurations
759         Extract.TableType currentTableType = tableType;
760         String currentExtractNamespace = extractNamespace;
761         String currentExtractTableName = partition.getTopicName();
762         boolean isCurrentFullExtract = isFullExtract;
763         // Update to topic specific configurations if any
764         if (topicSpecificState.isPresent()) {
765             State topicState = topicSpecificState.get();
766             if (topicState.contains(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY)) {
767                 currentTableType = Extract.TableType.valueOf(topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY));
768             }
769             currentExtractNamespace = topicState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, extractNamespace);
770             currentExtractTableName =
771                     topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName());
772             isCurrentFullExtract = topicState.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY, isFullExtract);
773         }
774 
775         Extract extract = this.createExtract(currentTableType, currentExtractNamespace, currentExtractTableName);
776         if (isCurrentFullExtract) {
777             extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
778         }
779 
780         WorkUnit workUnit = WorkUnit.create(extract);
781         workUnit.setProp(TOPIC_NAME, partition.getTopicName());
782         addDatasetUrnOptionally(workUnit);
783         workUnit.setProp(PARTITION_ID, partition.getId());
784         workUnit.setProp(LEADER_ID, partition.getLeader().getId());
785         workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
786         workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
787         workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
788         workUnit.setProp(PREVIOUS_START_FETCH_EPOCH_TIME, offsets.getPreviousStartFetchEpochTime());
789         workUnit.setProp(PREVIOUS_STOP_FETCH_EPOCH_TIME, offsets.getPreviousStopFetchEpochTime());
790         workUnit.setProp(PREVIOUS_LOW_WATERMARK, offsets.getPreviousStartOffset());
791         workUnit.setProp(PREVIOUS_HIGH_WATERMARK, offsets.getPreviousEndOffset());
792         workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime());
793         workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime());
794         workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset());
795 
796         // Add lineage info
797         DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
798         source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers);
799         if (this.lineageInfo.isPresent()) {
800             this.lineageInfo.get().setSource(source, workUnit);
801         }
802 
803         LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition,
804                 offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset()));
805 
806         return workUnit;
807     }
808 
809     /**
810      * Return topics to be processed filtered by job-level whitelist and blacklist.
811      */
812     protected List<KafkaTopic> getFilteredTopics(SourceState state) {
813         List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST);
814         List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST);
815         return this.kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
816     }
817 
818     @Override
819     public void shutdown(SourceState state) {
820         state.setProp(ConfigurationKeys.OFFSET_TOO_EARLY_COUNT, this.offsetTooEarlyCount);
821         state.setProp(ConfigurationKeys.OFFSET_TOO_LATE_COUNT, this.offsetTooLateCount);
822         state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT, this.failToGetOffsetCount);
823     }
824 
825     /**
826      * This class contains startOffset, earliestOffset and latestOffset for a Kafka partition.
827      */
828     @SuppressForbidden
829     private static class Offsets {
830 
831         @Getter
832         private long startOffset = 0;
833 
834         @Getter
835         @Setter
836         private long earliestOffset = 0;
837 
838         @Getter
839         @Setter
840         private long latestOffset = 0;
841 
842         @Getter
843         @Setter
844         private long offsetFetchEpochTime = 0;
845 
846         @Getter
847         @Setter
848         private long previousOffsetFetchEpochTime = 0;
849 
850         @Getter
851         @Setter
852         private long previousLatestOffset = 0;
853 
854         // previous low watermark
855         @Getter
856         @Setter
857         private long previousStartOffset = 0;
858 
859         // previous actual high watermark
860         @Getter
861         @Setter
862         private long previousEndOffset = 0;
863 
864         @Getter
865         @Setter
866         private long previousStartFetchEpochTime = 0;
867 
868         @Getter
869         @Setter
870         private long previousStopFetchEpochTime = 0;
871 
872         private void startAt(long offset)
873                 throws StartOffsetOutOfRangeException {
874             if (offset < this.earliestOffset || offset > this.latestOffset) {
875                 throw new StartOffsetOutOfRangeException(String
876                         .format("start offset = %d, earliest offset = %d, latest offset = %d", offset, this.earliestOffset,
877                                 this.latestOffset));
878             }
879             this.startOffset = offset;
880         }
881 
882         private void startAtEarliestOffset() {
883             this.startOffset = this.earliestOffset;
884         }
885 
886         private void startAtLatestOffset() {
887             this.startOffset = this.latestOffset;
888         }
889     }
890 
891     private class WorkUnitCreator implements Runnable {
892         public static final String WORK_UNITS_FOR_TOPIC_TIMER = "workUnitsForTopicTimer";
893         private final KafkaTopic topic;
894         private final SourceState state;
895         private final Optional<State> topicSpecificState;
896         private final Map<String, List<WorkUnit>> allTopicWorkUnits;
897 
898         WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
899                         Map<String, List<WorkUnit>> workUnits) {
900             this.topic = topic;
901             this.state = state;
902             this.topicSpecificState = topicSpecificState;
903             this.allTopicWorkUnits = workUnits;
904         }
905 
906         @Override
907         public void run() {
908             try (Timer.Context context = metricContext.timer(WORK_UNITS_FOR_TOPIC_TIMER).time()) {
909                 // use shared client if configure, otherwise set a thread local one from the pool
910                 if (KafkaSource.this.sharedKafkaConsumerClient != null) {
911                     KafkaSource.this.kafkaConsumerClient.set(KafkaSource.this.sharedKafkaConsumerClient);
912                 } else {
913                     GobblinKafkaConsumerClient client = KafkaSource.this.kafkaConsumerClientPool.poll();
914                     Preconditions.checkNotNull(client, "Unexpectedly ran out of preallocated consumer clients");
915                     KafkaSource.this.kafkaConsumerClient.set(client);
916                 }
917 
918                 this.allTopicWorkUnits.put(this.topic.getName(),
919                         KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState));
920             } catch (Throwable t) {
921                 LOG.error("Caught error in creating work unit for " + this.topic.getName(), t);
922                 throw new RuntimeException(t);
923             } finally {
924                 // return the client to the pool
925                 if (KafkaSource.this.sharedKafkaConsumerClient == null) {
926                     KafkaSource.this.kafkaConsumerClientPool.offer(KafkaSource.this.kafkaConsumerClient.get());
927                     KafkaSource.this.kafkaConsumerClient.remove();
928                 }
929             }
930         }
931     }
932 }