public class EventDataStreamFactory extends Object
EventDataStreamFactory factory = EventDataStreamFactory.from(
Arrays.asList(
"https://schema.wikimedia.org/repositories/primary/jsonschema",
"https://schema.wikimedia.org/repositories/secondary/jsonschema",
),
"https://meta.wikimedia.org/w/api.php"
);
Get a KafkaSource
for a declared event stream:
KafkaSource<Row> eventStreamSource = factory.kafkaSourceBuilder(
"test.event.example", // EventStream name
"localhost:9092",
"my_consumer_group"
).build();
Modifier and Type | Class and Description |
---|---|
static class |
EventDataStreamFactory.Builder |
Constructor and Description |
---|
EventDataStreamFactory(EventStreamFactory eventStreamFactory,
JsonEventGenerator eventGenerator)
Deprecated.
use
builder() |
Modifier and Type | Method and Description |
---|---|
static EventDataStreamFactory.Builder |
builder() |
JsonRowDeserializationSchema |
deserializer(String streamName)
Gets a JSON to Row DeserializationSchema by streamName and its latest
schema version.
|
JsonRowDeserializationSchema |
deserializer(String streamName,
String version)
Gets a JSON to Row DeserializationSchema by streamName.
|
org.apache.flink.streaming.api.datastream.DataStreamSource<org.apache.flink.types.Row> |
fileDataStream(String streamName,
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment env,
org.apache.flink.api.common.eventtime.WatermarkStrategy<org.apache.flink.types.Row> watermarkStrategy,
URI... files)
Gets a
DataStreamSource of Row for streamName that reads JSON events from files using
the latest schema version to deserialize. |
org.apache.flink.streaming.api.datastream.DataStreamSource<org.apache.flink.types.Row> |
fileDataStream(String streamName,
String schemaVersion,
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment env,
org.apache.flink.api.common.eventtime.WatermarkStrategy<org.apache.flink.types.Row> watermarkStrategy,
URI... files)
Gets a
DataStreamSource of Row for streamName that reads JSON events from files. |
org.apache.flink.connector.file.sink.FileSink.DefaultRowFormatBuilder<org.apache.flink.types.Row> |
fileSinkBuilderRowFormat(String streamName,
String schemaVersion,
org.apache.flink.core.fs.Path path)
A
FileSink FileSink.RowFormatBuilder used to output
serialize event Rows to JSON. |
org.apache.flink.connector.file.sink.FileSink.DefaultRowFormatBuilder<org.apache.flink.types.Row> |
fileSinkBuilderRowFormat(String streamName,
String schemaVersion,
String path)
Shortcut helper for calling fileSinkBuilderRowFormat using a String path.
|
FileSinkJsonRowEncoder |
fileSinkEncoder(String streamName,
String schemaVersion)
A Row to JSON string Encoder for use with FileSinks.
|
org.apache.flink.connector.file.src.FileSource.FileSourceBuilder<org.apache.flink.types.Row> |
fileStreamSourceBuilder(String streamName,
org.apache.flink.core.fs.Path... paths)
Gets a
FileSource.FileSourceBuilder that will generate Rows of events in JSON
files. |
org.apache.flink.connector.file.src.FileSource.FileSourceBuilder<org.apache.flink.types.Row> |
fileStreamSourceBuilder(String streamName,
String schemaVersion,
org.apache.flink.core.fs.Path... paths)
Gets a
FileSource.FileSourceBuilder that will generate Rows of events in JSON
files. |
static EventDataStreamFactory |
from(List<String> eventSchemaBaseUris,
String eventStreamConfigUri)
EventDataStreamFactory factory method.
|
static EventDataStreamFactory |
from(List<String> eventSchemaBaseUris,
String eventStreamConfigUri,
Map<String,String> httpClientRoutes)
EventDataStreamFactory factory method.
|
EventStreamFactory |
getEventStreamFactory()
Convenience method to get the EventStreamFactory used
by this EventDataStreamFactory.
|
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<org.apache.flink.types.Row> |
kafkaSinkBuilder(String streamName,
String schemaVersion,
String bootstrapServers,
String topic,
KafkaRecordTimestampStrategy timestampStrategy)
Prepare a KafkaSinkBuilder with all the required components to produce to kafka using a json format
matching the provided schema.
|
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<org.apache.flink.types.Row> |
kafkaSinkBuilder(String streamName,
String schemaVersion,
String bootstrapServers,
String topic,
KafkaRecordTimestampStrategy timestampStrategy,
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner<org.apache.flink.types.Row> partitioner)
Prepare a KafkaSinkBuilder with all the required components to produce to kafka using a json format
matching the provided schema.
|
org.apache.flink.connector.kafka.source.KafkaSourceBuilder<org.apache.flink.types.Row> |
kafkaSourceBuilder(String streamName,
String bootstrapServers,
String consumerGroup)
Get a
KafkaSourceBuilder that is primed with settings needed to consume
the streamName from Kafka from the stream's configured topics that matching the
topic filter or topic prefix using the latest schema version. |
org.apache.flink.connector.kafka.source.KafkaSourceBuilder<org.apache.flink.types.Row> |
kafkaSourceBuilder(String streamName,
String bootstrapServers,
String consumerGroup,
List<String> topics)
Get a
KafkaSourceBuilder that is primed with settings needed to consume
the streamName from Kafka using the latest schema version. |
org.apache.flink.connector.kafka.source.KafkaSourceBuilder<org.apache.flink.types.Row> |
kafkaSourceBuilder(String streamName,
String schemaVersion,
String bootstrapServers,
String consumerGroup,
List<String> topics)
Get a
KafkaSourceBuilder that is primed with settings needed to consume
the streamName from Kafka. |
EventRowTypeInfo |
rowTypeInfo(String streamName)
Gets the
EventRowTypeInfo (which is a TypeInformation of Row)
for the streamName. |
EventRowTypeInfo |
rowTypeInfo(String streamName,
String version)
Gets the
EventRowTypeInfo (which is a TypeInformation of Row)
for the streamName and a particular schema version. |
JsonRowSerializationSchema |
serializer(String streamName,
String version)
Create a
SerializationSchema suited for
producing json to the stream identified by streamName using the provided schema version. |
SSESourceFunction<org.apache.flink.types.Row> |
sseSourceFunction(String streamName,
String uri)
Returns a SourceFunction that reads JSON event Rows from the SSE EventSource.
|
SSESourceFunction<org.apache.flink.types.Row> |
sseSourceFunction(String streamName,
String schemaVersion,
String uri)
Returns a SourceFunction that reads JSON event Rows from the SSE EventSource.
|
@Deprecated public EventDataStreamFactory(EventStreamFactory eventStreamFactory, JsonEventGenerator eventGenerator)
builder()
public static EventDataStreamFactory.Builder builder()
public static EventDataStreamFactory from(@Nonnull List<String> eventSchemaBaseUris, @Nonnull String eventStreamConfigUri)
eventSchemaBaseUris
- URIs from which to fetch event JSONSchemas.eventStreamConfigUri
- URI from which to fetch event stream config.public static EventDataStreamFactory from(@Nonnull List<String> eventSchemaBaseUris, @Nonnull String eventStreamConfigUri, Map<String,String> httpClientRoutes)
eventSchemaBaseUris
- URIs from which to fetch event JSONSchemas.eventStreamConfigUri
- URI from which to fetch event stream config.httpClientRoutes
- Map of source to dest http client routes.
E.g. "https://meta.wikimedia.org" to "https://api-ro.wikimedia.org"
If null, no special routing will be configured.public EventStreamFactory getEventStreamFactory()
public EventRowTypeInfo rowTypeInfo(String streamName)
EventRowTypeInfo
(which is a TypeInformation of Row)
for the streamName.
The corresponding schema is the latest obtained from the EventStream configuration
identified by streamName.streamName
- Name of the EventStream, must be declared in EventStreamConfig.public EventRowTypeInfo rowTypeInfo(String streamName, String version)
EventRowTypeInfo
(which is a TypeInformation of Row)
for the streamName and a particular schema version.streamName
- Name of the EventStream, must be declared in EventStreamConfig.version
- Version of the schema to usepublic JsonRowDeserializationSchema deserializer(String streamName)
streamName
- Name of the EventStream, must be declared in EventStreamConfig.public JsonRowDeserializationSchema deserializer(String streamName, @Nullable String version)
streamName
- Name of the EventStream, must be declared in EventStreamConfig.version
- Version of the schema to use. If null, the latest version will be used.public JsonRowSerializationSchema serializer(String streamName, String version)
SerializationSchema
suited for
producing json to the stream identified by streamName using the provided schema version.public org.apache.flink.connector.kafka.source.KafkaSourceBuilder<org.apache.flink.types.Row> kafkaSourceBuilder(String streamName, @Nullable String schemaVersion, String bootstrapServers, String consumerGroup, @Nullable List<String> topics)
KafkaSourceBuilder
that is primed with settings needed to consume
the streamName from Kafka.
This sets the following:
- bootstrapServers,
- topics
- consumer group id
- value only deserializer that will deserialize to a Row conforming to streamName's JSONSchema at schemaVersion.
- starting offsets will use committed offsets, resetting to LATEST if no offsets are committed.
If you want to change any of these settings, then call the appropriate
method on the returned KafkaSourceBuilder before calling build().
Note that Flink's KafkaSourceBuilder does not allow you to call setTopics() more than once,
so if you need to override the topics used, be sure to provide them here as the topics parameter.
Example:
EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
KafkaSource<Row> eventStreamSource = eventDataStreamFactory.kafkaSourceBuilder(
"test.event.example", // EventStream name
"1.1.0",
"localhost:9092",
"my_consumer_group",
Arrays.asList("topic1", "topic2")
).build();
streamName
- Name of the EventStream, must be declared in EventStreamConfig.schemaVersion
- Version of the schema to use. If null, the latest schema version will be used.bootstrapServers
- Kafka bootstrap.servers property.consumerGroup
- Kafka consumer.group.id property.topics
- Optional List of Kafka topcis to subscribe to.
If null, the topics from EventStreamConfig matching the topic filter or topic prefix for streamName will be used.EventDataStreamFactory.Builder.topicPrefix(String)
,
EventDataStreamFactory.Builder.topicFilter(Predicate)
public org.apache.flink.connector.kafka.source.KafkaSourceBuilder<org.apache.flink.types.Row> kafkaSourceBuilder(String streamName, String bootstrapServers, String consumerGroup, @Nullable List<String> topics)
KafkaSourceBuilder
that is primed with settings needed to consume
the streamName from Kafka using the latest schema version.public org.apache.flink.connector.kafka.source.KafkaSourceBuilder<org.apache.flink.types.Row> kafkaSourceBuilder(String streamName, String bootstrapServers, String consumerGroup)
KafkaSourceBuilder
that is primed with settings needed to consume
the streamName from Kafka from the stream's configured topics that matching the
topic filter or topic prefix using the latest schema version.public org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<org.apache.flink.types.Row> kafkaSinkBuilder(String streamName, @Nullable String schemaVersion, String bootstrapServers, String topic, KafkaRecordTimestampStrategy timestampStrategy)
public org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<org.apache.flink.types.Row> kafkaSinkBuilder(String streamName, @Nullable String schemaVersion, String bootstrapServers, String topic, KafkaRecordTimestampStrategy timestampStrategy, @Nullable org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner<org.apache.flink.types.Row> partitioner)
streamName
- Name of the EventStream, must be declared in EventStreamConfig.schemaVersion
- Version of the schema to use.bootstrapServers
- Kafka bootstrap.servers property.topic
- The Kafka topic to write to.
If not one of the topics referenced by the EventStreamConfig, a warning will be logged.timestampStrategy
- The strategy to use regarding producer records timestamps and event time.partitioner
- An optional partionerKafkaRecordTimestampStrategy
public SSESourceFunction<org.apache.flink.types.Row> sseSourceFunction(String streamName, String uri)
public SSESourceFunction<org.apache.flink.types.Row> sseSourceFunction(String streamName, @Nullable String schemaVersion, String uri)
streamName
- Name of the EventStream, must be declared in EventStreamConfig.schemaVersion
- Version of the schema to use. If null, latest version will be used.uri
- EventSource URI endpoint.public org.apache.flink.connector.file.src.FileSource.FileSourceBuilder<org.apache.flink.types.Row> fileStreamSourceBuilder(String streamName, @Nullable String schemaVersion, org.apache.flink.core.fs.Path... paths)
FileSource.FileSourceBuilder
that will generate Rows of events in JSON
files. The Rows will be deserialized using the JSONSchema of streamName.
This is a slightly lower level method than
fileDataStream(String, StreamExecutionEnvironment, WatermarkStrategy, URI...)
You'll probably want to use fileDataStream, especially if you are just using this FileSource
for testing purposes.streamName
- Name of the EventStream, must be declared in EventStreamConfig.schemaVersion
- Version of the schema to use. If null, latest version will be used.paths
- Flink Paths from which to read JSON events.public org.apache.flink.connector.file.src.FileSource.FileSourceBuilder<org.apache.flink.types.Row> fileStreamSourceBuilder(String streamName, org.apache.flink.core.fs.Path... paths)
FileSource.FileSourceBuilder
that will generate Rows of events in JSON
files.public org.apache.flink.streaming.api.datastream.DataStreamSource<org.apache.flink.types.Row> fileDataStream(String streamName, @Nullable String schemaVersion, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment env, org.apache.flink.api.common.eventtime.WatermarkStrategy<org.apache.flink.types.Row> watermarkStrategy, URI... files)
DataStreamSource
of Row
for streamName that reads JSON events from files.
Example:
EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
DataStreamSource<Row> eventFileSource = eventDataStreamFactory.dataStreamFromFiles(
"test.event.example", // EventStream name
env, // Flink StreamExecutionEnvironment
WatermarkStrategy.noWatermarks(),
new URI("file:///path/to/test.events.json"
);
streamName
- Name of the EventStream, must be declared in EventStreamConfig.schemaVersion
- Version of the schema to use. If null, latest version will be used.env
- StreamExecutionEnvironment in which to call fromSource.watermarkStrategy
- For simple testing input from files, you'll likely want to use WatermarkStrategy.noWatermarks();files
- URIs to files of JSON events that conform to the JSONSchema of streamName.
These will be converted to Flink Path
s.public org.apache.flink.streaming.api.datastream.DataStreamSource<org.apache.flink.types.Row> fileDataStream(String streamName, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment env, org.apache.flink.api.common.eventtime.WatermarkStrategy<org.apache.flink.types.Row> watermarkStrategy, URI... files)
DataStreamSource
of Row
for streamName that reads JSON events from files using
the latest schema version to deserialize.public FileSinkJsonRowEncoder fileSinkEncoder(String streamName, String schemaVersion)
streamName
- Name of the EventStream, must be declared in EventStreamConfig.schemaVersion
- Version of the schema to use when encoding.public org.apache.flink.connector.file.sink.FileSink.DefaultRowFormatBuilder<org.apache.flink.types.Row> fileSinkBuilderRowFormat(String streamName, String schemaVersion, String path)
streamName
- Name of the EventStream, must be declared in EventStreamConfig.schemaVersion
- Version of the schema to use when encoding.path
- Base directory path location in which to write partitioned directories and files.
Must begin with a Flink supported file protocol, e.g. file://, hdfs:// etc.public org.apache.flink.connector.file.sink.FileSink.DefaultRowFormatBuilder<org.apache.flink.types.Row> fileSinkBuilderRowFormat(String streamName, String schemaVersion, org.apache.flink.core.fs.Path path)
FileSink
FileSink.RowFormatBuilder
used to output
serialize event Rows to JSON. If desired, customize the returned Builder
with custom bucket assigners and rolling policies, and then call
.build() to get a FileSink.streamName
- Name of the EventStream, must be declared in EventStreamConfig.schemaVersion
- Version of the schema to use when encoding.path
- Base directory path location in which to write partitioned directories and files.Copyright © 2025. All rights reserved.