EventStream.java

package org.wikimedia.eventutilities.core.event;

import java.net.URI;
import java.util.List;

import org.wikimedia.eventutilities.core.json.JsonLoadingException;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;

import lombok.SneakyThrows;


/**
 * Represents a single event stream.  An event stream is a named continuous stream
 * of events.  An event is a well typed single datum with a specific timestamp.
 *
 * An event stream in Kafka is composed of 1 or more topics.
 *
 * This class uses Wikimedia specific event stream configuration and schema repositories
 * to abstract looking up stream configuration, schemas, topics and canary events
 * given a stream name.
 */
public class EventStream {

    private static final String JSONSCHEMA_ID_FIELD = "$id";

    protected String streamName;

    /**
     * EventSchemaLoader instance used when getting a schema for this stream.
     */
    protected EventSchemaLoader eventSchemaLoader;

    /**
     * EventStreamConfig instance used to lookup stream config settings for this stream.
     */
    protected EventStreamConfig eventStreamConfig;

    /**
     * Consider using EventStreamFactory.createEventStream instead.
     */
    public EventStream(
        String streamName,
        EventSchemaLoader eventSchemaLoader,
        EventStreamConfig eventStreamConfig
    ) {
        Preconditions.checkArgument(
                !streamName.startsWith("/"),
                "EventStream name must not be a regex, was %s", streamName
        );

        this.streamName = streamName;
        this.eventSchemaLoader = eventSchemaLoader;
        this.eventStreamConfig = eventStreamConfig;
    }

    /**
     * Gets the stream name of this EventStream instance.
     */
    public String streamName() {
        return streamName;
    }

    /**
     * Gets a setting from stream config for this stream.
     */
    public JsonNode getSetting(String settingName) {
        return eventStreamConfig.getSetting(streamName, settingName);
    }

    /**
     * Gets the list of Kafka topics that compose this stream.
     */
    public List<String> topics() {
        return eventStreamConfig.getTopics(streamName);
    }

    /**
     * Gets this EventStream's destination_event_service name.
     *
     * This is an EventStreamConfig setting that should map
     * to the event service URL where events of this stream
     * can be POSTed.
     */
    public String eventServiceName() {
        return eventStreamConfig.getEventServiceName(streamName);
    }

    /**
     * Returns the (discovery) URL to which events belonging to this EventStream should be POSTed.
     *
     * E.g. https://eventgate-main.discovery.wmnet/v1/events
     */
    public URI eventServiceUri() {
        return eventStreamConfig.getEventServiceUri(streamName);
    }

    /**
     * Returns the datacenter specific event service URL to which events belonging to
     * this EventStream should be POSTed.
     *
     * E.g. https://eventgate-main.svc.eqiad.wmnet/v1/events
     * This assumes that EventStreamConfig is configured locally here with a
     * name to URI map of event_service_name-datacenter.  If it isn't, this will return null.
     */
    public URI eventServiceUri(String datacenter) {
        return eventStreamConfig.getEventServiceUri(
            streamName, datacenter
        );
    }

    /**
     * Get the Kafka message key fields.
     */
    public JsonNode messageKeyFields() {
        return eventStreamConfig.getMessageKeyFields(streamName);
    }

    /**
     * Gets the JSONSchema title configured for this EventStream.
     */
    public String schemaTitle() {
        return eventStreamConfig.getSchemaTitle(streamName);
    }

    /**
     * Builds schema URI for stream based on WMF conventions.
     *
     * This expects that the stream's schema_title will easily map to
     * a schema URI namespace hierarchy in a schema repository.  E.g.
     *   schema_title: my/cool/schema and version 1.0.0 returns /my/cool/schema/1.0.0
     *
     * @param version version of the schema to load
     */
    public URI schemaUri(String version) {
        return eventSchemaLoader.schemaUri(schemaTitle(), version);
    }

    /**
     * Builds a latest relative schema URI for stream based on WMF conventions.
     *
     * This expects that the stream's schema_title will easily map to
     * a schema URI namespace hierarchy in a schema repository.  E.g.
     *   schema_title: my/cool/schema returns /my/cool/schema/latest
     */
    public URI schemaUri() {
        // The final part of this URI (here latest) is the schema version.
        // It doesn't actually matter what version we put, since we'll be calling
        // EventSchemaLoader getLatestSchemaUri, and the version will be replaced anyway.
        return eventSchemaLoader.latestSchemaURI(schemaTitle());
    }

    /**
     * Infers the latest relative schemaUri for the stream from its schema_title
     * stream config setting and fetches and returns the schema at that URI using eventSchemaLoader.
     */
    public JsonNode schema() {
        return loadSchema(schemaUri());
    }

    /**
     * Get the version relative schemaUri for the stream from its schema_title
     * stream config setting and fetches and returns the schema at that URI using eventSchemaLoader.
     * @param version version of the schema to load
     */
    public JsonNode schema(String version) {
        return loadSchema(schemaUri(version));
    }

    /**
     * Gets the $id from the latest schema.
     */
    public String latestSchemaId() {
        ObjectNode latestSchema = (ObjectNode)schema();
        return latestSchema.get(JSONSCHEMA_ID_FIELD).asText();
    }

    /**
     * Infers the latest schema version number from the schema's $id field.
     */
    public String latestSchemaVersion() {
        String schemaId = latestSchemaId();
        String[] parts = schemaId.split("\\/");
        return parts[parts.length - 1];
    }

    /**
     * Loads the schema at the relative schemaUri using our eventSchemaLoader.
     */
    @SneakyThrows(JsonLoadingException.class)
    protected JsonNode loadSchema(URI schemaUri) {
        return eventSchemaLoader.getSchema(schemaUri);
    }

    /**
     * Gets the schema for stream and returns the first element in its JSONSchema examples.
     *
     * If the schema does not have any examples, returns null.
     */
    public ObjectNode exampleEvent() {
        return extractExampleEvent(schema());
    }

    /**
     * Gets the schema for stream and returns the first element in its JSONSchema examples.
     * If the schema does not have any examples, returns null.
     *
     * @param schemaVersion the schemaVersion
     */
    public ObjectNode exampleEvent(String schemaVersion) {
        return extractExampleEvent(schema(schemaVersion));
    }

    private static ObjectNode extractExampleEvent(JsonNode schema) {
        JsonNode examples = schema.get("examples");
        if (examples == null || examples.isEmpty()) {
            return null;
        } else {
            return (ObjectNode) examples.get(0);
        }
    }


    public String toString() {
        return "EventStream(" + streamName + ") of schema " + schemaTitle();
    }
}