JsonSchemaConverter.java

package org.wikimedia.eventutilities.core.event.types;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;


/**
 * JsonSchemaConverter iterates through the JSONSchema properties
 * and uses the provided {@link SchemaConversions} implementation to
 * convert from JSONSchema types.
 *
 * This was implemented expecting to support JSONSchema Draft 7,
 * but will likely work with many JSONSchema versions.
 * Support for JSONSchema Draft 3 has been added; notably
 * support for Draft 3's method of marking a field as required.
 *
 * @param <T>
 *  {@link SchemaConversions} implementation
 */
@ParametersAreNonnullByDefault
public class JsonSchemaConverter<T> {

    // JSONSchema keywords
    private static final String TITLE = "title";
    private static final String DESCRIPTION = "description";
    private static final String PROPERTIES = "properties";
    private static final String ADDITIONAL_PROPERTIES = "additionalProperties";
    private static final String TYPE = "type";
    private static final String ITEMS = "items";
    private static final String REQUIRED = "required";
    private static final String FORMAT = "format";

    // JSONSchema types
    private static final String TYPE_NULL = "null";
    private static final String TYPE_BOOLEAN = "boolean";
    private static final String TYPE_OBJECT = "object";
    private static final String TYPE_ARRAY = "array";
    private static final String TYPE_NUMBER = "number";
    private static final String TYPE_INTEGER = "integer";
    private static final String TYPE_STRING = "string";


    private static final String FORMAT_DATE_TIME = "date-time";

    // TO DO: Support conversion from these JSONSchema formats to more specific types.
    // See Flink's JsonRowSchemaConverter
    //    private static final String FORMAT_DATE = "date";
    //    private static final String FORMAT_TIME = "time";
    //    private static final String CONTENT_ENCODING_BASE64 = "base64";

    private static final Logger LOG = LoggerFactory.getLogger(JsonSchemaConverter.class);

    private final @Nonnull SchemaConversions<T> schemaConversions;

    public JsonSchemaConverter(SchemaConversions<T> schemaConversions) {
        this.schemaConversions = schemaConversions;
    }

    /**
     * Converts this JSONSchema using the provided SchemaConversions.
     *
     * Note: A special case of converting object with an additionalProperties schema to a Map is
     * included. In JSONSchema, additionalProperties can either be a boolean
     * or an object.  If it is an object, it expected to specify the schema
     * of the unknown properties. This is what we need for a Map type.
     * We want to still allow object schemas to indicate that they have specific
     * property keys in a Map type though, so an object with additionalProperties
     * with a schema can still include a defined properties.  In this case, we will
     * use a Map type here and the defined properties will be ignored in final map type.
     * It is up to the schema author to ensure that the types of the defined
     * properties match the additionalProperties schema; that is, all defined properties
     * must have the same type as the additionalProperties, as this is what will
     * be used for the value type in the Map.
     * See: https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#map_types
     *
     * @param jsonSchema
     *      The JSONSchema Object.  This should have at minimum type.
     *
     * @return jsonSchema converted to T using the provided SchemaConversions.
     */
    @Nonnull
    public T convert(ObjectNode jsonSchema) {
        checkArgument(!jsonSchema.isNull(), "JSONSchema cannot be a \"null\" node.");
        return convert(jsonSchema, "<root>");
    }

