EventStreamCatalogFactoryOptions.java
package org.wikimedia.eventutilities.flink.table.catalog;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.SCHEMA_VERSION;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;
import java.util.UUID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
public final class EventStreamCatalogFactoryOptions {
public static final ConfigOption<String> STREAM_PREFIX =
ConfigOptions.key("kafka-topic-prefix")
.stringType()
.noDefaultValue()
.withDescription(
"The prefix to attach to the event stream name when using the Kafka connector."
);
public static final ConfigOption<String> WATERMARK_COLUMN =
ConfigOptions.key("kafka-watermark-column")
.stringType()
.defaultValue("kafka_timestamp")
.withDescription(
"The column name to use for Kafka watermarks." +
"This virtual column is automatically applied to every Kafka table in the catalog and should not be overwritten."
);
public static final ConfigOption<Integer> WATERMARK_DELAY =
ConfigOptions.key("kafka-watermark-delay")
.intType()
.defaultValue(10)
.withDescription(
"The number of seconds to delay the Kafka watermark by."
);
/**
* Default options that are applied to every table that is created.
*/
public static final ImmutableMap<String, String> DEFAULT_OPTIONS = ImmutableMap.<String, String>builder()
.put(FactoryUtil.FORMAT.key(), EventJsonFormatFactory.IDENTIFIER)
.put(FactoryUtil.CONNECTOR.key(), KafkaDynamicTableFactory.IDENTIFIER)
.build();
/**
* Default options that are applied to every kafka table that is created.
*/
public static final ImmutableMap<String, String> DEFAULT_KAFKA_OPTIONS = ImmutableMap.<String, String>builder()
.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), "latest-offset")
.put(KafkaConnectorOptions.PROPS_GROUP_ID.key(), "flink_event_stream_catalog." + UUID.randomUUID().toString())
.build();
/**
* Options that can be overridden by options passed in through the table.
*/
public static final ImmutableList<ConfigOption<?>> OVERRIDE_OPTIONS = ImmutableList.<ConfigOption<?>>builder()
.add(STREAM_NAME)
.add(SCHEMA_VERSION)
.add(WATERMARK_COLUMN)
.add(WATERMARK_DELAY)
.build();
/**
* Options that are provided through the table but only used by the catalog.
*/
public static final ImmutableList<ConfigOption<?>> PSEUDO_TABLE_OPTIONS = ImmutableList.<ConfigOption<?>>builder()
.add(STREAM_PREFIX)
.add(STREAM_NAME)
.add(SCHEMA_VERSION)
.add(WATERMARK_COLUMN)
.add(WATERMARK_DELAY)
.build();
private EventStreamCatalogFactoryOptions() {
}
}