EventDataStreamFactory.java

package org.wikimedia.eventutilities.flink.stream;

import java.net.URI;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.core.event.JsonEventGenerator;
import org.wikimedia.eventutilities.flink.EventRowTypeInfo;
import org.wikimedia.eventutilities.flink.formats.json.JsonRowDeserializationSchema;
import org.wikimedia.eventutilities.flink.formats.json.JsonRowSerializationSchema;
import org.wikimedia.eventutilities.flink.formats.json.JsonSchemaFlinkConverter;
import org.wikimedia.eventutilities.flink.formats.json.KafkaEventSerializationSchema;
import org.wikimedia.eventutilities.flink.formats.json.KafkaRecordTimestampStrategy;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;

import lombok.Setter;
import lombok.experimental.Accessors;

/**
 * Wraps EventStreamFactory with Flink DataStream API to provide
 * helper methods to instantiate DataStream of Row using
 * EventStream JSONSchemas and other metadata.
 *
 * Examples:
 *
 * Instantiate a EventDataStreamFactory from URIs:
 * <pre>{@code
 *     EventDataStreamFactory factory = EventDataStreamFactory.from(
 *          Arrays.asList(
 *              "https://schema.wikimedia.org/repositories/primary/jsonschema",
 *              "https://schema.wikimedia.org/repositories/secondary/jsonschema",
 *          ),
 *          "https://meta.wikimedia.org/w/api.php"
 *     );
 * }</pre>
 *
 * Get a {@link KafkaSource} for a declared event stream:
 * <pre>{@code
 *     KafkaSource<Row> eventStreamSource = factory.kafkaSourceBuilder(
 *          "test.event.example",  // EventStream name
 *          "localhost:9092",
 *          "my_consumer_group"
 *     ).build();
 * }</pre>
 *
 */
@SuppressWarnings("checkstyle:classfanoutcomplexity")
public class EventDataStreamFactory {

    private static final Logger LOG = LoggerFactory.getLogger(EventDataStreamFactory.class);

    private final EventStreamFactory eventStreamFactory;
    private final JsonEventGenerator eventGenerator;
    private final Predicate<String> topicFilter;


    /** Helper method to enforce default configuration settings for
     * Flink Kafka producers. These are meant to ensure that topics
     * produced by Flink applications have configuration parity with
     * EventGate.
     *
     * References:
     *  - https://phabricator.wikimedia.org/T345805
     * */
    private static Properties getKafkaProducerConfig() {
        Properties properties = new Properties();
        // Specify the final compression type for a given topic.
        // We set snappy because it's the compression algorithm used
        // by our MirrorMaker deployment.
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        return properties;
    }

    /**
     * @deprecated use {@link #builder()}
     */
    @Deprecated
    public EventDataStreamFactory(
        EventStreamFactory eventStreamFactory,
        JsonEventGenerator eventGenerator
    ) {
        this(eventStreamFactory, eventGenerator, t -> true);
    }

