EventRowTypeInfo.java
package org.wikimedia.eventutilities.flink;
import static java.util.function.Function.identity;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.EVENT_TIME_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_ID_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_INGESTION_TIME_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_STREAM_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.SCHEMA_FIELD;
import java.lang.reflect.Array;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.EqualsAndHashCode;
/**
* Simple subclasss of RowTypeInfo that provides support for:
* <ul>
* <li>WMF Event Platform rules</li>
* <li>flink application state schema evolution for the Row type (following WMF guidelines)</li>
* </ul>
*
* NOTE: the serialization format is not compatible with the flink RowSerialization and thus RowTypeInfo and EventRowTypeInfo
* should not be mixed up.
* <br>
*
* If for some reasons you have to work with plain RowTypeInfo (pyflink) you can obtain the corresponding RowTypeInfo
* using {@link #toRowTypeInfo()}.
*/
@EqualsAndHashCode
public class EventRowTypeInfo extends RowTypeInfo {
private final LinkedHashMap<String, Integer> positionByName;
// map of { alias -> field } used to extract a key from a DataStream Row.
private final TreeMap<String, String> aliasedRowKey;
// Cache key type information across keyTypeInfo() calls.
private EventRowTypeInfo keyTypeInfo;
public static EventRowTypeInfo create(String[] fieldNames, TypeInformation<?>... types) {
return EventRowTypeInfo.create(null, fieldNames, types);
}
public static EventRowTypeInfo create(Map<String, String> aliasedRowKey, String[] fieldNames, TypeInformation<?>... types) {
TypeInformation<?>[] fieldTypes = Arrays.stream(types)
.map(t -> EventRowTypeInfo.switchRowTypeInfoImpl(t, RowTypeInfo.class, EventRowTypeInfo.class, EventRowTypeInfo::create))
.toArray(TypeInformation[]::new);
return new EventRowTypeInfo(fieldTypes, fieldNames, aliasedRowKey);
}
public static EventRowTypeInfo create(RowTypeInfo info) {
return EventRowTypeInfo.create(info.getFieldNames(), info.getFieldTypes());
}
public static EventRowTypeInfo create(RowTypeInfo info, Map<String, String> aliasedRowKey) {
return EventRowTypeInfo.create(aliasedRowKey, info.getFieldNames(), info.getFieldTypes());
}
/**
* Switch between two row TypeInformation implementations.
* Inspect the given type info and tries to convert any fromClass to toClass.
* Only MapTypeInfo and ObjectArrayTypeInfo are supported, other composite types such as
* POJO or Tuples are not inspected. Main purpose of this class is to support RowTypeInfo
* generated via {@link org.wikimedia.eventutilities.flink.formats.json.TypeInformationSchemaConversions}.
*/
private static <F extends TypeInformation<?>, T extends TypeInformation<?>> TypeInformation<?> switchRowTypeInfoImpl(
TypeInformation<?> typeInformation,
Class<F> fromClass, Class<T> toClass,
Function<F, T> conversion
) {
if (toClass.equals(typeInformation.getClass())) {
return typeInformation;
}
if (fromClass.equals(typeInformation.getClass())) {
return conversion.apply(fromClass.cast(typeInformation));
}
if (typeInformation.getClass().equals(MapTypeInfo.class)) {
MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) typeInformation;
return new MapTypeInfo<>(mapTypeInfo.getKeyTypeInfo(), switchRowTypeInfoImpl(mapTypeInfo.getValueTypeInfo(), fromClass, toClass, conversion));
}
if (typeInformation.getClass().equals(ObjectArrayTypeInfo.class)) {
ObjectArrayTypeInfo<?, ?> arrayTypeInfo = (ObjectArrayTypeInfo<?, ?>) typeInformation;
return ObjectArrayTypeInfo.getInfoFor(arrayTypeInfo.getTypeClass(),
switchRowTypeInfoImpl(arrayTypeInfo.getComponentInfo(), fromClass, toClass, conversion));
}
return typeInformation;
}
public EventRowTypeInfo(TypeInformation<?>[] types, String[] fieldNames) {
this(types, fieldNames, null);
}
public EventRowTypeInfo(TypeInformation<?>[] types, String[] fieldNames, @Nullable Map<String, String> aliasedRowKey) {
super(types, fieldNames);
this.positionByName = IntStream.range(0, fieldNames.length)
.boxed()
.collect(Collectors.toMap(i -> fieldNames[i], identity(), (i, j) -> i, LinkedHashMap::new));
this.aliasedRowKey = (aliasedRowKey != null) ? new TreeMap<>(aliasedRowKey) : new TreeMap<>();
initializeKeyTypeInfo();
}
/**
* Obtain the equivalent RowTypeInfo. Useful for APIs that expects to work on a plain {@link RowTypeInfo}.
*/
public RowTypeInfo toRowTypeInfo() {
TypeInformation<?>[] fieldTypes = Arrays.stream(types)
.map(t -> EventRowTypeInfo.switchRowTypeInfoImpl(t, EventRowTypeInfo.class, RowTypeInfo.class, EventRowTypeInfo::toRowTypeInfo))
.toArray(TypeInformation[]::new);
return new RowTypeInfo(fieldTypes, fieldNames);
}
@Override
@SuppressFBWarnings(value = "COM_COPIED_OVERRIDDEN_METHOD", justification = "not true, they are different")
public boolean canEqual(Object obj) {
// add canEqual manually because lombok may want to add one with another visibility
// failing with: attempting to assign weaker access privileges; was public
return obj instanceof EventRowTypeInfo;
}
/**
* List of fields that should not be projected by default
* when projecting a source event to a target event.
*/
public static final Set<String> DEFAULT_NON_PROJECTED_FIELDS = ImmutableSet.of(
META_FIELD + "." + META_ID_FIELD,
META_FIELD + "." + META_INGESTION_TIME_FIELD,
META_FIELD + "." + META_STREAM_FIELD,
SCHEMA_FIELD
);
/**
* Creates a new empty Row matching this RowTypeInfo.
*/
public Row createEmptyRow() {
return createRowForFieldPath(null);
}
/**
* Creates a new empty instance of a sub Row identified by fieldPath.
*
* This path supports dotted notation to access nested subfields.
* For arrays or maps of Row simply pass the name of the field of the array or map.
* @throws IllegalArgumentException if the path leads to a non-existent field or a field that is not a Row.
*/
public Row createEmptySubRow(String fieldPath) {
return createRowForFieldPath(fieldPath);
}
/**
* Creates a new empty instance of a sub Row identified by fieldPath.
* If fieldPath is null it creates the root Row object.
*/
private Row createRowForFieldPath(@Nullable String fieldPath) {
if (fieldPath == null) {
return RowUtils.createRowWithNamedPositions(RowKind.INSERT, new Object[getArity()], positionByName);
}
return unwrapRowTypeInfo(this.getTypeAt(fieldPath)).createEmptyRow();
}
/**
* Recursively inspect MapTypeInfo of ObjectArrayTypeInfo to extract the RowTypeInfo of the leaf value.
* @throws IllegalArgumentException if it leads to a non RowTypeInfo field.
*/
private EventRowTypeInfo unwrapRowTypeInfo(TypeInformation<?> fieldTypeInfo) {
// Extract the type information from composite types: arrays and maps
if (fieldTypeInfo instanceof MapTypeInfo) {
return unwrapRowTypeInfo(((MapTypeInfo<?, ?>) fieldTypeInfo).getValueTypeInfo());
} else if (fieldTypeInfo instanceof ObjectArrayTypeInfo<?, ?>) {
return unwrapRowTypeInfo(((ObjectArrayTypeInfo<?, ?>) fieldTypeInfo).getComponentInfo());
}
if (!(fieldTypeInfo instanceof EventRowTypeInfo)) {
throw new IllegalArgumentException("Expected a RowTypeInfo");
}
return (EventRowTypeInfo) fieldTypeInfo;
}
/**
* Project an input Row to a new Row matching the {@link TypeInformation} of this class.
*
* The passed Row must:
* <ul>
* <li>if intersect is false be compatible with this TypeInformation and must have all the fields
* declared by it
* <li>support named based access
* </ul>
*
* If intersect is true only the fields declared by both row and this TypeInformation will be copied over.
* The use-case of this method is to support WMF schema downgrades in case a pipeline
* reads events in schema V2 but wants to write them as a previous version
* (only new fields are allowed during schema evolution).
*
* The param ignoredFields can be used to explicitly ignore some field from the projection. The content must
* include full-path using a dot as a path separator (e.g. "meta.dt" to exclude the nested "dt" field).
*
* Edge-cases:
* - when intersect is true the resulting Row might be emtpy if no common fields are present
* - if the intersection results in an empty composite field (map, row, arrays) this field will not be projected.
*/
@SuppressFBWarnings(
value = "SPP_PASSING_THIS_AS_PARM",
justification = "We are calling copyRowContent() with different parameter in other places, it makes sense to reuse it with 'this' here.")
public Row projectFrom(Row row, boolean intersect, Set<String> ignoredFields) {
Row newRow = createEmptyRow();
copyRowContent(row, newRow, this, null, intersect, ignoredFields);
return newRow;
}
/**
* Project an input Row to a new Row matching the {@link TypeInformation} of this class.
*
* The following data will be ignored:
* <ul>
* <li>all the meta field content except the request_id, domain fields
* <li>the $schema field
* </ul>
*
* @see #projectFrom(Row, boolean, Set)
* @see #DEFAULT_NON_PROJECTED_FIELDS
*/
public Row projectFrom(Row row, boolean intersect) {
return projectFrom(row, intersect, DEFAULT_NON_PROJECTED_FIELDS);
}
/**
* Populate the target row from the source row.
*
* @return false if nothing was copied (the target row remained empty)
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private boolean copyRowContent(Row source, Row target, RowTypeInfo targetTypeinfo, @Nullable String path, boolean intersect, Set<String> ignoredFields) {
Set<String> fieldNames = source.getFieldNames(true);
boolean empty = true;
if (fieldNames == null) {
throw new IllegalArgumentException("Source Row must support named based access");
}
for (String field: targetTypeinfo.getFieldNames()) {
String npath = path != null ? path + "." + field : field;
boolean sourceHasField = fieldNames.contains(field);
// skip ignored fields
if (ignoredFields.contains(npath) || (intersect && !sourceHasField)) {
continue;
}
TypeInformation<?> targetField = targetTypeinfo.getTypeAt(field);
if (!intersect && !sourceHasField) {
throw new IllegalArgumentException("The source row does not support the field [" + npath + "].");
}
Object sourceField = source.getField(field);
if (sourceField != null) {
Object copiedSource = copy(sourceField, targetField, npath, intersect, ignoredFields);
if (copiedSource != null) {
empty = false;
target.setField(field, copy(sourceField, targetField, npath, intersect, ignoredFields));
}
}
}
return !empty;
}
private Object copy(Object source, TypeInformation<?> typeInformation, String fieldPath, boolean intersect, Set<String> ignoredFields) {
final Object returnValue;
if (typeInformation instanceof RowTypeInfo) {
Row targetSubfield = createEmptySubRow(fieldPath);
boolean copied = copyRowContent((Row) source, targetSubfield, (RowTypeInfo) typeInformation, fieldPath, intersect, ignoredFields);
// do not return an empty Row
returnValue = copied ? targetSubfield : null;
} else if (typeInformation instanceof MapTypeInfo) {
Map<?, ?> sourceSubfield = (Map<?, ?>) source;
TypeInformation<?> valueTypeInfo = ((MapTypeInfo<?, ?>) typeInformation).getValueTypeInfo();
Map<Object, Object> targetSubfield = new HashMap<>();
for (Map.Entry<?, ?> entry: sourceSubfield.entrySet()) {
targetSubfield.put(entry.getKey(), copy(entry.getValue(), valueTypeInfo, fieldPath, intersect, ignoredFields));
}
returnValue = targetSubfield;
} else if (typeInformation instanceof ObjectArrayTypeInfo) {
ObjectArrayTypeInfo<?, ?> objectArrayTypeInfo = (ObjectArrayTypeInfo<?, ?>) typeInformation;
Object[] sourceArray = (Object[]) source;
Object[] targetSubfield = (Object[]) Array.newInstance(objectArrayTypeInfo.getComponentInfo().getTypeClass(), ((Object[]) source).length);
for (int i = 0; i < sourceArray.length; i++) {
Object sourceMember = sourceArray[i];
if (sourceMember != null) {
targetSubfield[i] = copy(sourceMember, objectArrayTypeInfo.getComponentInfo(), fieldPath, intersect, ignoredFields);
}
}
returnValue = targetSubfield;
} else {
// no transformation, we assume that everything that is not a Row, a Map or ObjectArray is immutable
// (this is probably not true but might cover most WMF use-cases)
returnValue = source;
}
return returnValue;
}
/**
* Set the ingestion time (meta.dt) as per WMF event platform rules.
*
* @see <a href="https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#Required_fields">meta.dt rules</a>
*
* @throws IllegalArgumentException if the type information does not declare the meta field.
*/
public void setIngestionTime(Row element, Instant ingestionTime) {
if (!this.hasField(META_FIELD)) {
throw new IllegalArgumentException("This TypeInformation does not declare the " + META_FIELD + " field");
}
Row meta = element.getFieldAs(META_FIELD);
if (meta == null) {
meta = createEmptySubRow(META_FIELD);
element.setField(META_FIELD, meta);
}
meta.setField(META_INGESTION_TIME_FIELD, ingestionTime);
}
/**
* Set the event time (dt) as per WMF event platform rules.
*
* @throws IllegalArgumentException if the type information does not declare the dt field
*/
public void setEventTime(Row element, Instant eventTime) {
element.setField(EVENT_TIME_FIELD, eventTime);
}
/**
* Read the event time field.
* May return null if the field is not set.
*/
@Nullable
public Instant getEventTime(Row element) {
return element.getFieldAs(EVENT_TIME_FIELD);
}
/**
* Get the ingestion time of this event if set.
*/
public Instant getIngestionTime(Row element) {
Row meta = element.getFieldAs(META_FIELD);
if (meta != null) {
return meta.getFieldAs(EVENT_TIME_FIELD);
}
return null;
}
@Deprecated
@Override
public TypeSerializer<Row> createSerializer(ExecutionConfig config) {
return new EventRowSerializer(
Arrays.stream(types).map(t -> t.createSerializer(config))
.toArray(TypeSerializer[]::new), positionByName
);
}
@Override
public TypeSerializer<Row> createSerializer(SerializerConfig config) {
return new EventRowSerializer(
Arrays.stream(types).map(t -> t.createSerializer(config)).toArray(TypeSerializer[]::new),
positionByName
);
}
/**
* Check if the EventRowTypeInformation instance has a partition key.
* @return
*/
public boolean hasKey() {
return !aliasedRowKey.isEmpty();
}
private Object getFieldAt(Row row, String fieldPath) {
final String fieldPathDelimiter = "\\.";
List<String> path = Arrays.stream(fieldPath.split(fieldPathDelimiter)).collect(Collectors.toList());
Object target = row;
for (String field: path) {
if (target instanceof Row) {
target = ((Row) target).getField(field);
} else if (target instanceof Map) {
target = ((Map) target).get(field);
} else if (target == null) {
throw new IllegalArgumentException("Optional (nullable) fields are not allowed in key: " + fieldPath);
}
}
return target;
}
// Traverse the path type and extract the leaf type from composite types.
// Wrapper to RowTypeInfo.getTypeAt() to use in complex WMF Json Schemas.
// RowTypeInfo.getTypeAt() will throw an InvalidFieldReferenceException when the leaf
// is contained in a CompositeType; for example, it would fail to resolve
// Map<String, Row(string_in_subfield: String)>.
private TypeInformation<?> unwrapCompositeTypeAt(String fieldPath) {
final String fieldPathDelimiter = "\\.";
String[] path = fieldPath.split(fieldPathDelimiter);
TypeInformation<?> fieldType = null;
// Reconstruct fieldPath and unwrap composite types iteratively
StringBuilder visitedFieldPath = new StringBuilder();
int i = 0;
// Start iterating over the
EventRowTypeInfo curRowTypeInfo = this;
boolean resetVisitedPath;
while (i < path.length) {
resetVisitedPath = false;
visitedFieldPath.append(path[i]);
fieldType = curRowTypeInfo.getTypeAt(String.valueOf(visitedFieldPath));
if (fieldType instanceof MapTypeInfo) {
// Iterate over MapTypeInfo of MapTypeInfo (...) and update the visited path accordingly.
while (fieldType instanceof MapTypeInfo) {
fieldType = ((MapTypeInfo<?, ?>) fieldType).getValueTypeInfo();
i++;
}
}
// The MapTypeInfo value at visitedFieldPath contained a Row.
if (fieldType instanceof RowTypeInfo) {
// Reset the type information to use the nested row as base.
resetVisitedPath = true;
curRowTypeInfo = (EventRowTypeInfo) fieldType;
// Start visiting from the current row. This is to avoid trying to getTypeAt() on
// already traversed CompositeType paths
visitedFieldPath = new StringBuilder();
}
if (!resetVisitedPath) {
visitedFieldPath.append(fieldPathDelimiter);
}
i++;
}
return fieldType;
}
/**
* Extract a key for a given input Row. Returns null is
* EventRowTypeInfo has no key.
* @param row
* @return
*/
public @Nullable Row extractKey(Row row) {
if (hasKey()) {
// requireNonNull() to control keyTypeInfo() behaviour.
// keyTypeInfo() will be null only if hasKey() is false.
// In that case, this block will not be evaluated.
Row rowKey = Objects.requireNonNull(keyTypeInfo()).createEmptyRow();
aliasedRowKey.forEach((key, fieldPath) ->
rowKey.setField(key, getFieldAt(row, fieldPath))
);
return rowKey;
}
// no key was defined for this EventRowTypeInfo
return null;
}
// Eager initialization logic for keyTypeInfo
private void initializeKeyTypeInfo() {
if (hasKey()) {
TypeInformation<?>[] fieldTypes = aliasedRowKey.keySet().stream()
.map(k -> unwrapCompositeTypeAt(aliasedRowKey.get(k)))
.toArray(TypeInformation[]::new);
// A keyTypeInfo should not have a key. Set aliasedRowKey to null.
keyTypeInfo = new EventRowTypeInfo(fieldTypes, aliasedRowKey.keySet().toArray(new String[0]), null);
}
}
/**
* Return a TypeInfo representing the schema of Row keys.
* Returns null if EventRowTypeInfo has no key.
* @return
*/
public @Nullable EventRowTypeInfo keyTypeInfo() {
return keyTypeInfo;
}
}