EventUtilitiesConfig.java

package org.wikimedia.eventutilities.core.event;

import static java.util.Collections.singletonList;

import java.io.Serializable;
import java.net.URI;
import java.time.Instant;
import java.util.List;
import java.util.Map;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.concurrent.Immutable;

import org.wikimedia.eventutilities.core.SerializableClock;
import org.wikimedia.eventutilities.core.http.BasicHttpClient;
import org.wikimedia.eventutilities.core.json.JsonLoader;
import org.wikimedia.eventutilities.core.json.JsonSchemaLoader;
import org.wikimedia.eventutilities.core.util.ResourceLoader;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.With;

/**
 * Configuration for Event Utilities.
 *
 * Default values and instances are provided to aide working with Event
 * Streams. Those defaults should only be used for debugging and
 * troubleshooting. If you are writing code for a production job you should
 * properly configure your code externally with non hard-coded values.
 *
 * {@code create*()} methods expect the client to manage wiring by taking
 * the collaborators of the class created as method arguments. This allows
 * the caller to decide on sharing instances or not. And it allows this class
 * to stay lightweight and without any mutable state.
 */
@SuppressFBWarnings("SE_NO_SERIALVERSIONID")
@Value @Builder @Immutable @ParametersAreNonnullByDefault
public class EventUtilitiesConfig implements Serializable {

    public static final String META_WIKIMEDIA_ORG = "meta.wikimedia.org";

    /**
     * Default values and instances to aide working with Event Streams in
     * Wikimedia Production.
     *
     * You should only use this class for troubleshooting and debugging.
     * If you are writing code for a production job you should properly
     * configure your code externally with non hard-coded values.
     */
    public static final EventUtilitiesConfig DEFAULT = EventUtilitiesConfig.builder()
            .eventSchemaBaseUris(ImmutableList.of(
                    "https://schema.discovery.wmnet/repositories/primary/jsonschema",
                    "https://schema.discovery.wmnet/repositories/secondary/jsonschema"
            ))
            .eventloggingSchemaBaseUri("https://" + META_WIKIMEDIA_ORG + "/w/api.php")
            .eventServiceToUriMap(ImmutableMap.<String, URI>builder()
                    .put("eventgate-main", URI.create("https://eventgate-main.discovery.wmnet:4492/v1/events"))
                    .put("eventgate-main-eqiad", URI.create("https://eventgate-main.svc.eqiad.wmnet:4492/v1/events"))
                    .put("eventgate-main-codfw", URI.create("https://eventgate-main.svc.codfw.wmnet:4492/v1/events"))

                    .put("eventgate-analytics", URI.create("https://eventgate-analytics.discovery.wmnet:4592/v1/events"))
                    .put("eventgate-analytics-eqiad", URI.create("https://eventgate-analytics.svc.eqiad.wmnet:4592/v1/events"))
                    .put("eventgate-analytics-codfw", URI.create("https://eventgate-analytics.svc.codfw.wmnet:4592/v1/events"))

                    .put("eventgate-analytics-external", URI.create("https://eventgate-analytics-external.discovery.wmnet:4692/v1/events"))
                    .put("eventgate-analytics-external-eqiad", URI.create("https://eventgate-analytics-external.svc.eqiad.wmnet:4692/v1/events"))
                    .put("eventgate-analytics-external-codfw", URI.create("https://eventgate-analytics-external.svc.codfw.wmnet:4692/v1/events"))
                    .put("eventgate-logging-external", URI.create("https://eventgate-logging-external.discovery.wmnet:4392/v1/events"))
                    .put("eventgate-logging-external-eqiad", URI.create("https://eventgate-logging-external.svc.eqiad.wmnet:4392/v1/events"))
                    .put("eventgate-logging-external-codfw", URI.create("https://eventgate-logging-external.svc.codfw.wmnet:4392/v1/events"))
                    .build())
            .eventStreamConfigEndpoint("https://mw-api-int-ro.discovery.wmnet:4446")
            .eventStreamConfigUri("https://" + META_WIKIMEDIA_ORG + "/w/api.php")
            .build();

