CreateTableEventStreamCatalogConfiguration.java

package org.wikimedia.eventutilities.flink.table.catalog;

import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.STREAM_PREFIX;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

@SuppressFBWarnings("STT_TOSTRING_MAP_KEYING")
public class CreateTableEventStreamCatalogConfiguration extends EventStreamCatalogConfiguration {

    public CreateTableEventStreamCatalogConfiguration(
        Configuration catalogOptions,
        Configuration tableOptions,
        Collection<ConfigOption<?>> pseudoTableOptions,
        Collection<ConfigOption<?>> overrideOptions,
        Collection<ConfigOption<?>> sharedOptions,
        Map<String, String> defaultTableOptions,
        Map<String, String> defaultKafkaOptions,
        Map<String, String> defaultFormatOptions,
        EventStreamFactory eventStreamFactory
    ) {
        super(
            catalogOptions,
            tableOptions,
            pseudoTableOptions,
            overrideOptions,
            sharedOptions,
            defaultTableOptions,
            defaultKafkaOptions,
            defaultFormatOptions,
            eventStreamFactory
        );
    }

    /**
     * Processes options for CREATE TABLE.
     *
     * CREATE TABLE tableOptions are set by the user and should
     * take precedence over any funky things the catalog does
     */
    @Override
    public Map<String, String> getProcessedOptions() {
        Map<String, String> processedOptions = new HashMap<>(this.tableOptions.toMap());
        this.defaultTableOptions.forEach(processedOptions::putIfAbsent);

        if (isKafkaTable()) {
            this.defaultKafkaOptions.forEach(processedOptions::putIfAbsent);
            if (!this.tableOptions.containsKey(KafkaConnectorOptions.TOPIC.key())) {
                if (processedOptions.containsKey(STREAM_PREFIX.key())) {
                    processedOptions.put(TOPIC.key(), this.get(STREAM_PREFIX) + "." + this.get(STREAM_NAME));
                } else {
                    processedOptions.put(
                        TOPIC.key(),
                        String.join(";",
                            eventStreamFactory.createEventStream(
                                this.get(STREAM_NAME)
                            ).topics()
                        )
                    );
                }
            }
        }

        if (isEventJson()) {
            this.defaultFormatOptions.forEach(processedOptions::putIfAbsent);
            Map<String, String> catalogOptionsMap = catalogOptions.toMap();

            this.sharedOptions.forEach(o -> {
                String value = catalogOptionsMap.getOrDefault(o.key(), processedOptions.get(o.key()));
                processedOptions.remove(o.key());
                String prefixedKey = EventJsonFormatFactory.IDENTIFIER + "." + o.key();
                processedOptions.put(prefixedKey, value);
            });
        }

        // Remove options not needed to create the table
        // What happens if a table option just so happens to be the same name as a pseudoTableOption?
        this.pseudoTableOptions.stream().map(ConfigOption::key).forEach(processedOptions::remove);
        processedOptions.entrySet().removeIf(entry -> entry.getValue() == null);

        return processedOptions;
    }

}