WikimediaKafkaSource.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.wikimedia.gobblin.kafka;

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.wikimedia.eventutilities.core.event.EventStreamConfig;
import org.wikimedia.eventutilities.core.event.WikimediaDefaults;
import org.wikimedia.gobblin.copy.KafkaSource;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class WikimediaKafkaSource<S, D> extends KafkaSource<S, D> {

    private static final String TOPIC_INCLUDE_LIST = "topic.include";
    private static final String TOPIC_EXCLUDE_LIST = "topic.exclude";
    private static final String EVENT_STREAM_CONFIG_URI = "event_stream_config.uri";
    private static final String EVENT_STREAM_CONFIG_IS_WMF_PROD = "event_stream_config.is_wmf_production";
    private static final String EVENT_STREAM_CONFIG_STREAM_NAMES = "event_stream_config.stream_names";
    private static final String EVENT_STREAM_CONFIG_SETTINGS_FILTERS = "event_stream_config.settings_filters";

    /**
     * Finds Kafka topics to ingest.  Defaults to a literal include list.  In absence of that,
     * uses Wikimedia's EventStreamConfig service to get a list of topics based on:
     *  - a configured service URI
     *  - a list of stream names to get topics from
     *  - a list of settings filters in the form key1:value1,key2:value2
     *
     * @param state Work unit state
     * @return List of configured topics
     */
    @Override
    @SuppressFBWarnings(value = "SLF4J_FORMAT_SHOULD_BE_CONST", justification = "complex log message")
    protected List<KafkaTopic> getFilteredTopics(SourceState state) {
        // default to a literal include list
        List<Pattern> excludeTopicsPatterns = DatasetFilterUtils.getPatternList(state, TOPIC_EXCLUDE_LIST);
        List<Pattern> includeTopicPatterns = DatasetFilterUtils.getPatternList(state, TOPIC_INCLUDE_LIST);

        // fetch topics from Wikimedia's EventStreamConfig service
        if (includeTopicPatterns.isEmpty()) {
            if (state.contains(EVENT_STREAM_CONFIG_URI)) {
                final String configUri = state.getProp(EVENT_STREAM_CONFIG_URI);

                // streamNames needs to be null if not defined, otherwise EventStreamConfig uses it
                // and returns an empty topics list.
                final List<String> streamNames;
                if (state.contains(EVENT_STREAM_CONFIG_STREAM_NAMES)) {
                    streamNames = state.getPropAsList(EVENT_STREAM_CONFIG_STREAM_NAMES);
                } else {
                    streamNames = null;
                }
                final List<String> settingsFilters = state.getPropAsList(EVENT_STREAM_CONFIG_SETTINGS_FILTERS, "");

                String logMessage = "Getting " + TOPIC_INCLUDE_LIST + " from EventStreamConfig at " + configUri;
                if (streamNames != null) {
                    logMessage += " for streams " + String.join(",", streamNames);
                }
                if (settingsFilters.isEmpty()) {
                    logMessage += " with settings " + String.join(",", settingsFilters);
                }
                logMessage += ".";
                log.info(logMessage);

                // Use WikimediaDefault EventStreamConfig if the provided uri matches
                // the default one. This allows for http-routes to be set correctly
                // in HTTP headers, and prevent having to use a proxy when querying
                // the service from WMF production servers.
                final EventStreamConfig eventStreamConfig;
                if (configUri.equals(WikimediaDefaults.EVENT_STREAM_CONFIG_URI) &&
                        state.getPropAsBoolean(EVENT_STREAM_CONFIG_IS_WMF_PROD, false)) {
                    // Get WikimediaDefaults eventstream config
                    eventStreamConfig = WikimediaDefaults.EVENT_STREAM_CONFIG;
                } else {
                    // Get an EventStreamConfig instance using eventStreamConfigUri.
                    eventStreamConfig = EventStreamConfig.builder()
                            .setEventStreamConfigLoader(configUri)
                            // This is a hack to make EventStreamConfig work
                            // EventStreamConfig expects a map to configure
                            // to which service to produce event. As we're not
                            // producing events but just reading config, an
                            // empty map does the trick.
                            .setEventServiceToUriMap(emptyMap())
                            .build();
                }

                // Get the list of topics matching the target streamNames and settingsFilters.
                List<String> includeTopics = eventStreamConfig.collectTopicsMatchingSettings(streamNames, settingsListToMap(settingsFilters));
                if (includeTopics.isEmpty()) {
                    throw new RuntimeException("Failed to obtain topics to consume from EventStreamConfig at " + configUri);
                }
                includeTopicPatterns = DatasetFilterUtils.getPatternsFromStrings(includeTopics);
            } else {
                log.warn("No topic configured (no include-list nor eventStreamConfig)");
                return Collections.emptyList();
            }
        } else {
            log.info("Getting {} from include-list: {}", TOPIC_INCLUDE_LIST, state.getProp(TOPIC_INCLUDE_LIST));
        }

        return kafkaConsumerClient.get().getFilteredTopics(excludeTopicsPatterns, includeTopicPatterns);
    }

    protected Map<String, String> settingsListToMap(Collection<String> settingsFilters) {
        return settingsFilters.stream()
            .map(s -> s.split(":"))
            .collect(toMap(kv -> kv[0], kv -> kv[1]));
    }
}