ReadableConfigReader.java
package org.wikimedia.discovery.cirrus.updater.common.config;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import lombok.experimental.UtilityClass;
@UtilityClass
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
public class ReadableConfigReader {
public static IllegalArgumentException missingArgument(ConfigOption<?> option) {
return new IllegalArgumentException("Missing required argument for " + option);
}
public static <T> T getRequired(ReadableConfig configuration, ConfigOption<T> option) {
return configuration.getOptional(option).orElseThrow(() -> missingArgument(option));
}
public static <T> T getOptionalOrNull(ReadableConfig configuration, ConfigOption<T> option) {
return getOptionalOrNull(configuration, option, Function.identity());
}
public static <T, R> R getOptionalOrNull(
ReadableConfig configuration, ConfigOption<T> option, Function<T, R> mapper) {
return configuration.getOptional(option).map(mapper).orElse(null);
}
public static Optional<String> getOptionalMapEntry(
ReadableConfig configuration, ConfigOption<Map<String, String>> option, String key) {
return configuration.getOptional(option).map(map -> map.get(key));
}
public static String getRequiredMapEntry(
ReadableConfig configuration, ConfigOption<Map<String, String>> option, String key) {
return configuration
.getOptional(option)
.map(map -> map.get(key))
.orElseThrow(
() ->
missingArgument(
ConfigOptions.key(option.key() + "." + key).stringType().noDefaultValue()));
}
}