DataTypeSchemaConversions.java
package org.wikimedia.eventutilities.flink.formats.json;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.wikimedia.eventutilities.core.event.types.SchemaConversions;
/**
* Implementation of {@link SchemaConversions}
* that can convert to Flink Table API DataType.
* Used by JsonSchemaFlinkConverter.
*
* If you modify this class, please also augment {@link TypeInformationSchemaConversions} accordingly.
* To handle proper conversion between Table API and DataStream,
* Flink needs the types consistently converted to the same underlying representations.
*/
@ParametersAreNonnullByDefault
public class DataTypeSchemaConversions implements SchemaConversions<DataType> {
/**
* @return
* {@link DataTypes#NULL}
*/
@Override
public DataType typeNull() {
return DataTypes.NULL();
}
/**
* @return
* {@link DataTypes#BOOLEAN}
*/
@Override
public DataType typeBoolean() {
return DataTypes.BOOLEAN();
}
/**
* @return
* {@link DataTypes#STRING}
*/
@Override
public DataType typeString() {
return DataTypes.STRING();
}
/**
* @return
* {@link DataTypes#DOUBLE}
*/
@Override
public DataType typeDecimal() {
return DataTypes.DOUBLE();
}
/**
* Note that the default conversion of LogicalType BigIntType is Long,
* which is NOT the same as Flink Types.BIG_INT (in the DataStream API).
*
* @return
* {@link DataTypes#BIGINT}
*/
@Override
public DataType typeInteger() {
return DataTypes.BIGINT();
}
/**
* @return
* {@link DataTypes#TIMESTAMP_LTZ} with precision 3
*/
@Override
public DataType typeTimestamp() {
return DataTypes.TIMESTAMP_LTZ(3);
}
/**
* <code>elementsAreNullable</code> is ignored; all Flink Table API elements are nullable.
*
* @return
* {@link DataTypes#ARRAY}
*/
@Override
public DataType typeArray(
DataType elementType,
boolean elementsAreNullable
) {
return DataTypes.ARRAY(elementType);
}
/**
* {@code valuesAreNullable} is ignored; all Flink Table API elements are nullable.
*
* @return
* {@link DataTypes#MAP}
*/
@Override
public DataType typeMap(
DataType keyType,
DataType valueType,
boolean valuesAreNullable
) {
return DataTypes.MAP(keyType, valueType);
}
/**
* If a RowFields description is not null,
* it will be used as the {@link DataTypes.Field}'s description.
* RowField nullalble-ness is not relevant;
* all Flink Table API fields are nullable.
*
* @return
* {@link DataTypes#ROW}
*/
@Override
public DataType typeRow(List<RowField<DataType>> rowFields) {
List<DataTypes.Field> dataTypeFields = new ArrayList<>();
for (RowField<DataType> rowField : rowFields) {
dataTypeFields.add(
buildFieldDataType(
rowField.getName(),
rowField.getType(),
rowField.getDescription()
)
);
}
return DataTypes.ROW(dataTypeFields);
}
/**
* Helper for constructing a DataTypes.Field with name,
* DataType, and optional description.
*/
@Nonnull
protected DataTypes.Field buildFieldDataType(
String name,
DataType dataType,
@Nullable String description
) {
if (description != null) {
return DataTypes.FIELD(
name,
dataType,
description
);
} else {
return DataTypes.FIELD(
name,
dataType
);
}
}
}