KafkaSource.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wikimedia.gobblin.copy;
import static java.util.stream.Collectors.toSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.EventBasedSource;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
import org.apache.gobblin.source.extractor.extract.kafka.PreviousOffsetNotFoundException;
import org.apache.gobblin.source.extractor.extract.kafka.StartOffsetOutOfRangeException;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.dataset.DatasetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
import de.thetaphi.forbiddenapis.SuppressForbidden;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.Getter;
import lombok.Setter;
/**
* A {@link org.apache.gobblin.source.Source} implementation for Kafka source.
*
* <p>
* This is an updated copy of {@link org.apache.gobblin.source.extractor.extract.kafka.KafkaSource}
* in the gobblin-kafka-common module. This file should be deleted in favor of the upstream version when possible.
* Updates are:
* <ul>
* <li>Make {@link #getFilteredTopics} method protected instead of private -- https://github.com/apache/gobblin/pull/3408</li>
* </ul>
* </p>
*
* @author Ziyang Liu
*/
@SuppressForbidden
@SuppressFBWarnings(justification = "Class copied from upstream Gobblin")
public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
public static final String TOPIC_BLACKLIST = "topic.blacklist";
public static final String TOPIC_WHITELIST = "topic.whitelist";
public static final String LATEST_OFFSET = "latest";
public static final String EARLIEST_OFFSET = "earliest";
public static final String NEAREST_OFFSET = "nearest";
public static final String OFFSET_LOOKBACK = "offset_lookback";
public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
public static final String KAFKA_OFFSET_LOOKBACK = "kafka.offset.lookback";
public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = LATEST_OFFSET;
public static final String TOPICS_MOVE_TO_LATEST_OFFSET = "topics.move.to.latest.offset";
public static final String RESET_ON_OFFSET_OUT_OF_RANGE = "reset.on.offset.out.of.range";
public static final String DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE = NEAREST_OFFSET;
public static final String TOPIC_NAME = "topic.name";
public static final String PARTITION_ID = "partition.id";
public static final String LEADER_ID = "leader.id";
public static final String LEADER_HOSTANDPORT = "leader.hostandport";
public static final Extract.TableType DEFAULT_TABLE_TYPE = Extract.TableType.APPEND_ONLY;
public static final String DEFAULT_NAMESPACE_NAME = "KAFKA";
public static final String ALL_TOPICS = "all";
//A workunit property that contains the number of topic partitions for a given topic. Useful for
//workunit size estimation to assign weights to a given topic partition.
public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
public static final String AVG_RECORD_MILLIS = "avg.record.millis";
public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
public static final String STOP_FETCH_EPOCH_TIME = "stopFetchEpochTime";
public static final String PREVIOUS_START_FETCH_EPOCH_TIME = "previousStartFetchEpochTime";
public static final String PREVIOUS_STOP_FETCH_EPOCH_TIME = "previousStopFetchEpochTime";
public static final String PREVIOUS_LOW_WATERMARK = "previousLowWatermark";
public static final String PREVIOUS_HIGH_WATERMARK = "previousHighWatermark";
public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
"gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
public static final String DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS =
"org.apache.gobblin.kafka.client.Kafka08ConsumerClient$Factory";
public static final String GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE =
"gobblin.kafka.shouldEnableDatasetStateStore";
public static final boolean DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE = false;
public static final String OFFSET_FETCH_TIMER = "offsetFetchTimer";
public static final String RECORD_LEVEL_SLA_MINUTES_KEY = "gobblin.kafka.recordLevelSlaMinutes";
public static final String MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = "gobblin.kafka.maxobservedLatencyInHours";
public static final Integer DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = 24;
public static final String OBSERVED_LATENCY_PRECISION = "gobblin.kafka.observedLatencyPrecision";
public static final Integer DEFAULT_OBSERVED_LATENCY_PRECISION = 3;
public static final String OBSERVED_LATENCY_MEASUREMENT_ENABLED = "gobblin.kafka.observedLatencyMeasurementEnabled";
public static final Boolean DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED = false;
public static final String RECORD_CREATION_TIMESTAMP_FIELD = "gobblin.kafka.recordCreationTimestampField";
public static final String RECORD_CREATION_TIMESTAMP_UNIT = "gobblin.kafka.recordCreationTimestampUnit";
private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
private final Map<KafkaPartition, Long> previousLowWatermarks = Maps.newConcurrentMap();
private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap();
private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap();
private final Map<KafkaPartition, Long> previousStartFetchEpochTimes = Maps.newConcurrentMap();
private final Map<KafkaPartition, Long> previousStopFetchEpochTimes = Maps.newConcurrentMap();
private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet();
private final AtomicInteger failToGetOffsetCount = new AtomicInteger(0);
private final AtomicInteger offsetTooEarlyCount = new AtomicInteger(0);
private final AtomicInteger offsetTooLateCount = new AtomicInteger(0);
// sharing the kafka consumer may result in contention, so support thread local consumers
protected final ConcurrentLinkedQueue<GobblinKafkaConsumerClient> kafkaConsumerClientPool = new ConcurrentLinkedQueue();
protected static final ThreadLocal<GobblinKafkaConsumerClient> kafkaConsumerClient =
new ThreadLocal<GobblinKafkaConsumerClient>();
private GobblinKafkaConsumerClient sharedKafkaConsumerClient = null;
private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver =
new ClassAliasResolver<>(GobblinKafkaConsumerClientFactory.class);
private volatile boolean doneGettingAllPreviousOffsets = false;
private Extract.TableType tableType;
private String extractNamespace;
private boolean isFullExtract;
private String kafkaBrokers;
private boolean shouldEnableDatasetStateStore;
private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false);
private Set<String> topicsToProcess;
private MetricContext metricContext;
protected Optional<LineageInfo> lineageInfo;
private List<String> getLimiterExtractorReportKeys() {
List<String> keyNames = new ArrayList<>();
keyNames.add(KafkaSource.TOPIC_NAME);
keyNames.add(KafkaSource.PARTITION_ID);
return keyNames;
}
private void setLimiterReportKeyListToWorkUnits(List<WorkUnit> workUnits, List<String> keyNameList) {
if (keyNameList.isEmpty()) {
return;
}
String keyList = Joiner.on(',').join(keyNameList.iterator());
for (WorkUnit workUnit : workUnits) {
workUnit.setProp(LimiterConfigurationKeys.LIMITER_REPORT_KEY_LIST, keyList);
}
}
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class);
this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
String tableTypeStr =
state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
tableType = Extract.TableType.valueOf(tableTypeStr);
extractNamespace =
state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
} else {
// To be compatible, reject table type and namespace configuration keys as previous implementation
tableType = KafkaSource.DEFAULT_TABLE_TYPE;
extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
}
isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);
try {
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
.resolveClass(
state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
List<KafkaTopic> topics = getFilteredTopics(state);
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());
for (String topic : this.topicsToProcess) {
LOG.info("Discovered topic " + topic);
}
Map<String, State> topicSpecificStateMap =
DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {
@Override
public String apply(KafkaTopic topic) {
return topic.getName();
}
}), state);
for (KafkaTopic topic : topics) {
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
}
}
int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));
if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
} else {
// preallocate one client per thread
populateClientPool(numOfThreads, kafkaConsumerClientFactory, config);
}
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
for (KafkaTopic topic : topics) {
threadPool.submit(
new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
workUnits));
}
ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
LOG.info(String.format("Created workunits for %d topics in %d seconds", workUnits.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));
// Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
// but aren't processed).
createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state);
//determine the number of mappers
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
}
addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits);
setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
return workUnitList;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new RuntimeException("Checked exception caught", e);
} catch (Throwable t) {
throw new RuntimeException("Unexpected throwable caught, ", t);
} finally {
try {
GobblinKafkaConsumerClient consumerClient = this.kafkaConsumerClient.get();
if (consumerClient != null) {
consumerClient.close();
}
// cleanup clients from pool
for (GobblinKafkaConsumerClient client: kafkaConsumerClientPool) {
client.close();
}
} catch (Throwable t) {
//Swallow any exceptions in the finally{..} block to allow potential exceptions from the main try{..} block to be
//propagated
LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
}
}
}
protected void populateClientPool(int count,
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
Config config) {
for (int i = 0; i < count; i++) {
kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
}
}
private void addTopicSpecificPropsToWorkUnits(Map<String, List<WorkUnit>> workUnits, Map<String, State> topicSpecificStateMap) {
for (List<WorkUnit> workUnitList : workUnits.values()) {
for (WorkUnit workUnit : workUnitList) {
addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);
}
}
}
private void addTopicSpecificPropsToWorkUnit(WorkUnit workUnit, Map<String, State> topicSpecificStateMap) {
if (workUnit instanceof MultiWorkUnit) {
for (WorkUnit wu : ((MultiWorkUnit) workUnit).getWorkUnits()) {
addTopicSpecificPropsToWorkUnit(wu, topicSpecificStateMap);
}
} else if (!workUnit.contains(TOPIC_NAME)) {
return;
} else {
addDatasetUrnOptionally(workUnit);
if (topicSpecificStateMap == null) {
return;
} else if (!topicSpecificStateMap.containsKey(workUnit.getProp(TOPIC_NAME))) {
return;
} else {
workUnit.addAll(topicSpecificStateMap.get(workUnit.getProp(TOPIC_NAME)));
}
}
}
private void addDatasetUrnOptionally(WorkUnit workUnit) {
if (!this.shouldEnableDatasetStateStore) {
return;
}
workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, workUnit.getProp(TOPIC_NAME));
}
private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>> workUnits,
Map<String, State> topicSpecificStateMap, SourceState state) {
// in case the previous offset not been set
getAllPreviousOffsetState(state);
// For each partition that has a previous offset, create an empty WorkUnit for it if
// it is not in this.partitionsToBeProcessed.
for (Map.Entry<KafkaPartition, Long> entry : this.previousOffsets.entrySet()) {
KafkaPartition partition = entry.getKey();
if (!this.partitionsToBeProcessed.contains(partition)) {
String topicName = partition.getTopicName();
if (!this.isDatasetStateEnabled.get() || this.topicsToProcess.contains(topicName)) {
long previousOffset = entry.getValue();
WorkUnit emptyWorkUnit = createEmptyWorkUnit(partition, previousOffset,
this.previousOffsetFetchEpochTimes.get(partition),
Optional.fromNullable(topicSpecificStateMap.get(partition.getTopicName())));
if (workUnits.containsKey(topicName)) {
workUnits.get(topicName).add(emptyWorkUnit);
} else {
workUnits.put(topicName, Lists.newArrayList(emptyWorkUnit));
}
}
}
}
}
/*
* This function need to be thread safe since it is called in the Runnable
*/
private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) {
Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time();
boolean topicQualified = isTopicQualified(topic);
context.close();
List<WorkUnit> workUnits = Lists.newArrayList();
List<KafkaPartition> topicPartitions = topic.getPartitions();
for (KafkaPartition partition : topicPartitions) {
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
if (workUnit != null) {
// For disqualified topics, for each of its workunits set the high watermark to be the same
// as the low watermark, so that it will be skipped.
if (!topicQualified) {
skipWorkUnit(workUnit);
}
workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
workUnits.add(workUnit);
}
}
this.partitionsToBeProcessed.addAll(topic.getPartitions());
return workUnits;
}
/**
* Whether a {@link KafkaTopic} is qualified to be pulled.
*
* This method can be overridden by subclasses for verifying topic eligibility, e.g., one may want to
* skip a topic if its schema cannot be found in the schema registry.
*/
protected boolean isTopicQualified(KafkaTopic topic) {
return true;
}
@SuppressWarnings("deprecation")
private static void skipWorkUnit(WorkUnit workUnit) {
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, workUnit.getLowWaterMark());
}
private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceState state,
Optional<State> topicSpecificState) {
Offsets offsets = new Offsets();
boolean failedToGetKafkaOffsets = false;
try (Timer.Context context = this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
} catch (Throwable t) {
failedToGetKafkaOffsets = true;
LOG.error("Caught error in creating work unit for {}", partition, t);
}
long previousOffset = 0;
long previousOffsetFetchEpochTime = 0;
boolean previousOffsetNotFound = false;
try {
previousOffset = getPreviousOffsetForPartition(partition, state);
offsets.setPreviousEndOffset(previousOffset);
offsets.setPreviousStartOffset(getPreviousLowWatermark(partition, state));
offsets.setPreviousStartFetchEpochTime(getPreviousStartFetchEpochTimeForPartition(partition, state));
offsets.setPreviousStopFetchEpochTime(getPreviousStopFetchEpochTimeForPartition(partition, state));
offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
} catch (PreviousOffsetNotFoundException e) {
previousOffsetNotFound = true;
}
if (failedToGetKafkaOffsets) {
// Increment counts, which will be reported as job metrics
this.failToGetOffsetCount.incrementAndGet();
// When unable to get earliest/latest offsets from Kafka, skip the partition and create an empty workunit,
// so that previousOffset is persisted.
LOG.warn(String
.format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.",
partition));
return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime,
topicSpecificState);
}
if (shouldMoveToLatestOffset(partition, state)) {
offsets.startAtLatestOffset();
} else if (previousOffsetNotFound) {
/**
* When previous offset cannot be found, either start at earliest offset, latest offset, go back with (latest - lookback)
* (long value to be deducted from latest offset in order to avoid data loss) or skip the partition
* (no need to create an empty workunit in this case since there's no offset to persist).
* In case of no previous state OFFSET_LOOKBACK will make sure to avoid consuming huge amount of data (earlist) and data loss (latest offset)
* lookback can be set to any long value where (latest-lookback) is nearest offset for each partition. If computed offset is out of range then
* partition will be consumed from latest offset
**/
String offsetNotFoundMsg = String.format("Previous offset for partition %s does not exist. ", partition);
String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET, DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET)) {
LOG.warn(offsetNotFoundMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
offsets.startAtLatestOffset();
} else if (offsetOption.equals(EARLIEST_OFFSET)) {
LOG.warn(
offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
offsets.startAtEarliestOffset();
} else if (offsetOption.equals(OFFSET_LOOKBACK)) {
long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
long latestOffset = offsets.getLatestOffset();
long offset = latestOffset - lookbackOffsetRange;
LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ] start offset: " + offset);
try {
offsets.startAt(offset);
} catch (StartOffsetOutOfRangeException e) {
// Increment counts, which will be reported as job metrics
if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
this.offsetTooEarlyCount.incrementAndGet();
} else {
this.offsetTooLateCount.incrementAndGet();
}
// When above computed offset (latest-lookback) is out of range, either start at earliest, latest or nearest offset, or skip the
// partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
String offsetOutOfRangeMsg = String.format(
"Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
offsetOption =
state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
&& offsets.getStartOffset() >= offsets.getLatestOffset())) {
LOG.warn(
offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
offsets.startAtLatestOffset();
} else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
}
}
}
else {
LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
return null;
}
} else {
try {
offsets.startAt(previousOffset);
} catch (StartOffsetOutOfRangeException e) {
// Increment counts, which will be reported as job metrics
if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
this.offsetTooEarlyCount.incrementAndGet();
} else {
this.offsetTooLateCount.incrementAndGet();
}
// When previous offset is out of range, either start at earliest, latest or nearest offset, or skip the
// partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
String offsetOutOfRangeMsg = String.format(
"Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
String offsetOption =
state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
&& offsets.getStartOffset() >= offsets.getLatestOffset())) {
LOG.warn(
offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
offsets.startAtLatestOffset();
} else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
}
}
}
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
addSourceStatePropsToWorkUnit(workUnit, state);
return workUnit;
}
/**
* A method to copy specific properties from the {@link SourceState} object to {@link WorkUnitState}.
* @param workUnit WorkUnit state
* @param state Source state
*/
private void addSourceStatePropsToWorkUnit(WorkUnit workUnit, SourceState state) {
//Copy the SLA config from SourceState to WorkUnitState.
if (state.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
workUnit.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, state.getProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
}
boolean isobservedLatencyMeasurementEnabled = state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED);
if (isobservedLatencyMeasurementEnabled) {
Preconditions.checkArgument(state.contains(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD), "Missing config key: " + KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD);
workUnit.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, isobservedLatencyMeasurementEnabled);
workUnit.setProp(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS,
state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS, DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS));
workUnit.setProp(KafkaSource.OBSERVED_LATENCY_PRECISION,
state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION));
workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD));
workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
}
}
private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
getAllPreviousOffsetState(state);
return this.previousStartFetchEpochTimes.containsKey(partition) ?
this.previousStartFetchEpochTimes.get(partition) : 0;
}
private long getPreviousStopFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
getAllPreviousOffsetState(state);
return this.previousStopFetchEpochTimes.containsKey(partition) ?
this.previousStopFetchEpochTimes.get(partition) : 0;
}
private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state)
throws PreviousOffsetNotFoundException {
getAllPreviousOffsetState(state);
if (this.previousOffsetFetchEpochTimes.containsKey(partition)) {
return this.previousOffsetFetchEpochTimes.get(partition);
}
throw new PreviousOffsetNotFoundException(String
.format("Previous offset fetch epoch time for topic %s, partition %s not found.", partition.getTopicName(),
partition.getId()));
}
private long getPreviousOffsetForPartition(KafkaPartition partition, SourceState state)
throws PreviousOffsetNotFoundException {
getAllPreviousOffsetState(state);
if (this.previousOffsets.containsKey(partition)) {
return this.previousOffsets.get(partition);
}
throw new PreviousOffsetNotFoundException(String
.format("Previous offset for topic %s, partition %s not found.", partition.getTopicName(), partition.getId()));
}
private long getPreviousExpectedHighWatermark(KafkaPartition partition, SourceState state)
throws PreviousOffsetNotFoundException {
getAllPreviousOffsetState(state);
if (this.previousExpectedHighWatermarks.containsKey(partition)) {
return this.previousExpectedHighWatermarks.get(partition);
}
throw new PreviousOffsetNotFoundException(String
.format("Previous expected high watermark for topic %s, partition %s not found.", partition.getTopicName(),
partition.getId()));
}
private long getPreviousLowWatermark(KafkaPartition partition, SourceState state)
throws PreviousOffsetNotFoundException {
getAllPreviousOffsetState(state);
if (this.previousLowWatermarks.containsKey(partition)) {
return this.previousLowWatermarks.get(partition);
}
throw new PreviousOffsetNotFoundException(String
.format("Previous low watermark for topic %s, partition %s not found.", partition.getTopicName(),
partition.getId()));
}
// need to be synchronized as this.previousOffsets, this.previousExpectedHighWatermarks, and
// this.previousOffsetFetchEpochTimes need to be initialized once
private synchronized void getAllPreviousOffsetState(SourceState state) {
if (this.doneGettingAllPreviousOffsets) {
return;
}
this.previousOffsets.clear();
this.previousLowWatermarks.clear();
this.previousExpectedHighWatermarks.clear();
this.previousOffsetFetchEpochTimes.clear();
this.previousStartFetchEpochTimes.clear();
this.previousStopFetchEpochTimes.clear();
Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns();
if (!workUnitStatesByDatasetUrns.isEmpty() &&
!(workUnitStatesByDatasetUrns.size() == 1 && workUnitStatesByDatasetUrns.keySet().iterator().next()
.equals(""))) {
this.isDatasetStateEnabled.set(true);
}
for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
WorkUnit workUnit = workUnitState.getWorkunit();
MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class);
MultiLongWatermark previousLowWatermark = workUnit.getLowWatermark(MultiLongWatermark.class);
MultiLongWatermark previousExpectedHighWatermark = workUnit.getExpectedHighWatermark(MultiLongWatermark.class);
Preconditions.checkArgument(partitions.size() == watermark.size(), String
.format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions,
watermark));
for (int i = 0; i < partitions.size(); i++) {
KafkaPartition partition = partitions.get(i);
if (watermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
this.previousOffsets.put(partition, watermark.get(i));
}
if (previousLowWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
this.previousLowWatermarks.put(partition, previousLowWatermark.get(i));
}
if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i));
}
this.previousOffsetFetchEpochTimes.put(partition,
KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, OFFSET_FETCH_EPOCH_TIME, i));
this.previousStartFetchEpochTimes.put(partition,
KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, START_FETCH_EPOCH_TIME, i));
this.previousStopFetchEpochTimes.put(partition,
KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, STOP_FETCH_EPOCH_TIME, i));
}
}
this.doneGettingAllPreviousOffsets = true;
}
/**
* A topic can be configured to move to the latest offset in {@link #TOPICS_MOVE_TO_LATEST_OFFSET}.
*
* Need to be synchronized as access by multiple threads
*/
private synchronized boolean shouldMoveToLatestOffset(KafkaPartition partition, SourceState state) {
if (!state.contains(TOPICS_MOVE_TO_LATEST_OFFSET)) {
return false;
}
if (this.moveToLatestTopics.isEmpty()) {
this.moveToLatestTopics.addAll(
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(state.getProp(TOPICS_MOVE_TO_LATEST_OFFSET)));
}
return this.moveToLatestTopics.contains(partition.getTopicName()) || this.moveToLatestTopics.contains(ALL_TOPICS);
}
// thread safe
private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset, long previousFetchEpochTime,
Optional<State> topicSpecificState) {
Offsets offsets = new Offsets();
offsets.setEarliestOffset(previousOffset);
offsets.setLatestOffset(previousOffset);
offsets.startAtEarliestOffset();
offsets.setOffsetFetchEpochTime(previousFetchEpochTime);
return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
}
private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets offsets,
Optional<State> topicSpecificState) {
// Default to job level configurations
Extract.TableType currentTableType = tableType;
String currentExtractNamespace = extractNamespace;
String currentExtractTableName = partition.getTopicName();
boolean isCurrentFullExtract = isFullExtract;
// Update to topic specific configurations if any
if (topicSpecificState.isPresent()) {
State topicState = topicSpecificState.get();
if (topicState.contains(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY)) {
currentTableType = Extract.TableType.valueOf(topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY));
}
currentExtractNamespace = topicState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, extractNamespace);
currentExtractTableName =
topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName());
isCurrentFullExtract = topicState.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY, isFullExtract);
}
Extract extract = this.createExtract(currentTableType, currentExtractNamespace, currentExtractTableName);
if (isCurrentFullExtract) {
extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
}
WorkUnit workUnit = WorkUnit.create(extract);
workUnit.setProp(TOPIC_NAME, partition.getTopicName());
addDatasetUrnOptionally(workUnit);
workUnit.setProp(PARTITION_ID, partition.getId());
workUnit.setProp(LEADER_ID, partition.getLeader().getId());
workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
workUnit.setProp(PREVIOUS_START_FETCH_EPOCH_TIME, offsets.getPreviousStartFetchEpochTime());
workUnit.setProp(PREVIOUS_STOP_FETCH_EPOCH_TIME, offsets.getPreviousStopFetchEpochTime());
workUnit.setProp(PREVIOUS_LOW_WATERMARK, offsets.getPreviousStartOffset());
workUnit.setProp(PREVIOUS_HIGH_WATERMARK, offsets.getPreviousEndOffset());
workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime());
workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime());
workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset());
// Add lineage info
DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers);
if (this.lineageInfo.isPresent()) {
this.lineageInfo.get().setSource(source, workUnit);
}
LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition,
offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset()));
return workUnit;
}
/**
* Return topics to be processed filtered by job-level whitelist and blacklist.
*/
protected List<KafkaTopic> getFilteredTopics(SourceState state) {
List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST);
List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST);
return this.kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
}
@Override
public void shutdown(SourceState state) {
state.setProp(ConfigurationKeys.OFFSET_TOO_EARLY_COUNT, this.offsetTooEarlyCount);
state.setProp(ConfigurationKeys.OFFSET_TOO_LATE_COUNT, this.offsetTooLateCount);
state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT, this.failToGetOffsetCount);
}
/**
* This class contains startOffset, earliestOffset and latestOffset for a Kafka partition.
*/
@SuppressForbidden
private static class Offsets {
@Getter
private long startOffset = 0;
@Getter
@Setter
private long earliestOffset = 0;
@Getter
@Setter
private long latestOffset = 0;
@Getter
@Setter
private long offsetFetchEpochTime = 0;
@Getter
@Setter
private long previousOffsetFetchEpochTime = 0;
@Getter
@Setter
private long previousLatestOffset = 0;
// previous low watermark
@Getter
@Setter
private long previousStartOffset = 0;
// previous actual high watermark
@Getter
@Setter
private long previousEndOffset = 0;
@Getter
@Setter
private long previousStartFetchEpochTime = 0;
@Getter
@Setter
private long previousStopFetchEpochTime = 0;
private void startAt(long offset)
throws StartOffsetOutOfRangeException {
if (offset < this.earliestOffset || offset > this.latestOffset) {
throw new StartOffsetOutOfRangeException(String
.format("start offset = %d, earliest offset = %d, latest offset = %d", offset, this.earliestOffset,
this.latestOffset));
}
this.startOffset = offset;
}
private void startAtEarliestOffset() {
this.startOffset = this.earliestOffset;
}
private void startAtLatestOffset() {
this.startOffset = this.latestOffset;
}
}
private class WorkUnitCreator implements Runnable {
public static final String WORK_UNITS_FOR_TOPIC_TIMER = "workUnitsForTopicTimer";
private final KafkaTopic topic;
private final SourceState state;
private final Optional<State> topicSpecificState;
private final Map<String, List<WorkUnit>> allTopicWorkUnits;
WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits) {
this.topic = topic;
this.state = state;
this.topicSpecificState = topicSpecificState;
this.allTopicWorkUnits = workUnits;
}
@Override
public void run() {
try (Timer.Context context = metricContext.timer(WORK_UNITS_FOR_TOPIC_TIMER).time()) {
// use shared client if configure, otherwise set a thread local one from the pool
if (KafkaSource.this.sharedKafkaConsumerClient != null) {
KafkaSource.this.kafkaConsumerClient.set(KafkaSource.this.sharedKafkaConsumerClient);
} else {
GobblinKafkaConsumerClient client = KafkaSource.this.kafkaConsumerClientPool.poll();
Preconditions.checkNotNull(client, "Unexpectedly ran out of preallocated consumer clients");
KafkaSource.this.kafkaConsumerClient.set(client);
}
this.allTopicWorkUnits.put(this.topic.getName(),
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState));
} catch (Throwable t) {
LOG.error("Caught error in creating work unit for " + this.topic.getName(), t);
throw new RuntimeException(t);
} finally {
// return the client to the pool
if (KafkaSource.this.sharedKafkaConsumerClient == null) {
KafkaSource.this.kafkaConsumerClientPool.offer(KafkaSource.this.kafkaConsumerClient.get());
KafkaSource.this.kafkaConsumerClient.remove();
}
}
}
}
}