JsonRowDeserializationSchema.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wikimedia.eventutilities.flink.formats.json;
import static com.google.common.collect.Streams.stream;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.util.Collections.unmodifiableMap;
import static java.util.Locale.ENGLISH;
import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIME_FORMAT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* Deserialization schema from JSON to Flink Row type.
*
* NOTE: This class was directly copied and modified from Flink 1.15.0's deprecated
* org.apache.flink.formats.json.JsonRowSerializationSchema.
* It is used to bypass the Table API when you want a DataStream of Row
* without needing to initialize a TableEnvironment.
*
* WMF changes from upstream Flink:
* - Deserialize into name based Rows instead of position based Rows.
* - Supports ISO8601 date times with timezone offsets (+00:00), upstream does only support trailing 'Z'
*
* <p>Deserializes a {@code byte[]} message as a JSON object and reads the specified fields.
*
* <p>Failures during deserialization are forwarded as wrapped IOExceptions.
*
* TODO:
* - Should we return a Row in name based mode?
* - Should we infer the RowKind automatically? https://phabricator.wikimedia.org/T310082
*/
// Suppress all spotbugs warnings for this class, as it was copy/pasted from upstream Flink.
@SuppressFBWarnings
@SuppressWarnings({"checkstyle:ClassFanoutComplexity"})
public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
private static final long serialVersionUID = -228294330688809195L;
/** Type information describing the result type. */
private final RowTypeInfo typeInfo;
private final boolean failOnMissingField;
/** Object mapper for parsing the JSON. */
private final ObjectMapper objectMapper = new ObjectMapper();
private final DeserializationRuntimeConverter runtimeConverter;
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
private final boolean ignoreParseErrors;
// -- BEGIN WMF MODIFICATION --
/**
* 'field mode' of deserialized Rows.
* See {@link Row} documentation for more info.
*/
private final RowFieldMode rowFieldMode;
// -- END WMF MODIFICATION --
protected JsonRowDeserializationSchema(
TypeInformation<Row> typeInfo,
boolean failOnMissingField,
boolean ignoreParseErrors,
// -- BEGIN WMF MODIFICATION --
RowFieldMode rowFieldMode
// -- END WMF MODIFICATION --
) {
checkNotNull(typeInfo, "Type information");
checkArgument(typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
if (ignoreParseErrors && failOnMissingField) {
throw new IllegalArgumentException(
"JSON format doesn't support failOnMissingField and ignoreParseErrors are both true.");
}
this.typeInfo = (RowTypeInfo) typeInfo;
this.failOnMissingField = failOnMissingField;
this.runtimeConverter = createConverter(this.typeInfo);
this.ignoreParseErrors = ignoreParseErrors;
// -- BEGIN WMF MODIFICATION --
this.rowFieldMode = rowFieldMode;
// -- END WMF MODIFICATION --
RowType rowType = (RowType) fromLegacyInfoToDataType(this.typeInfo).getLogicalType();
boolean hasDecimalType =
LogicalTypeChecks.hasNested(rowType, t -> t.getTypeRoot().equals(DECIMAL));
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
}
// -- BEGIN WMF MODIFICATION --
/**
* Default constructor that will deserialize Rows in hybrid POSITION_WITH_NAMES mode.
*/
protected JsonRowDeserializationSchema(
TypeInformation<Row> typeInfo,
boolean failOnMissingField,
boolean ignoreParseErrors
) {
this(
typeInfo,
failOnMissingField,
ignoreParseErrors,
RowFieldMode.NAMED_POSITION
);
}
// -- END WMF MODIFICATION --
@SuppressWarnings("checkstyle:IllegalCatch")
@Override
public Row deserialize(byte[] message) throws IOException {
try {
final JsonNode root = objectMapper.readTree(message);
return convert(root);
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
}
throw new IOException(
format(
ENGLISH,
"Failed to deserialize JSON '%s'.",
new String(message, UTF_8)
),
t
);
}
}
/**
* Convert a JsonNode to a Row following this schema.
*/
public Row convert(JsonNode root) {
return (Row) runtimeConverter.convert(objectMapper, root);
}
@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
@Override
public TypeInformation<Row> getProducedType() {
return typeInfo;
}
/** Builder for {@link JsonRowDeserializationSchema}. */
public static class Builder {
private final RowTypeInfo typeInfo;
private boolean failOnMissingField;
private boolean ignoreParseErrors;
private RowFieldMode rowFieldMode = RowFieldMode.NAMED_POSITION;
/**
* Creates a JSON deserialization schema for the given type information.
*
* @param typeInfo Type information describing the result type. The field names of {@link
* Row} are used to parse the JSON properties.
*/
public Builder(TypeInformation<Row> typeInfo) {
checkArgument(typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
this.typeInfo = (RowTypeInfo) typeInfo;
}
/**
* Configures schema to fail if a JSON field is missing.
*
* <p>By default, a missing field is ignored and the field is set to null.
*/
public Builder failOnMissingField() {
this.failOnMissingField = true;
return this;
}
/**
* Configures schema to fail when parsing json failed.
*
* <p>By default, an exception will be thrown when parsing json fails.
*/
public Builder ignoreParseErrors() {
this.ignoreParseErrors = true;
return this;
}
// -- BEGIN WMF MODIFICATION --
public Builder rowFieldMode(RowFieldMode rowFieldMode) {
this.rowFieldMode = rowFieldMode;
return this;
}
// -- END WMF MODIFICATION --
public JsonRowDeserializationSchema build() {
return new JsonRowDeserializationSchema(
typeInfo,
failOnMissingField,
ignoreParseErrors,
rowFieldMode
);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final JsonRowDeserializationSchema that = (JsonRowDeserializationSchema) o;
return Objects.equals(typeInfo, that.typeInfo)
&& Objects.equals(failOnMissingField, that.failOnMissingField)
&& Objects.equals(ignoreParseErrors, that.ignoreParseErrors);
}
@Override
public int hashCode() {
return Objects.hash(typeInfo, failOnMissingField, ignoreParseErrors);
}
/*
Runtime converter
*/
/** Runtime converter that maps between {@link JsonNode}s and Java objects. */
@FunctionalInterface
private interface DeserializationRuntimeConverter extends Serializable {
Object convert(ObjectMapper mapper, JsonNode jsonNode);
}
private DeserializationRuntimeConverter createConverter(TypeInformation<?> typeInfo) {
DeserializationRuntimeConverter baseConverter =
createConverterForSimpleType(typeInfo)
.orElseGet(
() ->
createContainerConverter(typeInfo)
.orElseGet(
() ->
createFallbackConverter(
typeInfo.getTypeClass())));
return wrapIntoNullableConverter(baseConverter);
}
@SuppressWarnings("checkstyle:IllegalCatch")
private DeserializationRuntimeConverter wrapIntoNullableConverter(
DeserializationRuntimeConverter converter) {
return (mapper, jsonNode) -> {
if (jsonNode.isNull()) {
return null;
}
try {
return converter.convert(mapper, jsonNode);
} catch (Throwable t) {
if (!ignoreParseErrors) {
throw t;
}
return null;
}
};
}
private Optional<DeserializationRuntimeConverter> createContainerConverter(
TypeInformation<?> typeInfo) {
if (typeInfo instanceof RowTypeInfo) {
return Optional.of(createRowConverter((RowTypeInfo) typeInfo));
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
return Optional.of(
createObjectArrayConverter(
((ObjectArrayTypeInfo) typeInfo).getComponentInfo()));
} else if (typeInfo instanceof BasicArrayTypeInfo) {
return Optional.of(
createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo()));
} else if (isPrimitiveByteArray(typeInfo)) {
return Optional.of(createByteArrayConverter());
} else if (typeInfo instanceof MapTypeInfo) {
MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) typeInfo;
return Optional.of(
createMapConverter(
mapTypeInfo.getKeyTypeInfo(), mapTypeInfo.getValueTypeInfo()));
} else {
return Optional.empty();
}
}
private DeserializationRuntimeConverter createMapConverter(
TypeInformation keyType,
TypeInformation valueType
) {
DeserializationRuntimeConverter valueConverter = createConverter(valueType);
DeserializationRuntimeConverter keyConverter = createConverter(keyType);
return (mapper, jsonNode) -> {
Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
Map<Object, Object> result = new HashMap<>();
stream(fields).forEach(entry -> {
Object key = keyConverter.convert(mapper, TextNode.valueOf(entry.getKey()));
Object value = valueConverter.convert(mapper, entry.getValue());
result.put(key, value);
});
return unmodifiableMap(result);
};
}
private DeserializationRuntimeConverter createByteArrayConverter() {
return (mapper, jsonNode) -> {
try {
return jsonNode.binaryValue();
} catch (IOException e) {
throw new JsonParseException("Unable to deserialize byte array.", e);
}
};
}
private boolean isPrimitiveByteArray(TypeInformation<?> typeInfo) {
return typeInfo instanceof PrimitiveArrayTypeInfo
&& ((PrimitiveArrayTypeInfo) typeInfo).getComponentType() == Types.BYTE;
}
private DeserializationRuntimeConverter createObjectArrayConverter(
TypeInformation elementTypeInfo) {
DeserializationRuntimeConverter elementConverter = createConverter(elementTypeInfo);
return assembleArrayConverter(elementTypeInfo, elementConverter);
}
private DeserializationRuntimeConverter createRowConverter(RowTypeInfo typeInfo) {
List<DeserializationRuntimeConverter> fieldConverters =
Arrays.stream(typeInfo.getFieldTypes())
.map(this::createConverter)
.collect(Collectors.toList());
return assembleRowConverter(typeInfo.getFieldNames(), fieldConverters);
}
private DeserializationRuntimeConverter createFallbackConverter(Class<?> valueType) {
return (mapper, jsonNode) -> {
// for types that were specified without JSON schema
// e.g. POJOs
try {
return mapper.treeToValue(jsonNode, valueType);
} catch (JsonProcessingException e) {
throw new JsonParseException(
format(Locale.getDefault(), "Could not convert node: %s", jsonNode),
e
);
}
};
}
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private Optional<DeserializationRuntimeConverter> createConverterForSimpleType(
TypeInformation<?> simpleTypeInfo) {
if (simpleTypeInfo == Types.VOID) {
return Optional.of((mapper, jsonNode) -> null);
} else if (simpleTypeInfo == Types.BOOLEAN) {
return Optional.of(this::convertToBoolean);
} else if (simpleTypeInfo == Types.STRING) {
return Optional.of(this::convertToString);
} else if (simpleTypeInfo == Types.INT) {
return Optional.of(this::convertToInt);
} else if (simpleTypeInfo == Types.LONG) {
return Optional.of(this::convertToLong);
} else if (simpleTypeInfo == Types.DOUBLE) {
return Optional.of(this::convertToDouble);
} else if (simpleTypeInfo == Types.FLOAT) {
return Optional.of((mapper, jsonNode) -> Float.parseFloat(jsonNode.asText().trim()));
} else if (simpleTypeInfo == Types.SHORT) {
return Optional.of((mapper, jsonNode) -> Short.parseShort(jsonNode.asText().trim()));
} else if (simpleTypeInfo == Types.BYTE) {
return Optional.of((mapper, jsonNode) -> Byte.parseByte(jsonNode.asText().trim()));
} else if (simpleTypeInfo == Types.BIG_DEC) {
return Optional.of(this::convertToBigDecimal);
} else if (simpleTypeInfo == Types.BIG_INT) {
return Optional.of(this::convertToBigInteger);
} else if (simpleTypeInfo == Types.SQL_DATE) {
return Optional.of(this::convertToDate);
} else if (simpleTypeInfo == Types.SQL_TIME) {
return Optional.of(this::convertToTime);
} else if (simpleTypeInfo == Types.SQL_TIMESTAMP) {
return Optional.of(this::convertToTimestamp);
} else if (simpleTypeInfo == Types.LOCAL_DATE) {
return Optional.of(this::convertToLocalDate);
} else if (simpleTypeInfo == Types.LOCAL_TIME) {
return Optional.of(this::convertToLocalTime);
} else if (simpleTypeInfo == Types.LOCAL_DATE_TIME) {
return Optional.of(this::convertToLocalDateTime);
// -- BEGIN WMF MODIFICATION --
} else if (simpleTypeInfo == Types.INSTANT) {
return Optional.of(this::convertToInstant);
// -- END WMF MODIFICATION --
} else {
return Optional.empty();
}
}
private String convertToString(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.isContainerNode()) {
return jsonNode.toString();
} else {
return jsonNode.asText();
}
}
private boolean convertToBoolean(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.isBoolean()) {
// avoid redundant toString and parseBoolean, for better performance
return jsonNode.asBoolean();
} else {
return Boolean.parseBoolean(jsonNode.asText().trim());
}
}
private int convertToInt(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.canConvertToInt()) {
// avoid redundant toString and parseInt, for better performance
return jsonNode.asInt();
} else {
return Integer.parseInt(jsonNode.asText().trim());
}
}
private long convertToLong(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.canConvertToLong()) {
// avoid redundant toString and parseLong, for better performance
return jsonNode.asLong();
} else {
return Long.parseLong(jsonNode.asText().trim());
}
}
private double convertToDouble(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.isDouble()) {
// avoid redundant toString and parseDouble, for better performance
return jsonNode.asDouble();
} else {
return Double.parseDouble(jsonNode.asText().trim());
}
}
private BigDecimal convertToBigDecimal(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.isBigDecimal()) {
// avoid redundant toString and toDecimal, for better performance
return jsonNode.decimalValue();
} else {
return new BigDecimal(jsonNode.asText().trim());
}
}
private BigInteger convertToBigInteger(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.isBigInteger()) {
// avoid redundant toString and toBigInteger, for better performance
return jsonNode.bigIntegerValue();
} else {
return new BigInteger(jsonNode.asText().trim());
}
}
private LocalDate convertToLocalDate(ObjectMapper mapper, JsonNode jsonNode) {
return ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
}
private Date convertToDate(ObjectMapper mapper, JsonNode jsonNode) {
return Date.valueOf(convertToLocalDate(mapper, jsonNode));
}
private LocalDateTime convertToLocalDateTime(ObjectMapper mapper, JsonNode jsonNode) {
// according to RFC 3339 every date-time must have a timezone;
// until we have full timezone support, we only support UTC;
// users can parse their time as string as a workaround
// -- BEGIN WMF MODIFICATION --
// we use DateTimeFormatter.ISO_DATE_TIME to properly parse iso8601 date times, flink does use a custom
// RFC 3339 formatter that does not support zone offsets like "+00:00" it always expects a trailing 'Z'
TemporalAccessor parsedTimestamp = DateTimeFormatter.ISO_DATE_TIME.parse(jsonNode.asText());
// -- END WMF MODIFICATION --
ZoneOffset zoneOffset = parsedTimestamp.query(TemporalQueries.offset());
if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
throw new IllegalStateException(
"Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. "
+ "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
}
LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
return LocalDateTime.of(localDate, localTime);
}
private Timestamp convertToTimestamp(ObjectMapper mapper, JsonNode jsonNode) {
return Timestamp.valueOf(convertToLocalDateTime(mapper, jsonNode));
}
// -- BEGIN WMF MODIFICATION --
/**
* Converts using convertToLocalDateTime at UTC offset as an Instant.
*/
private Instant convertToInstant(ObjectMapper mapper, JsonNode jsonNode) {
LocalDateTime localDateTime = convertToLocalDateTime(mapper, jsonNode);
return localDateTime.toInstant(ZoneOffset.UTC);
}
// -- END WMF MODIFICATION --
private LocalTime convertToLocalTime(ObjectMapper mapper, JsonNode jsonNode) {
// according to RFC 3339 every full-time must have a timezone;
// until we have full timezone support, we only support UTC;
// users can parse their time as string as a workaround
TemporalAccessor parsedTime = RFC3339_TIME_FORMAT.parse(jsonNode.asText());
ZoneOffset zoneOffset = parsedTime.query(TemporalQueries.offset());
LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 || localTime.getNano() != 0) {
throw new IllegalStateException(
"Invalid time format. Only a time in UTC timezone without milliseconds is supported yet.");
}
return localTime;
}
private Time convertToTime(ObjectMapper mapper, JsonNode jsonNode) {
return Time.valueOf(convertToLocalTime(mapper, jsonNode));
}
private DeserializationRuntimeConverter assembleRowConverter(
String[] fieldNames, List<DeserializationRuntimeConverter> fieldConverters) {
// Pre-build the positionByName map so that we can re-use accross events
LinkedHashMap<String, Integer> positionByName = new LinkedHashMap<>();
for (int i = 0; i < fieldNames.length; i++) {
positionByName.put(fieldNames[i], i);
}
return (mapper, jsonNode) -> {
ObjectNode node = (ObjectNode) jsonNode;
int arity = fieldNames.length;
// -- BEGIN WMF MODIFICATION --
// DRY closure that converts from the field at fieldIndex using the fieldConverter at fieldIndex.
IntFunction<Object> convertJsonField = (int fieldIndex) -> {
String fieldName = fieldNames[fieldIndex];
JsonNode field = node.get(fieldName);
return convertField(mapper, fieldConverters.get(fieldIndex), fieldName, field);
};
// This deserializer instance is configured to deserialize Rows in one of the
// supported RowFieldModes. Create the Row in the configured mode.
Row row;
switch (rowFieldMode) {
// This is what upstream Flink 1.15.0 returns.
case POSITION:
row = new Row(arity);
for (int i = 0; i < arity; i++) {
Object convertedFieldValue = convertJsonField.apply(i);
row.setField(i, convertedFieldValue);
}
break;
// Named mode will allow setting additional new fields on the returned row.
// You might want a named row if you want to augment the Row with new fields.
case NAMED:
row = Row.withNames();
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
Object convertedFieldValue = convertJsonField.apply(i);
row.setField(fieldName, convertedFieldValue);
}
break;
// Named position hybrid supports both position and named based field lookups,
// but the schema of the Row is immutable. I.e. you cannot add new fields.
// Make a copy or projection of the Row to do that.
case NAMED_POSITION:
// For named-position mode, Row needs to be able to map from
// names to positions, and then from positions to fields.
// By providing these to the Row constructor, the Row will
// operate in position based mode, but allow lookups by name.
Object[] fieldByPosition = new Object[arity];
// Build the converted fieldByPosition array.
for (int i = 0; i < arity; i++) {
Object convertedFieldValue = convertJsonField.apply(i);
fieldByPosition[i] = convertedFieldValue;
}
row = RowUtils.createRowWithNamedPositions(
RowKind.INSERT,
fieldByPosition,
positionByName
);
break;
default:
throw new IllegalStateException("Unexpected value: " + rowFieldMode);
}
// -- END WMF MODIFICATION --
return row;
};
}
private Object convertField(
ObjectMapper mapper,
DeserializationRuntimeConverter fieldConverter,
String fieldName,
JsonNode field) {
if (field == null) {
if (failOnMissingField) {
throw new IllegalStateException(
"Could not find field with name '" + fieldName + "'.");
} else {
return null;
}
} else {
return fieldConverter.convert(mapper, field);
}
}
private DeserializationRuntimeConverter assembleArrayConverter(
TypeInformation<?> elementType, DeserializationRuntimeConverter elementConverter) {
final Class<?> elementClass = elementType.getTypeClass();
return (mapper, jsonNode) -> {
final ArrayNode node = (ArrayNode) jsonNode;
final Object[] array = (Object[]) Array.newInstance(elementClass, node.size());
for (int i = 0; i < node.size(); i++) {
final JsonNode innerNode = node.get(i);
array[i] = elementConverter.convert(mapper, innerNode);
}
return array;
};
}
/** Exception which refers to parse errors in converters. */
private static final class JsonParseException extends RuntimeException {
private static final long serialVersionUID = 1L;
JsonParseException(String message, Throwable cause) {
super(message, cause);
}
}
// -- BEGIN WMF MODIFICATION --\
/**
* Used to determine which of the Row field modes in which this deserializer should create Rows.
*/
public enum RowFieldMode {
POSITION, NAMED, NAMED_POSITION
}
// -- END WMF MODIFICATION --\
}