ConsumerConfig.java
package org.wikimedia.discovery.cirrus.updater.consumer.config;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.wikimedia.discovery.cirrus.updater.common.config.ReadableConfigReader;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
@Builder
@Accessors(fluent = true)
@ParametersAreNonnullByDefault
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class ConsumerConfig {
public static final ConfigOption<String> PARAM_EVENT_STREAM_CONFIG_URL =
ConfigOptions.key("event-stream-config-url").stringType().noDefaultValue();
public static final ConfigOption<List<String>> PARAM_EVENT_STREAM_JSON_SCHEMA_URLS =
ConfigOptions.key("event-stream-json-schema-urls").stringType().asList().noDefaultValue();
public static final ConfigOption<Map<String, String>> PARAM_KAFKA_SOURCE_CONFIG =
ConfigOptions.key("kafka-source-config").mapType().noDefaultValue();
public static final ConfigOption<String> PARAM_KAFKA_SOURCE_START_TIME =
ConfigOptions.key("kafka-source-start-time").stringType().noDefaultValue();
public static final ConfigOption<String> PARAM_KAFKA_SOURCE_END_TIME =
ConfigOptions.key("kafka-source-end-time").stringType().noDefaultValue();
public static final ConfigOption<String> PARAM_UPDATE_STREAM =
ConfigOptions.key("update-stream").stringType().noDefaultValue();
public static final ConfigOption<List<String>> PARAM_ELASTICSEARCH_URLS =
ConfigOptions.key("elasticsearch-urls").stringType().asList().noDefaultValue();
public static final ConfigOption<Integer> PARAM_ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS =
ConfigOptions.key("elasticsearch-bulk-flush-max-actions").intType().defaultValue(0);
public static final ConfigOption<Integer> PARAM_ELASTICSEARCH_BULK_FLUSH_MAX_SIZE_MB =
ConfigOptions.key("elasticsearch-bulk-flush-max-size-mb").intType().noDefaultValue();
public static final ConfigOption<Long> PARAM_ELASTICSEARCH_BULK_FLUSH_INTERVAL =
ConfigOptions.key("elasticsearch-bulk-flush-interval")
.longType()
.noDefaultValue()
.withDescription("Interval in ms");
public static final ConfigOption<Integer> PARAM_ELASTICSEARCH_CONNECTION_TIMEOUT =
ConfigOptions.key("elasticsearch-connection-timeout")
.intType()
.noDefaultValue()
.withDescription("Interval in ms");
public static final ConfigOption<Integer> PARAM_ELASTICSEARCH_CONNECTION_REQUEST_TIMEOUT =
ConfigOptions.key("elasticsearch-connection-request-timeout")
.intType()
.noDefaultValue()
.withDescription("Interval in ms");
String eventStreamConfigUrl;
List<String> eventStreamJsonSchemaUrls;
Map<String, String> kafkaSourceConfig;
String kafkaSourceBootstrapServers;
String kafkaSourceGroupId;
@Nullable Instant kafkaSourceStartTime;
@Nullable Instant kafkaSourceEndTime;
String updateStream;
List<String> elasticSearchUrls;
Configuration configuration;
public static ConsumerConfig of(ParameterTool params) {
final Configuration configuration = params.getConfiguration();
return ConsumerConfig.builder()
.configuration(configuration)
.eventStreamConfigUrl(
ReadableConfigReader.getRequired(configuration, PARAM_EVENT_STREAM_CONFIG_URL))
.eventStreamJsonSchemaUrls(
ReadableConfigReader.getRequired(configuration, PARAM_EVENT_STREAM_JSON_SCHEMA_URLS))
.kafkaSourceConfig(
ReadableConfigReader.getRequired(configuration, PARAM_KAFKA_SOURCE_CONFIG))
.kafkaSourceBootstrapServers(
ReadableConfigReader.getRequiredMapEntry(
configuration,
PARAM_KAFKA_SOURCE_CONFIG,
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.kafkaSourceGroupId(
ReadableConfigReader.getRequiredMapEntry(
configuration,
PARAM_KAFKA_SOURCE_CONFIG,
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG))
.kafkaSourceStartTime(
ReadableConfigReader.getOptionalOrNull(
configuration, PARAM_KAFKA_SOURCE_START_TIME, Instant::parse))
.kafkaSourceEndTime(
ReadableConfigReader.getOptionalOrNull(
configuration, PARAM_KAFKA_SOURCE_END_TIME, Instant::parse))
.updateStream(ReadableConfigReader.getRequired(configuration, PARAM_UPDATE_STREAM))
.elasticSearchUrls(
ReadableConfigReader.getRequired(configuration, PARAM_ELASTICSEARCH_URLS))
.build();
}
public Properties kafkaSourceProperties() {
final Properties properties = new Properties();
properties.putAll(kafkaSourceConfig);
return properties;
}
public <T> Optional<T> optional(ConfigOption<T> option) {
return configuration.getOptional(option);
}
}