EventStreamCatalog.java
package org.wikimedia.eventutilities.flink.table.catalog;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.DEFAULT_FORMAT_OPTIONS;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.FORWARD_OPTIONS;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.SCHEMA_VERSION;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.DEFAULT_OPTIONS;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.DEFAULT_KAFKA_OPTIONS;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.OVERRIDE_OPTIONS;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.PSEUDO_TABLE_OPTIONS;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.WATERMARK_COLUMN;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.WATERMARK_DELAY;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;
import org.wikimedia.eventutilities.flink.table.EventTableUtils;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
public class EventStreamCatalog extends AbstractEventStreamCatalog {
private static final Logger LOG = LoggerFactory.getLogger(EventStreamCatalog.class);
public static final String PLACEHOLDER = "_placeholder_";
private final List<String> eventSchemasBaseUris;
private final String eventStreamConfigUri;
private final Map<String, String> httpRoutes;
private EventStreamFactory eventStreamFactory;
private final Map<String, String> options;
private final EventStreamCatalogConfigurationFactory configurationFactory;
public EventStreamCatalog(
String name,
List<String> eventSchemasBaseUris,
String eventStreamConfigUri,
@Nullable Map<String, String> httpRoutes,
// TODO: do we need to keep all options? I think maybe FactoryUtil.CatalogFactoryHelper helps?
Map<String, String> options
) {
super(name);
this.eventSchemasBaseUris = eventSchemasBaseUris;
this.eventStreamConfigUri = eventStreamConfigUri;
this.httpRoutes = httpRoutes;
Map<String, String> defaultKafkaOptions = new HashMap<>(DEFAULT_KAFKA_OPTIONS);
defaultKafkaOptions.put(
KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS.key(),
options.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS.key())
);
this.options = options;
this.configurationFactory = EventStreamCatalogConfigurationFactory.builder()
.catalogOptions(this.options)
.defaultFormatOptions(DEFAULT_FORMAT_OPTIONS)
.defaultKafkaOptions(defaultKafkaOptions)
.defaultTableOptions(DEFAULT_OPTIONS)
.sharedOptions(FORWARD_OPTIONS)
.pseudoTableOptions(PSEUDO_TABLE_OPTIONS)
.overrideOptions(OVERRIDE_OPTIONS)
.build();
}
@Override
public void open() throws CatalogException {
this.eventStreamFactory = EventStreamFactory.from(
eventSchemasBaseUris,
eventStreamConfigUri,
httpRoutes
);
}
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
// There are no databases, but return true to pass sql verification
return true;
}
@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
List<String> tables = super.listTables(databaseName);
tables.addAll(eventStreamFactory.getEventStreamConfig().cachedStreamNames());
return tables;
}
@Override
public boolean tableExists(ObjectPath tablePath) {
if (eventStreamFactory.getEventStreamConfig().streamExists(tablePath.getObjectName())) {
return true;
}
return super.tableExists(tablePath);
}
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
if (eventStreamFactory.getEventStreamConfig().streamExists(tablePath.getObjectName())) {
throw new TableAlreadyExistException(this.getName(), tablePath);
}
// Table is a view. Views don't have options so bypass everything
if (table instanceof CatalogView) {
super.createTable(tablePath, table, ignoreIfExists);
return;
}
List<Schema.UnresolvedColumn> columns = table.getUnresolvedSchema().getColumns();
// If there isn't a _placeholder_ column, assume it's a CREATE LIKE
if (columns.stream().noneMatch(c -> c.getName().equalsIgnoreCase(PLACEHOLDER))) {
createTableLike(tablePath, table, ignoreIfExists);
} else {
createTableOfEventStream(tablePath, table, ignoreIfExists);
}
}
private void createTableLike(
ObjectPath tablePath,
CatalogBaseTable table,
boolean ignoreIfExists
) throws TableAlreadyExistException, DatabaseNotExistException {
LOG.debug("Creating custom table {}", tablePath.getObjectName());
Map<String, String> finalTableOptions = this.configurationFactory.
createTableLikeEventConfiguration(table.getOptions(), this.eventStreamFactory)
.getProcessedOptions();
CatalogBaseTable normalizedTable = CatalogTable.of(
table.getUnresolvedSchema(),
table.getComment(),
((ResolvedCatalogTable) table).getPartitionKeys(),
finalTableOptions
);
super.createTable(tablePath, normalizedTable, ignoreIfExists);
}
private void createTableOfEventStream(
ObjectPath tablePath,
CatalogBaseTable table,
boolean ignoreIfExists
) throws TableAlreadyExistException, DatabaseNotExistException {
LOG.debug("Creating table from event stream {}", tablePath.getObjectName());
EventStreamCatalogConfiguration config = this.configurationFactory.
createTableEventConfiguration(table.getOptions(), this.eventStreamFactory);
Map<String, String> finalTableOptions = config.getProcessedOptions();
EventStream eventStream = eventStreamFactory.createEventStream(config.get(STREAM_NAME));
List<Schema.UnresolvedColumn> incomingColumns = table.getUnresolvedSchema().getColumns().stream().filter(
column -> !column.getName().equals(PLACEHOLDER)
).collect(Collectors.toList());
CatalogBaseTable eventStreamTable;
if (config.isKafkaTable()) {
eventStreamTable = EventTableUtils.getTableOfEventStream(
eventStream,
config.get(SCHEMA_VERSION),
finalTableOptions,
table.getComment(),
incomingColumns,
config.get(WATERMARK_COLUMN),
config.get(WATERMARK_DELAY)
);
} else {
eventStreamTable = EventTableUtils.getTableOfEventStream(
eventStream,
config.get(SCHEMA_VERSION),
finalTableOptions,
table.getComment(),
incomingColumns
);
}
super.createTable(tablePath, eventStreamTable, ignoreIfExists);
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException {
if (super.tableExists(tablePath)) {
return super.getTable(tablePath);
}
String streamName = tablePath.getObjectName();
if (eventStreamFactory.getEventStreamConfig().streamExists(streamName)) {
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put(STREAM_NAME.key(), streamName);
tableOptions.put(SCHEMA_VERSION.key(), eventStreamFactory.createEventStream(streamName).latestSchemaVersion());
EventStream eventStream = eventStreamFactory.createEventStream(streamName);
Configuration config = Configuration.fromMap(this.options);
return EventTableUtils.getTableOfEventStream(
eventStream,
eventStream.latestSchemaVersion(),
this.configurationFactory
.createTableEventConfiguration(tableOptions, this.eventStreamFactory)
.getProcessedOptions(),
null,
null,
config.get(WATERMARK_COLUMN),
config.get(WATERMARK_DELAY)
);
}
throw new TableNotExistException(this.getName(), tablePath);
}
@Override
public void dropTable(ObjectPath objectPath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
if (super.tableExists(objectPath)) {
super.dropTable(objectPath, ignoreIfNotExists);
} else if (!ignoreIfNotExists) {
if (eventStreamFactory.getEventStreamConfig().streamExists(objectPath.getObjectName())) {
throw new CatalogException("Cannot drop an external table. " +
"You can only drop tables created with CREATE TABLE");
}
throw new TableNotExistException(getName(), objectPath);
}
}
@SneakyThrows
@Override
public void alterTable(
ObjectPath tablePath,
CatalogBaseTable catalogBaseTable,
boolean ignoreIfNotExists
) throws TableNotExistException, CatalogException {
if (eventStreamFactory.getEventStreamConfig().streamExists(tablePath.getObjectName())) {
throw new CatalogException("Cannot alter an external table. " +
"You can only alter tables created with CREATE TABLE");
}
if (!super.tableExists(tablePath)) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(this.getName(), tablePath);
}
return;
}
Map<String, String> currentTableOptions = catalogBaseTable.getOptions();
Map<String, String> cachedTableOptions = super.getTable(tablePath).getOptions();
Set<String> immutableOptions = new HashSet<>();
immutableOptions.add(STREAM_NAME.key());
immutableOptions.add(SCHEMA_VERSION.key());
MapDifference<String, String> diff = Maps.difference(currentTableOptions, cachedTableOptions);
Set<String> changedOptions = new HashSet<>(diff.entriesDiffering().keySet());
changedOptions.addAll(diff.entriesOnlyOnLeft().keySet());
changedOptions.addAll(diff.entriesOnlyOnRight().keySet());
LOG.debug("Altered options: {}", diff);
immutableOptions.forEach(o -> {
if (changedOptions.contains(o) ||
changedOptions.contains(EventJsonFormatFactory.IDENTIFIER + "." + o)) {
throw new CatalogException("Cannot alter option: " + o);
}
});
CatalogBaseTable alteredTable = CatalogTable.of(
catalogBaseTable.getUnresolvedSchema(),
catalogBaseTable.getComment(),
((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys(),
this.configurationFactory
.createTableEventConfiguration(currentTableOptions, this.eventStreamFactory)
.getProcessedOptions()
);
super.alterTable(tablePath, alteredTable, ignoreIfNotExists);
}
}