CreateTableEventStreamCatalogConfiguration.java
package org.wikimedia.eventutilities.flink.table.catalog;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.STREAM_PREFIX;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@SuppressFBWarnings("STT_TOSTRING_MAP_KEYING")
public class CreateTableEventStreamCatalogConfiguration extends EventStreamCatalogConfiguration {
public CreateTableEventStreamCatalogConfiguration(
Configuration catalogOptions,
Configuration tableOptions,
Collection<ConfigOption<?>> pseudoTableOptions,
Collection<ConfigOption<?>> overrideOptions,
Collection<ConfigOption<?>> sharedOptions,
Map<String, String> defaultTableOptions,
Map<String, String> defaultKafkaOptions,
Map<String, String> defaultFormatOptions,
EventStreamFactory eventStreamFactory
) {
super(
catalogOptions,
tableOptions,
pseudoTableOptions,
overrideOptions,
sharedOptions,
defaultTableOptions,
defaultKafkaOptions,
defaultFormatOptions,
eventStreamFactory
);
}
/**
* Processes options for CREATE TABLE.
*
* CREATE TABLE tableOptions are set by the user and should
* take precedence over any funky things the catalog does
*/
@Override
public Map<String, String> getProcessedOptions() {
Map<String, String> processedOptions = new HashMap<>(this.tableOptions.toMap());
this.defaultTableOptions.forEach(processedOptions::putIfAbsent);
if (isKafkaTable()) {
this.defaultKafkaOptions.forEach(processedOptions::putIfAbsent);
if (!this.tableOptions.containsKey(KafkaConnectorOptions.TOPIC.key())) {
if (processedOptions.containsKey(STREAM_PREFIX.key())) {
processedOptions.put(TOPIC.key(), this.get(STREAM_PREFIX) + "." + this.get(STREAM_NAME));
} else {
processedOptions.put(
TOPIC.key(),
String.join(";",
eventStreamFactory.createEventStream(
this.get(STREAM_NAME)
).topics()
)
);
}
}
}
if (isEventJson()) {
this.defaultFormatOptions.forEach(processedOptions::putIfAbsent);
Map<String, String> catalogOptionsMap = catalogOptions.toMap();
this.sharedOptions.forEach(o -> {
String value = catalogOptionsMap.getOrDefault(o.key(), processedOptions.get(o.key()));
processedOptions.remove(o.key());
String prefixedKey = EventJsonFormatFactory.IDENTIFIER + "." + o.key();
processedOptions.put(prefixedKey, value);
});
}
// Remove options not needed to create the table
// What happens if a table option just so happens to be the same name as a pseudoTableOption?
this.pseudoTableOptions.stream().map(ConfigOption::key).forEach(processedOptions::remove);
processedOptions.entrySet().removeIf(entry -> entry.getValue() == null);
return processedOptions;
}
}