View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.wikimedia.gobblin.kafka;
19  
20  import static java.util.Collections.emptyMap;
21  import static java.util.stream.Collectors.toMap;
22  
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.regex.Pattern;
28  import java.util.stream.Collectors;
29  
30  import org.apache.gobblin.configuration.SourceState;
31  import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
32  import org.apache.gobblin.util.DatasetFilterUtils;
33  import org.wikimedia.eventutilities.core.event.EventStreamConfig;
34  import org.wikimedia.eventutilities.core.event.WikimediaDefaults;
35  import org.wikimedia.gobblin.copy.KafkaSource;
36  
37  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
38  import lombok.extern.slf4j.Slf4j;
39  
40  @Slf4j
41  public abstract class WikimediaKafkaSource<S, D> extends KafkaSource<S, D> {
42  
43      private static final String TOPIC_INCLUDE_LIST = "topic.include";
44      private static final String TOPIC_EXCLUDE_LIST = "topic.exclude";
45      private static final String EVENT_STREAM_CONFIG_URI = "event_stream_config.uri";
46      private static final String EVENT_STREAM_CONFIG_IS_WMF_PROD = "event_stream_config.is_wmf_production";
47      private static final String EVENT_STREAM_CONFIG_STREAM_NAMES = "event_stream_config.stream_names";
48      private static final String EVENT_STREAM_CONFIG_SETTINGS_FILTERS = "event_stream_config.settings_filters";
49  
50      /**
51       * Finds Kafka topics to ingest.  Defaults to a literal include list.  In absence of that,
52       * uses Wikimedia's EventStreamConfig service to get a list of topics based on:
53       *  - a configured service URI
54       *  - a list of stream names to get topics from
55       *  - a list of settings filters in the form key1:value1,key2:value2
56       *
57       * @param state Work unit state
58       * @return List of configured topics
59       */
60      @Override
61      @SuppressFBWarnings(value = "SLF4J_FORMAT_SHOULD_BE_CONST", justification = "complex log message")
62      protected List<KafkaTopic> getFilteredTopics(SourceState state) {
63          // default to a literal include list
64          List<Pattern> excludeTopicsPatterns = DatasetFilterUtils.getPatternList(state, TOPIC_EXCLUDE_LIST);
65          List<Pattern> includeTopicPatterns = DatasetFilterUtils.getPatternList(state, TOPIC_INCLUDE_LIST);
66  
67          // fetch topics from Wikimedia's EventStreamConfig service
68          if (includeTopicPatterns.isEmpty()) {
69              if (state.contains(EVENT_STREAM_CONFIG_URI)) {
70                  final String configUri = state.getProp(EVENT_STREAM_CONFIG_URI);
71  
72                  // streamNames needs to be null if not defined, otherwise EventStreamConfig uses it
73                  // and returns an empty topics list.
74                  final List<String> streamNames;
75                  if (state.contains(EVENT_STREAM_CONFIG_STREAM_NAMES)) {
76                      streamNames = state.getPropAsList(EVENT_STREAM_CONFIG_STREAM_NAMES);
77                  } else {
78                      streamNames = null;
79                  }
80                  final List<String> settingsFilters = state.getPropAsList(EVENT_STREAM_CONFIG_SETTINGS_FILTERS, "");
81  
82                  String logMessage = "Getting " + TOPIC_INCLUDE_LIST + " from EventStreamConfig at " + configUri;
83                  if (streamNames != null) {
84                      logMessage += " for streams " + String.join(",", streamNames);
85                  }
86                  if (settingsFilters.isEmpty()) {
87                      logMessage += " with settings " + String.join(",", settingsFilters);
88                  }
89                  logMessage += ".";
90                  log.info(logMessage);
91  
92                  // Use WikimediaDefault EventStreamConfig if the provided uri matches
93                  // the default one. This allows for http-routes to be set correctly
94                  // in HTTP headers, and prevent having to use a proxy when querying
95                  // the service from WMF production servers.
96                  final EventStreamConfig eventStreamConfig;
97                  if (configUri.equals(WikimediaDefaults.EVENT_STREAM_CONFIG_URI) &&
98                          state.getPropAsBoolean(EVENT_STREAM_CONFIG_IS_WMF_PROD, false)) {
99                      // Get WikimediaDefaults eventstream config
100                     eventStreamConfig = WikimediaDefaults.EVENT_STREAM_CONFIG;
101                 } else {
102                     // Get an EventStreamConfig instance using eventStreamConfigUri.
103                     eventStreamConfig = EventStreamConfig.builder()
104                             .setEventStreamConfigLoader(configUri)
105                             // This is a hack to make EventStreamConfig work
106                             // EventStreamConfig expects a map to configure
107                             // to which service to produce event. As we're not
108                             // producing events but just reading config, an
109                             // empty map does the trick.
110                             .setEventServiceToUriMap(emptyMap())
111                             .build();
112                 }
113 
114                 // Get the list of topics matching the target streamNames and settingsFilters.
115                 List<String> includeTopics = eventStreamConfig.collectTopicsMatchingSettings(streamNames, settingsListToMap(settingsFilters));
116                 if (includeTopics.isEmpty()) {
117                     throw new RuntimeException("Failed to obtain topics to consume from EventStreamConfig at " + configUri);
118                 }
119                 includeTopicPatterns = DatasetFilterUtils.getPatternsFromStrings(includeTopics);
120             } else {
121                 log.warn("No topic configured (no include-list nor eventStreamConfig)");
122                 return Collections.emptyList();
123             }
124         } else {
125             log.info("Getting {} from include-list: {}", TOPIC_INCLUDE_LIST, state.getProp(TOPIC_INCLUDE_LIST));
126         }
127 
128         return kafkaConsumerClient.get().getFilteredTopics(excludeTopicsPatterns, includeTopicPatterns);
129     }
130 
131     protected Map<String, String> settingsListToMap(Collection<String> settingsFilters) {
132         return settingsFilters.stream()
133             .map(s -> s.split(":"))
134             .collect(toMap(kv -> kv[0], kv -> kv[1]));
135     }
136 }