ConsumerConfig.java

package org.wikimedia.discovery.cirrus.updater.consumer.config;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.wikimedia.discovery.cirrus.updater.common.config.ReadableConfigReader;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Value
@Builder
@Accessors(fluent = true)
@ParametersAreNonnullByDefault
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class ConsumerConfig {

    public static final ConfigOption<String> PARAM_EVENT_STREAM_CONFIG_URL =
            ConfigOptions.key("event-stream-config-url").stringType().noDefaultValue();
    public static final ConfigOption<List<String>> PARAM_EVENT_STREAM_JSON_SCHEMA_URLS =
            ConfigOptions.key("event-stream-json-schema-urls").stringType().asList().noDefaultValue();
    public static final ConfigOption<Map<String, String>> PARAM_KAFKA_SOURCE_CONFIG =
            ConfigOptions.key("kafka-source-config").mapType().noDefaultValue();
    public static final ConfigOption<String> PARAM_KAFKA_SOURCE_START_TIME =
            ConfigOptions.key("kafka-source-start-time").stringType().noDefaultValue();
    public static final ConfigOption<String> PARAM_KAFKA_SOURCE_END_TIME =
            ConfigOptions.key("kafka-source-end-time").stringType().noDefaultValue();
    public static final ConfigOption<String> PARAM_UPDATE_STREAM =
            ConfigOptions.key("update-stream").stringType().noDefaultValue();
    public static final ConfigOption<List<String>> PARAM_ELASTICSEARCH_URLS =
            ConfigOptions.key("elasticsearch-urls").stringType().asList().noDefaultValue();

    public static final ConfigOption<Integer> PARAM_ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS =
            ConfigOptions.key("elasticsearch-bulk-flush-max-actions").intType().defaultValue(0);

    public static final ConfigOption<Integer> PARAM_ELASTICSEARCH_BULK_FLUSH_MAX_SIZE_MB =
            ConfigOptions.key("elasticsearch-bulk-flush-max-size-mb").intType().noDefaultValue();

    public static final ConfigOption<Long> PARAM_ELASTICSEARCH_BULK_FLUSH_INTERVAL =
            ConfigOptions.key("elasticsearch-bulk-flush-interval")
                    .longType()
                    .noDefaultValue()
                    .withDescription("Interval in ms");

    public static final ConfigOption<Integer> PARAM_ELASTICSEARCH_CONNECTION_TIMEOUT =
            ConfigOptions.key("elasticsearch-connection-timeout")
                    .intType()
                    .noDefaultValue()
                    .withDescription("Interval in ms");
    public static final ConfigOption<Integer> PARAM_ELASTICSEARCH_CONNECTION_REQUEST_TIMEOUT =
            ConfigOptions.key("elasticsearch-connection-request-timeout")
                    .intType()
                    .noDefaultValue()
                    .withDescription("Interval in ms");

    String eventStreamConfigUrl;
    List<String> eventStreamJsonSchemaUrls;

    Map<String, String> kafkaSourceConfig;
    String kafkaSourceBootstrapServers;
    String kafkaSourceGroupId;

    @Nullable Instant kafkaSourceStartTime;
    @Nullable Instant kafkaSourceEndTime;

    String updateStream;

    List<String> elasticSearchUrls;

    Configuration configuration;

    public static ConsumerConfig of(ParameterTool params) {
        final Configuration configuration = params.getConfiguration();

        return ConsumerConfig.builder()
                .configuration(configuration)
                .eventStreamConfigUrl(
                        ReadableConfigReader.getRequired(configuration, PARAM_EVENT_STREAM_CONFIG_URL))
                .eventStreamJsonSchemaUrls(
                        ReadableConfigReader.getRequired(configuration, PARAM_EVENT_STREAM_JSON_SCHEMA_URLS))
                .kafkaSourceConfig(
                        ReadableConfigReader.getRequired(configuration, PARAM_KAFKA_SOURCE_CONFIG))
                .kafkaSourceBootstrapServers(
                        ReadableConfigReader.getRequiredMapEntry(
                                configuration,
                                PARAM_KAFKA_SOURCE_CONFIG,
                                org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
                .kafkaSourceGroupId(
                        ReadableConfigReader.getRequiredMapEntry(
                                configuration,
                                PARAM_KAFKA_SOURCE_CONFIG,
                                org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG))
                .kafkaSourceStartTime(
                        ReadableConfigReader.getOptionalOrNull(
                                configuration, PARAM_KAFKA_SOURCE_START_TIME, Instant::parse))
                .kafkaSourceEndTime(
                        ReadableConfigReader.getOptionalOrNull(
                                configuration, PARAM_KAFKA_SOURCE_END_TIME, Instant::parse))
                .updateStream(ReadableConfigReader.getRequired(configuration, PARAM_UPDATE_STREAM))
                .elasticSearchUrls(
                        ReadableConfigReader.getRequired(configuration, PARAM_ELASTICSEARCH_URLS))
                .build();
    }

    public Properties kafkaSourceProperties() {
        final Properties properties = new Properties();
        properties.putAll(kafkaSourceConfig);
        return properties;
    }

    public <T> Optional<T> optional(ConfigOption<T> option) {
        return configuration.getOptional(option);
    }
}