ConsumerGraphFactory.java
package org.wikimedia.discovery.cirrus.updater.consumer.graph;
import java.net.URI;
import java.time.Instant;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.wikimedia.discovery.cirrus.updater.consumer.config.ConsumerConfig;
import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory;
public class ConsumerGraphFactory {
private final EventDataStreamFactory eventDataStreamFactory;
private final StreamExecutionEnvironment env;
private final ConsumerConfig config;
public ConsumerGraphFactory(StreamExecutionEnvironment env, ConsumerConfig config) {
this.env = env;
eventDataStreamFactory =
EventDataStreamFactory.from(
config.eventStreamJsonSchemaUrls(), config.eventStreamConfigUrl());
this.config = config;
}
public DataStream<Row> createDataStreamSource() {
KafkaSourceBuilder<Row> sourceBuilder =
eventDataStreamFactory.kafkaSourceBuilder(
config.updateStream(),
config.kafkaSourceBootstrapServers(),
config.kafkaSourceGroupId());
setKafkaOffsetBounds(sourceBuilder);
sourceBuilder.setProperties(config.kafkaSourceProperties());
KafkaSource<Row> source = sourceBuilder.build();
return env.fromSource(
source, WatermarkStrategy.noWatermarks(), config.updateStream() + "-source");
}
public ElasticsearchSink<Row> createElasticSearchSink(RowTypeInfo updateTypeInfo) {
final HttpHost[] httpHosts =
config.elasticSearchUrls().stream()
.map(URI::create)
.map(uri -> new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()))
.toArray(HttpHost[]::new);
final UpdateElasticsearchEmitter emitter = new UpdateElasticsearchEmitter(updateTypeInfo);
final Elasticsearch7SinkBuilder<Row> builder =
new Elasticsearch7SinkBuilder<Row>().setHosts(httpHosts).setEmitter(emitter);
config
.optional(ConsumerConfig.PARAM_ELASTICSEARCH_BULK_FLUSH_INTERVAL)
.ifPresent(builder::setBulkFlushInterval);
config
.optional(ConsumerConfig.PARAM_ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS)
.ifPresent(builder::setBulkFlushMaxActions);
config
.optional(ConsumerConfig.PARAM_ELASTICSEARCH_BULK_FLUSH_MAX_SIZE_MB)
.ifPresent(builder::setBulkFlushMaxSizeMb);
config
.optional(ConsumerConfig.PARAM_ELASTICSEARCH_CONNECTION_TIMEOUT)
.ifPresent(builder::setConnectionTimeout);
config
.optional(ConsumerConfig.PARAM_ELASTICSEARCH_CONNECTION_REQUEST_TIMEOUT)
.ifPresent(builder::setConnectionRequestTimeout);
config.elasticSearchUrls().stream()
.map(URI::create)
.filter(uri -> uri.getPath() != null && uri.getPath().length() > 1)
.findFirst()
.map(URI::getPath)
.filter(path -> !path.isEmpty())
.ifPresent(builder::setConnectionPathPrefix);
return builder.build();
}
private void setKafkaOffsetBounds(KafkaSourceBuilder<Row> sourceBuilder) {
final Instant startTime = config.kafkaSourceStartTime();
if (startTime != null) {
sourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(startTime.toEpochMilli()));
}
final Instant endTime = config.kafkaSourceEndTime();
if (endTime != null) {
sourceBuilder.setBounded(OffsetsInitializer.timestamp(endTime.toEpochMilli()));
}
}
public void createStreamGraph() {
final DataStream<Row> dataStreamSource = createDataStreamSource();
final TypeInformation<Row> type = dataStreamSource.getType();
dataStreamSource.sinkTo(createElasticSearchSink((RowTypeInfo) type));
}
}