1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.wikimedia.gobblin.kafka;
19
20 import static java.util.Collections.emptyMap;
21 import static java.util.stream.Collectors.toMap;
22
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.regex.Pattern;
28 import java.util.stream.Collectors;
29
30 import org.apache.gobblin.configuration.SourceState;
31 import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
32 import org.apache.gobblin.util.DatasetFilterUtils;
33 import org.wikimedia.eventutilities.core.event.EventStreamConfig;
34 import org.wikimedia.eventutilities.core.event.WikimediaDefaults;
35 import org.wikimedia.gobblin.copy.KafkaSource;
36
37 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
38 import lombok.extern.slf4j.Slf4j;
39
40 @Slf4j
41 public abstract class WikimediaKafkaSource<S, D> extends KafkaSource<S, D> {
42
43 private static final String TOPIC_INCLUDE_LIST = "topic.include";
44 private static final String TOPIC_EXCLUDE_LIST = "topic.exclude";
45 private static final String EVENT_STREAM_CONFIG_URI = "event_stream_config.uri";
46 private static final String EVENT_STREAM_CONFIG_IS_WMF_PROD = "event_stream_config.is_wmf_production";
47 private static final String EVENT_STREAM_CONFIG_STREAM_NAMES = "event_stream_config.stream_names";
48 private static final String EVENT_STREAM_CONFIG_SETTINGS_FILTERS = "event_stream_config.settings_filters";
49
50
51
52
53
54
55
56
57
58
59
60 @Override
61 @SuppressFBWarnings(value = "SLF4J_FORMAT_SHOULD_BE_CONST", justification = "complex log message")
62 protected List<KafkaTopic> getFilteredTopics(SourceState state) {
63
64 List<Pattern> excludeTopicsPatterns = DatasetFilterUtils.getPatternList(state, TOPIC_EXCLUDE_LIST);
65 List<Pattern> includeTopicPatterns = DatasetFilterUtils.getPatternList(state, TOPIC_INCLUDE_LIST);
66
67
68 if (includeTopicPatterns.isEmpty()) {
69 if (state.contains(EVENT_STREAM_CONFIG_URI)) {
70 final String configUri = state.getProp(EVENT_STREAM_CONFIG_URI);
71
72
73
74 final List<String> streamNames;
75 if (state.contains(EVENT_STREAM_CONFIG_STREAM_NAMES)) {
76 streamNames = state.getPropAsList(EVENT_STREAM_CONFIG_STREAM_NAMES);
77 } else {
78 streamNames = null;
79 }
80 final List<String> settingsFilters = state.getPropAsList(EVENT_STREAM_CONFIG_SETTINGS_FILTERS, "");
81
82 String logMessage = "Getting " + TOPIC_INCLUDE_LIST + " from EventStreamConfig at " + configUri;
83 if (streamNames != null) {
84 logMessage += " for streams " + String.join(",", streamNames);
85 }
86 if (settingsFilters.isEmpty()) {
87 logMessage += " with settings " + String.join(",", settingsFilters);
88 }
89 logMessage += ".";
90 log.info(logMessage);
91
92
93
94
95
96 final EventStreamConfig eventStreamConfig;
97 if (configUri.equals(WikimediaDefaults.EVENT_STREAM_CONFIG_URI) &&
98 state.getPropAsBoolean(EVENT_STREAM_CONFIG_IS_WMF_PROD, false)) {
99
100 eventStreamConfig = WikimediaDefaults.EVENT_STREAM_CONFIG;
101 } else {
102
103 eventStreamConfig = EventStreamConfig.builder()
104 .setEventStreamConfigLoader(configUri)
105
106
107
108
109
110 .setEventServiceToUriMap(emptyMap())
111 .build();
112 }
113
114
115 List<String> includeTopics = eventStreamConfig.collectTopicsMatchingSettings(streamNames, settingsListToMap(settingsFilters));
116 if (includeTopics.isEmpty()) {
117 throw new RuntimeException("Failed to obtain topics to consume from EventStreamConfig at " + configUri);
118 }
119 includeTopicPatterns = DatasetFilterUtils.getPatternsFromStrings(includeTopics);
120 } else {
121 log.warn("No topic configured (no include-list nor eventStreamConfig)");
122 return Collections.emptyList();
123 }
124 } else {
125 log.info("Getting {} from include-list: {}", TOPIC_INCLUDE_LIST, state.getProp(TOPIC_INCLUDE_LIST));
126 }
127
128 return kafkaConsumerClient.get().getFilteredTopics(excludeTopicsPatterns, includeTopicPatterns);
129 }
130
131 protected Map<String, String> settingsListToMap(Collection<String> settingsFilters) {
132 return settingsFilters.stream()
133 .map(s -> s.split(":"))
134 .collect(toMap(kv -> kv[0], kv -> kv[1]));
135 }
136 }