ReadableConfigReader.java

package org.wikimedia.discovery.cirrus.updater.common.config;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;

import lombok.experimental.UtilityClass;

@UtilityClass
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
public class ReadableConfigReader {

    public static IllegalArgumentException missingArgument(ConfigOption<?> option) {
        return new IllegalArgumentException("Missing required argument for " + option);
    }

    public static <T> T getRequired(ReadableConfig configuration, ConfigOption<T> option) {
        return configuration.getOptional(option).orElseThrow(() -> missingArgument(option));
    }

    public static <T> T getOptionalOrNull(ReadableConfig configuration, ConfigOption<T> option) {
        return getOptionalOrNull(configuration, option, Function.identity());
    }

    public static <T, R> R getOptionalOrNull(
            ReadableConfig configuration, ConfigOption<T> option, Function<T, R> mapper) {
        return configuration.getOptional(option).map(mapper).orElse(null);
    }

    public static Optional<String> getOptionalMapEntry(
            ReadableConfig configuration, ConfigOption<Map<String, String>> option, String key) {
        return configuration.getOptional(option).map(map -> map.get(key));
    }

    public static String getRequiredMapEntry(
            ReadableConfig configuration, ConfigOption<Map<String, String>> option, String key) {
        return configuration
                .getOptional(option)
                .map(map -> map.get(key))
                .orElseThrow(
                        () ->
                                missingArgument(
                                        ConfigOptions.key(option.key() + "." + key).stringType().noDefaultValue()));
    }
}