EventStreamCatalogFactoryOptions.java

package org.wikimedia.eventutilities.flink.table.catalog;

import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.SCHEMA_VERSION;
import static org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatOptions.STREAM_NAME;

import java.util.UUID;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.wikimedia.eventutilities.flink.formats.json.EventJsonFormatFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public final class EventStreamCatalogFactoryOptions {

    public static final ConfigOption<String> STREAM_PREFIX =
        ConfigOptions.key("kafka-topic-prefix")
            .stringType()
            .noDefaultValue()
            .withDescription(
                "The prefix to attach to the event stream name when using the Kafka connector."
            );

    public static final ConfigOption<String> WATERMARK_COLUMN =
        ConfigOptions.key("kafka-watermark-column")
            .stringType()
            .defaultValue("kafka_timestamp")
            .withDescription(
                "The column name to use for Kafka watermarks." +
                    "This virtual column is automatically applied to every Kafka table in the catalog and should not be overwritten."
            );

    public static final ConfigOption<Integer> WATERMARK_DELAY =
        ConfigOptions.key("kafka-watermark-delay")
            .intType()
            .defaultValue(10)
            .withDescription(
                "The number of seconds to delay the Kafka watermark by."
            );

    /**
     * Default options that are applied to every table that is created.
     */
    public static final ImmutableMap<String, String> DEFAULT_OPTIONS = ImmutableMap.<String, String>builder()
        .put(FactoryUtil.FORMAT.key(), EventJsonFormatFactory.IDENTIFIER)
        .put(FactoryUtil.CONNECTOR.key(), KafkaDynamicTableFactory.IDENTIFIER)
        .build();

    /**
     * Default options that are applied to every kafka table that is created.
     */
    public static final ImmutableMap<String, String> DEFAULT_KAFKA_OPTIONS = ImmutableMap.<String, String>builder()
        .put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), "latest-offset")
        .put(KafkaConnectorOptions.PROPS_GROUP_ID.key(), "flink_event_stream_catalog." + UUID.randomUUID().toString())
        .build();

    /**
     * Options that can be overridden by options passed in through the table.
     */
    public static final ImmutableList<ConfigOption<?>> OVERRIDE_OPTIONS = ImmutableList.<ConfigOption<?>>builder()
        .add(STREAM_NAME)
        .add(SCHEMA_VERSION)
        .add(WATERMARK_COLUMN)
        .add(WATERMARK_DELAY)
        .build();

    /**
     * Options that are provided through the table but only used by the catalog.
     */
    public static final ImmutableList<ConfigOption<?>> PSEUDO_TABLE_OPTIONS = ImmutableList.<ConfigOption<?>>builder()
        .add(STREAM_PREFIX)
        .add(STREAM_NAME)
        .add(SCHEMA_VERSION)
        .add(WATERMARK_COLUMN)
        .add(WATERMARK_DELAY)
        .build();

    private EventStreamCatalogFactoryOptions() {
    }
}