RowDataToJsonConverters.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wikimedia.eventutilities.flink.formats.json;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Locale;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.EqualsAndHashCode;
/**
* Tool class used to convert from {@link RowData} to {@link JsonNode}.
* <p>
* NOTE: This class was directly copied and modified from upstream Flink in order to
* use unshaded Jackson dependencies for use with eventutilities-core.
* <p>
* WMF changes from upstream Flink:
* - Use unshaded jackson dependencies.
*/
// Suppress all spotbugs warnings for this class, as it was copy/pasted from upstream Flink.
@SuppressFBWarnings
@SuppressWarnings({"checkstyle:ClassFanoutComplexity", "checkstyle:CyclomaticComplexity"})
@EqualsAndHashCode
public class RowDataToJsonConverters implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Timestamp format specification which is used to parse timestamp.
*/
private final TimestampFormat timestampFormat;
/**
* The handling mode when serializing null keys for map data.
*/
private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;
/**
* The string literal when handling mode for map null key LITERAL. is
*/
private final String mapNullKeyLiteral;
public RowDataToJsonConverters(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
}
/**
* Runtime converter that converts objects of Flink Table and SQL internal data structures to
* corresponding {@link JsonNode}s.
*/
public interface RowDataToJsonConverter extends Serializable {
JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value);
}
/**
* Creates a runtime converter which is null safe.
*/
public RowDataToJsonConverter createConverter(LogicalType type) {
return wrapIntoNullableConverter(createNotNullConverter(type));
}
/**
* Creates a runtime converter which assuming input object is not null.
*/
private RowDataToJsonConverter createNotNullConverter(LogicalType type) {
// TODO: Handle INSTANT type?
switch (type.getTypeRoot()) {
case NULL:
return (mapper, reuse, value) -> mapper.getNodeFactory().nullNode();
case BOOLEAN:
return (mapper, reuse, value) ->
mapper.getNodeFactory().booleanNode((boolean) value);
case TINYINT:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((byte) value);
case SMALLINT:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((short) value);
case INTEGER:
case INTERVAL_YEAR_MONTH:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((int) value);
case BIGINT:
case INTERVAL_DAY_TIME:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((long) value);
case FLOAT:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((float) value);
case DOUBLE:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((double) value);
case CHAR:
case VARCHAR:
// value is BinaryString
return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(value.toString());
case BINARY:
case VARBINARY:
return (mapper, reuse, value) -> mapper.getNodeFactory().binaryNode((byte[]) value);
case DATE:
return createDateConverter();
case TIME_WITHOUT_TIME_ZONE:
return createTimeConverter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampConverter();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return createTimestampWithLocalZone();
case DECIMAL:
return createDecimalConverter();
case ARRAY:
return createArrayConverter((ArrayType) type);
case MAP:
MapType mapType = (MapType) type;
return createMapConverter(
mapType.asSummaryString(), mapType.getKeyType(), mapType.getValueType());
case MULTISET:
MultisetType multisetType = (MultisetType) type;
return createMapConverter(
multisetType.asSummaryString(),
multisetType.getElementType(),
new IntType());
case ROW:
return createRowConverter((RowType) type);
case RAW:
default:
throw new UnsupportedOperationException("Not support to parse type: " + type);
}
}
private RowDataToJsonConverter createDecimalConverter() {
return (mapper, reuse, value) -> {
BigDecimal bd = ((DecimalData) value).toBigDecimal();
return mapper.getNodeFactory().numberNode(bd);
};
}
private RowDataToJsonConverter createDateConverter() {
return (mapper, reuse, value) -> {
int days = (int) value;
LocalDate date = LocalDate.ofEpochDay(days);
return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format(date));
};
}
private RowDataToJsonConverter createTimeConverter() {
return (mapper, reuse, value) -> {
int millisecond = (int) value;
LocalTime time = LocalTime.ofSecondOfDay(millisecond / 1000L);
return mapper.getNodeFactory().textNode(SQL_TIME_FORMAT.format(time));
};
}
private RowDataToJsonConverter createTimestampConverter() {
switch (timestampFormat) {
case ISO_8601:
return (mapper, reuse, value) -> {
TimestampData timestamp = (TimestampData) value;
return mapper.getNodeFactory()
.textNode(ISO8601_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
};
case SQL:
return (mapper, reuse, value) -> {
TimestampData timestamp = (TimestampData) value;
return mapper.getNodeFactory()
.textNode(SQL_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
};
default:
throw new TableException(
"Unsupported timestamp format. Validator should have checked that.");
}
}
private RowDataToJsonConverter createTimestampWithLocalZone() {
switch (timestampFormat) {
case ISO_8601:
return (mapper, reuse, value) -> {
TimestampData timestampWithLocalZone = (TimestampData) value;
return mapper.getNodeFactory()
.textNode(
ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.format(
timestampWithLocalZone
.toInstant()
.atOffset(ZoneOffset.UTC)));
};
case SQL:
return (mapper, reuse, value) -> {
TimestampData timestampWithLocalZone = (TimestampData) value;
return mapper.getNodeFactory()
.textNode(
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.format(
timestampWithLocalZone
.toInstant()
.atOffset(ZoneOffset.UTC)));
};
default:
throw new TableException(
"Unsupported timestamp format. Validator should have checked that.");
}
}
private RowDataToJsonConverter createArrayConverter(ArrayType type) {
final LogicalType elementType = type.getElementType();
final RowDataToJsonConverter elementConverter = createConverter(elementType);
final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
return (mapper, reuse, value) -> {
ArrayNode node;
// reuse could be a NullNode if last record is null.
if (reuse == null || reuse.isNull()) {
node = mapper.createArrayNode();
} else {
node = (ArrayNode) reuse;
node.removeAll();
}
ArrayData array = (ArrayData) value;
int numElements = array.size();
for (int i = 0; i < numElements; i++) {
Object element = elementGetter.getElementOrNull(array, i);
node.add(elementConverter.convert(mapper, null, element));
}
return node;
};
}
private RowDataToJsonConverter createMapConverter(
String typeSummary, LogicalType keyType, LogicalType valueType) {
if (!keyType.is(LogicalTypeFamily.CHARACTER_STRING)) {
throw new UnsupportedOperationException(
"JSON format doesn't support non-string as key type of map. "
+ "The type is: "
+ typeSummary);
}
final RowDataToJsonConverter valueConverter = createConverter(valueType);
final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
return (mapper, reuse, object) -> {
ObjectNode node;
// reuse could be a NullNode if last record is null.
if (reuse == null || reuse.isNull()) {
node = mapper.createObjectNode();
} else {
node = (ObjectNode) reuse;
node.removeAll();
}
MapData map = (MapData) object;
ArrayData keyArray = map.keyArray();
ArrayData valueArray = map.valueArray();
int numElements = map.size();
for (int i = 0; i < numElements; i++) {
String fieldName = null;
if (keyArray.isNullAt(i)) {
// when map key is null
switch (mapNullKeyMode) {
case LITERAL:
fieldName = mapNullKeyLiteral;
break;
case DROP:
continue;
case FAIL:
throw new RuntimeException(
String.format(Locale.ROOT,
"JSON format doesn't support to serialize map data with null keys. "
+ "You can drop null key entries or encode null in literals by specifying %s option.",
JsonFormatOptions.MAP_NULL_KEY_MODE.key()));
default:
throw new RuntimeException(
"Unsupported map null key mode. Validator should have checked that.");
}
} else {
fieldName = keyArray.getString(i).toString();
}
Object value = valueGetter.getElementOrNull(valueArray, i);
node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), value));
}
return node;
};
}
@SuppressWarnings("checkstyle:IllegalCatch")
private RowDataToJsonConverter createRowConverter(RowType type) {
final String[] fieldNames = type.getFieldNames().toArray(new String[0]);
final LogicalType[] fieldTypes =
type.getFields().stream()
.map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
final RowDataToJsonConverter[] fieldConverters =
Arrays.stream(fieldTypes)
.map(this::createConverter)
.toArray(RowDataToJsonConverter[]::new);
final int fieldCount = type.getFieldCount();
final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length];
for (int i = 0; i < fieldCount; i++) {
fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
}
return (mapper, reuse, value) -> {
ObjectNode node;
// reuse could be a NullNode if last record is null.
if (reuse == null || reuse.isNull()) {
node = mapper.createObjectNode();
} else {
node = (ObjectNode) reuse;
}
RowData row = (RowData) value;
for (int i = 0; i < fieldCount; i++) {
String fieldName = fieldNames[i];
try {
Object field = fieldGetters[i].getFieldOrNull(row);
node.set(
fieldName,
fieldConverters[i].convert(mapper, node.get(fieldName), field));
} catch (Throwable t) {
throw new RuntimeException(
String.format(Locale.ROOT, "Fail to serialize at field: %s.", fieldName), t);
}
}
return node;
};
}
private RowDataToJsonConverter wrapIntoNullableConverter(RowDataToJsonConverter converter) {
return (mapper, reuse, object) -> {
if (object == null) {
return mapper.getNodeFactory().nullNode();
}
return converter.convert(mapper, reuse, object);
};
}
}