SchemaBuilderUtil.java
package org.wikimedia.eventutilities.flink.table;
import javax.annotation.Nonnull;
import org.apache.flink.table.api.Schema;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.flink.formats.json.JsonSchemaFlinkConverter;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Helper functions for creating and adding to Flink Schema.Builder for WMF event streams.
*/
public final class SchemaBuilderUtil {
/**
* Given an EventStream, converts its latest schema to
* Flink columns and returns a new Schema.Builder with all columns.
*/
public static Schema.Builder createEventStreamSchemaBuilder(EventStream eventStream) {
return JsonSchemaFlinkConverter.toSchemaBuilder(
(ObjectNode) eventStream.schema()
);
}
/**
* Given an EventStream, converts its schema at the given version to
* Flink columns and returns a new Schema.Builder with all columns.
*/
public static Schema.Builder createEventStreamSchemaBuilder(EventStream eventStream, String schemaVersion) {
return JsonSchemaFlinkConverter.toSchemaBuilder(
(ObjectNode) eventStream.schema(schemaVersion)
);
}
/**
* Adds a "kafka_timestamp" column to the schema builder and uses
* it as the watermark field with a default watermark delay of 10 seconds.
*/
@Nonnull
public static Schema.Builder withKafkaTimestampAsWatermark(Schema.Builder schemaBuilder) {
return withKafkaTimestampAsWatermark(
schemaBuilder,
"kafka_timestamp",
10
);
}
/**
* Adds kafka timestamp as a virtual column to the schema builder
* and uses it as the watermark with a delay of watermarkDelaySeconds.
*
* @param columnName Name of the timestamp column to add to the schema.
* @param watermarkDelaySeconds Seconds to delay the watermark by.
*/
@Nonnull
public static Schema.Builder withKafkaTimestampAsWatermark(
Schema.Builder schemaBuilder,
String columnName,
int watermarkDelaySeconds
) {
String watermarkExpression;
if (watermarkDelaySeconds == 0) {
watermarkExpression = columnName;
} else {
watermarkExpression = columnName + " - INTERVAL '" + watermarkDelaySeconds + "' SECOND";
}
return withKafkaTimestampAsWatermark(
schemaBuilder,
columnName,
watermarkExpression
);
}
/**
* Adds kafka timestamp as a virtual column to the schema builder
* and uses it as the watermark.
* See also {@link Schema.Builder#watermark(String, String)}
*
* @param columnName Name of the column to add to the schema.
* @param sqlExpression SQL expression to use for the watermark.
* This should probably refer to the columnName, e.g. "$columnName - INTERVAL '30' seconds"
*/
@Nonnull
public static Schema.Builder withKafkaTimestampAsWatermark(
Schema.Builder schemaBuilder,
String columnName,
String sqlExpression
) {
schemaBuilder.columnByMetadata(
columnName,
"TIMESTAMP_LTZ(3) NOT NULL",
"timestamp",
true
).watermark(columnName, sqlExpression);
return schemaBuilder;
}
private SchemaBuilderUtil() {
}
}