CreateTableLikeEventStreamCatalogConfiguration.java

package org.wikimedia.eventutilities.flink.table.catalog;

import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.STREAM_PREFIX;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;

public class CreateTableLikeEventStreamCatalogConfiguration extends EventStreamCatalogConfiguration {

    public CreateTableLikeEventStreamCatalogConfiguration(
        Configuration catalogOptions,
        Configuration tableOptions,
        Collection<ConfigOption<?>> pseudoTableOptions,
        Collection<ConfigOption<?>> overrideOptions,
        Collection<ConfigOption<?>> sharedOptions,
        Map<String, String> defaultTableOptions,
        Map<String, String> defaultKafkaOptions,
        Map<String, String> defaultFormatOptions,
        EventStreamFactory eventStreamFactory
    ) {
        super(
            catalogOptions,
            tableOptions,
            pseudoTableOptions,
            overrideOptions,
            sharedOptions,
            defaultTableOptions,
            defaultKafkaOptions,
            defaultFormatOptions,
            eventStreamFactory
        );
    }

    /**
     * Processes options for CREATE TABLE LIKE.
     *
     * CREATE TABLE LIKE tableOptions have already been processed once by
     * CREATE TABLE, so this handles removing options if the user switches
     * from kafka or event-json
     */
    @Override
    public Map<String, String> getProcessedOptions() {
        Map<String, String> processedOptions = new HashMap<>(this.catalogOptions.toMap());
        processedOptions.putAll(this.tableOptions.toMap());

        if (!isKafkaTable()) {
            this.defaultKafkaOptions.forEach(
                (k, v) -> processedOptions.entrySet().removeIf(
                    entry -> entry.getKey().equals(k)
                )
            );
            processedOptions.remove(KafkaConnectorOptions.TOPIC.key());
        } else {
            if (processedOptions.containsKey(STREAM_PREFIX.key())) {
                processedOptions.put(TOPIC.key(), this.get(STREAM_PREFIX) + "." + this.get(STREAM_NAME));
            }
        }

        if (!isEventJson()) {
            processedOptions.entrySet().removeIf(entry -> entry.getKey().startsWith(EventJsonFormatFactory.IDENTIFIER));
        }

        this.pseudoTableOptions.stream().map(ConfigOption::key).forEach(processedOptions::remove);
        processedOptions.entrySet().removeIf(entry -> entry.getValue() == null);

        return processedOptions;
    }

}