EventRowTypeInfo.java

package org.wikimedia.eventutilities.flink;

import static java.util.function.Function.identity;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.EVENT_TIME_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_ID_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_INGESTION_TIME_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_STREAM_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.SCHEMA_FIELD;

import java.lang.reflect.Array;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.annotation.Nullable;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;

import com.google.common.collect.ImmutableSet;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.EqualsAndHashCode;

/**
 * Simple subclasss of RowTypeInfo that provides support for:
 * <ul>
 * <li>WMF Event Platform rules</li>
 * <li>flink application state schema evolution for the Row type (following WMF guidelines)</li>
 * </ul>
 *
 * NOTE: the serialization format is not compatible with the flink RowSerialization and thus RowTypeInfo and EventRowTypeInfo
 * should not be mixed up.
 * <br>
 *
 * If for some reasons you have to work with plain RowTypeInfo (pyflink) you can obtain the corresponding RowTypeInfo
 * using {@link #toRowTypeInfo()}.
 */
@EqualsAndHashCode
public class EventRowTypeInfo extends RowTypeInfo {
    private final LinkedHashMap<String, Integer> positionByName;
    // map of { alias -> field } used to extract a key from a DataStream Row.
    private final TreeMap<String, String> aliasedRowKey;
    // Cache key type information across keyTypeInfo() calls.
    private EventRowTypeInfo keyTypeInfo;

    public static EventRowTypeInfo create(String[] fieldNames, TypeInformation<?>... types) {
        return EventRowTypeInfo.create(null, fieldNames, types);
    }

    public static EventRowTypeInfo create(Map<String, String> aliasedRowKey, String[] fieldNames, TypeInformation<?>... types) {
        TypeInformation<?>[] fieldTypes = Arrays.stream(types)
                .map(t -> EventRowTypeInfo.switchRowTypeInfoImpl(t, RowTypeInfo.class, EventRowTypeInfo.class, EventRowTypeInfo::create))
                .toArray(TypeInformation[]::new);
        return new EventRowTypeInfo(fieldTypes, fieldNames, aliasedRowKey);
    }

    public static EventRowTypeInfo create(RowTypeInfo info) {
        return EventRowTypeInfo.create(info.getFieldNames(), info.getFieldTypes());
    }

    public static EventRowTypeInfo create(RowTypeInfo info, Map<String, String> aliasedRowKey) {
        return EventRowTypeInfo.create(aliasedRowKey, info.getFieldNames(), info.getFieldTypes());
    }