    /**
     * Default values and instances to aide working with Event Streams in
     * Wikimedia outside of production networks, e.g. schema.wikimedia.org
     * instead of schema.discovery.wmnet.
     *
     * It will not be able to use non external services like eventgate-main.
     *
     * You should only use this class for troubleshooting and debugging.
     * If you are writing code for a production job you should properly
     * configure your code externally with non hard-coded values.
     */
    public static final EventUtilitiesConfig EXTERNAL_DEFAULT = EventUtilitiesConfig.builder()
            .eventSchemaBaseUris(ImmutableList.of(
                    "https://schema.wikimedia.org/repositories/primary/jsonschema",
                    "https://schema.wikimedia.org/repositories/secondary/jsonschema"
            ))
            .eventloggingSchemaBaseUri("https://" + META_WIKIMEDIA_ORG + "/w/api.php")
            .eventServiceToUriMap(ImmutableMap.<String, URI>builder()
                    // eventgate-main and eventgate-analytics are not accessible from external networks.
                    .put("eventgate-analytics-external", URI.create("https://intake-analytics.wikimedia.org/v1/events"))
                    .put("eventgate-analytics-external-eqiad", URI.create("https://intake-analytics.wikimedia.org/v1/events"))
                    .put("eventgate-analytics-external-codfw", URI.create("https://intake-analytics.wikimedia.org/v1/events"))

                    .put("eventgate-logging-external", URI.create("https://intake-logging.wikimedia.org/v1/events"))
                    .put("eventgate-logging-external-eqiad", URI.create("https://intake-logging.wikimedia.org/v1/events"))
                    .put("eventgate-logging-external-codfw", URI.create("https://intake-logging.wikimedia.org/v1/events"))
                    .build())
            .eventStreamConfigEndpoint(null)
            .eventStreamConfigUri("https://" + META_WIKIMEDIA_ORG + "/w/api.php")
            .build();

    @Nonnull List<String> eventSchemaBaseUris;

    /** URI from which to get legacy EventLogging on wiki schemas. */
    @Nonnull String eventloggingSchemaBaseUri;

    /**
     * Mapping of service to port, used to route HTTP traffic through SSL
     * tunnels. See also https://wikitech.wikimedia.org/wiki/Service_ports.
     */
    @Nonnull Map<String, URI> eventServiceToUriMap;

    /**
     * In production, the mediawiki internal api endpoint needs the Host header
     * set to the MediaWiki API that should be accessed.
     */
    @Nullable String eventStreamConfigEndpoint;

    /** MediaWiki EventStreamConfig API used in WMF production to use to fetch stream configs. */
    @Nonnull String eventStreamConfigUri;

    /**
     * Clock used for various time based operations and timestamp creation.
     *
     * Overriding the default can be used in tests to create predictable
     * timestamps. A default configuration can be used as a starting point,
     * overriding only the clock:
     * {@code EventUtilitiesConfig config = EventUtilitiesConfig.DEFAULT.withClock(SerializableClock.frozenClock(Instant.EPOCH));}
     */
    @Builder.Default
    @Nonnull @With SerializableClock clock = Instant::now;

    @SneakyThrows
    public BasicHttpClient createHttpClient() {
        BasicHttpClient.Builder httpClientBuilder = BasicHttpClient.builder();
        // Add a custom route to the mediawiki internal api endpoint
        if (eventStreamConfigEndpoint != null)
            httpClientBuilder.addRoute(eventStreamConfigUri, eventStreamConfigEndpoint);
        return httpClientBuilder.build();
    }

    public EventLoggingSchemaLoader createEventloggingSchemaLoader(BasicHttpClient httpClient) {
        return new EventLoggingSchemaLoader(
                JsonSchemaLoader.build(ResourceLoader.builder()
                        .withHttpClient(httpClient)
                        .setBaseUrls(ResourceLoader.asURLs(singletonList(eventloggingSchemaBaseUri)))
                        .build()
                )
        );
    }

    public MediawikiEventStreamConfigLoader createMediawikiEventStreamConfigLoader(BasicHttpClient httpClient) {
        return new MediawikiEventStreamConfigLoader(
                eventStreamConfigUri,
                new JsonLoader(ResourceLoader.builder().withHttpClient(httpClient).build())
        );
    }

    public EventStreamConfig createEventStreamConfig(EventStreamConfigLoader eventStreamConfigLoader) {
        return EventStreamConfig.builder()
                .setEventStreamConfigLoader(eventStreamConfigLoader)
                .setEventServiceToUriMap(eventServiceToUriMap)
                .build();
    }

    public EventSchemaLoader createEventSchemaLoader(BasicHttpClient httpClient) {
        return EventSchemaLoader.builder()
                .setJsonSchemaLoader(
                        JsonSchemaLoader.build(
                                ResourceLoader.builder()
                                        .withHttpClient(httpClient)
                                        .setBaseUrls(ResourceLoader.asURLs(eventSchemaBaseUris))
                                        .build()
                        )
                )
                .build();
    }

    public EventStreamFactory createEventStreamFactory(
            EventSchemaLoader eventSchemaLoader,
            EventStreamConfig eventStreamConfig
    ) {
        return EventStreamFactory.builder()
                .setEventSchemaLoader(eventSchemaLoader)
                .setEventStreamConfig(eventStreamConfig)
                .build();
    }

    public EventSchemaValidator createEventSchemaValidator(EventSchemaLoader eventSchemaLoader) {
        return new EventSchemaValidator(eventSchemaLoader);
    }

    public JsonEventGenerator createJsonEventGenerator(
            EventSchemaLoader eventSchemaLoader,
            EventStreamConfig eventStreamConfig
    ) {
        return JsonEventGenerator.builder()
                .schemaLoader(eventSchemaLoader)
                .eventStreamConfig(eventStreamConfig)
                .ingestionTimeClock(clock)
                .build();
    }
}