EventStreamCatalogConfiguration.java
package org.wikimedia.eventutilities.flink.table.catalog;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.SCHEMA_VERSION;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory;
import org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;
import lombok.AllArgsConstructor;
/**
* A wrapper for all the configuration of the EventStreamCatalog.
*/
@AllArgsConstructor
public abstract class EventStreamCatalogConfiguration {
/**
* Option values that are passed in when first instantiating the catalog via CREATE CATALOG.
*/
protected final Configuration catalogOptions;
/**
* Option values are passed in when instantiating a table within the catalog via CREATE TABLE.
*/
protected final Configuration tableOptions;
/**
* List of options that can be passed in via the table but not used by the table.
*/
protected final Collection<ConfigOption<?>> pseudoTableOptions;
/**
* List of options that can be overridden by pseudoTableOptions.
*/
protected final Collection<ConfigOption<?>> overrideOptions;
/**
* List of options that are shared across the catalog and format.
*/
protected final Collection<ConfigOption<?>> sharedOptions;
protected Map<String, String> defaultTableOptions;
protected Map<String, String> defaultKafkaOptions;
protected Map<String, String> defaultFormatOptions;
protected EventStreamFactory eventStreamFactory;
/**
* This should implement logic that produces a map of finalized table options
* that is ready to be used in CatalogTable.of.
*/
public abstract Map<String, String> getProcessedOptions();
public boolean isKafkaTable() {
String connector = this.tableOptions.getString(FactoryUtil.CONNECTOR, KafkaDynamicTableFactory.IDENTIFIER);
return connector.equals(KafkaDynamicTableFactory.IDENTIFIER) ||
connector.equals(UpsertKafkaDynamicTableFactory.IDENTIFIER);
}
public boolean isEventJson() {
String format = this.tableOptions.getString(FactoryUtil.FORMAT, EventJsonFormatFactory.IDENTIFIER);
return format.equals(EventJsonFormatFactory.IDENTIFIER);
}
/**
* Gets values from Event Configuration.
* This retrieves the correct values from catalog and table options.
* It figures out overrides and format prefixes automatically.
* It can be thought of retrieving options from an intermediate state
* after overrides have been applied but before any options have been removed.
*
* @param option
* @param <T>
* @return
*/
public <T> T get(ConfigOption<T> option) {
boolean isOverride = this.overrideOptions.contains(option);
boolean isShared = this.sharedOptions.contains(option);
boolean isPseudoTable = this.pseudoTableOptions.contains(option);
// Schema version should dynamically default to the latest version
if (option.equals(SCHEMA_VERSION)) {
option = option.withFallbackKeys(EventJsonFormatFactory.IDENTIFIER + "." + option.key());
String val = tableOptions.getString((ConfigOption<String>) option, "");
if (val.isEmpty() || val.equals("latest")) {
EventStream eventStream = eventStreamFactory.createEventStream(this.get(STREAM_NAME));
// T is always the type of SCHEMA_VERSION (which should be a string)
return (T) eventStream.latestSchemaVersion();
}
return (T) val;
}
if (isShared) {
// Option should also match with prefixed option
option = option.withFallbackKeys(EventJsonFormatFactory.IDENTIFIER + "." + option.key());
}
if (isPseudoTable && isOverride) {
// Need to reassign option to work in lambda function
ConfigOption<T> finalOption = option;
return tableOptions.getOptional(option).orElseGet(() -> catalogOptions.get(finalOption));
}
if (isPseudoTable) {
return tableOptions.get(option);
}
if (isOverride) {
ConfigOption<T> finalOption = option;
return tableOptions.getOptional(option).orElseGet(() -> {
Map<String, String> allDefaults = new HashMap<>(defaultTableOptions);
allDefaults.putAll(defaultKafkaOptions);
allDefaults.putAll(defaultFormatOptions);
Configuration defaults = Configuration.fromMap(allDefaults);
return defaults.get(finalOption);
});
}
ConfigOption<T> finalOption = option;
return tableOptions.getOptional(option).orElseGet(() -> catalogOptions.get(finalOption));
}
}