DataTypeSchemaConversions.java
package org.wikimedia.eventutilities.spark.sql;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.wikimedia.eventutilities.core.event.types.SchemaConversions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class DataTypeSchemaConversions implements SchemaConversions<DataType> {
private final boolean timestampsAsStrings;
public DataTypeSchemaConversions() {
this(false);
}
public DataTypeSchemaConversions(boolean timestampsAsStrings) {
this.timestampsAsStrings = timestampsAsStrings;
}
/**
* Only used for ArrayList.toArray() type inference.
*/
private static final StructField[] EMPTY_STRUCT_FIELD_ARRAY = new StructField[] {};
@Nonnull
@Override
public DataType typeNull() {
return DataTypes.NullType;
}
@Nonnull
@Override
public DataType typeBoolean() {
return DataTypes.BooleanType;
}
@Nonnull
@Override
public DataType typeString() {
return DataTypes.StringType;
}
@Nonnull
@Override
public DataType typeInteger() {
return DataTypes.LongType;
}
@Nonnull
@Override
public DataType typeDecimal() {
return DataTypes.DoubleType;
}
@Nonnull
@Override
public DataType typeTimestamp() {
if (timestampsAsStrings) {
return DataTypes.StringType;
} else {
return DataTypes.TimestampType;
}
// TODO https://phabricator.wikimedia.org/T278467
}
@Nonnull
@Override
public DataType typeMap(
@Nonnull DataType keyType,
@Nonnull DataType valueType,
boolean valuesAreNullable
) {
return new MapType(keyType, valueType, valuesAreNullable);
}
@Nonnull
@Override
public DataType typeArray(
@Nonnull DataType elementType,
boolean elementsAreNullable
) {
return new ArrayType(elementType, elementsAreNullable);
}
@Nonnull
@Override
@SuppressFBWarnings(
value = {"PCAIL_POSSIBLE_CONSTANT_ALLOCATION_IN_LOOP"},
justification = "No reusable constant allocation here"
)
public DataType typeRow(List<RowField<DataType>> rowFields) {
List<StructField> structFields = new ArrayList<>();
for (RowField<DataType> rowField : rowFields) {
StructField structField = new StructField(
rowField.getName(), rowField.getType(), rowField.isNullable(), new MetadataBuilder().build()
);
if (rowField.getDescription() != null) {
structField = structField.withComment(rowField.getDescription());
}
structFields.add(structField);
}
// NOTE: toArray requires an instance of the kind of array type we want to return,
// so we pass it an empty StructField[]
return new StructType(structFields.toArray(EMPTY_STRUCT_FIELD_ARRAY));
}
}