KafkaEventSerializationSchema.java

package org.wikimedia.eventutilities.flink.formats.json;

import java.time.Instant;

import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.wikimedia.eventutilities.core.SerializableClock;
import org.wikimedia.eventutilities.flink.EventRowTypeInfo;


/**
 * This class is designed to produce events compliant with the WMF Event Platform.
 */
@ParametersAreNonnullByDefault
public class KafkaEventSerializationSchema implements KafkaRecordSerializationSchema<Row> {
    private final String topic;
    @Nullable
    private final FlinkKafkaPartitioner<Row> partitioner;
    private final SerializationSchema<Row> serializationSchema;

    private final  SerializableClock ingestionTimeClock;

    private final EventRowTypeInfo eventRowTypeInfo;
    private final KafkaRecordTimestampStrategy timestampStrategy;
    @Nullable private final JsonRowSerializationSchema jsonKeySerializer;
    public KafkaEventSerializationSchema(
            RowTypeInfo typeInformation,
            SerializationSchema<Row> serializationSchema,
            SerializableClock ingestionTimeClock,
            String topic,
            KafkaRecordTimestampStrategy timestampStrategy,
            @Nullable FlinkKafkaPartitioner<Row> partitioner
    ) {
        this.timestampStrategy = timestampStrategy;
        this.topic = topic;
        this.partitioner = partitioner;
        this.serializationSchema = serializationSchema;
        // Ideally we'd like to use the ProcessingTimeService, but it does not seem trivial to access it from here
        this.ingestionTimeClock = ingestionTimeClock;
        this.eventRowTypeInfo = typeInformation instanceof EventRowTypeInfo ? (EventRowTypeInfo) typeInformation : EventRowTypeInfo.create(typeInformation);

        if (eventRowTypeInfo.hasKey()) {
            this.jsonKeySerializer = JsonRowSerializationSchema
                    .builder()
                    .withTypeInfo(eventRowTypeInfo.keyTypeInfo())
                    .withoutNormalization()
                    .build(); // it's not an event we do no normalization here
        } else {
            this.jsonKeySerializer = null;
        }


    }

    @Override
    public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) {
        if (partitioner != null) {
            partitioner.open(sinkContext.getParallelInstanceId(), sinkContext.getNumberOfParallelInstances());
        }
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Row element, KafkaSinkContext context, @Nullable Long timestamp) {
        Instant ingestionTime = ingestionTimeClock.get();
        eventRowTypeInfo.setIngestionTime(element, ingestionTime);
        Instant kafkaTimestamp;
        switch (timestampStrategy) {
            case ROW_EVENT_TIME:
                kafkaTimestamp = eventRowTypeInfo.getEventTime(element);
                if (kafkaTimestamp == null) {
                    throw new IllegalArgumentException("The row " + element + " does not have its event time set");
                }
                break;
            case FLINK_RECORD_EVENT_TIME:
                if (timestamp == null) {
                    throw new IllegalArgumentException("The stream record timestamp is null, " +
                            "the pipeline must set record timestamps");
                }
                kafkaTimestamp = Instant.ofEpochMilli(timestamp);
                eventRowTypeInfo.setEventTime(element, kafkaTimestamp);
                break;
            case ROW_INGESTION_TIME:
                kafkaTimestamp = ingestionTime;
                break;
            default:
                throw new UnsupportedOperationException("Unsupported timestamp strategy: " + timestampStrategy);
        }

        byte[] messageKey = null;

        if (eventRowTypeInfo.hasKey()) {
            messageKey = jsonKeySerializer.serialize(eventRowTypeInfo.extractKey(element));
        }
        byte[] messageBody = serializationSchema.serialize(element);

        Integer partition = partitioner != null ? partitioner.partition(element, null, messageBody, topic, context.getPartitionsForTopic(topic)) : null;
        return new ProducerRecord<>(topic, partition, kafkaTimestamp.toEpochMilli(), messageKey, messageBody);
    }


}