EventStreamCatalog.java

package org.wikimedia.eventutilities.flink.table.catalog;

import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.DEFAULT_FORMAT_OPTIONS;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.FORWARD_OPTIONS;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.SCHEMA_VERSION;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.DEFAULT_OPTIONS;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.DEFAULT_KAFKA_OPTIONS;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.OVERRIDE_OPTIONS;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.PSEUDO_TABLE_OPTIONS;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.WATERMARK_COLUMN;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.WATERMARK_DELAY;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;
import org.wikimedia.eventutilities.flink.table.EventTableUtils;

import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;

import lombok.SneakyThrows;

public class EventStreamCatalog extends AbstractEventStreamCatalog {

    private static final Logger LOG = LoggerFactory.getLogger(EventStreamCatalog.class);

    public static final String PLACEHOLDER = "_placeholder_";

    private final List<String> eventSchemasBaseUris;

    private final String eventStreamConfigUri;

    private final Map<String, String> httpRoutes;

    private EventStreamFactory eventStreamFactory;

    private final Map<String, String> options;

    private final EventStreamCatalogConfigurationFactory configurationFactory;

    public EventStreamCatalog(
        String name,
        List<String> eventSchemasBaseUris,
        String eventStreamConfigUri,
        @Nullable Map<String, String> httpRoutes,
        // TODO: do we need to keep all options?  I think maybe FactoryUtil.CatalogFactoryHelper helps?
        Map<String, String> options
    ) {
        super(name);

        this.eventSchemasBaseUris = eventSchemasBaseUris;
        this.eventStreamConfigUri = eventStreamConfigUri;
        this.httpRoutes = httpRoutes;

        Map<String, String> defaultKafkaOptions = new HashMap<>(DEFAULT_KAFKA_OPTIONS);
        defaultKafkaOptions.put(
            KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS.key(),
            options.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS.key())
        );

        this.options = options;

