package org.wikimedia.eventutilities.flink.formats.json;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.wikimedia.eventutilities.core.event.types.JsonSchemaConverter;
import org.wikimedia.eventutilities.flink.EventRowTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
* Converts from JSONSchemas to Flink types.
* Adheres to a few Event Platform specific schema conventions,
* specifically support for specifying Map types in JSONSchema
* via additionalProperties schema.
* Example:
* Create a streaming Table from JSON data in a Kafka topic using a JSONSchema.
* <pre>{@code
* ObjectNode jsonSchema = # ... Get this somehow, perhaps from EventStream schema() method.
* # This schemaBuilder will already have the DataType set via the JSONSchema.
* Schema.Builder schemaBuilder = JsonSchemaFlinkConverter.toSchemaBuilder(jsonSchema);
* # Add the kafka_timestamp as the metadata column and use it as the watermark.
* schemaBuilder.columnByMetadata(
* "kafka_timestamp",
* "timestamp",
* true
* );
* schemaBuilder.watermark("kafka_timestamp", "kafka_timestamp");
* # Create a Dynamic Table from this topic in Kafka.
* stEnv.createTemporaryTable(
* "my_table",
* TableDescriptor.forConnector("kafka")
* .schema(schemaBuilderlder.build())
* .option("properties.bootstrap.servers", "localhost:9092")
* .option("topic", "my_stream_topic")
* .option("properties.group.id", "my_consumer_group0")
* .option("scan.startup.mode", "latest-offset")
* .format("json")
* .build()
* )
* }</pre>
* Or, get the RowTypeInfo (AKA TypeInformation of Row) corresponding to an event JSONSchema.
* <pre>{@code
* ObjectNode jsonSchema = # ... Get this somehow, perhaps from EventStream schema() method.
* RowTypeInfo eventSchemaTypeInfo = JsonSchemaFlinkConverter.toRowTypeInfo(jsonSchema);
* }</pre>
public final class JsonSchemaFlinkConverter {
* JsonSchemaConverter instance to convert to Flink Table API DataType.
private static final JsonSchemaConverter<DataType> DATA_TYPE_CONVERTER =
new JsonSchemaConverter<>(new DataTypeSchemaConversions());
* JsonSchemaConverter to convert to Flink DataStream API TypeInformation.
private static final JsonSchemaConverter<TypeInformation<?>> TYPE_INFORMATION_CONVERTER =
new JsonSchemaConverter<>(new TypeInformationSchemaConversions());
* Returns a Table API Schema Builder starting with a Row DataType
* converted from the provided JSONSchema.
* @param jsonSchema
* The JSONSchema ObjectNode. This should have "type": "object"
* to properly convert to a logical
* {@link org.apache.flink.table.types.logical.RowType}
* @throws IllegalArgumentException
* if the JSONSchema is not on "object" type.
* @return
* {@link org.apache.flink.table.api.Schema.Builder}
* with DataType with logical RowType as schema.
public static Schema.Builder toSchemaBuilder(ObjectNode jsonSchema) {
DataType dataType = toDataType(jsonSchema);
// Assert that the converted DataType's LogicalType is of type Row.
LogicalType logicalType = dataType.getLogicalType();
if (!logicalType.is(LogicalTypeRoot.ROW)) {
throw new IllegalStateException(
"Converted JSONSchema to Flink LogicalType " + logicalType +
" but expected " + LogicalTypeRoot.ROW + ". This should not happen."
Schema.Builder builder = Schema.newBuilder();
return builder;
* Converts this JSONSchema to a Flink Table API DataType.
* @param jsonSchema
* The JSONSchema ObjectNode. This should have at minimum type.
* @return
* jsonSchema converted to
* {@link DataType}
public static DataType toDataType(ObjectNode jsonSchema) {
return DATA_TYPE_CONVERTER.convert(jsonSchema);
* Converts this JSONSchema to a Flink DataStream API TypeInformation.
* @param jsonSchema
* The JSONSchema ObjectNode. This should have at minimum "type".
* @return
* jsonSchema converted to {@link TypeInformation}
public static TypeInformation<?> toTypeInformation(ObjectNode jsonSchema) {
RowTypeInfo typeInfo = (RowTypeInfo) TYPE_INFORMATION_CONVERTER.convert(jsonSchema);
return EventRowTypeInfo.create(typeInfo);
* Converts this JSONSchema to a Flink DataStream API TypeInformation with key TypeInformation
* extracted from messageKeyFields mappings.
* Returns jsonSchema converted to TypeInformation
* @param jsonSchema the JSONSchema ObjectNode. This should have at minimum "type".
* @param messageKeyFields message key fields declared in the stream config.
* @return
public static TypeInformation<?> toTypeInformation(ObjectNode jsonSchema, ObjectNode messageKeyFields) {
RowTypeInfo typeInfo = (RowTypeInfo) TYPE_INFORMATION_CONVERTER.convert(jsonSchema);
Map<String, String> aliasedRowKey = new ObjectMapper().convertValue(messageKeyFields, Map.class);
if (aliasedRowKey != null) {
return EventRowTypeInfo.create(typeInfo, aliasedRowKey);
return EventRowTypeInfo.create(typeInfo);
* Converts this JSONSchema to a Flink DataStream API {@link EventRowTypeInfo},
* which is an instance of
* {@link TypeInformation}<{@link org.apache.flink.types.Row}>.
* EventRowTypeInfo has some extra logic for working with TypeInformation
* when it represents a {@link org.apache.flink.types.Row}.
* You can use RowTypeInfo as if it were a TypeInformation of Row.
* @param jsonSchema
* The JSONSchema ObjectNode. his should have "type": "object"
* to property convert to
* {@link TypeInformation}<{@link org.apache.flink.types.Row}>.
* @throws IllegalArgumentException
* if the JSONSchema is not on "object" type.
* @return
* jsonSchema converted to {@link TypeInformation}<{@link org.apache.flink.types.Row}>.
public static EventRowTypeInfo toRowTypeInfo(ObjectNode jsonSchema
) {
return (EventRowTypeInfo) toTypeInformation(jsonSchema);
/** Converts this JSONSchema to a Flink DataStream API {@link EventRowTypeInfo},
* with stream partitions keys extracted from messageKeyFields mappings.
* @param jsonSchema
* @param messageKeyFields
* @return
public static EventRowTypeInfo toRowTypeInfo(ObjectNode jsonSchema, ObjectNode messageKeyFields
) {
return (EventRowTypeInfo) toTypeInformation(jsonSchema, messageKeyFields);
* Gets a JSON deserializer to Row in hybrid named position mode for the jsonSchema.
* Missing fields are allowed (and set to null), and parse errors
* are always thrown.
* @param jsonSchema
* The JSONSchema ObjectNode. his should have "type": "object"
* to property convert to
* {@link TypeInformation}<{@link org.apache.flink.types.Row}>.
* @return
* DeserializationSchema of Row that can deserialize JSON data for the jsonSchema to a FLink Row.
public static JsonRowDeserializationSchema toDeserializationSchemaRow(
@Nonnull ObjectNode jsonSchema
) {
return new JsonRowDeserializationSchema.Builder(toRowTypeInfo(jsonSchema)).build();
* Constructor to make maven checkstyle plugin happy.
* See: https://checkstyle.sourceforge.io/config_design.html#HideUtilityClassConstructor
private JsonSchemaFlinkConverter() {}