DataTypeSchemaConversions.java

package org.wikimedia.eventutilities.spark.sql;

import java.util.ArrayList;
import java.util.List;

import javax.annotation.Nonnull;

import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.wikimedia.eventutilities.core.event.types.SchemaConversions;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public class DataTypeSchemaConversions implements SchemaConversions<DataType> {

    private final boolean timestampsAsStrings;

    public DataTypeSchemaConversions() {
        this(false);
    }

    public DataTypeSchemaConversions(boolean timestampsAsStrings) {
        this.timestampsAsStrings = timestampsAsStrings;
    }

    /**
     * Only used for ArrayList.toArray() type inference.
     */
    private static final StructField[] EMPTY_STRUCT_FIELD_ARRAY = new StructField[] {};


    @Nonnull
    @Override
    public DataType typeNull() {
        return DataTypes.NullType;
    }

    @Nonnull
    @Override
    public DataType typeBoolean() {
        return DataTypes.BooleanType;
    }

    @Nonnull
    @Override
    public DataType typeString() {
        return DataTypes.StringType;
    }

    @Nonnull
    @Override
    public DataType typeInteger() {
        return DataTypes.LongType;
    }

    @Nonnull
    @Override
    public DataType typeDecimal() {
        return DataTypes.DoubleType;
    }

    @Nonnull
    @Override
    public DataType typeTimestamp() {
        if (timestampsAsStrings) {
            return DataTypes.StringType;
        } else {
            return DataTypes.TimestampType;
        }
        // TODO https://phabricator.wikimedia.org/T278467
    }

    @Nonnull
    @Override
    public DataType typeMap(
        @Nonnull DataType keyType,
        @Nonnull DataType valueType,
        boolean valuesAreNullable
    ) {
        return new MapType(keyType, valueType, valuesAreNullable);
    }

    @Nonnull
    @Override
    public DataType typeArray(
        @Nonnull DataType elementType,
        boolean elementsAreNullable
    ) {
        return new ArrayType(elementType, elementsAreNullable);
    }

    @Nonnull
    @Override
    @SuppressFBWarnings(
        value = {"PCAIL_POSSIBLE_CONSTANT_ALLOCATION_IN_LOOP"},
        justification = "No reusable constant allocation here"
    )
    public DataType typeRow(List<RowField<DataType>> rowFields) {

        List<StructField> structFields = new ArrayList<>();

        for (RowField<DataType> rowField : rowFields) {
            StructField structField = new StructField(
                rowField.getName(), rowField.getType(), rowField.isNullable(), new MetadataBuilder().build()
            );

            if (rowField.getDescription() != null) {
                structField = structField.withComment(rowField.getDescription());
            }

            structFields.add(structField);
        }

        // NOTE: toArray requires an instance of the kind of array type we want to return,
        // so we pass it an empty StructField[]
        return new StructType(structFields.toArray(EMPTY_STRUCT_FIELD_ARRAY));
    }

}