    /**
     * Iterates through jsonSchema and calls appropriate TypeConverter methods
     * on each jsonSchema property type in order to convert it into type T.
     *
     * @param jsonSchema
     *      The JSONSchema Object.  This should have at minimum type.
     *
     * @param nodePath
     *      Fully qualified dotted json path to the current node in the JSONSchema.
     *      Used only for informational error and log messages.
     */
    @Nonnull
    protected T convert(
        ObjectNode jsonSchema,
        String nodePath
    ) {
        // Get the value of the JSONSchema "type".
        String jsonSchemaType = getJsonSchemaType(jsonSchema);
        T convertedType;

        // Convert jsonSchemaType to T.
        switch (jsonSchemaType) {

            case TYPE_NULL:
                convertedType = schemaConversions.typeNull();
                break;

            case TYPE_BOOLEAN:
                convertedType = schemaConversions.typeBoolean();
                break;

            case TYPE_INTEGER:
                convertedType = schemaConversions.typeInteger();
                break;

            case TYPE_NUMBER:
                convertedType = schemaConversions.typeDecimal();
                break;

            case TYPE_STRING:
                if (jsonSchema.hasNonNull(FORMAT)) {
                    String format = getJsonNode(
                        jsonSchema,
                        FORMAT,
                        "Expected field " + nodePath + " to specify " + FORMAT
                    ).asText();

                    if (format.equals(FORMAT_DATE_TIME)) {
                        convertedType = schemaConversions.typeTimestamp();
                    } else {
                        convertedType = schemaConversions.typeString();
                    }
                } else {
                    convertedType = schemaConversions.typeString();
                }
                break;

            case TYPE_ARRAY:
                convertedType = convertArrayType(jsonSchema, nodePath);
                break;

            case TYPE_OBJECT:
                // An object type must have a schema defined either in properties (Row type)
                // or additionalProperties (Map type).

                // Special map-case: additionalProperties has a schema,
                // this is a Map type instead of a Row Type.
                // https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#map_types
                if (
                    jsonSchema.hasNonNull(ADDITIONAL_PROPERTIES) &&
                    jsonSchema.get(ADDITIONAL_PROPERTIES).isObject()
                ) {
                    convertedType = convertMapType(jsonSchema, nodePath);
                    // TO DO: if properties are specified, they should be compatible with the map type
                    // (additionalProperties) schema.
                    // Those properties are used more for hinting and validation
                    // (e.g. requiring certain map keys during JSONSchema validation),
                    // so we don't strictly have to do this here, but it is probably a good idea.
                } else {
                    convertedType = convertRowType(jsonSchema, nodePath);
                }
                break;

            default:
                throw new IllegalArgumentException(
                    "Unknown JSONSchema \"" + TYPE  + "\": \"" + jsonSchemaType +
                    "\" at " + nodePath
                );
        }

        LOG.debug(
            "Converted JSONSchema at {} to Flink Table DataType {}",
            nodePath,
            convertedType
        );

        return convertedType;
    }

    @Nonnull
    protected T convertArrayType(ObjectNode jsonSchema, String nodePath) {
        JsonNode itemsSchema = getJsonNode(
            jsonSchema,
            ITEMS,
            nodePath + " array JSONSchema did not specify the items type"
        );
        checkArgument(
            itemsSchema.isObject() && itemsSchema.has(TYPE),
            nodePath + " array JSONSchema must specify the items type for field, " +
                "e.g. \"" + ITEMS + "\": { \"" + TYPE + "\": \"string\"}"
        );

        T elementType = convert(
            (ObjectNode)itemsSchema,
            nodePath + "." + ITEMS
        );
        return schemaConversions.typeArray(elementType, true);
    }

    @Nonnull
    protected T convertMapType(ObjectNode jsonSchema, String nodePath) {
        String errorMessage = nodePath + "." + ADDITIONAL_PROPERTIES +
            "\" must be a JSONSchema object to be a Map type";

        JsonNode additionalPropertiesSchema = getJsonNode(
            jsonSchema,
            ADDITIONAL_PROPERTIES,
            errorMessage
        );
        checkArgument(additionalPropertiesSchema.isObject(), errorMessage);
        checkArgument(
            additionalPropertiesSchema.has(TYPE),
            nodePath + "." + ADDITIONAL_PROPERTIES +
                "JSONSchema object must specify value \""
                + TYPE + "\"" + "to be a Map type"
        );

        T valueType = convert(
            (ObjectNode)additionalPropertiesSchema,
            nodePath + "." + ADDITIONAL_PROPERTIES
        );


        // Maps made from JSON always have key strings.
        return schemaConversions.typeMap(schemaConversions.typeString(), valueType, true);
    }

