EventTableUtils.java
package org.wikimedia.eventutilities.flink.table;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.wikimedia.eventutilities.core.event.EventStream;
import com.google.common.collect.LinkedHashMultiset;
public final class EventTableUtils {
/**
* Creates a table with a schema from the given event stream.
*
* @param eventStream to get the schema from
* @param schemaVersion version of the schema
* @param tableOptions normalized options
* @param comment catalog table comment
* @param otherColumns additional columns
* @return CatalogBaseTable
* @throws CatalogException if there is a column conflict
*/
public static CatalogBaseTable getTableOfEventStream(
EventStream eventStream,
String schemaVersion,
Map<String, String> tableOptions,
@Nullable String comment,
@Nullable List<Schema.UnresolvedColumn> otherColumns
) throws CatalogException {
return getTableOfEventStream(eventStream, schemaVersion, tableOptions, comment, otherColumns, null, null);
}
/**
* Creates a table with a schema from the given event stream and merges it with custom columns.
*
* @param eventStream to get the schema from
* @param schemaVersion version of the schema
* @param tableOptions processed options
* @param comment catalog table comment
* @param otherColumns additional columns
* @param watermarkColumn column name for kafka timestamp watermark
* @param watermarkDelay delay for watermark
* @return CatalogBaseTable
* @throws CatalogException if there is a column conflict
*/
public static CatalogBaseTable getTableOfEventStream(
EventStream eventStream,
String schemaVersion,
@Nonnull Map<String, String> tableOptions,
@Nullable String comment,
@Nullable List<Schema.UnresolvedColumn> otherColumns,
@Nullable String watermarkColumn,
@Nullable Integer watermarkDelay
) throws CatalogException {
Schema.Builder schemaBuilder = SchemaBuilderUtil.createEventStreamSchemaBuilder(eventStream, schemaVersion);
if (otherColumns != null) {
schemaBuilder.fromColumns(otherColumns);
}
if (watermarkColumn != null && watermarkDelay != null) {
schemaBuilder = SchemaBuilderUtil.withKafkaTimestampAsWatermark(schemaBuilder, watermarkColumn, watermarkDelay);
}
Schema schema = schemaBuilder.build();
List<String> columns = schema.getColumns()
.stream().map(Schema.UnresolvedColumn::getName).collect(Collectors.toList());
if (columns.size() != columns.stream().distinct().count()) {
LinkedHashMultiset<String> duplicates = LinkedHashMultiset.create(columns);
duplicates.entrySet().removeIf(entry -> entry.getCount() == 1);
throw new CatalogException("Column name conflict: " + duplicates);
}
comment = (comment == null || comment.isEmpty()) ? eventStream.toString() : comment;
// TODO: partition keys?
return CatalogTable.of(schema, comment, Lists.newArrayList(), tableOptions);
}
private EventTableUtils() {
}
}