EventStreamCatalogConfiguration.java

package org.wikimedia.eventutilities.flink.table.catalog;

import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.SCHEMA_VERSION;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory;
import org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;

import lombok.AllArgsConstructor;

/**
 * A wrapper for all the configuration of the EventStreamCatalog.
 */
@AllArgsConstructor
public abstract class EventStreamCatalogConfiguration {

    /**
     * Option values that are passed in when first instantiating the catalog via CREATE CATALOG.
     */
    protected final Configuration catalogOptions;

    /**
     * Option values are passed in when instantiating a table within the catalog via CREATE TABLE.
     */
    protected final Configuration tableOptions;

    /**
     * List of options that can be passed in via the table but not used by the table.
     */
    protected final Collection<ConfigOption<?>> pseudoTableOptions;

    /**
     * List of options that can be overridden by pseudoTableOptions.
     */
    protected final Collection<ConfigOption<?>> overrideOptions;

    /**
     * List of options that are shared across the catalog and format.
     */
    protected final Collection<ConfigOption<?>> sharedOptions;
    protected Map<String, String> defaultTableOptions;
    protected Map<String, String> defaultKafkaOptions;
    protected Map<String, String> defaultFormatOptions;

    protected EventStreamFactory eventStreamFactory;

    /**
     * This should implement logic that produces a map of finalized table options
     * that is ready to be used in CatalogTable.of.
     */
    public abstract Map<String, String> getProcessedOptions();

    public boolean isKafkaTable() {
        String connector = this.tableOptions.getString(FactoryUtil.CONNECTOR, KafkaDynamicTableFactory.IDENTIFIER);
        return connector.equals(KafkaDynamicTableFactory.IDENTIFIER) ||
            connector.equals(UpsertKafkaDynamicTableFactory.IDENTIFIER);
    }

    public boolean isEventJson() {
        String format = this.tableOptions.getString(FactoryUtil.FORMAT, EventJsonFormatFactory.IDENTIFIER);
        return format.equals(EventJsonFormatFactory.IDENTIFIER);
    }

    /**
     * Gets values from Event Configuration.
     * This retrieves the correct values from catalog and table options.
     * It figures out overrides and format prefixes automatically.
     * It can be thought of retrieving options from an intermediate state
     * after overrides have been applied but before any options have been removed.
     *
     * @param option
     * @param <T>
     * @return
     */
    public <T> T get(ConfigOption<T> option) {
        boolean isOverride = this.overrideOptions.contains(option);
        boolean isShared = this.sharedOptions.contains(option);
        boolean isPseudoTable = this.pseudoTableOptions.contains(option);

        // Schema version should dynamically default to the latest version
        if (option.equals(SCHEMA_VERSION)) {
            option = option.withFallbackKeys(EventJsonFormatFactory.IDENTIFIER + "." + option.key());
            String val = tableOptions.getString((ConfigOption<String>) option, "");
            if (val.isEmpty() || val.equals("latest")) {
                EventStream eventStream = eventStreamFactory.createEventStream(this.get(STREAM_NAME));
                // T is always the type of SCHEMA_VERSION (which should be a string)
                return (T) eventStream.latestSchemaVersion();
            }
            return (T) val;
        }

        if (isShared) {
            // Option should also match with prefixed option
            option = option.withFallbackKeys(EventJsonFormatFactory.IDENTIFIER + "." + option.key());
        }

        if (isPseudoTable && isOverride) {
            // Need to reassign option to work in lambda function
            ConfigOption<T> finalOption = option;
            return tableOptions.getOptional(option).orElseGet(() -> catalogOptions.get(finalOption));
        }

        if (isPseudoTable) {
            return tableOptions.get(option);
        }

        if (isOverride) {
            ConfigOption<T> finalOption = option;
            return tableOptions.getOptional(option).orElseGet(() -> {
                Map<String, String> allDefaults = new HashMap<>(defaultTableOptions);
                allDefaults.putAll(defaultKafkaOptions);
                allDefaults.putAll(defaultFormatOptions);
                Configuration defaults = Configuration.fromMap(allDefaults);
                return defaults.get(finalOption);
            });
        }

        ConfigOption<T> finalOption = option;
        return tableOptions.getOptional(option).orElseGet(() -> catalogOptions.get(finalOption));
    }
}