    /**
     * Switch between two row TypeInformation implementations.
     * Inspect the given type info and tries to convert any fromClass to toClass.
     * Only MapTypeInfo and ObjectArrayTypeInfo are supported, other composite types such as
     * POJO or Tuples are not inspected. Main purpose of this class is to support RowTypeInfo
     * generated via {@link org.wikimedia.eventutilities.flink.formats.json.TypeInformationSchemaConversions}.
     */
    private static <F extends TypeInformation<?>, T extends TypeInformation<?>> TypeInformation<?> switchRowTypeInfoImpl(
            TypeInformation<?> typeInformation,
            Class<F> fromClass, Class<T> toClass,
            Function<F, T> conversion
    ) {
        if (toClass.equals(typeInformation.getClass())) {
            return typeInformation;
        }
        if (fromClass.equals(typeInformation.getClass())) {
            return conversion.apply(fromClass.cast(typeInformation));
        }
        if (typeInformation.getClass().equals(MapTypeInfo.class)) {
            MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) typeInformation;
            return new MapTypeInfo<>(mapTypeInfo.getKeyTypeInfo(), switchRowTypeInfoImpl(mapTypeInfo.getValueTypeInfo(), fromClass, toClass, conversion));
        }
        if (typeInformation.getClass().equals(ObjectArrayTypeInfo.class)) {
            ObjectArrayTypeInfo<?, ?> arrayTypeInfo = (ObjectArrayTypeInfo<?, ?>) typeInformation;
            return ObjectArrayTypeInfo.getInfoFor(arrayTypeInfo.getTypeClass(),
                    switchRowTypeInfoImpl(arrayTypeInfo.getComponentInfo(), fromClass, toClass, conversion));
        }
        return typeInformation;
    }

    public EventRowTypeInfo(TypeInformation<?>[] types, String[] fieldNames) {
        this(types, fieldNames, null);
    }

    public EventRowTypeInfo(TypeInformation<?>[] types, String[] fieldNames, @Nullable Map<String, String> aliasedRowKey) {
        super(types, fieldNames);
        this.positionByName = IntStream.range(0, fieldNames.length)
                .boxed()
                .collect(Collectors.toMap(i -> fieldNames[i], identity(), (i, j) -> i, LinkedHashMap::new));

        this.aliasedRowKey =  (aliasedRowKey != null) ? new TreeMap<>(aliasedRowKey) : new TreeMap<>();
        initializeKeyTypeInfo();
    }

    /**
     * Obtain the equivalent RowTypeInfo. Useful for APIs that expects to work on a plain {@link RowTypeInfo}.
     */
    public RowTypeInfo toRowTypeInfo() {
        TypeInformation<?>[] fieldTypes = Arrays.stream(types)
                .map(t -> EventRowTypeInfo.switchRowTypeInfoImpl(t, EventRowTypeInfo.class, RowTypeInfo.class, EventRowTypeInfo::toRowTypeInfo))
                .toArray(TypeInformation[]::new);
        return new RowTypeInfo(fieldTypes, fieldNames);
    }

    @Override
    @SuppressFBWarnings(value = "COM_COPIED_OVERRIDDEN_METHOD", justification = "not true, they are different")
    public boolean canEqual(Object obj) {
        // add canEqual manually because lombok may want to add one with another visibility
        // failing with: attempting to assign weaker access privileges; was public
        return obj instanceof EventRowTypeInfo;
    }

    /**
     * List of fields that should not be projected by default
     * when projecting a source event to a target event.
     */
    public static final Set<String> DEFAULT_NON_PROJECTED_FIELDS = ImmutableSet.of(
            META_FIELD + "." + META_ID_FIELD,
            META_FIELD + "." + META_INGESTION_TIME_FIELD,
            META_FIELD + "." + META_STREAM_FIELD,
            SCHEMA_FIELD
    );

    /**
     * Creates a new empty Row matching this RowTypeInfo.
     */
    public Row createEmptyRow() {
        return createRowForFieldPath(null);
    }

    /**
     * Creates a new empty instance of a sub Row identified by fieldPath.
     *
     * This path supports dotted notation to access nested subfields.
     * For arrays or maps of Row simply pass the name of the field of the array or map.
     * @throws IllegalArgumentException if the path leads to a non-existent field or a field that is not a Row.
     */
    public Row createEmptySubRow(String fieldPath) {
        return createRowForFieldPath(fieldPath);
    }

    /**
     * Creates a new empty instance of a sub Row identified by fieldPath.
     * If fieldPath is null it creates the root Row object.
     */
    private Row createRowForFieldPath(@Nullable String fieldPath) {
        if (fieldPath == null) {
            return RowUtils.createRowWithNamedPositions(RowKind.INSERT, new Object[getArity()], positionByName);
        }
        return unwrapRowTypeInfo(this.getTypeAt(fieldPath)).createEmptyRow();
    }

    /**
     * Recursively inspect MapTypeInfo of ObjectArrayTypeInfo to extract the RowTypeInfo of the leaf value.
     * @throws IllegalArgumentException if it leads to a non RowTypeInfo field.
     */
    private EventRowTypeInfo unwrapRowTypeInfo(TypeInformation<?> fieldTypeInfo) {
        // Extract the type information from composite types: arrays and maps
        if (fieldTypeInfo instanceof MapTypeInfo) {
            return unwrapRowTypeInfo(((MapTypeInfo<?, ?>) fieldTypeInfo).getValueTypeInfo());
        } else if (fieldTypeInfo instanceof ObjectArrayTypeInfo<?, ?>) {
            return unwrapRowTypeInfo(((ObjectArrayTypeInfo<?, ?>) fieldTypeInfo).getComponentInfo());
        }
        if (!(fieldTypeInfo instanceof EventRowTypeInfo)) {
            throw new IllegalArgumentException("Expected a RowTypeInfo");
        }
        return (EventRowTypeInfo) fieldTypeInfo;
    }

    /**
     * Project an input Row to a new Row matching the {@link TypeInformation} of this class.
     *
     * The passed Row must:
     * <ul>
     *  <li>if intersect is false be compatible with this TypeInformation and must have all the fields
     *    declared by it
     *  <li>support named based access
     * </ul>
     *
     * If intersect is true only the fields declared by both row and this TypeInformation will be copied over.
     * The use-case of this method is to support WMF schema downgrades in case a pipeline
     * reads events in schema V2 but wants to write them as a previous version
     * (only new fields are allowed during schema evolution).
     *
     * The param ignoredFields can be used to explicitly ignore some field from the projection. The content must
     * include full-path using a dot as a path separator (e.g. "meta.dt" to exclude the nested "dt" field).
     *
     * Edge-cases:
     * - when intersect is true the resulting Row might be emtpy if no common fields are present
     * - if the intersection results in an empty composite field (map, row, arrays) this field will not be projected.
     */
    @SuppressFBWarnings(
            value = "SPP_PASSING_THIS_AS_PARM",
            justification = "We are calling copyRowContent() with different parameter in other places, it makes sense to reuse it with 'this' here.")
    public Row projectFrom(Row row, boolean intersect, Set<String> ignoredFields) {
        Row newRow = createEmptyRow();
        copyRowContent(row, newRow, this, null, intersect, ignoredFields);
        return newRow;
    }

    /**
     * Project an input Row to a new Row matching the {@link TypeInformation} of this class.
     *
     * The following data will be ignored:
     * <ul>
     * <li>all the meta field content except the request_id, domain fields
     * <li>the $schema field
     * </ul>
     *
     * @see #projectFrom(Row, boolean, Set)
     * @see #DEFAULT_NON_PROJECTED_FIELDS
     */
    public Row projectFrom(Row row, boolean intersect) {
        return projectFrom(row, intersect, DEFAULT_NON_PROJECTED_FIELDS);
    }

    /**
     * Populate the target row from the source row.
     *
     * @return false if nothing was copied (the target row remained empty)
     */
    @SuppressWarnings("checkstyle:CyclomaticComplexity")
    private boolean copyRowContent(Row source, Row target, RowTypeInfo targetTypeinfo, @Nullable String path, boolean intersect, Set<String> ignoredFields) {
        Set<String> fieldNames = source.getFieldNames(true);
        boolean empty = true;
        if (fieldNames == null) {
            throw new IllegalArgumentException("Source Row must support named based access");
        }
        for (String field: targetTypeinfo.getFieldNames()) {
            String npath = path != null ? path + "." + field : field;
            boolean sourceHasField = fieldNames.contains(field);
            // skip ignored fields
            if (ignoredFields.contains(npath) || (intersect && !sourceHasField)) {
                continue;
            }
            TypeInformation<?> targetField = targetTypeinfo.getTypeAt(field);
            if (!intersect && !sourceHasField) {
                throw new IllegalArgumentException("The source row does not support the field [" + npath + "].");
            }
            Object sourceField = source.getField(field);
            if (sourceField != null) {
                Object copiedSource = copy(sourceField, targetField, npath, intersect, ignoredFields);
                if (copiedSource != null) {
                    empty = false;
                    target.setField(field, copy(sourceField, targetField, npath, intersect, ignoredFields));
                }
            }
        }
        return !empty;
    }

    private Object copy(Object source, TypeInformation<?> typeInformation, String fieldPath, boolean intersect, Set<String> ignoredFields) {
        final Object returnValue;
        if (typeInformation instanceof RowTypeInfo) {
            Row targetSubfield = createEmptySubRow(fieldPath);
            boolean copied = copyRowContent((Row) source, targetSubfield, (RowTypeInfo) typeInformation, fieldPath, intersect, ignoredFields);
            // do not return an empty Row
            returnValue = copied ? targetSubfield : null;
        } else if (typeInformation instanceof MapTypeInfo) {
            Map<?, ?> sourceSubfield = (Map<?, ?>) source;
            TypeInformation<?> valueTypeInfo = ((MapTypeInfo<?, ?>) typeInformation).getValueTypeInfo();
            Map<Object, Object> targetSubfield = new HashMap<>();
            for (Map.Entry<?, ?> entry: sourceSubfield.entrySet()) {
                targetSubfield.put(entry.getKey(), copy(entry.getValue(), valueTypeInfo, fieldPath, intersect, ignoredFields));
            }
            returnValue = targetSubfield;
        } else if (typeInformation instanceof ObjectArrayTypeInfo) {
            ObjectArrayTypeInfo<?, ?> objectArrayTypeInfo = (ObjectArrayTypeInfo<?, ?>) typeInformation;
            Object[] sourceArray = (Object[]) source;
            Object[] targetSubfield = (Object[]) Array.newInstance(objectArrayTypeInfo.getComponentInfo().getTypeClass(), ((Object[]) source).length);
            for (int i = 0; i < sourceArray.length; i++) {
                Object sourceMember = sourceArray[i];
                if (sourceMember != null) {
                    targetSubfield[i] = copy(sourceMember, objectArrayTypeInfo.getComponentInfo(), fieldPath, intersect, ignoredFields);
                }
            }
            returnValue = targetSubfield;
        } else {
            // no transformation, we assume that everything that is not a Row, a Map or ObjectArray is immutable
            // (this is probably not true but might cover most WMF use-cases)
            returnValue = source;
        }
        return returnValue;
    }

    /**
     * Set the ingestion time (meta.dt) as per WMF event platform rules.
     *
     * @see <a href="https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#Required_fields">meta.dt rules</a>
     *
     * @throws IllegalArgumentException if the type information does not declare the meta field.
     */
    public void setIngestionTime(Row element, Instant ingestionTime) {
        if (!this.hasField(META_FIELD)) {
            throw new IllegalArgumentException("This TypeInformation does not declare the " + META_FIELD + " field");
        }
        Row meta = element.getFieldAs(META_FIELD);
        if (meta == null) {
            meta = createEmptySubRow(META_FIELD);
            element.setField(META_FIELD, meta);
        }
        meta.setField(META_INGESTION_TIME_FIELD, ingestionTime);
    }

    /**
     * Set the event time (dt) as per WMF event platform rules.
     *
     * @throws IllegalArgumentException if the type information does not declare the dt field
     */
    public void setEventTime(Row element, Instant eventTime) {
        element.setField(EVENT_TIME_FIELD, eventTime);
    }

    /**
     * Read the event time field.
     * May return null if the field is not set.
     */
    @Nullable
    public Instant getEventTime(Row element) {
        return element.getFieldAs(EVENT_TIME_FIELD);
    }

    /**
     * Get the ingestion time of this event if set.
     */
    public Instant getIngestionTime(Row element) {
        Row meta = element.getFieldAs(META_FIELD);
        if (meta != null) {
            return meta.getFieldAs(EVENT_TIME_FIELD);
        }
        return null;
    }

    @Override
    public TypeSerializer<Row> createSerializer(ExecutionConfig config) {
        return new EventRowSerializer(
                Arrays.stream(types).map(t -> t.createSerializer(config)).toArray(TypeSerializer[]::new),
                positionByName
        );
    }

    /**
     * Check if the EventRowTypeInformation instance has a partition key.
     * @return
     */
    public boolean hasKey() {
        return !aliasedRowKey.isEmpty();
    }

    private Object getFieldAt(Row row, String fieldPath) {
        final String fieldPathDelimiter = "\\.";
        List<String> path = Arrays.stream(fieldPath.split(fieldPathDelimiter)).collect(Collectors.toList());
        Object target = row;

        for (String field: path) {
            if (target instanceof Row) {
                target = ((Row) target).getField(field);
            } else if (target instanceof Map) {
                target = ((Map) target).get(field);
            } else if (target == null) {
                throw new IllegalArgumentException("Optional (nullable) fields are not allowed in key: " + fieldPath);
            }
        }
        return target;
    }

    // Traverse the path type and extract the leaf type from composite types.
    // Wrapper to RowTypeInfo.getTypeAt() to use in complex WMF Json Schemas.
    // RowTypeInfo.getTypeAt() will throw an InvalidFieldReferenceException when the leaf
    // is contained in a CompositeType; for example, it would fail to resolve
    // Map<String, Row(string_in_subfield: String)>.
    private TypeInformation<?> unwrapCompositeTypeAt(String fieldPath) {
        final String fieldPathDelimiter = "\\.";
        String[] path = fieldPath.split(fieldPathDelimiter);

        TypeInformation<?> fieldType = null;
        // Reconstruct fieldPath and unwrap composite types iteratively
        StringBuilder visitedFieldPath = new StringBuilder();
        int i = 0;
        // Start iterating over the
        EventRowTypeInfo curRowTypeInfo = this;
        boolean resetVisitedPath;
        while (i < path.length) {
            resetVisitedPath = false;
            visitedFieldPath.append(path[i]);
            fieldType = curRowTypeInfo.getTypeAt(String.valueOf(visitedFieldPath));
            if (fieldType instanceof  MapTypeInfo) {
                // Iterate over MapTypeInfo of MapTypeInfo (...) and update the visited path accordingly.
                while (fieldType instanceof  MapTypeInfo) {
                    fieldType = ((MapTypeInfo<?, ?>) fieldType).getValueTypeInfo();
                    i++;
                }
            }
            // The MapTypeInfo value at visitedFieldPath contained a Row.
            if (fieldType instanceof RowTypeInfo) {
                // Reset the type information to use the nested row as base.
                resetVisitedPath = true;
                curRowTypeInfo = (EventRowTypeInfo) fieldType;
                // Start visiting from the current row. This is to avoid trying to getTypeAt() on
                // already traversed CompositeType paths
                visitedFieldPath = new StringBuilder();
            }

            if (!resetVisitedPath) {
                visitedFieldPath.append(fieldPathDelimiter);
            }
            i++;
        }
        return fieldType;
    }

    /**
     * Extract a key for a given input Row. Returns null is
     * EventRowTypeInfo has no key.
     * @param row
     * @return
     */
    public @Nullable Row extractKey(Row row) {
        if (hasKey()) {
            // requireNonNull() to control keyTypeInfo() behaviour.
            // keyTypeInfo() will be null only if hasKey() is false.
            // In that case, this block will not be evaluated.
            Row rowKey = Objects.requireNonNull(keyTypeInfo()).createEmptyRow();
            aliasedRowKey.forEach((key, fieldPath) ->
                rowKey.setField(key, getFieldAt(row, fieldPath))
            );

            return rowKey;
        }
        // no key was defined for this EventRowTypeInfo
        return null;
    }

    // Eager initialization logic for keyTypeInfo
    private void initializeKeyTypeInfo() {
        if (hasKey()) {
            TypeInformation<?>[] fieldTypes = aliasedRowKey.keySet().stream()
                    .map(k -> unwrapCompositeTypeAt(aliasedRowKey.get(k)))
                    .toArray(TypeInformation[]::new);
            // A keyTypeInfo should not have a key. Set aliasedRowKey to null.
            keyTypeInfo = new EventRowTypeInfo(fieldTypes, aliasedRowKey.keySet().toArray(new String[0]), null);
        }
    }

    /**
     * Return a TypeInfo representing the schema of Row keys.
     * Returns null if EventRowTypeInfo has no key.
     * @return
     */
    public @Nullable EventRowTypeInfo keyTypeInfo() {
        return keyTypeInfo;
    }
}