EventStreamCatalogFactory.java
package org.wikimedia.eventutilities.flink.table.catalog;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.EVENT_SCHEMAS_BASE_URIS;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.EVENT_STREAM_CONFIG_URI;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.HTTP_ROUTES;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.WATERMARK_COLUMN;
import static org.wikimedia.eventutilities.flink.table.catalog.EventStreamCatalogFactoryOptions.WATERMARK_DELAY;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventStreamCatalogFactory implements CatalogFactory {
public static final String IDENTIFIER = "eventstream";
private static final Logger LOG = LoggerFactory.getLogger(EventStreamCatalogFactory.class);
@Override
public Catalog createCatalog(CatalogFactory.Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
LOG.debug("Creating event stream catalog...");
return new EventStreamCatalog(
context.getName(),
helper.getOptions().get(EVENT_SCHEMAS_BASE_URIS),
helper.getOptions().get(EVENT_STREAM_CONFIG_URI),
helper.getOptions().getOptional(HTTP_ROUTES).orElse(null),
context.getOptions()
);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS);
options.add(EVENT_SCHEMAS_BASE_URIS);
options.add(EVENT_STREAM_CONFIG_URI);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HTTP_ROUTES);
options.add(KafkaConnectorOptions.PROPS_GROUP_ID);
options.add(WATERMARK_COLUMN);
options.add(WATERMARK_DELAY);
return options;
}
}