    private EventDataStreamFactory(
            EventStreamFactory eventStreamFactory,
            JsonEventGenerator eventGenerator,
            Predicate<String> topicFilter
    ) {
        this.eventStreamFactory = eventStreamFactory;
        this.eventGenerator = eventGenerator;
        this.topicFilter = topicFilter;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Setter
    @Accessors(fluent = true, chain = true)
    public static class Builder {
        private List<String> schemaBaseUris;
        private String eventStreamConfigUri;
        private EventStreamFactory eventStreamFactory;
        private JsonEventGenerator generator;
        private Map<String, String> httpClientRoutes;
        /**
         * The filter to apply when extracting the configured topics of a stream.
         * Accept all topics by default.
         */
        @Nonnull
        private Predicate<String> topicFilter = t -> true;

        /**
         * Filter source topics with a particular prefix (generally the datacenter name).
         * This is based on the naming convention that topics are prefixed with datacenter name they're populated from.
         * @param topicPrefix the prefix to use (usually the name of a datacenter)
         */
        public Builder topicPrefix(String topicPrefix) {
            topicFilter = t -> t.startsWith(topicPrefix);
            return this;
        }

        public EventDataStreamFactory build() {
            if (eventStreamFactory == null) {
                Preconditions.checkArgument(schemaBaseUris != null,
                        "eventSchemaBaseUris must be set if eventStreamFactory is not provided");
                Preconditions.checkArgument(eventStreamConfigUri != null,
                        "eventStreamConfigUri must be set if eventStreamFactory is not provided");
                eventStreamFactory = EventStreamFactory.from(
                        schemaBaseUris, eventStreamConfigUri, httpClientRoutes
                );
            }

            if (generator == null) {
                generator = JsonEventGenerator.builder()
                        .eventStreamConfig(eventStreamFactory.getEventStreamConfig())
                        .schemaLoader(eventStreamFactory.getEventSchemaLoader())
                        .build();
            }

            return new EventDataStreamFactory(
                    eventStreamFactory,
                    generator,
                    topicFilter
            );
        }
    }
    /**
     * EventDataStreamFactory factory method.
     *
     * @param eventSchemaBaseUris
     *  URIs from which to fetch event JSONSchemas.
     *
     * @param eventStreamConfigUri
     *  URI from which to fetch event stream config.
     */
    public static EventDataStreamFactory from(
        @Nonnull List<String> eventSchemaBaseUris,
        @Nonnull String eventStreamConfigUri
    ) {

        return EventDataStreamFactory.from(
            eventSchemaBaseUris, eventStreamConfigUri, null
        );
    }

    /**
     * EventDataStreamFactory factory method.
     *
     * @param eventSchemaBaseUris
     *  URIs from which to fetch event JSONSchemas.
     *
     * @param eventStreamConfigUri
     *  URI from which to fetch event stream config.
     *
     * @param httpClientRoutes
     *  Map of source to dest http client routes.
     *  E.g. "https://meta.wikimedia.org" to "https://api-ro.wikimedia.org"
     *  If null, no special routing will be configured.
     */
    public static EventDataStreamFactory from(
        @Nonnull List<String> eventSchemaBaseUris,
        @Nonnull String eventStreamConfigUri,
        Map<String, String> httpClientRoutes
    ) {
        EventStreamFactory eventStreamFactory = EventStreamFactory.from(
                eventSchemaBaseUris, eventStreamConfigUri, httpClientRoutes
        );

        JsonEventGenerator generator = JsonEventGenerator.builder()
                .eventStreamConfig(eventStreamFactory.getEventStreamConfig())
                .schemaLoader(eventStreamFactory.getEventSchemaLoader())
                .build();

        return EventDataStreamFactory.builder()
                .eventStreamFactory(eventStreamFactory)
                .generator(generator)
                .build();
    }

    /**
     * Convenience method to get the EventStreamFactory used
     * by this EventDataStreamFactory.
     */
    public EventStreamFactory getEventStreamFactory() {
        return eventStreamFactory;
    }

    /**
     * Gets the {@link EventRowTypeInfo} (which is a TypeInformation of Row)
     * for the streamName.
     * The corresponding schema is the latest obtained from the EventStream configuration
     * identified by streamName.
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     */
    public EventRowTypeInfo rowTypeInfo(String streamName) {
        EventStream eventStream = eventStreamFactory.createEventStream(streamName);
        ObjectNode jsonSchema = (ObjectNode)eventStream.schema();
        ObjectNode messageKeyFields = (ObjectNode) eventStream.messageKeyFields();
        return JsonSchemaFlinkConverter.toRowTypeInfo(jsonSchema, messageKeyFields);
    }

    /**
     * Gets the {@link EventRowTypeInfo} (which is a TypeInformation of Row)
     * for the streamName and a particular schema version.
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     * @param version
     *  Version of the schema to use
     */
    public EventRowTypeInfo rowTypeInfo(String streamName, String version) {
        EventStream eventStream = eventStreamFactory.createEventStream(streamName);
        ObjectNode jsonSchema = (ObjectNode)eventStream.schema(version);
        ObjectNode messageKeyFields = (ObjectNode)eventStream.messageKeyFields();
        return JsonSchemaFlinkConverter.toRowTypeInfo(jsonSchema, messageKeyFields);
    }

    /**
     * Gets a JSON to Row DeserializationSchema by streamName and its latest
     * schema version.
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     */
    public JsonRowDeserializationSchema deserializer(String streamName) {
        return deserializer(streamName, null);
    }

    /**
     * Gets a JSON to Row DeserializationSchema by streamName.
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     * @param version
     *  Version of the schema to use.  If null, the latest version will be used.
     */
    public JsonRowDeserializationSchema deserializer(String streamName, @Nullable String version) {
        EventStream eventStream = eventStreamFactory.createEventStream(streamName);

        ObjectNode jsonSchema;
        if (version != null) {
            jsonSchema = (ObjectNode)eventStream.schema(version);
        } else {
            jsonSchema = (ObjectNode)eventStream.schema();
        }
        return JsonSchemaFlinkConverter.toDeserializationSchemaRow(jsonSchema);
    }

    /**
     * Create a {@link org.apache.flink.api.common.serialization.SerializationSchema} suited for
     * producing json to the stream identified by streamName using the provided schema version.
     */
    public JsonRowSerializationSchema serializer(String streamName, String version) {
        EventStream stream = eventStreamFactory.createEventStream(streamName);
        JsonEventGenerator.EventNormalizer generator = eventGenerator
                .createEventStreamEventGenerator(streamName, stream.schemaUri(version).toString());

        TypeInformation<Row> typeInformation = rowTypeInfo(streamName, version);
        return JsonRowSerializationSchema.builder()
                .withNormalizationFunction(generator)
                .withObjectMapper(generator.getObjectMapper())
                .withTypeInfo(typeInformation)
                .build();
    }

    /**
     * Get a {@link KafkaSourceBuilder} that is primed with settings needed to consume
     * the streamName from Kafka.
     *
     * This sets the following:
     * - bootstrapServers,
     * - topics
     * - consumer group id
     * - value only deserializer that will deserialize to a Row conforming to streamName's JSONSchema at schemaVersion.
     * - starting offsets will use committed offsets, resetting to LATEST if no offsets are committed.
     *
     * If you want to change any of these settings, then call the appropriate
     * method on the returned KafkaSourceBuilder before calling build().
     * Note that Flink's KafkaSourceBuilder does not allow you to call setTopics() more than once,
     * so if you need to override the topics used, be sure to provide them here as the topics parameter.
     *
     * Example:
     *
     * <pre>{@code
     *     EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
     *     KafkaSource&lt;Row&gt; eventStreamSource = eventDataStreamFactory.kafkaSourceBuilder(
     *          "test.event.example",  // EventStream name
     *          "1.1.0",
     *          "localhost:9092",
     *          "my_consumer_group",
     *          Arrays.asList("topic1", "topic2")
     *     ).build();
     * }</pre>
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     * @param schemaVersion
     *  Version of the schema to use. If null, the latest schema version will be used.
     *
     * @param bootstrapServers
     *  Kafka bootstrap.servers property.
     *
     * @param consumerGroup
     *  Kafka consumer.group.id property.
     *
     * @param topics
     *  Optional List of Kafka topcis to subscribe to.
     *  If null, the topics from EventStreamConfig matching the topic filter or topic prefix for streamName will be used.
     *
     * @see Builder#topicPrefix(String)
     * @see Builder#topicFilter(Predicate)
     */
    public KafkaSourceBuilder<Row> kafkaSourceBuilder(
        String streamName,
        @Nullable String schemaVersion,
        String bootstrapServers,
        String consumerGroup,
        @Nullable List<String> topics
    ) {
        EventStream eventStream = eventStreamFactory.createEventStream(streamName);

        if (topics == null) {
            topics = eventStream.topics().stream().filter(topicFilter).collect(Collectors.toList());
            Verify.verify(!topics.isEmpty(), "No topics matching the filter were found");
        } else {
            Preconditions.checkArgument(!topics.isEmpty(), "The list of provided topics must not be empty");
        }

        KafkaSourceBuilder<Row> builder = KafkaSource.builder();
        builder
            .setBootstrapServers(bootstrapServers)
            .setGroupId(consumerGroup)
            .setTopics(topics)
            .setValueOnlyDeserializer(deserializer(eventStream.streamName(), schemaVersion))
            .setStartingOffsets(
                OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
            );

        return builder;
    }

    /**
     *
     * Get a {@link KafkaSourceBuilder} that is primed with settings needed to consume
     * the streamName from Kafka using the latest schema version.
     *
     * @see #kafkaSourceBuilder(String, String, String, String, List)
     */
    public KafkaSourceBuilder<Row> kafkaSourceBuilder(
        String streamName,
        String bootstrapServers,
        String consumerGroup,
        @Nullable List<String> topics
    ) {
        return kafkaSourceBuilder(streamName, null, bootstrapServers, consumerGroup, topics);
    }

    /**
     * Get a {@link KafkaSourceBuilder} that is primed with settings needed to consume
     * the streamName from Kafka from the stream's configured topics that matching the
     * topic filter or topic prefix using the latest schema version.
     *
     * @see #kafkaSourceBuilder(String, String, String, String, List)
     */
    public KafkaSourceBuilder<Row> kafkaSourceBuilder(
        String streamName,
        String bootstrapServers,
        String consumerGroup
    ) {
        return kafkaSourceBuilder(streamName, null, bootstrapServers, consumerGroup, null);
    }

    /**
     * Prepare a KafkaSinkBuilder with all the required components to produce to kafka using a json format
     * matching the provided schema.
     *
     * Produces events to the default kafka partition.
     *
     * @see #kafkaSinkBuilder(String, String, String, String, KafkaRecordTimestampStrategy, FlinkKafkaPartitioner)
     */
    public KafkaSinkBuilder<Row> kafkaSinkBuilder(
            String streamName,
            @Nullable String schemaVersion,
            String bootstrapServers,
            String topic,
            KafkaRecordTimestampStrategy timestampStrategy
    ) {
        return kafkaSinkBuilder(streamName, schemaVersion, bootstrapServers, topic, timestampStrategy, null);
    }

    /**
     * Prepare a KafkaSinkBuilder with all the required components to produce to kafka using a json format
     * matching the provided schema. The instantiated KafkaSink will produce snappy compressed records.
     *
     * The produced messages match the WMF Event Platform Rules:
     * - the meta.dt field is filled and will be used as the kafka timestamp
     * - the topic to produce to is one of the topics defined in the EventStream configuration
     * - the provided schema must match the one defined in the EventStream configuration
     * - the resulting json is validated against this schema
     * - the dt (event time) field remains optional and must be set beforehand in the pipeline
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     * @param schemaVersion
     *  Version of the schema to use.
     *
     * @param bootstrapServers
     *  Kafka bootstrap.servers property.
     *
     * @param topic
     *  The Kafka topic to write to.
     *  If not one of the topics referenced by the EventStreamConfig, a warning will be logged.
     *
     * @param timestampStrategy
     *  The strategy to use regarding producer records timestamps and event time.
     *
     * @param partitioner
     *  An optional partioner
     *
     * @see KafkaRecordTimestampStrategy
     */
    public KafkaSinkBuilder<Row> kafkaSinkBuilder(
            String streamName,
            @Nullable String schemaVersion,
            String bootstrapServers,
            String topic,
            KafkaRecordTimestampStrategy timestampStrategy,
            @Nullable FlinkKafkaPartitioner<Row> partitioner
    ) {
        EventStream stream = eventStreamFactory.createEventStream(streamName);

        if (!stream.topics().contains(topic)) {
            LOG.warn(
                "The topic '{}' is not configured for  stream '{}'. Configured topics are [{}]. Using provided topic anyway.",
                topic,
                stream,
                String.join(",", stream.topics())
            );
        }

        JsonRowSerializationSchema serializationSchema = serializer(streamName, schemaVersion);

        KafkaRecordSerializationSchema<Row> kafkaRecordSerializationSchema = new KafkaEventSerializationSchema(
                serializationSchema.getTypeInformation(),
                serializationSchema,
                Instant::now,
                topic,
                timestampStrategy,
                partitioner
        );
        return KafkaSink.<Row>builder()
                .setBootstrapServers(bootstrapServers)
                .setRecordSerializer(kafkaRecordSerializationSchema)
                .setKafkaProducerConfig(getKafkaProducerConfig());
    }

    /**
     * Returns a SourceFunction that reads JSON event Rows from the SSE EventSource.
     * The Rows will be deserialized using the latest version JSONSchema of streamName.
     * @see #sseSourceFunction(String, String, String)
     */
    public SSESourceFunction<Row> sseSourceFunction(
        String streamName,
        String uri
    ) {
        return sseSourceFunction(streamName, null, uri);
    }

    /**
     * Returns a SourceFunction that reads JSON event Rows from the SSE EventSource.
     * The Rows will be deserialized using the JSONSchema of streamName at schemaVersion.
     *
     * NOTE: This should not be used for 'production' jobs.
     * It uses the deprecated SourceFunction, and has not been thoroughly tested.
     * This is mostly useful for development and testing.
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     * @param schemaVersion
     *  Version of the schema to use. If null, latest version will be used.
     *
     * @param uri
     *  EventSource URI endpoint.
     */
    public SSESourceFunction<Row> sseSourceFunction(
        String streamName,
        @Nullable String schemaVersion,
        String uri
    ) {
        return new SSESourceFunction<>(
            uri, deserializer(streamName, schemaVersion)
        );
    }

    /**
     * Gets a {@link FileSource.FileSourceBuilder} that will generate Rows of events in JSON
     * files.  The Rows will be deserialized using the JSONSchema of streamName.
     * This is a slightly lower level method than
     * fileDataStream(String, StreamExecutionEnvironment, WatermarkStrategy, URI...)
     * You'll probably want to use fileDataStream, especially if you are just using this FileSource
     * for testing purposes.
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     * @param schemaVersion
     *  Version of the schema to use. If null, latest version will be used.
     *
     * @param paths
     *  Flink Paths from which to read JSON events.
     */
    public FileSource.FileSourceBuilder<Row> fileStreamSourceBuilder(
        String streamName,
        @Nullable String schemaVersion,
        Path... paths
    ) {

        return FileSource.forRecordStreamFormat(
            new LineStreamFormat<>(deserializer(streamName, schemaVersion)),
            paths
        );
    }

    /**
     * Gets a {@link FileSource.FileSourceBuilder} that will generate Rows of events in JSON
     * files.
     *
     * @see #fileStreamSourceBuilder(String, String, Path...)
     */
    public FileSource.FileSourceBuilder<Row> fileStreamSourceBuilder(
        String streamName,
        Path... paths
    ) {
        return fileStreamSourceBuilder(streamName, null, paths);
    }

    /**
     * Gets a {@link DataStreamSource} of {@link Row} for streamName that reads JSON events from files.
     *
     * Example:
     *
     * <pre>{@code
     *     EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
     *     DataStreamSource&lt;Row&gt; eventFileSource = eventDataStreamFactory.dataStreamFromFiles(
     *          "test.event.example",       // EventStream name
     *          env,                        // Flink StreamExecutionEnvironment
     *          WatermarkStrategy.noWatermarks(),
     *          new URI("file:///path/to/test.events.json"
     *     );
     * }</pre>
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     * @param schemaVersion
     *  Version of the schema to use. If null, latest version will be used.
     *
     * @param env
     *  StreamExecutionEnvironment in which to call fromSource.
     *
     * @param watermarkStrategy
     *  For simple testing input from files, you'll likely want to use WatermarkStrategy.noWatermarks();
     *
     * @param files
     *  URIs to files of JSON events that conform to the JSONSchema of streamName.
     *  These will be converted to Flink {@link Path}s.
     */
    public DataStreamSource<Row> fileDataStream(
        String streamName,
        @Nullable String schemaVersion,
        StreamExecutionEnvironment env,
        WatermarkStrategy<Row> watermarkStrategy,
        URI... files
    ) {
        // Convert file URIs into Flink Paths
        Path[] paths = Arrays.stream(files).map(Path::new).toArray(Path[]::new);
        EventStream eventStream = getEventStreamFactory().createEventStream(streamName);

        // Make a nice DataStreamSource description including all the source files.
        String dataSourceDescription = eventStream +
            " from files [" +
            Arrays.stream(paths).map(Path::toString).collect(Collectors.joining(",")) +
            "]";

        return env.fromSource(
            fileStreamSourceBuilder(streamName, schemaVersion, paths).build(),
            watermarkStrategy,
            dataSourceDescription
        );
    }

    /**
     * Gets a {@link DataStreamSource} of {@link Row} for streamName that reads JSON events from files using
     * the latest schema version to deserialize.
     *
     * @see #fileDataStream(String, String, StreamExecutionEnvironment, WatermarkStrategy, URI...)
     */
    public DataStreamSource<Row> fileDataStream(
        String streamName,
        StreamExecutionEnvironment env,
        WatermarkStrategy<Row> watermarkStrategy,
        URI... files
    ) {
        return fileDataStream(streamName, null, env, watermarkStrategy, files);
    }


    /**
     * A Row to JSON string Encoder for use with FileSinks.
     *
     * @param streamName    Name of the EventStream, must be declared in EventStreamConfig.
     * @param schemaVersion Version of the schema to use when encoding.
     */
    public FileSinkJsonRowEncoder fileSinkEncoder(
        String streamName,
        String schemaVersion
    ) {
        return new FileSinkJsonRowEncoder(
            rowTypeInfo(streamName, schemaVersion),
            serializer(streamName, schemaVersion),
            Instant::now
        );
    }


    /**
     * Shortcut helper for calling fileSinkBuilderRowFormat using a String path.
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     * @param schemaVersion
     *  Version of the schema to use when encoding.
     *
     * @param path
     *  Base directory path location in which to write partitioned directories and files.
     *  Must begin with a Flink supported file protocol, e.g. file://, hdfs:// etc.
     *
     */
    public FileSink.DefaultRowFormatBuilder<Row> fileSinkBuilderRowFormat(
        String streamName,
        String schemaVersion,
        String path
    ) {
        return fileSinkBuilderRowFormat(streamName, schemaVersion, new Path(path));
    }


    /**
     * A {@link FileSink} {@link FileSink.RowFormatBuilder} used to output
     * serialize event Rows to JSON. If desired, customize the returned Builder
     * with custom bucket assigners and rolling policies, and then call
     * .build() to get a FileSink.
     *
     * @param streamName
     *  Name of the EventStream, must be declared in EventStreamConfig.
     *
     * @param schemaVersion
     *  Version of the schema to use when encoding.
     *
     * @param path
     *  Base directory path location in which to write partitioned directories and files.
     */
    public FileSink.DefaultRowFormatBuilder<Row> fileSinkBuilderRowFormat(
        String streamName,
        String schemaVersion,
        Path path
        // TODO add support for custom bucketAssigner and other rollingPolicies
    ) {
        return FileSink.forRowFormat(
            path,
            fileSinkEncoder(streamName, schemaVersion)
        );
    }

}