Row serializer that supports simple schema evolution as defined by the WMF event platform.
General notes about flink Serializers:
- They must be
Serializable
because they are constructed when the stream graph is built and
thus must be transferred over the wire to the taskmanager nodes. This serialization form is not stored durably
and so we do not have to bother about reading another version of this same class.
- At runtime the Rows transferred from one operator to another will be serialized using
serialize(Row, DataOutputView)
and deserialized using deserialize(DataInputView)
- When Rows have to be stored in a durable state the shape of this serializer must be stored in the header of
this state. This is done by first extracting the
snapshotConfiguration()
of this serializer and then
storing it using EventRowSerializer.EventRowSerializerSnapshot.writeSnapshot(DataOutputView)
.
- When restoring a state (possibly generated by a previous version of this serializer) flink will read
back the
EventRowSerializer.EventRowSerializerSnapshot
that was previously stored and determine the level
of compatibility of the previous version with the runtime version of the serializer using
TypeSerializerSnapshot.resolveSchemaCompatibility(TypeSerializer)
.
This serializer is meant to only support rows that have named positions, for instance rows created from
EventRowTypeInfo.createEmptyRow()
. This is the main difference with the upstream RowSerializer that
supports either position based or named based row, position based rows cannot support migration with in
compatibleAfterMigration mode and named based rows are not usable everywhere (the JSON serializer expects positions).
The nature of aliased partition keys carried by
EventRowTypeInfo.keyTypeInfo()
does not affect how events
are serialized. Keys do not have to be taken into consideration when evaluating if stored events
are compatible with a newer version of this class or its underlying schema. They will be omitted
from
EventRowSerializer.EventRowSerializerSnapshot
configurations.
Compatibility levels:
-
if the fields are exactly the same and are at the same position compatibility is delegated to the
worse of the field's serializers compatibility, from worse to best: incompatible,
compatibleAfterMigration, compatibleWithReconfiguredSerializer, compatibleAsIs.
-
if new fields are present and/or if the field positions changed we migrate the state returning
compatibleAfterMigration (or incompatible if one of the field's serializer is incompatible)
-
in any other case we return incompatible (e.g. a field is missing)
Note on the migration: migrating a state (compatibleAfterMigration) is done when restoring the state, all the state
is then read with the old version of the serializer and written back using
serialize(Row, DataOutputView)
.
In other words this means that the (@link #serialize} function must accept Rows created from a different serializer
(with possibly missing fields and/or fields at different position) but the serialization format must be the same as
if a properly constructed Row was passed.