@ParametersAreNonnullByDefault public final class JsonSchemaFlinkConverter extends Object
ObjectNode jsonSchema = # ... Get this somehow, perhaps from EventStream schema() method.
# This schemaBuilder will already have the DataType set via the JSONSchema.
Schema.Builder schemaBuilder = JsonSchemaFlinkConverter.toSchemaBuilder(jsonSchema);
# Add the kafka_timestamp as the metadata column and use it as the watermark.
schemaBuilder.columnByMetadata(
"kafka_timestamp",
"TIMESTAMP_LTZ(3) NOT NULL",
"timestamp",
true
);
schemaBuilder.watermark("kafka_timestamp", "kafka_timestamp");
# Create a Dynamic Table from this topic in Kafka.
stEnv.createTemporaryTable(
"my_table",
TableDescriptor.forConnector("kafka")
.schema(schemaBuilderlder.build())
.option("properties.bootstrap.servers", "localhost:9092")
.option("topic", "my_stream_topic")
.option("properties.group.id", "my_consumer_group0")
.option("scan.startup.mode", "latest-offset")
.format("json")
.build()
)
Or, get the RowTypeInfo (AKA TypeInformation of Row) corresponding to an event JSONSchema.
ObjectNode jsonSchema = # ... Get this somehow, perhaps from EventStream schema() method.
RowTypeInfo eventSchemaTypeInfo = JsonSchemaFlinkConverter.toRowTypeInfo(jsonSchema);
Modifier and Type | Method and Description |
---|---|
static org.apache.flink.table.types.DataType |
toDataType(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
Converts this JSONSchema to a Flink Table API DataType.
|
static JsonRowDeserializationSchema |
toDeserializationSchemaRow(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
Gets a JSON deserializer to Row in hybrid named position mode for the jsonSchema.
|
static EventRowTypeInfo |
toRowTypeInfo(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
Converts this JSONSchema to a Flink DataStream API
EventRowTypeInfo ,
which is an instance of
TypeInformation <Row >. |
static EventRowTypeInfo |
toRowTypeInfo(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema,
com.fasterxml.jackson.databind.node.ObjectNode messageKeyFields)
Converts this JSONSchema to a Flink DataStream API
EventRowTypeInfo ,
with stream partitions keys extracted from messageKeyFields mappings. |
static org.apache.flink.table.api.Schema.Builder |
toSchemaBuilder(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
Returns a Table API Schema Builder starting with a Row DataType
converted from the provided JSONSchema.
|
static org.apache.flink.api.common.typeinfo.TypeInformation<?> |
toTypeInformation(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
Converts this JSONSchema to a Flink DataStream API TypeInformation.
|
static org.apache.flink.api.common.typeinfo.TypeInformation<?> |
toTypeInformation(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema,
com.fasterxml.jackson.databind.node.ObjectNode messageKeyFields)
Converts this JSONSchema to a Flink DataStream API TypeInformation with key TypeInformation
extracted from messageKeyFields mappings.
|
@Nonnull public static org.apache.flink.table.api.Schema.Builder toSchemaBuilder(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
jsonSchema
- The JSONSchema ObjectNode. This should have "type": "object"
to properly convert to a logical
RowType
Schema.Builder
with DataType with logical RowType as schema.IllegalArgumentException
- if the JSONSchema is not on "object" type.@Nonnull public static org.apache.flink.table.types.DataType toDataType(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
jsonSchema
- The JSONSchema ObjectNode. This should have at minimum type.DataType
@Nonnull public static org.apache.flink.api.common.typeinfo.TypeInformation<?> toTypeInformation(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
jsonSchema
- The JSONSchema ObjectNode. This should have at minimum "type".TypeInformation
@Nonnull public static org.apache.flink.api.common.typeinfo.TypeInformation<?> toTypeInformation(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema, com.fasterxml.jackson.databind.node.ObjectNode messageKeyFields)
jsonSchema
- the JSONSchema ObjectNode. This should have at minimum "type".messageKeyFields
- message key fields declared in the stream config.@Nonnull public static EventRowTypeInfo toRowTypeInfo(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
EventRowTypeInfo
,
which is an instance of
TypeInformation
<Row
>.
EventRowTypeInfo has some extra logic for working with TypeInformation
when it represents a Row
.
You can use RowTypeInfo as if it were a TypeInformation of Row.jsonSchema
- The JSONSchema ObjectNode. his should have "type": "object"
to property convert to
TypeInformation
<Row
>.TypeInformation
<Row
>.IllegalArgumentException
- if the JSONSchema is not on "object" type.@Nonnull public static EventRowTypeInfo toRowTypeInfo(com.fasterxml.jackson.databind.node.ObjectNode jsonSchema, com.fasterxml.jackson.databind.node.ObjectNode messageKeyFields)
EventRowTypeInfo
,
with stream partitions keys extracted from messageKeyFields mappings.jsonSchema
- messageKeyFields
- public static JsonRowDeserializationSchema toDeserializationSchemaRow(@Nonnull com.fasterxml.jackson.databind.node.ObjectNode jsonSchema)
jsonSchema
- The JSONSchema ObjectNode. his should have "type": "object"
to property convert to
TypeInformation
<Row
>.Copyright © 2024. All rights reserved.