EventDataStreamFactory.java
package org.wikimedia.eventutilities.flink.stream;
import java.net.URI;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.core.event.JsonEventGenerator;
import org.wikimedia.eventutilities.flink.EventRowTypeInfo;
import org.wikimedia.eventutilities.flink.formats.json.JsonRowDeserializationSchema;
import org.wikimedia.eventutilities.flink.formats.json.JsonRowSerializationSchema;
import org.wikimedia.eventutilities.flink.formats.json.JsonSchemaFlinkConverter;
import org.wikimedia.eventutilities.flink.formats.json.KafkaEventSerializationSchema;
import org.wikimedia.eventutilities.flink.formats.json.KafkaRecordTimestampStrategy;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import lombok.Setter;
import lombok.experimental.Accessors;
/**
* Wraps EventStreamFactory with Flink DataStream API to provide
* helper methods to instantiate DataStream of Row using
* EventStream JSONSchemas and other metadata.
*
* Examples:
*
* Instantiate a EventDataStreamFactory from URIs:
* <pre>{@code
* EventDataStreamFactory factory = EventDataStreamFactory.from(
* Arrays.asList(
* "https://schema.wikimedia.org/repositories/primary/jsonschema",
* "https://schema.wikimedia.org/repositories/secondary/jsonschema",
* ),
* "https://meta.wikimedia.org/w/api.php"
* );
* }</pre>
*
* Get a {@link KafkaSource} for a declared event stream:
* <pre>{@code
* KafkaSource<Row> eventStreamSource = factory.kafkaSourceBuilder(
* "test.event.example", // EventStream name
* "localhost:9092",
* "my_consumer_group"
* ).build();
* }</pre>
*
*/
@SuppressWarnings("checkstyle:classfanoutcomplexity")
public class EventDataStreamFactory {
private static final Logger LOG = LoggerFactory.getLogger(EventDataStreamFactory.class);
private final EventStreamFactory eventStreamFactory;
private final JsonEventGenerator eventGenerator;
private final Predicate<String> topicFilter;
/** Helper method to enforce default configuration settings for
* Flink Kafka producers. These are meant to ensure that topics
* produced by Flink applications have configuration parity with
* EventGate.
*
* References:
* - https://phabricator.wikimedia.org/T345805
* */
private static Properties getKafkaProducerConfig() {
Properties properties = new Properties();
// Specify the final compression type for a given topic.
// We set snappy because it's the compression algorithm used
// by our MirrorMaker deployment.
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return properties;
}
/**
* @deprecated use {@link #builder()}
*/
@Deprecated
public EventDataStreamFactory(
EventStreamFactory eventStreamFactory,
JsonEventGenerator eventGenerator
) {
this(eventStreamFactory, eventGenerator, t -> true);
}
private EventDataStreamFactory(
EventStreamFactory eventStreamFactory,
JsonEventGenerator eventGenerator,
Predicate<String> topicFilter
) {
this.eventStreamFactory = eventStreamFactory;
this.eventGenerator = eventGenerator;
this.topicFilter = topicFilter;
}
public static Builder builder() {
return new Builder();
}
@Setter
@Accessors(fluent = true, chain = true)
public static class Builder {
private List<String> schemaBaseUris;
private String eventStreamConfigUri;
private EventStreamFactory eventStreamFactory;
private JsonEventGenerator generator;
private Map<String, String> httpClientRoutes;
/**
* The filter to apply when extracting the configured topics of a stream.
* Accept all topics by default.
*/
@Nonnull
private Predicate<String> topicFilter = t -> true;
/**
* Filter source topics with a particular prefix (generally the datacenter name).
* This is based on the naming convention that topics are prefixed with datacenter name they're populated from.
* @param topicPrefix the prefix to use (usually the name of a datacenter)
*/
public Builder topicPrefix(String topicPrefix) {
topicFilter = t -> t.startsWith(topicPrefix);
return this;
}
public EventDataStreamFactory build() {
if (eventStreamFactory == null) {
Preconditions.checkArgument(schemaBaseUris != null,
"eventSchemaBaseUris must be set if eventStreamFactory is not provided");
Preconditions.checkArgument(eventStreamConfigUri != null,
"eventStreamConfigUri must be set if eventStreamFactory is not provided");
eventStreamFactory = EventStreamFactory.from(
schemaBaseUris, eventStreamConfigUri, httpClientRoutes
);
}
if (generator == null) {
generator = JsonEventGenerator.builder()
.eventStreamConfig(eventStreamFactory.getEventStreamConfig())
.schemaLoader(eventStreamFactory.getEventSchemaLoader())
.build();
}
return new EventDataStreamFactory(
eventStreamFactory,
generator,
topicFilter
);
}
}
/**
* EventDataStreamFactory factory method.
*
* @param eventSchemaBaseUris
* URIs from which to fetch event JSONSchemas.
*
* @param eventStreamConfigUri
* URI from which to fetch event stream config.
*/
public static EventDataStreamFactory from(
@Nonnull List<String> eventSchemaBaseUris,
@Nonnull String eventStreamConfigUri
) {
return EventDataStreamFactory.from(
eventSchemaBaseUris, eventStreamConfigUri, null
);
}
/**
* EventDataStreamFactory factory method.
*
* @param eventSchemaBaseUris
* URIs from which to fetch event JSONSchemas.
*
* @param eventStreamConfigUri
* URI from which to fetch event stream config.
*
* @param httpClientRoutes
* Map of source to dest http client routes.
* E.g. "https://meta.wikimedia.org" to "https://api-ro.wikimedia.org"
* If null, no special routing will be configured.
*/
public static EventDataStreamFactory from(
@Nonnull List<String> eventSchemaBaseUris,
@Nonnull String eventStreamConfigUri,
Map<String, String> httpClientRoutes
) {
EventStreamFactory eventStreamFactory = EventStreamFactory.from(
eventSchemaBaseUris, eventStreamConfigUri, httpClientRoutes
);
JsonEventGenerator generator = JsonEventGenerator.builder()
.eventStreamConfig(eventStreamFactory.getEventStreamConfig())
.schemaLoader(eventStreamFactory.getEventSchemaLoader())
.build();
return EventDataStreamFactory.builder()
.eventStreamFactory(eventStreamFactory)
.generator(generator)
.build();
}
/**
* Convenience method to get the EventStreamFactory used
* by this EventDataStreamFactory.
*/
public EventStreamFactory getEventStreamFactory() {
return eventStreamFactory;
}
/**
* Gets the {@link EventRowTypeInfo} (which is a TypeInformation of Row)
* for the streamName.
* The corresponding schema is the latest obtained from the EventStream configuration
* identified by streamName.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*/
public EventRowTypeInfo rowTypeInfo(String streamName) {
EventStream eventStream = eventStreamFactory.createEventStream(streamName);
ObjectNode jsonSchema = (ObjectNode)eventStream.schema();
ObjectNode messageKeyFields = (ObjectNode) eventStream.messageKeyFields();
return JsonSchemaFlinkConverter.toRowTypeInfo(jsonSchema, messageKeyFields);
}
/**
* Gets the {@link EventRowTypeInfo} (which is a TypeInformation of Row)
* for the streamName and a particular schema version.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
* @param version
* Version of the schema to use
*/
public EventRowTypeInfo rowTypeInfo(String streamName, String version) {
EventStream eventStream = eventStreamFactory.createEventStream(streamName);
ObjectNode jsonSchema = (ObjectNode)eventStream.schema(version);
ObjectNode messageKeyFields = (ObjectNode)eventStream.messageKeyFields();
return JsonSchemaFlinkConverter.toRowTypeInfo(jsonSchema, messageKeyFields);
}
/**
* Gets a JSON to Row DeserializationSchema by streamName and its latest
* schema version.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
*/
public JsonRowDeserializationSchema deserializer(String streamName) {
return deserializer(streamName, null);
}
/**
* Gets a JSON to Row DeserializationSchema by streamName.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param version
* Version of the schema to use. If null, the latest version will be used.
*/
public JsonRowDeserializationSchema deserializer(String streamName, @Nullable String version) {
EventStream eventStream = eventStreamFactory.createEventStream(streamName);
ObjectNode jsonSchema;
if (version != null) {
jsonSchema = (ObjectNode)eventStream.schema(version);
} else {
jsonSchema = (ObjectNode)eventStream.schema();
}
return JsonSchemaFlinkConverter.toDeserializationSchemaRow(jsonSchema);
}
/**
* Create a {@link org.apache.flink.api.common.serialization.SerializationSchema} suited for
* producing json to the stream identified by streamName using the provided schema version.
*/
public JsonRowSerializationSchema serializer(String streamName, String version) {
EventStream stream = eventStreamFactory.createEventStream(streamName);
JsonEventGenerator.EventNormalizer generator = eventGenerator
.createEventStreamEventGenerator(streamName, stream.schemaUri(version).toString());
TypeInformation<Row> typeInformation = rowTypeInfo(streamName, version);
return JsonRowSerializationSchema.builder()
.withNormalizationFunction(generator)
.withObjectMapper(generator.getObjectMapper())
.withTypeInfo(typeInformation)
.build();
}
/**
* Get a {@link KafkaSourceBuilder} that is primed with settings needed to consume
* the streamName from Kafka.
*
* This sets the following:
* - bootstrapServers,
* - topics
* - consumer group id
* - value only deserializer that will deserialize to a Row conforming to streamName's JSONSchema at schemaVersion.
* - starting offsets will use committed offsets, resetting to LATEST if no offsets are committed.
*
* If you want to change any of these settings, then call the appropriate
* method on the returned KafkaSourceBuilder before calling build().
* Note that Flink's KafkaSourceBuilder does not allow you to call setTopics() more than once,
* so if you need to override the topics used, be sure to provide them here as the topics parameter.
*
* Example:
*
* <pre>{@code
* EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
* KafkaSource<Row> eventStreamSource = eventDataStreamFactory.kafkaSourceBuilder(
* "test.event.example", // EventStream name
* "1.1.0",
* "localhost:9092",
* "my_consumer_group",
* Arrays.asList("topic1", "topic2")
* ).build();
* }</pre>
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param schemaVersion
* Version of the schema to use. If null, the latest schema version will be used.
*
* @param bootstrapServers
* Kafka bootstrap.servers property.
*
* @param consumerGroup
* Kafka consumer.group.id property.
*
* @param topics
* Optional List of Kafka topcis to subscribe to.
* If null, the topics from EventStreamConfig matching the topic filter or topic prefix for streamName will be used.
*
* @see Builder#topicPrefix(String)
* @see Builder#topicFilter(Predicate)
*/
public KafkaSourceBuilder<Row> kafkaSourceBuilder(
String streamName,
@Nullable String schemaVersion,
String bootstrapServers,
String consumerGroup,
@Nullable List<String> topics
) {
EventStream eventStream = eventStreamFactory.createEventStream(streamName);
if (topics == null) {
topics = eventStream.topics().stream().filter(topicFilter).collect(Collectors.toList());
Verify.verify(!topics.isEmpty(), "No topics matching the filter were found");
} else {
Preconditions.checkArgument(!topics.isEmpty(), "The list of provided topics must not be empty");
}
KafkaSourceBuilder<Row> builder = KafkaSource.builder();
builder
.setBootstrapServers(bootstrapServers)
.setGroupId(consumerGroup)
.setTopics(topics)
.setValueOnlyDeserializer(deserializer(eventStream.streamName(), schemaVersion))
.setStartingOffsets(
OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
);
return builder;
}
/**
*
* Get a {@link KafkaSourceBuilder} that is primed with settings needed to consume
* the streamName from Kafka using the latest schema version.
*
* @see #kafkaSourceBuilder(String, String, String, String, List)
*/
public KafkaSourceBuilder<Row> kafkaSourceBuilder(
String streamName,
String bootstrapServers,
String consumerGroup,
@Nullable List<String> topics
) {
return kafkaSourceBuilder(streamName, null, bootstrapServers, consumerGroup, topics);
}
/**
* Get a {@link KafkaSourceBuilder} that is primed with settings needed to consume
* the streamName from Kafka from the stream's configured topics that matching the
* topic filter or topic prefix using the latest schema version.
*
* @see #kafkaSourceBuilder(String, String, String, String, List)
*/
public KafkaSourceBuilder<Row> kafkaSourceBuilder(
String streamName,
String bootstrapServers,
String consumerGroup
) {
return kafkaSourceBuilder(streamName, null, bootstrapServers, consumerGroup, null);
}
/**
* Prepare a KafkaSinkBuilder with all the required components to produce to kafka using a json format
* matching the provided schema.
*
* Produces events to the default kafka partition.
*
* @see #kafkaSinkBuilder(String, String, String, String, KafkaRecordTimestampStrategy, FlinkKafkaPartitioner)
*/
public KafkaSinkBuilder<Row> kafkaSinkBuilder(
String streamName,
@Nullable String schemaVersion,
String bootstrapServers,
String topic,
KafkaRecordTimestampStrategy timestampStrategy
) {
return kafkaSinkBuilder(streamName, schemaVersion, bootstrapServers, topic, timestampStrategy, null);
}
/**
* Prepare a KafkaSinkBuilder with all the required components to produce to kafka using a json format
* matching the provided schema. The instantiated KafkaSink will produce snappy compressed records.
*
* The produced messages match the WMF Event Platform Rules:
* - the meta.dt field is filled and will be used as the kafka timestamp
* - the topic to produce to is one of the topics defined in the EventStream configuration
* - the provided schema must match the one defined in the EventStream configuration
* - the resulting json is validated against this schema
* - the dt (event time) field remains optional and must be set beforehand in the pipeline
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param schemaVersion
* Version of the schema to use.
*
* @param bootstrapServers
* Kafka bootstrap.servers property.
*
* @param topic
* The Kafka topic to write to.
* If not one of the topics referenced by the EventStreamConfig, a warning will be logged.
*
* @param timestampStrategy
* The strategy to use regarding producer records timestamps and event time.
*
* @param partitioner
* An optional partioner
*
* @see KafkaRecordTimestampStrategy
*/
public KafkaSinkBuilder<Row> kafkaSinkBuilder(
String streamName,
@Nullable String schemaVersion,
String bootstrapServers,
String topic,
KafkaRecordTimestampStrategy timestampStrategy,
@Nullable FlinkKafkaPartitioner<Row> partitioner
) {
EventStream stream = eventStreamFactory.createEventStream(streamName);
if (!stream.topics().contains(topic)) {
LOG.warn(
"The topic '{}' is not configured for stream '{}'. Configured topics are [{}]. Using provided topic anyway.",
topic,
stream,
String.join(",", stream.topics())
);
}
JsonRowSerializationSchema serializationSchema = serializer(streamName, schemaVersion);
KafkaRecordSerializationSchema<Row> kafkaRecordSerializationSchema = new KafkaEventSerializationSchema(
serializationSchema.getTypeInformation(),
serializationSchema,
Instant::now,
topic,
timestampStrategy,
partitioner
);
return KafkaSink.<Row>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(kafkaRecordSerializationSchema)
.setKafkaProducerConfig(getKafkaProducerConfig());
}
/**
* Returns a SourceFunction that reads JSON event Rows from the SSE EventSource.
* The Rows will be deserialized using the latest version JSONSchema of streamName.
* @see #sseSourceFunction(String, String, String)
*/
public SSESourceFunction<Row> sseSourceFunction(
String streamName,
String uri
) {
return sseSourceFunction(streamName, null, uri);
}
/**
* Returns a SourceFunction that reads JSON event Rows from the SSE EventSource.
* The Rows will be deserialized using the JSONSchema of streamName at schemaVersion.
*
* NOTE: This should not be used for 'production' jobs.
* It uses the deprecated SourceFunction, and has not been thoroughly tested.
* This is mostly useful for development and testing.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param schemaVersion
* Version of the schema to use. If null, latest version will be used.
*
* @param uri
* EventSource URI endpoint.
*/
public SSESourceFunction<Row> sseSourceFunction(
String streamName,
@Nullable String schemaVersion,
String uri
) {
return new SSESourceFunction<>(
uri, deserializer(streamName, schemaVersion)
);
}
/**
* Gets a {@link FileSource.FileSourceBuilder} that will generate Rows of events in JSON
* files. The Rows will be deserialized using the JSONSchema of streamName.
* This is a slightly lower level method than
* fileDataStream(String, StreamExecutionEnvironment, WatermarkStrategy, URI...)
* You'll probably want to use fileDataStream, especially if you are just using this FileSource
* for testing purposes.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param schemaVersion
* Version of the schema to use. If null, latest version will be used.
*
* @param paths
* Flink Paths from which to read JSON events.
*/
public FileSource.FileSourceBuilder<Row> fileStreamSourceBuilder(
String streamName,
@Nullable String schemaVersion,
Path... paths
) {
return FileSource.forRecordStreamFormat(
new LineStreamFormat<>(deserializer(streamName, schemaVersion)),
paths
);
}
/**
* Gets a {@link FileSource.FileSourceBuilder} that will generate Rows of events in JSON
* files.
*
* @see #fileStreamSourceBuilder(String, String, Path...)
*/
public FileSource.FileSourceBuilder<Row> fileStreamSourceBuilder(
String streamName,
Path... paths
) {
return fileStreamSourceBuilder(streamName, null, paths);
}
/**
* Gets a {@link DataStreamSource} of {@link Row} for streamName that reads JSON events from files.
*
* Example:
*
* <pre>{@code
* EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
* DataStreamSource<Row> eventFileSource = eventDataStreamFactory.dataStreamFromFiles(
* "test.event.example", // EventStream name
* env, // Flink StreamExecutionEnvironment
* WatermarkStrategy.noWatermarks(),
* new URI("file:///path/to/test.events.json"
* );
* }</pre>
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param schemaVersion
* Version of the schema to use. If null, latest version will be used.
*
* @param env
* StreamExecutionEnvironment in which to call fromSource.
*
* @param watermarkStrategy
* For simple testing input from files, you'll likely want to use WatermarkStrategy.noWatermarks();
*
* @param files
* URIs to files of JSON events that conform to the JSONSchema of streamName.
* These will be converted to Flink {@link Path}s.
*/
public DataStreamSource<Row> fileDataStream(
String streamName,
@Nullable String schemaVersion,
StreamExecutionEnvironment env,
WatermarkStrategy<Row> watermarkStrategy,
URI... files
) {
// Convert file URIs into Flink Paths
Path[] paths = Arrays.stream(files).map(Path::new).toArray(Path[]::new);
EventStream eventStream = getEventStreamFactory().createEventStream(streamName);
// Make a nice DataStreamSource description including all the source files.
String dataSourceDescription = eventStream +
" from files [" +
Arrays.stream(paths).map(Path::toString).collect(Collectors.joining(",")) +
"]";
return env.fromSource(
fileStreamSourceBuilder(streamName, schemaVersion, paths).build(),
watermarkStrategy,
dataSourceDescription
);
}
/**
* Gets a {@link DataStreamSource} of {@link Row} for streamName that reads JSON events from files using
* the latest schema version to deserialize.
*
* @see #fileDataStream(String, String, StreamExecutionEnvironment, WatermarkStrategy, URI...)
*/
public DataStreamSource<Row> fileDataStream(
String streamName,
StreamExecutionEnvironment env,
WatermarkStrategy<Row> watermarkStrategy,
URI... files
) {
return fileDataStream(streamName, null, env, watermarkStrategy, files);
}
/**
* A Row to JSON string Encoder for use with FileSinks.
*
* @param streamName Name of the EventStream, must be declared in EventStreamConfig.
* @param schemaVersion Version of the schema to use when encoding.
*/
public FileSinkJsonRowEncoder fileSinkEncoder(
String streamName,
String schemaVersion
) {
return new FileSinkJsonRowEncoder(
rowTypeInfo(streamName, schemaVersion),
serializer(streamName, schemaVersion),
Instant::now
);
}
/**
* Shortcut helper for calling fileSinkBuilderRowFormat using a String path.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param schemaVersion
* Version of the schema to use when encoding.
*
* @param path
* Base directory path location in which to write partitioned directories and files.
* Must begin with a Flink supported file protocol, e.g. file://, hdfs:// etc.
*
*/
public FileSink.DefaultRowFormatBuilder<Row> fileSinkBuilderRowFormat(
String streamName,
String schemaVersion,
String path
) {
return fileSinkBuilderRowFormat(streamName, schemaVersion, new Path(path));
}
/**
* A {@link FileSink} {@link FileSink.RowFormatBuilder} used to output
* serialize event Rows to JSON. If desired, customize the returned Builder
* with custom bucket assigners and rolling policies, and then call
* .build() to get a FileSink.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param schemaVersion
* Version of the schema to use when encoding.
*
* @param path
* Base directory path location in which to write partitioned directories and files.
*/
public FileSink.DefaultRowFormatBuilder<Row> fileSinkBuilderRowFormat(
String streamName,
String schemaVersion,
Path path
// TODO add support for custom bucketAssigner and other rollingPolicies
) {
return FileSink.forRowFormat(
path,
fileSinkEncoder(streamName, schemaVersion)
);
}
}