    @Nonnull
    protected T convertRowType(ObjectNode jsonSchema, String nodePath) {
        String errorMessage = nodePath + " object JSONSchema \"" + PROPERTIES + "\" is not an object.";
        JsonNode properties = getJsonNode(jsonSchema, PROPERTIES, errorMessage);
        checkArgument(properties.isObject(), errorMessage);

        // Collect info about each JsonSchema property into
        // a RowField DTO.
        List<SchemaConversions.RowField<T>> rowFields = new ArrayList<>();
        Set<String> requiredFields = new HashSet<>();

        if (jsonSchema.hasNonNull(REQUIRED)) {
            JsonNode requiredNode = jsonSchema.get(REQUIRED);
            checkArgument(
                requiredNode.isArray() || requiredNode.isBoolean(),
                "JSONSchema at " + nodePath +  "\" " + REQUIRED + "\" " +
                    "must be an array or a boolean (JSONSchema Draft-3)."
            );

            if (requiredNode.isArray()) {
                requiredNode.elements().forEachRemaining(j -> requiredFields.add(j.asText()));
            }
        }

        // Convert each property to a RowField and collect in a list of RowFields,
        // and then call typeRow on the schemaConversions implementation.
        properties.fields().forEachRemaining((Map.Entry<String, JsonNode> propertiesField) -> {
            String propertyName = propertiesField.getKey();

            String propertyNodePath = nodePath + "." + PROPERTIES + "." + propertyName;
            checkArgument(
                propertiesField.getValue().isObject(),
                "JSONSchema at \"" + propertyNodePath + "\" must be an object node."
            );

            ObjectNode propertyJsonSchema = (ObjectNode)propertiesField.getValue();

            // Covert the property schema type into T
            T propertyType = convert(propertyJsonSchema, propertyNodePath);

            // property is nullable (optional) if it is not in the list of required fields,
            // or if the field itself does not set required (Draft 3 JSONSchema).
            boolean isNullable = !(requiredFields.contains(propertyName) || isRequiredDraft3(propertyJsonSchema));

            // description can be null.
            String description = propertyJsonSchema.hasNonNull(DESCRIPTION) ?
                propertyJsonSchema.get(DESCRIPTION).asText() :
                null;

            rowFields.add(new SchemaConversions.RowField<T>(propertyName, propertyType, isNullable, description));
        });

        return schemaConversions.typeRow(rowFields);

    }

    /**
     * If a field has a 'required' property as a boolean, assume this is a JSONSchema Draft3
     * style schema and the required property is referring to the current field, not a list of
     * required sub properties.  This returns true if the jsonSchema has required: true
     */
    private boolean isRequiredDraft3(ObjectNode jsonSchema) {
        return jsonSchema.has(REQUIRED) &&
            jsonSchema.get(REQUIRED).isBoolean() &&
            jsonSchema.get(REQUIRED).booleanValue();
    }

    /**
     * DRY helper function to get a JsonNode out of an ObjectNode by key,
     * throwing IllegalArgumentException if the key does not exist, or is set to "null".
     * @param objectNode ObjectNode from which to get JsonNode by key
     * @param key Field name key to get out of the ObjectNode.
     * @param errorMessage Error message for IllegalArgumentException if key is not set or is "null".
     * @return JsonNode in objectNode at key
     */
    @Nonnull
    public static JsonNode getJsonNode(ObjectNode objectNode, String key, String errorMessage) {
        JsonNode jsonNode = objectNode.get(key);
        checkArgument(jsonNode != null && !jsonNode.isNull(), errorMessage);
        return jsonNode;
    }

    /**
     * DRY helper function to extract the JSONSchema "type".
     */
    @Nonnull
    public static String getJsonSchemaType(ObjectNode jsonSchema) {
        String getTypeErrorMessage = "jsonSchema does not contain valid JSONSchema \"" + TYPE + "\".";
        JsonNode schemaTypeNode = getJsonNode(jsonSchema, TYPE, getTypeErrorMessage);
        checkArgument(schemaTypeNode.isTextual(), getTypeErrorMessage);

        return schemaTypeNode.asText();
    }

    /**
     * DRY helper function that asserts that the jsonSchema has "type": "object".
     * @throws IllegalArgumentException if jsonSchema "type" != "object"
     */
    @Nonnull
    public static void checkJsonSchemaIsObject(ObjectNode jsonSchema) {
        String schemaTitle;
        if (jsonSchema.hasNonNull(TITLE)) {
            schemaTitle = jsonSchema.get(TITLE).textValue();
        } else {
            schemaTitle = "Untitled";
        }

        String jsonSchemaType = getJsonSchemaType(jsonSchema);
        checkArgument(
            jsonSchemaType.equals(TYPE_OBJECT),
            "When converting JSONSchema " + schemaTitle +
                " to Flink row schema, expected JSONSchema \"" + TYPE +
                "\" to be \" + " + TYPE_OBJECT + "\" but was " + jsonSchemaType
        );
    }

}