EventTableUtils.java

package org.wikimedia.eventutilities.flink.table;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.wikimedia.eventutilities.core.event.EventStream;

import com.google.common.collect.LinkedHashMultiset;

public final class EventTableUtils {

    /**
     * Creates a table with a schema from the given event stream.
     *
     * @param eventStream   to get the schema from
     * @param schemaVersion version of the schema
     * @param tableOptions  normalized options
     * @param comment catalog table comment
     * @param otherColumns additional columns
     * @return CatalogBaseTable
     * @throws CatalogException if there is a column conflict
     */
    public static CatalogBaseTable getTableOfEventStream(
        EventStream eventStream,
        String schemaVersion,
        Map<String, String> tableOptions,
        @Nullable String comment,
        @Nullable List<Schema.UnresolvedColumn> otherColumns
    ) throws CatalogException {
        return getTableOfEventStream(eventStream, schemaVersion, tableOptions, comment, otherColumns, null, null);
    }

    /**
     * Creates a table with a schema from the given event stream and merges it with custom columns.
     *
     * @param eventStream     to get the schema from
     * @param schemaVersion   version of the schema
     * @param tableOptions    processed options
     * @param comment         catalog table comment
     * @param otherColumns    additional columns
     * @param watermarkColumn column name for kafka timestamp watermark
     * @param watermarkDelay delay for watermark
     * @return CatalogBaseTable
     * @throws CatalogException if there is a column conflict
     */
    public static CatalogBaseTable getTableOfEventStream(
        EventStream eventStream,
        String schemaVersion,
        @Nonnull Map<String, String> tableOptions,
        @Nullable String comment,
        @Nullable List<Schema.UnresolvedColumn> otherColumns,
        @Nullable String watermarkColumn,
        @Nullable Integer watermarkDelay
    ) throws CatalogException {
        Schema.Builder schemaBuilder = SchemaBuilderUtil.createEventStreamSchemaBuilder(eventStream, schemaVersion);
        if (otherColumns != null) {
            schemaBuilder.fromColumns(otherColumns);
        }
        if (watermarkColumn != null && watermarkDelay != null) {
            schemaBuilder = SchemaBuilderUtil.withKafkaTimestampAsWatermark(schemaBuilder, watermarkColumn, watermarkDelay);
        }
        Schema schema = schemaBuilder.build();

        List<String> columns = schema.getColumns()
            .stream().map(Schema.UnresolvedColumn::getName).collect(Collectors.toList());
        if (columns.size() != columns.stream().distinct().count()) {
            LinkedHashMultiset<String> duplicates = LinkedHashMultiset.create(columns);
            duplicates.entrySet().removeIf(entry -> entry.getCount() == 1);

            throw new CatalogException("Column name conflict: " + duplicates);
        }

        comment = (comment == null || comment.isEmpty()) ? eventStream.toString() : comment;

        // TODO: partition keys?
        return CatalogTable.of(schema, comment, Lists.newArrayList(), tableOptions);
    }

    private EventTableUtils() {
    }
}