CreateTableLikeEventStreamCatalogConfiguration.java
package org.wikimedia.eventutilities.flink.table.catalog;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.STREAM_PREFIX;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;
public class CreateTableLikeEventStreamCatalogConfiguration extends EventStreamCatalogConfiguration {
public CreateTableLikeEventStreamCatalogConfiguration(
Configuration catalogOptions,
Configuration tableOptions,
Collection<ConfigOption<?>> pseudoTableOptions,
Collection<ConfigOption<?>> overrideOptions,
Collection<ConfigOption<?>> sharedOptions,
Map<String, String> defaultTableOptions,
Map<String, String> defaultKafkaOptions,
Map<String, String> defaultFormatOptions,
EventStreamFactory eventStreamFactory
) {
super(
catalogOptions,
tableOptions,
pseudoTableOptions,
overrideOptions,
sharedOptions,
defaultTableOptions,
defaultKafkaOptions,
defaultFormatOptions,
eventStreamFactory
);
}
/**
* Processes options for CREATE TABLE LIKE.
*
* CREATE TABLE LIKE tableOptions have already been processed once by
* CREATE TABLE, so this handles removing options if the user switches
* from kafka or event-json
*/
@Override
public Map<String, String> getProcessedOptions() {
Map<String, String> processedOptions = new HashMap<>(this.catalogOptions.toMap());
processedOptions.putAll(this.tableOptions.toMap());
if (!isKafkaTable()) {
this.defaultKafkaOptions.forEach(
(k, v) -> processedOptions.entrySet().removeIf(
entry -> entry.getKey().equals(k)
)
);
processedOptions.remove(KafkaConnectorOptions.TOPIC.key());
} else {
if (processedOptions.containsKey(STREAM_PREFIX.key())) {
processedOptions.put(TOPIC.key(), this.get(STREAM_PREFIX) + "." + this.get(STREAM_NAME));
}
}
if (!isEventJson()) {
processedOptions.entrySet().removeIf(entry -> entry.getKey().startsWith(EventJsonFormatFactory.IDENTIFIER));
}
this.pseudoTableOptions.stream().map(ConfigOption::key).forEach(processedOptions::remove);
processedOptions.entrySet().removeIf(entry -> entry.getValue() == null);
return processedOptions;
}
}