EventTableDescriptorBuilder.java
package org.wikimedia.eventutilities.flink.table;
import static java.util.Collections.emptyMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import com.google.common.base.Preconditions;
/**
* Builder wrapper to aid in constructing Flink TableDescriptors
* using Wikimedia Event Streams. TableDescriptors are passed
* to the Flink TableEnvironment when instantiating Table objects
* or when creating tables in a catalog.
*
* This class uses EventStreamFactory to get information
* about Event Streams declared in EventStreamConfig, e.g.
* schema and topics.
*
* You must at minimum call eventStream() and connector() before
* calling build().
*
* It is possible to pass in your own Flink Schema.Builder
* while using this wrapper. If you need to augment the schema, for example
* to add computed / virtual fields, calling getSchemaBuilder()
* will get you the Schema.Builder with the stream's schema set as
* the physical DataType. After modifying the Schema.Builder as desired,
* call schemaBuilder(modifiedSchemaBuilder) again to use it for the
* TableDescriptor that will be built.
*
* Examples:
*
* Instantiate a EventTableDescriptorBuilder from URIs:
* <pre>{@code
* EventTableDescriptorBuilder builder = EventTableDescriptorBuilder.from(
* Arrays.asList(
* "https://schema.wikimedia.org/repositories/primary/jsonschema",
* "https://schema.wikimedia.org/repositories/secondary/jsonschema",
* ),
* "https://meta.wikimedia.org/w/api.php"
* );
* }</pre>
*
* Build a DataGen Table:
* <pre>{@code
* Table t = tableEnv.from(
* builder
* .connector("datagen")
* .eventStream("mediawiki.page-create")
* .option("number-of-rows", "10")
* .build()
* )
* );
* }</pre>
*
* Build a Kafka Table:
* <pre>{@code
* Table t = tableEnv.from(
* builder
* .eventStream("mediawiki.page-create")
* .setupKafka(
* "localhost:9092",
* "my_consumer_group",
* true // If should auto-add a kafka_timestamp watermark virtual field.
* )
* .build()
* )
* );
* }</pre>
*
*/
@ParametersAreNonnullByDefault
public class EventTableDescriptorBuilder {
private final EventStreamFactory eventStreamFactory;
private EventStream eventStream;
private String connectorIdentifier;
private String formatIdentifier;
private String[] partitionKeys;
private Schema.Builder schemaBuilder;
private Map<String, String> options;
// We want to default to using ISO-8601 timestamp format, not SQL,
// since WMF Event Platform timestamps are ISO-8601 formatted strings.
private static final String JSON_TIMESTAMP_FORMAT_KEY = "json.timestamp-format.standard";
private static final String JSON_TIMESTAMP_FORMAT_DEFAULT = "ISO-8601";
public EventTableDescriptorBuilder(EventStreamFactory eventStreamFactory) {
this.eventStreamFactory = eventStreamFactory;
this.clear();
}
/**
* EventTableDescriptorBuilder factory method.
*
* @param eventSchemaBaseUris
* URIs from which to fetch event JSONSchemas.
*
* @param eventStreamConfigUri
* URI from which to fetch event stream config.
*/
@Nonnull
public static EventTableDescriptorBuilder from(
List<String> eventSchemaBaseUris,
String eventStreamConfigUri
) {
return EventTableDescriptorBuilder.from(
eventSchemaBaseUris, eventStreamConfigUri, emptyMap()
);
}
/**
* EventTableDescriptorBuilder factory method.
*
* @param eventSchemaBaseUris
* URIs from which to fetch event JSONSchemas.
*
* @param eventStreamConfigUri
* URI from which to fetch event stream config.
*
* @param httpClientRoutes
* Map of source to dest http client routes.
* E.g. "https://meta.wikimedia.org" to "https://api-ro.wikimedia.org"
* If null, no special routing will be configured.
*/
@Nonnull
public static EventTableDescriptorBuilder from(
List<String> eventSchemaBaseUris,
String eventStreamConfigUri,
Map<String, String> httpClientRoutes
) {
return new EventTableDescriptorBuilder(
EventStreamFactory.from(
eventSchemaBaseUris, eventStreamConfigUri, httpClientRoutes
)
);
}
/**
* Convenience method to get the EventStreamFactory used
* by this EventTableDescriptorBuilder.
*/
@Nonnull
public EventStreamFactory getEventStreamFactory() {
return eventStreamFactory;
}
/**
* Sets the connector.
*
* @param connectorIdentifier
* A valid and registered Flink Table Connector Factory Identifier string.
* e.g. "kafka" or "filesystem".
*/
@Nonnull
public EventTableDescriptorBuilder connector(String connectorIdentifier) {
this.connectorIdentifier = connectorIdentifier;
return this;
}
/**
* Sets the EventStream that will be used to build the TableDescriptor.
*/
@Nonnull
public EventTableDescriptorBuilder eventStream(EventStream eventStream) {
this.eventStream = eventStream;
return this;
}
/**
* Sets the EventStream by streamName that will be used to build the TableDescriptor.
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*/
@Nonnull
public EventTableDescriptorBuilder eventStream(String streamName) {
return eventStream(eventStreamFactory.createEventStream(streamName));
}
/**
* Sets the Schema.Builder. If you don't set this the Table Schema will be
* converted from the eventStream's JSONSchema.
*/
@Nonnull
public EventTableDescriptorBuilder schemaBuilder(Schema.Builder schemaBuilder) {
this.schemaBuilder = schemaBuilder;
return this;
}
/**
* Sets the TableDescriptor.Builder options.
* This clears out any previously set options.
*/
@Nonnull
public EventTableDescriptorBuilder options(Map<String, String> options) {
this.options = new HashMap<>(options);
return this;
}
/**
* Sets a single TableDescriptor.Builder option.
*/
@Nonnull
public EventTableDescriptorBuilder option(String key, String value) {
this.options.put(key, value);
return this;
}
/**
* Sets the format for the TableDescriptor.Builder.
* This needed by most, but not all, connectors.
*/
@Nonnull
public EventTableDescriptorBuilder format(String format) {
this.formatIdentifier = format;
return this;
}
/**
* Sets the partitionKeys for the TableDescriptor.Builder.
*/
@Nonnull
public EventTableDescriptorBuilder partitionedBy(String... partitionKeys) {
this.partitionKeys = partitionKeys;
return this;
}
/**
* Helper to aid in setting up a kafka connector for the EventStream.
* You must first call eventStream() before calling this method so that
* it can fill in the details for the connector from the EventStream.
*
* Note: The Kafka Table Connector option scan.start.mode will not be set,
* relying on the default value of "group-offsets".
* The Kafka consumer property auto.offset.reset will be set to "latest",
* which will be used in the case that there are no committed group offsets.
* Override this behavior by calling e.g.
* {@code option("properties.auto.offset.reset", "earliest")}
*
* This does not set the key format or hoist any metadata fields
* (like kafka_timestamp) into the schema.
* See withKafkaTimestampAsWatermark to help use the kafka timestamp
* as the watermark.
*
* @param bootstrapServers
* Kafka bootstrap.servers property.
*
* @param consumerGroup
* Kafka consumer.group.id property.
*
*/
@Nonnull
public EventTableDescriptorBuilder setupKafka(
String bootstrapServers,
String consumerGroup
) {
Preconditions.checkState(
this.eventStream != null,
"Must call eventStream() before calling setupKafka()."
);
connector("kafka");
// EventStreams in Kafka are JSON.
format("json");
option("properties.bootstrap.servers", bootstrapServers);
option("topic", String.join(";", eventStream.topics()));
option("properties.group.id", consumerGroup);
option("properties.auto.offset.reset", "latest");
return this;
}
/**
* Adds a "kafka_timestamp" column to the schema and uses
* it as the watermark field with a default watermark delay of 10 seconds.
*/
@Nonnull
public EventTableDescriptorBuilder withKafkaTimestampAsWatermark() {
return schemaBuilder(
SchemaBuilderUtil.withKafkaTimestampAsWatermark(getSchemaBuilder())
);
}
/**
* Adds kafka timestamp as a virtual column to the schema,
* and uses it as the watermark with a delay of watermarkDelaySeconds.
*
* @param columnName
* Name of the timestamp column to add to the schema.
* @param watermarkDelaySeconds
* Seconds to delay the watermark by.
*
*/
@Nonnull
public EventTableDescriptorBuilder withKafkaTimestampAsWatermark(
String columnName,
int watermarkDelaySeconds
) {
return schemaBuilder(
SchemaBuilderUtil.withKafkaTimestampAsWatermark(
getSchemaBuilder(),
columnName,
watermarkDelaySeconds
)
);
}
/**
* Adds kafka timestamp as a virtual column to the schema,
* and uses it as the watermark.
* See also {@link Schema.Builder#watermark(String, String)}
*
* @param columnName
* Name of the column to add to the schema.
*
* @param sqlExpression
* SQL expression to use for the watermark.
* This should probably refer to the columnName, e.g. "$columnName - INTERVAL '30' seconds"
*
*/
@Nonnull
public EventTableDescriptorBuilder withKafkaTimestampAsWatermark(
String columnName,
String sqlExpression
) {
return schemaBuilder(
SchemaBuilderUtil.withKafkaTimestampAsWatermark(
getSchemaBuilder(),
columnName,
sqlExpression
)
);
}
/**
* Use this to get the schemaBuilder for the eventStream
* in order to augment and reset it before calling build().
* If you change the Schema.Builder, make sure you call
* {@code schemaBuilder(modifiedSchemaBuilder)} to apply your changes
* before calling {@code build()}.
*
* @return
* The Schema.Builder that will be used to build the Table Schema.
*/
@Nonnull
public Schema.Builder getSchemaBuilder() {
Preconditions.checkState(
!(this.eventStream == null && this.schemaBuilder == null),
"Must call eventStream() or schemaBuilder() before calling getSchemaBuilder()."
);
if (this.schemaBuilder == null) {
this.schemaBuilder = SchemaBuilderUtil.createEventStreamSchemaBuilder(eventStream);
}
return this.schemaBuilder;
}
/**
* Applies all the collected configs and options to
* build a TableDescriptor using information from the EventStream.
* You can use the returned TableDescriptor to instantiate a Table object
* or create a Table in a catalog.
*
* Any collected state will be cleared before returning, so you
* can re-use this EventTableDescriptorBuilder again.
*/
@Nonnull
public TableDescriptor build() {
Preconditions.checkState(
connectorIdentifier != null,
"Must call connector() before calling build()."
);
Preconditions.checkState(
eventStream != null,
"Must call eventStream() before calling build()."
);
TableDescriptor.Builder tdb = TableDescriptor.forConnector(connectorIdentifier)
.schema(getSchemaBuilder().build())
.comment(eventStream.toString());
// Not all connectors use format.
if (formatIdentifier != null) {
tdb.format(formatIdentifier);
}
if (partitionKeys != null) {
tdb.partitionedBy(partitionKeys);
}
// If not specified, set the JSON timestamp format to our default.
if (!options.containsKey(JSON_TIMESTAMP_FORMAT_KEY)) {
options.put(JSON_TIMESTAMP_FORMAT_KEY, JSON_TIMESTAMP_FORMAT_DEFAULT);
}
options.forEach(tdb::option);
TableDescriptor td = tdb.build();
this.clear();
return td;
}
/**
* Clears all collected configs and options from this EventTableDescriptorBuilder
* so that it can be used to build another TableDescriptor.
*/
@Nonnull
public final EventTableDescriptorBuilder clear() {
this.options = new HashMap<>();
this.eventStream = null;
this.connectorIdentifier = null;
this.formatIdentifier = null;
this.partitionKeys = null;
this.schemaBuilder = null;
return this;
}
}