        this.configurationFactory = EventStreamCatalogConfigurationFactory.builder()
            .catalogOptions(this.options)
            .defaultFormatOptions(DEFAULT_FORMAT_OPTIONS)
            .defaultKafkaOptions(defaultKafkaOptions)
            .defaultTableOptions(DEFAULT_OPTIONS)
            .sharedOptions(FORWARD_OPTIONS)
            .pseudoTableOptions(PSEUDO_TABLE_OPTIONS)
            .overrideOptions(OVERRIDE_OPTIONS)
            .build();
    }


    @Override
    public void open() throws CatalogException {
        this.eventStreamFactory = EventStreamFactory.from(
            eventSchemasBaseUris,
            eventStreamConfigUri,
            httpRoutes
        );
    }

    @Override
    public boolean databaseExists(String databaseName) throws CatalogException {
        // There are no databases, but return true to pass sql verification
        return true;
    }

    @Override
    public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
        List<String> tables = super.listTables(databaseName);
        tables.addAll(eventStreamFactory.getEventStreamConfig().cachedStreamNames());
        return tables;
    }

    @Override
    public boolean tableExists(ObjectPath tablePath) {
        if (eventStreamFactory.getEventStreamConfig().streamExists(tablePath.getObjectName())) {
            return true;
        }
        return super.tableExists(tablePath);
    }

    @Override
    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
        throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        if (eventStreamFactory.getEventStreamConfig().streamExists(tablePath.getObjectName())) {
            throw new TableAlreadyExistException(this.getName(), tablePath);
        }

        // Table is a view. Views don't have options so bypass everything
        if (table instanceof CatalogView) {
            super.createTable(tablePath, table, ignoreIfExists);
            return;
        }

        List<Schema.UnresolvedColumn> columns = table.getUnresolvedSchema().getColumns();

        // If there isn't a _placeholder_ column, assume it's a CREATE LIKE
        if (columns.stream().noneMatch(c -> c.getName().equalsIgnoreCase(PLACEHOLDER))) {
            createTableLike(tablePath, table, ignoreIfExists);
        } else {
            createTableOfEventStream(tablePath, table, ignoreIfExists);
        }
    }

    private void createTableLike(
        ObjectPath tablePath,
        CatalogBaseTable table,
        boolean ignoreIfExists
    ) throws TableAlreadyExistException, DatabaseNotExistException {
        LOG.debug("Creating custom table {}", tablePath.getObjectName());

        Map<String, String> finalTableOptions = this.configurationFactory.
            createTableLikeEventConfiguration(table.getOptions(), this.eventStreamFactory)
            .getProcessedOptions();
        CatalogBaseTable normalizedTable = CatalogTable.of(
            table.getUnresolvedSchema(),
            table.getComment(),
            ((ResolvedCatalogTable) table).getPartitionKeys(),
            finalTableOptions
        );
        super.createTable(tablePath, normalizedTable, ignoreIfExists);
    }

    private void createTableOfEventStream(
        ObjectPath tablePath,
        CatalogBaseTable table,
        boolean ignoreIfExists
    ) throws TableAlreadyExistException, DatabaseNotExistException {
        LOG.debug("Creating table from event stream {}", tablePath.getObjectName());

        EventStreamCatalogConfiguration config = this.configurationFactory.
            createTableEventConfiguration(table.getOptions(), this.eventStreamFactory);
        Map<String, String> finalTableOptions = config.getProcessedOptions();

        EventStream eventStream = eventStreamFactory.createEventStream(config.get(STREAM_NAME));

        List<Schema.UnresolvedColumn> incomingColumns = table.getUnresolvedSchema().getColumns().stream().filter(
            column -> !column.getName().equals(PLACEHOLDER)
        ).collect(Collectors.toList());

        CatalogBaseTable eventStreamTable;
        if (config.isKafkaTable()) {
            eventStreamTable = EventTableUtils.getTableOfEventStream(
                eventStream,
                config.get(SCHEMA_VERSION),
                finalTableOptions,
                table.getComment(),
                incomingColumns,
                config.get(WATERMARK_COLUMN),
                config.get(WATERMARK_DELAY)
            );
        } else {
            eventStreamTable = EventTableUtils.getTableOfEventStream(
                eventStream,
                config.get(SCHEMA_VERSION),
                finalTableOptions,
                table.getComment(),
                incomingColumns
            );
        }

        super.createTable(tablePath, eventStreamTable, ignoreIfExists);
    }

    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException {
        if (super.tableExists(tablePath)) {
            return super.getTable(tablePath);
        }

        String streamName = tablePath.getObjectName();

        if (eventStreamFactory.getEventStreamConfig().streamExists(streamName)) {
            Map<String, String> tableOptions = new HashMap<>();
            tableOptions.put(STREAM_NAME.key(), streamName);
            tableOptions.put(SCHEMA_VERSION.key(), eventStreamFactory.createEventStream(streamName).latestSchemaVersion());

            EventStream eventStream = eventStreamFactory.createEventStream(streamName);
            Configuration config = Configuration.fromMap(this.options);

            return EventTableUtils.getTableOfEventStream(
                eventStream,
                eventStream.latestSchemaVersion(),
                this.configurationFactory
                    .createTableEventConfiguration(tableOptions, this.eventStreamFactory)
                    .getProcessedOptions(),
                null,
                null,
                config.get(WATERMARK_COLUMN),
                config.get(WATERMARK_DELAY)
            );
        }

        throw new TableNotExistException(this.getName(), tablePath);
    }

    @Override
    public void dropTable(ObjectPath objectPath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
        if (super.tableExists(objectPath)) {
            super.dropTable(objectPath, ignoreIfNotExists);
        } else if (!ignoreIfNotExists) {
            if (eventStreamFactory.getEventStreamConfig().streamExists(objectPath.getObjectName())) {
                throw new CatalogException("Cannot drop an external table. " +
                    "You can only drop tables created with CREATE TABLE");
            }
            throw new TableNotExistException(getName(), objectPath);
        }
    }

    @SneakyThrows
    @Override
    public void alterTable(
        ObjectPath tablePath,
        CatalogBaseTable catalogBaseTable,
        boolean ignoreIfNotExists
    ) throws TableNotExistException, CatalogException {
        if (eventStreamFactory.getEventStreamConfig().streamExists(tablePath.getObjectName())) {
            throw new CatalogException("Cannot alter an external table. " +
                "You can only alter tables created with CREATE TABLE");
        }

        if (!super.tableExists(tablePath)) {
            if (!ignoreIfNotExists) {
                throw new TableNotExistException(this.getName(), tablePath);
            }
            return;
        }

        Map<String, String> currentTableOptions = catalogBaseTable.getOptions();
        Map<String, String> cachedTableOptions = super.getTable(tablePath).getOptions();
        Set<String> immutableOptions = new HashSet<>();
        immutableOptions.add(STREAM_NAME.key());
        immutableOptions.add(SCHEMA_VERSION.key());

        MapDifference<String, String> diff = Maps.difference(currentTableOptions, cachedTableOptions);
        Set<String> changedOptions = new HashSet<>(diff.entriesDiffering().keySet());
        changedOptions.addAll(diff.entriesOnlyOnLeft().keySet());
        changedOptions.addAll(diff.entriesOnlyOnRight().keySet());

        LOG.debug("Altered options: {}", diff);

        immutableOptions.forEach(o -> {
            if (changedOptions.contains(o) ||
                changedOptions.contains(EventJsonFormatFactory.IDENTIFIER + "." + o)) {
                throw new CatalogException("Cannot alter option: " + o);
            }
        });

        CatalogBaseTable alteredTable = CatalogTable.of(
            catalogBaseTable.getUnresolvedSchema(),
            catalogBaseTable.getComment(),
            ((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys(),
            this.configurationFactory
                .createTableEventConfiguration(currentTableOptions, this.eventStreamFactory)
                .getProcessedOptions()
        );

        super.alterTable(tablePath, alteredTable, ignoreIfNotExists);
    }

}