JsonEventGenerator.java
package org.wikimedia.eventutilities.core.event;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.wikimedia.eventutilities.core.SerializableClock;
import org.wikimedia.eventutilities.core.json.JsonLoadingException;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.fge.jsonschema.core.exceptions.ProcessingException;
import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
* Helper class suited to assist java clients producing json events directly to kafka.
* This object must be constructed using {@link JsonEventGenerator#builder()}.
*
* It provides:
* - initialization of default values
* - meta.dt as the kafka ingestion time with the provided clock
* - meta.stream
* - dt as a yet optional event time provided via a param
* - $schema as the schema that can validate the generated event
* - verification of schema vs stream
* - validation of the event against its schema
*/
@ParametersAreNonnullByDefault
public final class JsonEventGenerator {
public static final String SCHEMA_FIELD = "$schema";
public static final String EVENT_TIME_FIELD = "dt";
public static final String META_FIELD = "meta";
public static final String META_STREAM_FIELD = "stream";
public static final String META_INGESTION_TIME_FIELD = "dt";
public static final String META_ID_FIELD = "id";
private final EventSchemaLoader schemaLoader;
private final EventStreamConfig eventStreamConfig;
private final SerializableClock ingestionTimeClock;
private final ObjectMapper jsonMapper;
private final Supplier<UUID> uuidSupplier;
/**
* cache of the JsonSchema whose schema/stream pair was already verified against the stream configuration.
*/
private final Cache<SchemaStreamNamesPair, EventNormalizer> validatingGeneratorCache = Caffeine.newBuilder().build();
private final Cache<SchemaStreamNamesPair, EventNormalizer> nonValidatingGeneratorCache = Caffeine.newBuilder().build();
private JsonEventGenerator(
EventSchemaLoader schemaLoader,
EventStreamConfig eventStreamConfig,
SerializableClock ingestionTimeClock,
ObjectMapper jsonMapper,
Supplier<UUID> uuidSupplier
) {
this.schemaLoader = schemaLoader;
this.eventStreamConfig = eventStreamConfig;
this.ingestionTimeClock = ingestionTimeClock;
this.jsonMapper = jsonMapper;
this.uuidSupplier = uuidSupplier;
}
/**
* Generates an json event calling the supplier eventData.
*
* @param streamName the stream this event will be pushed to
* @param schemaUri the schema this event is build against
* @param eventData consumer receiving an empty ObjectNode to attach data to
* @param eventTime the optional eventTime to be set to the dt field (might become mandatory)
* @throws IllegalArgumentException if the schema does not match what is expected from the stream configuration
* @throws IllegalArgumentException if the schema cannot be found/loaded
* @throws IllegalArgumentException if the event is not valid against the provided schema
*/
public ObjectNode generateEvent(
String streamName,
String schemaUri,
Consumer<ObjectNode> eventData,
@Nullable Instant eventTime
) {
return createEventStreamEventGenerator(streamName, schemaUri).generateEvent(eventData, eventTime);
}
/**
* Generates a json event calling the supplier eventData.
*
* Using this method with validate to false is highly discouraged and should only be used when shipping events
* to event-gate that will take care of the schema validation.
*
* @param streamName the stream this event will be pushed to
* @param schemaUri the schema this event is build against
* @param eventData consumer receiving an empty ObjectNode to attach data to
* @param eventTime the optional eventTime to be set to the dt field (might become mandatory)
* @param validate true to validate the event against its schema
*
* @throws IllegalArgumentException if the schema does not match what is expected from the stream configuration
* @throws IllegalArgumentException if the schema cannot be found/loaded
* @throws IllegalArgumentException if the event is not valid against the provided schema
*/
public ObjectNode generateEvent(
String streamName,
String schemaUri,
Consumer<ObjectNode> eventData,
@Nullable Instant eventTime,
boolean validate
) {
Objects.requireNonNull(streamName, "stream must not be null");
Objects.requireNonNull(schemaUri, "schema must not be null");
return createEventStreamEventGenerator(streamName, schemaUri, validate).generateEvent(eventData, eventTime);
}
/**
* Create a validating event generator.
* Suited to ship events directly to kafka
*/
public EventNormalizer createEventStreamEventGenerator(String streamName, String schemaUri) {
return createEventStreamEventGenerator(streamName, schemaUri, true);
}
/**
* Create a non-validating event generator.
* Use with care, this should ONLY be used to send event via event-gate which will perform schema validation.
* Using this to ship events directly to kafka must never be used.
*/
public EventNormalizer createNonValidatingEventStreamEventGenerator(String streamName, String schemaUri) {
return createEventStreamEventGenerator(streamName, schemaUri, false);
}
private EventNormalizer createEventStreamEventGenerator(String streamName, String schemaUri, boolean validating) {
Cache<SchemaStreamNamesPair, EventNormalizer> generatorCache = validating ? validatingGeneratorCache : nonValidatingGeneratorCache;
return generatorCache.get(new SchemaStreamNamesPair(streamName, schemaUri), (key) -> {
List<EventNormalizationStep> steps = validating ?
normalizeAndValidationSteps(streamName, schemaUri) : normalizationStep(key.streamName, schemaUri);
return new StepBasedEventNormalizer(jsonMapper, steps);
});
}
/**
* Non-validating normalization steps.
* - meta.dt
* - meta.id
* - $schema and meta.stream
*/
private List<EventNormalizationStep> normalizationStep(String streamName, String schemaUri) {
return ImmutableList.of(
EventNormalizationStep.streamAndSchema(streamName, schemaUri),
EventNormalizationStep.ingestionTime(ingestionTimeClock),
EventNormalizationStep.eventId(uuidSupplier)
);
}
/**
* Validating and normalization steps.
* - meta.dt
* - meta.id
* - $schema and meta.stream
* - schema validation
*/
private List<EventNormalizationStep> normalizeAndValidationSteps(String streamName, String schemaUri) {
ObjectNode schemaAsJson = loadAndVerifyJsonSchema(streamName, schemaUri);
return ImmutableList.<EventNormalizationStep>builder()
.addAll(normalizationStep(streamName, schemaUri))
.add(EventNormalizationStep.schemaValidation(schemaAsJson, streamName))
.build();
}
private ObjectNode loadAndVerifyJsonSchema(String streamName, String schemaUri) {
JsonNode eventSchemaAsJson;
try {
eventSchemaAsJson = schemaLoader.getSchema(URI.create(schemaUri));
} catch (JsonLoadingException e) {
throw new IllegalArgumentException("Cannot load schema for event", e);
}
checkSchemaTitleAndStream(eventSchemaAsJson, streamName, schemaUri);
try {
// check that the schema is valid
schemaLoader.getJsonSchema(eventSchemaAsJson);
} catch (ProcessingException e) {
throw new IllegalArgumentException("Cannot obtain schema [" + schemaUri + "]", e);
}
if (!(eventSchemaAsJson instanceof ObjectNode)) {
throw new IllegalArgumentException("schemaLoader must return an ObjectNode");
}
return (ObjectNode) eventSchemaAsJson;
}
private static final class SchemaStreamNamesPair {
private final String streamName;
private final String schemaUri;
private SchemaStreamNamesPair(String streamName, String schemaUri) {
this.streamName = streamName;
this.schemaUri = schemaUri;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchemaStreamNamesPair that = (SchemaStreamNamesPair) o;
return Objects.equals(streamName, that.streamName) && Objects.equals(schemaUri, that.schemaUri);
}
@Override
public int hashCode() {
return Objects.hash(streamName, schemaUri);
}
}
private void checkSchemaTitleAndStream(
JsonNode eventSchema,
String streamName,
String schemaUri
) {
JsonNode schemaTitle = eventSchema.get("title");
if (schemaTitle == null || schemaTitle.getNodeType() != JsonNodeType.STRING) {
throw new IllegalArgumentException("Missing or invalid title for schema [" + schemaUri + "]");
}
List<String> allowedTitles = eventStreamConfig.collectSettingAsString(
streamName,
EventStreamConfig.SCHEMA_TITLE_SETTING
);
if (allowedTitles.isEmpty()) {
throw new IllegalArgumentException("Cannot find any schema titles for stream [" + streamName + "]");
}
if (!allowedTitles.contains(schemaTitle.textValue())) {
throw new IllegalArgumentException("Schema [" + schemaUri + "] with title " +
"[" + schemaTitle.asText() + "] does not match allowed titles for stream " +
"[" + streamName + "], allowed titles are: " +
"[" + String.join(",", allowedTitles) + "]");
}
}
/**
* Helper method to serialize the event as bytes.
*/
public byte[] serializeAsBytes(ObjectNode root) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
JsonGenerator generator = jsonMapper.getFactory().createGenerator(
byteArrayOutputStream,
JsonEncoding.UTF8
);
root.serialize(generator, jsonMapper.getSerializerProviderInstance());
generator.flush();
generator.close();
return byteArrayOutputStream.toByteArray();
}
public static Builder builder() {
return new Builder();
}
public static final class Builder {
private EventSchemaLoader schemaLoader;
private EventStreamConfig eventStreamConfig;
private SerializableClock ingestionTimeClock;
private ObjectMapper jsonMapper;
private Supplier<UUID> uuidSupplier;
private Builder() {}
public Builder ingestionTimeClock(SerializableClock ingestionTimeClock) {
this.ingestionTimeClock = Objects.requireNonNull(ingestionTimeClock);
return this;
}
public Builder jsonMapper(ObjectMapper mapper) {
this.jsonMapper = mapper;
return this;
}
public Builder schemaLoader(EventSchemaLoader schemaLoader) {
this.schemaLoader = Objects.requireNonNull(schemaLoader);
return this;
}
public Builder eventStreamConfig(EventStreamConfig eventStreamConfig) {
this.eventStreamConfig = Objects.requireNonNull(eventStreamConfig);
return this;
}
public Builder withUuidSupplier(Supplier<UUID> uuidSupplier) {
this.uuidSupplier = uuidSupplier;
return this;
}
public JsonEventGenerator build() {
if (schemaLoader == null) {
throw new IllegalArgumentException(
"Must call schemaLoader() before calling build()."
);
}
if (eventStreamConfig == null) {
throw new IllegalArgumentException(
"Must call eventStreamConfig() before calling build()."
);
}
if (ingestionTimeClock == null) {
ingestionTimeClock = Instant::now;
}
return new JsonEventGenerator(
schemaLoader,
eventStreamConfig,
ingestionTimeClock,
jsonMapper != null ? jsonMapper : new ObjectMapper(),
this.uuidSupplier != null ? this.uuidSupplier : (Supplier<UUID> & Serializable) UUID::randomUUID
);
}
}
public ObjectMapper getJsonMapper() {
return jsonMapper;
}
/**
* A component that applies various normalization and verification steps to the events it receives.
* This object is serializable and is suited for use with frameworks like spark or flink.
*/
public interface EventNormalizer extends Serializable, Function<Consumer<ObjectNode>, ObjectNode> {
ObjectNode generateEvent(
Consumer<ObjectNode> eventData,
@Nullable Instant eventTime
);
default ObjectNode apply(
Consumer<ObjectNode> eventData
) {
return generateEvent(eventData, null);
}
/**
* The object mapper this normalizer will use to generate json nodes.
*/
ObjectMapper getObjectMapper();
}
@Getter
@AllArgsConstructor
@ToString(exclude = {"objectMapper"})
@SuppressFBWarnings({"SE_NO_SERIALVERSIONID"})
private static final class StepBasedEventNormalizer implements Serializable, EventNormalizer {
private final ObjectMapper objectMapper;
private final List<EventNormalizationStep> steps;
@Override
public ObjectNode generateEvent(Consumer<ObjectNode> eventData, @Nullable Instant eventTime) {
ObjectNode root = objectMapper.createObjectNode();
eventData.accept(root);
if (eventTime != null) {
EventNormalizationStep.eventTime(eventTime).accept(root);
}
steps.forEach(e -> e.accept(root));
return root;
}
}
}