1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.wikimedia.gobblin.copy;
18
19 import static java.util.stream.Collectors.toSet;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.regex.Pattern;
32
33 import org.apache.gobblin.configuration.ConfigurationKeys;
34 import org.apache.gobblin.configuration.SourceState;
35 import org.apache.gobblin.configuration.State;
36 import org.apache.gobblin.configuration.WorkUnitState;
37 import org.apache.gobblin.dataset.DatasetConstants;
38 import org.apache.gobblin.dataset.DatasetDescriptor;
39 import org.apache.gobblin.instrumented.Instrumented;
40 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
41 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
42 import org.apache.gobblin.metrics.MetricContext;
43 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
44 import org.apache.gobblin.source.extractor.extract.EventBasedSource;
45 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
46 import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
47 import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
48 import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
49 import org.apache.gobblin.source.extractor.extract.kafka.PreviousOffsetNotFoundException;
50 import org.apache.gobblin.source.extractor.extract.kafka.StartOffsetOutOfRangeException;
51 import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
52 import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
53 import org.apache.gobblin.source.workunit.Extract;
54 import org.apache.gobblin.source.workunit.MultiWorkUnit;
55 import org.apache.gobblin.source.workunit.WorkUnit;
56 import org.apache.gobblin.util.ClassAliasResolver;
57 import org.apache.gobblin.util.ConfigUtils;
58 import org.apache.gobblin.util.DatasetFilterUtils;
59 import org.apache.gobblin.util.ExecutorsUtils;
60 import org.apache.gobblin.util.dataset.DatasetUtils;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import com.codahale.metrics.Timer;
65 import com.google.common.base.Function;
66 import com.google.common.base.Joiner;
67 import com.google.common.base.Optional;
68 import com.google.common.base.Preconditions;
69 import com.google.common.base.Splitter;
70 import com.google.common.base.Stopwatch;
71 import com.google.common.collect.Iterables;
72 import com.google.common.collect.Lists;
73 import com.google.common.collect.Maps;
74 import com.google.common.collect.Sets;
75 import com.typesafe.config.Config;
76
77 import de.thetaphi.forbiddenapis.SuppressForbidden;
78 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
79 import lombok.Getter;
80 import lombok.Setter;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @SuppressForbidden
98 @SuppressFBWarnings(justification = "Class copied from upstream Gobblin")
99 public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
100
101 private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
102
103 public static final String TOPIC_BLACKLIST = "topic.blacklist";
104 public static final String TOPIC_WHITELIST = "topic.whitelist";
105 public static final String LATEST_OFFSET = "latest";
106 public static final String EARLIEST_OFFSET = "earliest";
107 public static final String NEAREST_OFFSET = "nearest";
108 public static final String OFFSET_LOOKBACK = "offset_lookback";
109 public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
110 public static final String KAFKA_OFFSET_LOOKBACK = "kafka.offset.lookback";
111 public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = LATEST_OFFSET;
112 public static final String TOPICS_MOVE_TO_LATEST_OFFSET = "topics.move.to.latest.offset";
113 public static final String RESET_ON_OFFSET_OUT_OF_RANGE = "reset.on.offset.out.of.range";
114 public static final String DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE = NEAREST_OFFSET;
115 public static final String TOPIC_NAME = "topic.name";
116 public static final String PARTITION_ID = "partition.id";
117 public static final String LEADER_ID = "leader.id";
118 public static final String LEADER_HOSTANDPORT = "leader.hostandport";
119 public static final Extract.TableType DEFAULT_TABLE_TYPE = Extract.TableType.APPEND_ONLY;
120 public static final String DEFAULT_NAMESPACE_NAME = "KAFKA";
121 public static final String ALL_TOPICS = "all";
122
123
124 public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
125 public static final String AVG_RECORD_MILLIS = "avg.record.millis";
126 public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
127 public static final String STOP_FETCH_EPOCH_TIME = "stopFetchEpochTime";
128 public static final String PREVIOUS_START_FETCH_EPOCH_TIME = "previousStartFetchEpochTime";
129 public static final String PREVIOUS_STOP_FETCH_EPOCH_TIME = "previousStopFetchEpochTime";
130 public static final String PREVIOUS_LOW_WATERMARK = "previousLowWatermark";
131 public static final String PREVIOUS_HIGH_WATERMARK = "previousHighWatermark";
132 public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
133 public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
134 public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
135 public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
136 public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
137 "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
138 public static final String DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS =
139 "org.apache.gobblin.kafka.client.Kafka08ConsumerClient$Factory";
140 public static final String GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE =
141 "gobblin.kafka.shouldEnableDatasetStateStore";
142 public static final boolean DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE = false;
143 public static final String OFFSET_FETCH_TIMER = "offsetFetchTimer";
144 public static final String RECORD_LEVEL_SLA_MINUTES_KEY = "gobblin.kafka.recordLevelSlaMinutes";
145 public static final String MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = "gobblin.kafka.maxobservedLatencyInHours";
146 public static final Integer DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = 24;
147 public static final String OBSERVED_LATENCY_PRECISION = "gobblin.kafka.observedLatencyPrecision";
148 public static final Integer DEFAULT_OBSERVED_LATENCY_PRECISION = 3;
149 public static final String OBSERVED_LATENCY_MEASUREMENT_ENABLED = "gobblin.kafka.observedLatencyMeasurementEnabled";
150 public static final Boolean DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED = false;
151 public static final String RECORD_CREATION_TIMESTAMP_FIELD = "gobblin.kafka.recordCreationTimestampField";
152 public static final String RECORD_CREATION_TIMESTAMP_UNIT = "gobblin.kafka.recordCreationTimestampUnit";
153
154 private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
155 private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
156 private final Map<KafkaPartition, Long> previousLowWatermarks = Maps.newConcurrentMap();
157 private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap();
158 private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap();
159 private final Map<KafkaPartition, Long> previousStartFetchEpochTimes = Maps.newConcurrentMap();
160 private final Map<KafkaPartition, Long> previousStopFetchEpochTimes = Maps.newConcurrentMap();
161
162 private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet();
163
164 private final AtomicInteger failToGetOffsetCount = new AtomicInteger(0);
165 private final AtomicInteger offsetTooEarlyCount = new AtomicInteger(0);
166 private final AtomicInteger offsetTooLateCount = new AtomicInteger(0);
167
168
169 protected final ConcurrentLinkedQueue<GobblinKafkaConsumerClient> kafkaConsumerClientPool = new ConcurrentLinkedQueue();
170 protected static final ThreadLocal<GobblinKafkaConsumerClient> kafkaConsumerClient =
171 new ThreadLocal<GobblinKafkaConsumerClient>();
172 private GobblinKafkaConsumerClient sharedKafkaConsumerClient = null;
173 private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver =
174 new ClassAliasResolver<>(GobblinKafkaConsumerClientFactory.class);
175
176 private volatile boolean doneGettingAllPreviousOffsets = false;
177 private Extract.TableType tableType;
178 private String extractNamespace;
179 private boolean isFullExtract;
180 private String kafkaBrokers;
181 private boolean shouldEnableDatasetStateStore;
182 private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false);
183 private Set<String> topicsToProcess;
184
185 private MetricContext metricContext;
186
187 protected Optional<LineageInfo> lineageInfo;
188
189 private List<String> getLimiterExtractorReportKeys() {
190 List<String> keyNames = new ArrayList<>();
191 keyNames.add(KafkaSource.TOPIC_NAME);
192 keyNames.add(KafkaSource.PARTITION_ID);
193 return keyNames;
194 }
195
196 private void setLimiterReportKeyListToWorkUnits(List<WorkUnit> workUnits, List<String> keyNameList) {
197 if (keyNameList.isEmpty()) {
198 return;
199 }
200 String keyList = Joiner.on(',').join(keyNameList.iterator());
201 for (WorkUnit workUnit : workUnits) {
202 workUnit.setProp(LimiterConfigurationKeys.LIMITER_REPORT_KEY_LIST, keyList);
203 }
204 }
205
206 @Override
207 public List<WorkUnit> getWorkunits(SourceState state) {
208 this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class);
209 this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
210
211 Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
212 if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
213 String tableTypeStr =
214 state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
215 tableType = Extract.TableType.valueOf(tableTypeStr);
216 extractNamespace =
217 state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
218 } else {
219
220 tableType = KafkaSource.DEFAULT_TABLE_TYPE;
221 extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
222 }
223 isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
224 kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
225 this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
226 DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);
227
228 try {
229 Config config = ConfigUtils.propertiesToConfig(state.getProperties());
230 GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
231 .resolveClass(
232 state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
233 DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
234
235 this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
236
237 List<KafkaTopic> topics = getFilteredTopics(state);
238 this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());
239
240 for (String topic : this.topicsToProcess) {
241 LOG.info("Discovered topic " + topic);
242 }
243 Map<String, State> topicSpecificStateMap =
244 DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {
245
246 @Override
247 public String apply(KafkaTopic topic) {
248 return topic.getName();
249 }
250 }), state);
251
252 for (KafkaTopic topic : topics) {
253 if (topic.getTopicSpecificState().isPresent()) {
254 topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
255 .addAllIfNotExist(topic.getTopicSpecificState().get());
256 }
257 }
258
259 int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
260 ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
261 ExecutorService threadPool =
262 Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));
263
264 if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
265 ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
266 this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
267 } else {
268
269 populateClientPool(numOfThreads, kafkaConsumerClientFactory, config);
270 }
271
272 Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
273
274 for (KafkaTopic topic : topics) {
275 threadPool.submit(
276 new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
277 workUnits));
278 }
279
280 ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
281 LOG.info(String.format("Created workunits for %d topics in %d seconds", workUnits.size(),
282 createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));
283
284
285
286 createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state);
287
288 int maxMapperNum =
289 state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
290 KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
291 int numOfMultiWorkunits = maxMapperNum;
292 if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
293 double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
294 LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
295 double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
296 numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
297 numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
298 }
299 addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
300 List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits);
301 setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
302 return workUnitList;
303 } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
304 throw new RuntimeException("Checked exception caught", e);
305 } catch (Throwable t) {
306 throw new RuntimeException("Unexpected throwable caught, ", t);
307 } finally {
308 try {
309 GobblinKafkaConsumerClient consumerClient = this.kafkaConsumerClient.get();
310 if (consumerClient != null) {
311 consumerClient.close();
312 }
313
314 for (GobblinKafkaConsumerClient client: kafkaConsumerClientPool) {
315 client.close();
316 }
317 } catch (Throwable t) {
318
319
320 LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
321 }
322 }
323 }
324
325 protected void populateClientPool(int count,
326 GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
327 Config config) {
328 for (int i = 0; i < count; i++) {
329 kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
330 }
331 }
332
333 private void addTopicSpecificPropsToWorkUnits(Map<String, List<WorkUnit>> workUnits, Map<String, State> topicSpecificStateMap) {
334 for (List<WorkUnit> workUnitList : workUnits.values()) {
335 for (WorkUnit workUnit : workUnitList) {
336 addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);
337 }
338 }
339 }
340
341 private void addTopicSpecificPropsToWorkUnit(WorkUnit workUnit, Map<String, State> topicSpecificStateMap) {
342 if (workUnit instanceof MultiWorkUnit) {
343 for (WorkUnit wu : ((MultiWorkUnit) workUnit).getWorkUnits()) {
344 addTopicSpecificPropsToWorkUnit(wu, topicSpecificStateMap);
345 }
346 } else if (!workUnit.contains(TOPIC_NAME)) {
347 return;
348 } else {
349 addDatasetUrnOptionally(workUnit);
350 if (topicSpecificStateMap == null) {
351 return;
352 } else if (!topicSpecificStateMap.containsKey(workUnit.getProp(TOPIC_NAME))) {
353 return;
354 } else {
355 workUnit.addAll(topicSpecificStateMap.get(workUnit.getProp(TOPIC_NAME)));
356 }
357 }
358 }
359
360 private void addDatasetUrnOptionally(WorkUnit workUnit) {
361 if (!this.shouldEnableDatasetStateStore) {
362 return;
363 }
364 workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, workUnit.getProp(TOPIC_NAME));
365 }
366
367 private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>> workUnits,
368 Map<String, State> topicSpecificStateMap, SourceState state) {
369
370
371 getAllPreviousOffsetState(state);
372
373
374
375 for (Map.Entry<KafkaPartition, Long> entry : this.previousOffsets.entrySet()) {
376 KafkaPartition partition = entry.getKey();
377 if (!this.partitionsToBeProcessed.contains(partition)) {
378 String topicName = partition.getTopicName();
379 if (!this.isDatasetStateEnabled.get() || this.topicsToProcess.contains(topicName)) {
380 long previousOffset = entry.getValue();
381 WorkUnit emptyWorkUnit = createEmptyWorkUnit(partition, previousOffset,
382 this.previousOffsetFetchEpochTimes.get(partition),
383 Optional.fromNullable(topicSpecificStateMap.get(partition.getTopicName())));
384
385 if (workUnits.containsKey(topicName)) {
386 workUnits.get(topicName).add(emptyWorkUnit);
387 } else {
388 workUnits.put(topicName, Lists.newArrayList(emptyWorkUnit));
389 }
390 }
391 }
392 }
393 }
394
395
396
397
398 private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) {
399 Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time();
400 boolean topicQualified = isTopicQualified(topic);
401 context.close();
402
403 List<WorkUnit> workUnits = Lists.newArrayList();
404 List<KafkaPartition> topicPartitions = topic.getPartitions();
405 for (KafkaPartition partition : topicPartitions) {
406 WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
407 if (workUnit != null) {
408
409
410 if (!topicQualified) {
411 skipWorkUnit(workUnit);
412 }
413 workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
414 workUnits.add(workUnit);
415 }
416 }
417 this.partitionsToBeProcessed.addAll(topic.getPartitions());
418 return workUnits;
419 }
420
421
422
423
424
425
426
427 protected boolean isTopicQualified(KafkaTopic topic) {
428 return true;
429 }
430
431 @SuppressWarnings("deprecation")
432 private static void skipWorkUnit(WorkUnit workUnit) {
433 workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, workUnit.getLowWaterMark());
434 }
435
436 private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceState state,
437 Optional<State> topicSpecificState) {
438 Offsets offsets = new Offsets();
439
440 boolean failedToGetKafkaOffsets = false;
441
442 try (Timer.Context context = this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
443 offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
444 offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
445 offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
446 } catch (Throwable t) {
447 failedToGetKafkaOffsets = true;
448 LOG.error("Caught error in creating work unit for {}", partition, t);
449 }
450
451 long previousOffset = 0;
452 long previousOffsetFetchEpochTime = 0;
453 boolean previousOffsetNotFound = false;
454 try {
455 previousOffset = getPreviousOffsetForPartition(partition, state);
456 offsets.setPreviousEndOffset(previousOffset);
457 offsets.setPreviousStartOffset(getPreviousLowWatermark(partition, state));
458 offsets.setPreviousStartFetchEpochTime(getPreviousStartFetchEpochTimeForPartition(partition, state));
459 offsets.setPreviousStopFetchEpochTime(getPreviousStopFetchEpochTimeForPartition(partition, state));
460 offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
461 previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
462 offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
463 } catch (PreviousOffsetNotFoundException e) {
464 previousOffsetNotFound = true;
465 }
466
467 if (failedToGetKafkaOffsets) {
468
469
470 this.failToGetOffsetCount.incrementAndGet();
471
472
473
474 LOG.warn(String
475 .format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.",
476 partition));
477 return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime,
478 topicSpecificState);
479 }
480
481 if (shouldMoveToLatestOffset(partition, state)) {
482 offsets.startAtLatestOffset();
483 } else if (previousOffsetNotFound) {
484
485
486
487
488
489
490
491
492
493 String offsetNotFoundMsg = String.format("Previous offset for partition %s does not exist. ", partition);
494 String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET, DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
495 if (offsetOption.equals(LATEST_OFFSET)) {
496 LOG.warn(offsetNotFoundMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
497 offsets.startAtLatestOffset();
498 } else if (offsetOption.equals(EARLIEST_OFFSET)) {
499 LOG.warn(
500 offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
501 offsets.startAtEarliestOffset();
502 } else if (offsetOption.equals(OFFSET_LOOKBACK)) {
503 long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
504 long latestOffset = offsets.getLatestOffset();
505 long offset = latestOffset - lookbackOffsetRange;
506 LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ] start offset: " + offset);
507 try {
508 offsets.startAt(offset);
509 } catch (StartOffsetOutOfRangeException e) {
510
511 if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
512 this.offsetTooEarlyCount.incrementAndGet();
513 } else {
514 this.offsetTooLateCount.incrementAndGet();
515 }
516
517
518
519 String offsetOutOfRangeMsg = String.format(
520 "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
521 partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
522 offsetOption =
523 state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
524 if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
525 && offsets.getStartOffset() >= offsets.getLatestOffset())) {
526 LOG.warn(
527 offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
528 offsets.startAtLatestOffset();
529 } else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
530 LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
531 .getEarliestOffset());
532 offsets.startAtEarliestOffset();
533 } else {
534 LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
535 return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
536 }
537 }
538 }
539 else {
540 LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
541 return null;
542 }
543 } else {
544 try {
545 offsets.startAt(previousOffset);
546 } catch (StartOffsetOutOfRangeException e) {
547
548
549 if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
550 this.offsetTooEarlyCount.incrementAndGet();
551 } else {
552 this.offsetTooLateCount.incrementAndGet();
553 }
554
555
556
557 String offsetOutOfRangeMsg = String.format(
558 "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
559 partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
560 String offsetOption =
561 state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
562 if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
563 && offsets.getStartOffset() >= offsets.getLatestOffset())) {
564 LOG.warn(
565 offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
566 offsets.startAtLatestOffset();
567 } else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
568 LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
569 .getEarliestOffset());
570 offsets.startAtEarliestOffset();
571 } else {
572 LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
573 return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
574 }
575 }
576 }
577 WorkUnit workUnit = getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
578 addSourceStatePropsToWorkUnit(workUnit, state);
579 return workUnit;
580 }
581
582
583
584
585
586
587 private void addSourceStatePropsToWorkUnit(WorkUnit workUnit, SourceState state) {
588
589 if (state.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
590 workUnit.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, state.getProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
591 }
592 boolean isobservedLatencyMeasurementEnabled = state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED);
593 if (isobservedLatencyMeasurementEnabled) {
594 Preconditions.checkArgument(state.contains(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD), "Missing config key: " + KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD);
595 workUnit.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, isobservedLatencyMeasurementEnabled);
596 workUnit.setProp(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS,
597 state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS, DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS));
598 workUnit.setProp(KafkaSource.OBSERVED_LATENCY_PRECISION,
599 state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION));
600 workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD));
601 workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
602 }
603 }
604
605 private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
606 getAllPreviousOffsetState(state);
607 return this.previousStartFetchEpochTimes.containsKey(partition) ?
608 this.previousStartFetchEpochTimes.get(partition) : 0;
609 }
610
611 private long getPreviousStopFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
612 getAllPreviousOffsetState(state);
613 return this.previousStopFetchEpochTimes.containsKey(partition) ?
614 this.previousStopFetchEpochTimes.get(partition) : 0;
615 }
616
617 private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state)
618 throws PreviousOffsetNotFoundException {
619
620 getAllPreviousOffsetState(state);
621
622 if (this.previousOffsetFetchEpochTimes.containsKey(partition)) {
623 return this.previousOffsetFetchEpochTimes.get(partition);
624 }
625
626 throw new PreviousOffsetNotFoundException(String
627 .format("Previous offset fetch epoch time for topic %s, partition %s not found.", partition.getTopicName(),
628 partition.getId()));
629 }
630
631 private long getPreviousOffsetForPartition(KafkaPartition partition, SourceState state)
632 throws PreviousOffsetNotFoundException {
633
634 getAllPreviousOffsetState(state);
635
636 if (this.previousOffsets.containsKey(partition)) {
637 return this.previousOffsets.get(partition);
638 }
639 throw new PreviousOffsetNotFoundException(String
640 .format("Previous offset for topic %s, partition %s not found.", partition.getTopicName(), partition.getId()));
641 }
642
643 private long getPreviousExpectedHighWatermark(KafkaPartition partition, SourceState state)
644 throws PreviousOffsetNotFoundException {
645
646 getAllPreviousOffsetState(state);
647
648 if (this.previousExpectedHighWatermarks.containsKey(partition)) {
649 return this.previousExpectedHighWatermarks.get(partition);
650 }
651 throw new PreviousOffsetNotFoundException(String
652 .format("Previous expected high watermark for topic %s, partition %s not found.", partition.getTopicName(),
653 partition.getId()));
654 }
655
656 private long getPreviousLowWatermark(KafkaPartition partition, SourceState state)
657 throws PreviousOffsetNotFoundException {
658
659 getAllPreviousOffsetState(state);
660
661 if (this.previousLowWatermarks.containsKey(partition)) {
662 return this.previousLowWatermarks.get(partition);
663 }
664 throw new PreviousOffsetNotFoundException(String
665 .format("Previous low watermark for topic %s, partition %s not found.", partition.getTopicName(),
666 partition.getId()));
667 }
668
669
670
671 private synchronized void getAllPreviousOffsetState(SourceState state) {
672 if (this.doneGettingAllPreviousOffsets) {
673 return;
674 }
675 this.previousOffsets.clear();
676 this.previousLowWatermarks.clear();
677 this.previousExpectedHighWatermarks.clear();
678 this.previousOffsetFetchEpochTimes.clear();
679 this.previousStartFetchEpochTimes.clear();
680 this.previousStopFetchEpochTimes.clear();
681 Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns();
682
683 if (!workUnitStatesByDatasetUrns.isEmpty() &&
684 !(workUnitStatesByDatasetUrns.size() == 1 && workUnitStatesByDatasetUrns.keySet().iterator().next()
685 .equals(""))) {
686 this.isDatasetStateEnabled.set(true);
687 }
688
689 for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
690 List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
691 WorkUnit workUnit = workUnitState.getWorkunit();
692
693 MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class);
694 MultiLongWatermark previousLowWatermark = workUnit.getLowWatermark(MultiLongWatermark.class);
695 MultiLongWatermark previousExpectedHighWatermark = workUnit.getExpectedHighWatermark(MultiLongWatermark.class);
696 Preconditions.checkArgument(partitions.size() == watermark.size(), String
697 .format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions,
698 watermark));
699
700 for (int i = 0; i < partitions.size(); i++) {
701 KafkaPartition partition = partitions.get(i);
702
703 if (watermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
704 this.previousOffsets.put(partition, watermark.get(i));
705 }
706
707 if (previousLowWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
708 this.previousLowWatermarks.put(partition, previousLowWatermark.get(i));
709 }
710
711 if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
712 this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i));
713 }
714
715 this.previousOffsetFetchEpochTimes.put(partition,
716 KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, OFFSET_FETCH_EPOCH_TIME, i));
717
718 this.previousStartFetchEpochTimes.put(partition,
719 KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, START_FETCH_EPOCH_TIME, i));
720
721 this.previousStopFetchEpochTimes.put(partition,
722 KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, STOP_FETCH_EPOCH_TIME, i));
723 }
724 }
725
726 this.doneGettingAllPreviousOffsets = true;
727 }
728
729
730
731
732
733
734 private synchronized boolean shouldMoveToLatestOffset(KafkaPartition partition, SourceState state) {
735 if (!state.contains(TOPICS_MOVE_TO_LATEST_OFFSET)) {
736 return false;
737 }
738 if (this.moveToLatestTopics.isEmpty()) {
739 this.moveToLatestTopics.addAll(
740 Splitter.on(',').trimResults().omitEmptyStrings().splitToList(state.getProp(TOPICS_MOVE_TO_LATEST_OFFSET)));
741 }
742 return this.moveToLatestTopics.contains(partition.getTopicName()) || this.moveToLatestTopics.contains(ALL_TOPICS);
743 }
744
745
746 private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset, long previousFetchEpochTime,
747 Optional<State> topicSpecificState) {
748 Offsets offsets = new Offsets();
749 offsets.setEarliestOffset(previousOffset);
750 offsets.setLatestOffset(previousOffset);
751 offsets.startAtEarliestOffset();
752 offsets.setOffsetFetchEpochTime(previousFetchEpochTime);
753 return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
754 }
755
756 private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets offsets,
757 Optional<State> topicSpecificState) {
758
759 Extract.TableType currentTableType = tableType;
760 String currentExtractNamespace = extractNamespace;
761 String currentExtractTableName = partition.getTopicName();
762 boolean isCurrentFullExtract = isFullExtract;
763
764 if (topicSpecificState.isPresent()) {
765 State topicState = topicSpecificState.get();
766 if (topicState.contains(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY)) {
767 currentTableType = Extract.TableType.valueOf(topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY));
768 }
769 currentExtractNamespace = topicState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, extractNamespace);
770 currentExtractTableName =
771 topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName());
772 isCurrentFullExtract = topicState.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY, isFullExtract);
773 }
774
775 Extract extract = this.createExtract(currentTableType, currentExtractNamespace, currentExtractTableName);
776 if (isCurrentFullExtract) {
777 extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
778 }
779
780 WorkUnit workUnit = WorkUnit.create(extract);
781 workUnit.setProp(TOPIC_NAME, partition.getTopicName());
782 addDatasetUrnOptionally(workUnit);
783 workUnit.setProp(PARTITION_ID, partition.getId());
784 workUnit.setProp(LEADER_ID, partition.getLeader().getId());
785 workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
786 workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
787 workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
788 workUnit.setProp(PREVIOUS_START_FETCH_EPOCH_TIME, offsets.getPreviousStartFetchEpochTime());
789 workUnit.setProp(PREVIOUS_STOP_FETCH_EPOCH_TIME, offsets.getPreviousStopFetchEpochTime());
790 workUnit.setProp(PREVIOUS_LOW_WATERMARK, offsets.getPreviousStartOffset());
791 workUnit.setProp(PREVIOUS_HIGH_WATERMARK, offsets.getPreviousEndOffset());
792 workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime());
793 workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime());
794 workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset());
795
796
797 DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
798 source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers);
799 if (this.lineageInfo.isPresent()) {
800 this.lineageInfo.get().setSource(source, workUnit);
801 }
802
803 LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition,
804 offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset()));
805
806 return workUnit;
807 }
808
809
810
811
812 protected List<KafkaTopic> getFilteredTopics(SourceState state) {
813 List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST);
814 List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST);
815 return this.kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
816 }
817
818 @Override
819 public void shutdown(SourceState state) {
820 state.setProp(ConfigurationKeys.OFFSET_TOO_EARLY_COUNT, this.offsetTooEarlyCount);
821 state.setProp(ConfigurationKeys.OFFSET_TOO_LATE_COUNT, this.offsetTooLateCount);
822 state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT, this.failToGetOffsetCount);
823 }
824
825
826
827
828 @SuppressForbidden
829 private static class Offsets {
830
831 @Getter
832 private long startOffset = 0;
833
834 @Getter
835 @Setter
836 private long earliestOffset = 0;
837
838 @Getter
839 @Setter
840 private long latestOffset = 0;
841
842 @Getter
843 @Setter
844 private long offsetFetchEpochTime = 0;
845
846 @Getter
847 @Setter
848 private long previousOffsetFetchEpochTime = 0;
849
850 @Getter
851 @Setter
852 private long previousLatestOffset = 0;
853
854
855 @Getter
856 @Setter
857 private long previousStartOffset = 0;
858
859
860 @Getter
861 @Setter
862 private long previousEndOffset = 0;
863
864 @Getter
865 @Setter
866 private long previousStartFetchEpochTime = 0;
867
868 @Getter
869 @Setter
870 private long previousStopFetchEpochTime = 0;
871
872 private void startAt(long offset)
873 throws StartOffsetOutOfRangeException {
874 if (offset < this.earliestOffset || offset > this.latestOffset) {
875 throw new StartOffsetOutOfRangeException(String
876 .format("start offset = %d, earliest offset = %d, latest offset = %d", offset, this.earliestOffset,
877 this.latestOffset));
878 }
879 this.startOffset = offset;
880 }
881
882 private void startAtEarliestOffset() {
883 this.startOffset = this.earliestOffset;
884 }
885
886 private void startAtLatestOffset() {
887 this.startOffset = this.latestOffset;
888 }
889 }
890
891 private class WorkUnitCreator implements Runnable {
892 public static final String WORK_UNITS_FOR_TOPIC_TIMER = "workUnitsForTopicTimer";
893 private final KafkaTopic topic;
894 private final SourceState state;
895 private final Optional<State> topicSpecificState;
896 private final Map<String, List<WorkUnit>> allTopicWorkUnits;
897
898 WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
899 Map<String, List<WorkUnit>> workUnits) {
900 this.topic = topic;
901 this.state = state;
902 this.topicSpecificState = topicSpecificState;
903 this.allTopicWorkUnits = workUnits;
904 }
905
906 @Override
907 public void run() {
908 try (Timer.Context context = metricContext.timer(WORK_UNITS_FOR_TOPIC_TIMER).time()) {
909
910 if (KafkaSource.this.sharedKafkaConsumerClient != null) {
911 KafkaSource.this.kafkaConsumerClient.set(KafkaSource.this.sharedKafkaConsumerClient);
912 } else {
913 GobblinKafkaConsumerClient client = KafkaSource.this.kafkaConsumerClientPool.poll();
914 Preconditions.checkNotNull(client, "Unexpectedly ran out of preallocated consumer clients");
915 KafkaSource.this.kafkaConsumerClient.set(client);
916 }
917
918 this.allTopicWorkUnits.put(this.topic.getName(),
919 KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState));
920 } catch (Throwable t) {
921 LOG.error("Caught error in creating work unit for " + this.topic.getName(), t);
922 throw new RuntimeException(t);
923 } finally {
924
925 if (KafkaSource.this.sharedKafkaConsumerClient == null) {
926 KafkaSource.this.kafkaConsumerClientPool.offer(KafkaSource.this.kafkaConsumerClient.get());
927 KafkaSource.this.kafkaConsumerClient.remove();
928 }
929 }
930 }
931 }
932 }