EventNormalizationStep.java

package org.wikimedia.eventutilities.core.event;

import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.EVENT_TIME_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_ID_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.META_STREAM_FIELD;
import static org.wikimedia.eventutilities.core.event.JsonEventGenerator.SCHEMA_FIELD;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.time.Instant;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.wikimedia.eventutilities.core.SerializableClock;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.fge.jsonschema.core.exceptions.ProcessingException;
import com.github.fge.jsonschema.core.report.ProcessingReport;
import com.github.fge.jsonschema.main.JsonSchema;
import com.github.fge.jsonschema.main.JsonSchemaFactory;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;


/**
 * Define the steps to "normalize" an event before sending it to kafka.
 * Normalize here stands for "standardization" to make outgoing events compliant to the WMF event platform recommendations.
 */
@FunctionalInterface
public interface EventNormalizationStep extends Serializable, Consumer<ObjectNode> {

    /**
     * Set the ingestionTime (meta.dt) if not already set.
     */
    static EventNormalizationStep ingestionTime(SerializableClock clock) {
        return node -> {
            ObjectNode meta = getOrSetMeta(node);
            // set meta.dt only if not already provided, some users prefer to provide their own "processing-time" rather than
            // using this object clock.
            if (!meta.has(JsonEventGenerator.META_INGESTION_TIME_FIELD)) {
                // ingestionTimeClock cannot be null
                meta.put(JsonEventGenerator.META_INGESTION_TIME_FIELD, clock.get().toString());
            }
        };
    }

    /**
     * Set the event time (dt) if not already set.
     */
    static EventNormalizationStep eventTime(Instant eventTime) {
        return node -> {
            if (!node.has(EVENT_TIME_FIELD)) {
                // Should event time be mandatory?
                node.put(EVENT_TIME_FIELD, eventTime.toString());
            }
        };
    }

    /**
     * Set the event if (meta.id) if not already set.
     */
    static EventNormalizationStep eventId(Supplier<UUID> uuidSupplier) {
        return node -> {
            ObjectNode meta = getOrSetMeta(node);
            if (!meta.has(META_ID_FIELD)) {
                meta.put(META_ID_FIELD, uuidSupplier.get().toString());
            }
        };
    }

    /**
     * Forcibly set the stream (meta.stream) and the schema URI.
     */
    static EventNormalizationStep streamAndSchema(String streamName, String schemaUri) {
        return node -> {
            node.put(SCHEMA_FIELD, schemaUri);
            getOrSetMeta(node).put(META_STREAM_FIELD, streamName);
        };
    }

    /**
     * Validates the event against the provided schema (must be the last step to apply).
     */
    static EventNormalizationStep schemaValidation(ObjectNode jsonSchema, String schemaUri) {
        return new Validation(jsonSchema, schemaUri);
    }

    static ObjectNode getOrSetMeta(ObjectNode node) {
        JsonNode metaAsJsonNode = node.get(JsonEventGenerator.META_FIELD);
        ObjectNode meta;
        if (metaAsJsonNode == null || metaAsJsonNode.isNull()) {
            meta = node.putObject(JsonEventGenerator.META_FIELD);
        } else if (!metaAsJsonNode.isObject()) {
            throw new IllegalArgumentException("The field [meta] must be an object. [" + metaAsJsonNode.getNodeType() + "] found.");
        } else {
            meta = (ObjectNode) metaAsJsonNode;
        }
        return meta;
    }

    @SuppressFBWarnings("SE_NO_SERIALVERSIONID")
    final class Validation implements EventNormalizationStep {
        private final ObjectNode jsonSchema;
        private final String schemaUri;

        /**
         * This object is not serializable and will be managed manually in constructor and readObject.
         */
        private transient JsonSchema schemaValidator;

        private Validation(ObjectNode jsonSchema, String schemaUri) {
            this.jsonSchema = jsonSchema;
            this.schemaUri = schemaUri;
            this.schemaValidator = parseSchema(jsonSchema, schemaUri);
        }

        /**
         * Parse the schema when deserializing to fail early in case this stream
         * is rarely pushing events.
         */
        private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
            // Default serialization mechanism do not call the constructor, so we have to re-initialize
            // this transient field explicitly when it happens
            // see https://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
            in.defaultReadObject();
            schemaValidator = parseSchema(jsonSchema, schemaUri);
        }

        private static JsonSchema parseSchema(JsonNode jsonSchema, String schemaUri) {
            JsonSchemaFactory schemaFactory = JsonSchemaFactory.byDefault();
            try {
                return schemaFactory.getJsonSchema(jsonSchema);
            } catch (ProcessingException e) {
                throw new IllegalArgumentException("The schema [" + schemaUri + "] is invalid", e);
            }
        }
        @Override
        public void accept(ObjectNode root) {
            validateEvent(root, schemaValidator);
        }

        private void validateEvent(ObjectNode root, JsonSchema eventSchema) {
            ProcessingReport report;
            try {
                report = eventSchema.validate(root);
            } catch (ProcessingException e) {
                throw new IllegalArgumentException("Cannot validate the generated event", e);
            }
            if (!report.isSuccess()) {
                throw new IllegalArgumentException("Cannot validate the generated event: " + report);
            }
        }
    }

}