SchemaBuilderUtil.java

package org.wikimedia.eventutilities.flink.table;

import javax.annotation.Nonnull;

import org.apache.flink.table.api.Schema;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.flink.formats.json.JsonSchemaFlinkConverter;

import com.fasterxml.jackson.databind.node.ObjectNode;

/**
 * Helper functions for creating and adding to Flink Schema.Builder for WMF event streams.
 */
public final class SchemaBuilderUtil {

    /**
     * Given an EventStream, converts its latest schema to
     * Flink columns and returns a new Schema.Builder with all columns.
     */
    public static Schema.Builder createEventStreamSchemaBuilder(EventStream eventStream) {
        return JsonSchemaFlinkConverter.toSchemaBuilder(
            (ObjectNode) eventStream.schema()
        );
    }

    /**
     * Given an EventStream, converts its schema at the given version to
     * Flink columns and returns a new Schema.Builder with all columns.
     */
    public static Schema.Builder createEventStreamSchemaBuilder(EventStream eventStream, String schemaVersion) {
        return JsonSchemaFlinkConverter.toSchemaBuilder(
            (ObjectNode) eventStream.schema(schemaVersion)
        );
    }

    /**
     * Adds a "kafka_timestamp" column to the schema builder and uses
     * it as the watermark field with a default watermark delay of 10 seconds.
     */
    @Nonnull
    public static Schema.Builder withKafkaTimestampAsWatermark(Schema.Builder schemaBuilder) {
        return withKafkaTimestampAsWatermark(
            schemaBuilder,
            "kafka_timestamp",
            10
        );
    }

    /**
     * Adds kafka timestamp as a virtual column to the schema builder
     * and uses it as the watermark with a delay of watermarkDelaySeconds.
     *
     * @param columnName            Name of the timestamp column to add to the schema.
     * @param watermarkDelaySeconds Seconds to delay the watermark by.
     */
    @Nonnull
    public static Schema.Builder withKafkaTimestampAsWatermark(
        Schema.Builder schemaBuilder,
        String columnName,
        int watermarkDelaySeconds
    ) {
        String watermarkExpression;

        if (watermarkDelaySeconds == 0) {
            watermarkExpression = columnName;
        } else {
            watermarkExpression = columnName + " - INTERVAL '" + watermarkDelaySeconds + "' SECOND";
        }

        return withKafkaTimestampAsWatermark(
            schemaBuilder,
            columnName,
            watermarkExpression
        );
    }

    /**
     * Adds kafka timestamp as a virtual column to the schema builder
     * and uses it as the watermark.
     * See also {@link Schema.Builder#watermark(String, String)}
     *
     * @param columnName    Name of the column to add to the schema.
     * @param sqlExpression SQL expression to use for the watermark.
     *                      This should probably refer to the columnName, e.g. "$columnName - INTERVAL '30' seconds"
     */
    @Nonnull
    public static Schema.Builder withKafkaTimestampAsWatermark(
        Schema.Builder schemaBuilder,
        String columnName,
        String sqlExpression
    ) {
        schemaBuilder.columnByMetadata(
            columnName,
            "TIMESTAMP_LTZ(3) NOT NULL",
            "timestamp",
            true
        ).watermark(columnName, sqlExpression);

        return schemaBuilder;
    }

    private SchemaBuilderUtil() {
    }
}