EventStreamCatalogFactory.java

package org.wikimedia.eventutilities.flink.table.catalog;

import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.EVENT_SCHEMAS_BASE_URIS;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.EVENT_STREAM_CONFIG_URI;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.HTTP_ROUTES;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.WATERMARK_COLUMN;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.WATERMARK_DELAY;

import java.util.HashSet;
import java.util.Set;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventStreamCatalogFactory implements CatalogFactory {

    public static final String IDENTIFIER = "eventstream";
    private static final Logger LOG = LoggerFactory.getLogger(EventStreamCatalogFactory.class);

    @Override
    public Catalog createCatalog(CatalogFactory.Context context) {
        final FactoryUtil.CatalogFactoryHelper helper =
            FactoryUtil.createCatalogFactoryHelper(this, context);
        helper.validate();
        LOG.debug("Creating event stream catalog...");
        return new EventStreamCatalog(
            context.getName(),
            helper.getOptions().get(EVENT_SCHEMAS_BASE_URIS),
            helper.getOptions().get(EVENT_STREAM_CONFIG_URI),
            helper.getOptions().getOptional(HTTP_ROUTES).orElse(null),
            context.getOptions()
        );
    }

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS);
        options.add(EVENT_SCHEMAS_BASE_URIS);
        options.add(EVENT_STREAM_CONFIG_URI);
        return options;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HTTP_ROUTES);
        options.add(KafkaConnectorOptions.PROPS_GROUP_ID);
        options.add(WATERMARK_COLUMN);
        options.add(WATERMARK_DELAY);
        return options;
    }
}