ConsumerGraphFactory.java

package org.wikimedia.discovery.cirrus.updater.consumer.graph;

import java.net.URI;
import java.time.Instant;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.wikimedia.discovery.cirrus.updater.consumer.config.ConsumerConfig;
import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory;

public class ConsumerGraphFactory {

    private final EventDataStreamFactory eventDataStreamFactory;
    private final StreamExecutionEnvironment env;
    private final ConsumerConfig config;

    public ConsumerGraphFactory(StreamExecutionEnvironment env, ConsumerConfig config) {
        this.env = env;
        eventDataStreamFactory =
                EventDataStreamFactory.from(
                        config.eventStreamJsonSchemaUrls(), config.eventStreamConfigUrl());
        this.config = config;
    }

    public DataStream<Row> createDataStreamSource() {
        KafkaSourceBuilder<Row> sourceBuilder =
                eventDataStreamFactory.kafkaSourceBuilder(
                        config.updateStream(),
                        config.kafkaSourceBootstrapServers(),
                        config.kafkaSourceGroupId());
        setKafkaOffsetBounds(sourceBuilder);
        sourceBuilder.setProperties(config.kafkaSourceProperties());
        KafkaSource<Row> source = sourceBuilder.build();

        return env.fromSource(
                source, WatermarkStrategy.noWatermarks(), config.updateStream() + "-source");
    }

    public ElasticsearchSink<Row> createElasticSearchSink(RowTypeInfo updateTypeInfo) {
        final HttpHost[] httpHosts =
                config.elasticSearchUrls().stream()
                        .map(URI::create)
                        .map(uri -> new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()))
                        .toArray(HttpHost[]::new);

        final UpdateElasticsearchEmitter emitter = new UpdateElasticsearchEmitter(updateTypeInfo);
        final Elasticsearch7SinkBuilder<Row> builder =
                new Elasticsearch7SinkBuilder<Row>().setHosts(httpHosts).setEmitter(emitter);

        config
                .optional(ConsumerConfig.PARAM_ELASTICSEARCH_BULK_FLUSH_INTERVAL)
                .ifPresent(builder::setBulkFlushInterval);
        config
                .optional(ConsumerConfig.PARAM_ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS)
                .ifPresent(builder::setBulkFlushMaxActions);
        config
                .optional(ConsumerConfig.PARAM_ELASTICSEARCH_BULK_FLUSH_MAX_SIZE_MB)
                .ifPresent(builder::setBulkFlushMaxSizeMb);
        config
                .optional(ConsumerConfig.PARAM_ELASTICSEARCH_CONNECTION_TIMEOUT)
                .ifPresent(builder::setConnectionTimeout);
        config
                .optional(ConsumerConfig.PARAM_ELASTICSEARCH_CONNECTION_REQUEST_TIMEOUT)
                .ifPresent(builder::setConnectionRequestTimeout);

        config.elasticSearchUrls().stream()
                .map(URI::create)
                .filter(uri -> uri.getPath() != null && uri.getPath().length() > 1)
                .findFirst()
                .map(URI::getPath)
                .filter(path -> !path.isEmpty())
                .ifPresent(builder::setConnectionPathPrefix);

        return builder.build();
    }

    private void setKafkaOffsetBounds(KafkaSourceBuilder<Row> sourceBuilder) {
        final Instant startTime = config.kafkaSourceStartTime();
        if (startTime != null) {
            sourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(startTime.toEpochMilli()));
        }
        final Instant endTime = config.kafkaSourceEndTime();
        if (endTime != null) {
            sourceBuilder.setBounded(OffsetsInitializer.timestamp(endTime.toEpochMilli()));
        }
    }

    public void createStreamGraph() {
        final DataStream<Row> dataStreamSource = createDataStreamSource();
        final TypeInformation<Row> type = dataStreamSource.getType();
        dataStreamSource.sinkTo(createElasticSearchSink((RowTypeInfo) type));
    }
}