EventStreamFactory.java

package org.wikimedia.eventutilities.core.event;

import static com.google.common.collect.ImmutableList.toImmutableList;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.wikimedia.eventutilities.core.http.BasicHttpClient;
import org.wikimedia.eventutilities.core.json.JsonLoader;
import org.wikimedia.eventutilities.core.json.JsonSchemaLoader;
import org.wikimedia.eventutilities.core.util.ResourceLoader;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

/**
 * Class to aide in constructing {@link EventStream} instances and
 * working with groups of event streams using
 * {@link EventStreamConfig} and {@link EventSchemaLoader}.
 *
 * Use EventStreamFactory.builder() or one of the EventStreamFactory.from()
 * methods to construct an EventStreamFactory.
 *
 * Example using builder():
 * <pre>{@code
 *  EventStreamFactory f = EventStreamFactory.builder()
 *      .setEventSchemaLoader(Arrays.asList("file:///path/to/schema/repo"))
 *      // MW Stream Config URL, will use MediawikiEventStreamConfigLoader
 *      .setEventStreamConfig(
 *          "https://meta.wikimedia.org/w/api.php",
 *      )
 *       // in prod networks, we need special routing for MW API, optional.
 *      .setHttpRoute("https://meta.wikimedia.org", "https://api-ro.wikimedia.org")
 *      // event service to URI map config file, optional.
 *      .setEventServiceToUriMap(
 *          "file:///path/to/event_service_map.yaml"
 *       )
 *      .build()
 * }</pre>
 *
 * Example using from():
 * <pre>{@code
 *     EventStreamFactory f = EventStreamFactory.from(
 *          Arrays.asList(
 *              "https://schema.wikimedia.org/repositories/primary",
 *              "https://schema.wikimedia.org/repositories/secondary",
 *          ),
 *          "https://meta.wikimedia.org/w/api.php/",
 *     );
 * }</pre>
 */
public class EventStreamFactory {
    /**
     * EventSchemaLoader instance used when constructing EventStreams.
     */
    protected final EventSchemaLoader eventSchemaLoader;

    /**
     * EventStreamConfig instance used when constructing EventStreams and
     * looking up stream configs.
     */
    protected EventStreamConfig eventStreamConfig;

    /**
     * Constructs a new instance of EventStreamFactory.
     * This is protected; use EventStreamFactory.builder()
     * or one of the EventStreamFactory.of(...) factories.
     * to create your EventStreamFactory instance.
     */
    protected EventStreamFactory(
        EventSchemaLoader eventSchemaLoader,
        EventStreamConfig eventStreamConfig
    ) {
        this.eventSchemaLoader = eventSchemaLoader;
        this.eventStreamConfig = eventStreamConfig;
    }

    /**
     * Helper function to quickly instantiate EventStreamFactory.
     */
    public static EventStreamFactory from(
        List<String> eventSchemaBaseUris,
        String eventStreamConfigUri
    ) {
        return from(
            eventSchemaBaseUris,
            eventStreamConfigUri,
            null,
            null
        );
    }

    /**
     * Helper function to quickly instantiate EventStreamFactory.
     */
    public static EventStreamFactory from(
        List<String> eventSchemaBaseUris,
        String eventStreamConfigUri,
        Map<String, String> httpRoutes
    ) {
        return from(
            eventSchemaBaseUris,
            eventStreamConfigUri,
            httpRoutes,
            null
        );
    }

    /**
     * Helper function to quickly instantiate EventStreamFactory.
     */
    public static EventStreamFactory from(
        List<String> eventSchemaBaseUris,
        String eventStreamConfigUri,
        Map<String, String> httpRoutes,
        String eventServiceToUriMapUrl
    ) {
        Builder builder = EventStreamFactory.builder();

        builder.setEventSchemaLoader(eventSchemaBaseUris);
        builder.setEventStreamConfig(eventStreamConfigUri);

        if (eventServiceToUriMapUrl != null) {
            builder.setEventServiceToUriMap(eventServiceToUriMapUrl);
        }

        if (httpRoutes != null) {
            builder.setHttpRoutes(httpRoutes);
        }

        return builder.build();
    }

    /**
     * Builder builder pattern to construct EventStreamFactory instance.
     */
    public static class Builder {
        private List<String> eventSchemaBaseUris;
        private EventSchemaLoader eventSchemaLoader;

        private String eventStreamConfigUri;
        private String eventServiceToUriMapUrl;
        private Map<String, String> eventServiceToUriMap;
        private EventStreamConfig eventStreamConfig;

        private Map<String, String> httpRoutes = new HashMap<>();


        // setEventSchemaLoader by URIs
        public Builder setEventSchemaLoader(List<String> eventSchemaBaseUris) {
            this.eventSchemaBaseUris = ImmutableList.copyOf(eventSchemaBaseUris);
            return this;
        }
        // setEventSchemaLoader by instantiated eventSchemaLoader.
        // If you use this, any provided eventSchemaBaseUris
        // or httpRoutes will be ignored.
        public Builder setEventSchemaLoader(EventSchemaLoader eventSchemaLoader) {
            this.eventSchemaLoader = eventSchemaLoader;
            return this;
        }

        // setEventServiceToUriMap by config file
        public Builder setEventServiceToUriMap(String eventServiceToUriMapUrl) {
            this.eventServiceToUriMapUrl = eventServiceToUriMapUrl;
            return this;
        }
        // setEventServiceToUriMap by instantiated Map
        public Builder setEventServiceToUriMap(Map<String, String> eventServiceToUriMap) {
            this.eventServiceToUriMap = ImmutableMap.copyOf(eventServiceToUriMap);
            return this;
        }

        // Set setEventStreamConfig by URI
        public Builder setEventStreamConfig(String eventStreamConfigUri) {
            this.eventStreamConfigUri = eventStreamConfigUri;
            return this;
        }
        // setEventStreamConfig by instantiated eventStreamConfig.
        // If you use this, any provided httpRoutes or setEventServiceToUriMap
        // will be ignored.
        public Builder setEventStreamConfig(EventStreamConfig eventStreamConfig) {
            this.eventStreamConfig = eventStreamConfig;
            return this;
        }

        // Add an httpRoute by source and dest url
        public Builder setHttpRoute(String sourceUrl, String destUrl) {
            this.httpRoutes.put(sourceUrl, destUrl);
            return this;
        }
        // Set the httpRoutes, if you use this, any previously set
        // http routes will be ignored.
        public Builder setHttpRoutes(Map<String, String> httpRoutes) {
            this.httpRoutes = new HashMap<>(httpRoutes);
            return this;
        }

        /**
         * Returns a new EventStreamFactory.
         * If not enough has been provided to build an EventStreamFactory
         * an IllegalArgumentException will be thrown.
         */
        public EventStreamFactory build() {
            Preconditions.checkState(
                !(eventSchemaLoader == null && eventSchemaBaseUris == null),
                "Must call setEventSchemaLoader() before calling build().");

            Preconditions.checkState(
                !(eventStreamConfig == null && eventStreamConfigUri == null),
                "Must call eventStreamConfig() before calling build().");

            // Build a BasicHttpClient with configured routes
            // that will be used for one or both of our eventSchemaLoader
            // and eventStreamConfig.
            BasicHttpClient.Builder httpClientBuilder = BasicHttpClient.builder();
            httpRoutes.forEach((source, dest) -> {
                try {
                    httpClientBuilder.addRoute(source, dest);
                } catch (MalformedURLException e) {
                    throw new IllegalArgumentException(e);
                }
            });
            BasicHttpClient httpClient = httpClientBuilder.build();

            // Build the eventSchemaLoader from eventSchemaBaseUris.
            if (this.eventSchemaLoader == null) {
                List<URL> baseUrls = eventSchemaBaseUris.stream().map((s) -> {
                    try {
                        return URI.create(s).toURL();
                    } catch (MalformedURLException e) {
                        throw new IllegalArgumentException(e);
                    }
                }).collect(Collectors.toList());

                JsonLoader jsonLoader = new JsonLoader(ResourceLoader.builder()
                    .withHttpClient(httpClient)
                    .setBaseUrls(baseUrls)
                    .build()
                );

                this.eventSchemaLoader = EventSchemaLoader.builder()
                    .setJsonSchemaLoader(new JsonSchemaLoader(jsonLoader))
                    .build();
            }

            // Build the eventStreamConfig from eventStreamConfigUri
            if (this.eventStreamConfig == null) {
                JsonLoader jsonLoader = new JsonLoader(ResourceLoader.builder()
                        .withHttpClient(httpClient)
                        .build()
                );
                EventStreamConfig.Builder eventStreamConfigBuilder = EventStreamConfig.builder()
                    .setJsonLoader(jsonLoader)
                    .setEventStreamConfigLoader(eventStreamConfigUri);

                // If either eventServiceToUriMap or eventServiceToUriMapUrl are set,
                // then call setEventServiceToUriMap on the builder with one of them.
                if (this.eventServiceToUriMap != null) {
                    // Convert our String, String eventServiceToUriMap to String, URI.
                    Map<String, URI> uriMap = new HashMap<>();
                    for (Map.Entry<String, String> entry : this.eventServiceToUriMap.entrySet()) {
                        uriMap.put(entry.getKey(), URI.create(entry.getValue()));
                    }
                    eventStreamConfigBuilder.setEventServiceToUriMap(uriMap);
                } else if (this.eventServiceToUriMapUrl != null) {
                    eventStreamConfigBuilder.setEventServiceToUriMap(this.eventServiceToUriMapUrl);
                }

                this.eventStreamConfig = eventStreamConfigBuilder.build();
            }

            // Finally, return the new EventStreamFactory
            return new EventStreamFactory(
                this.eventSchemaLoader,
                this.eventStreamConfig
            );
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    // --- actual instance methods below ---

    /**
     * Creates EventStreams for all streams in our EventStreamConfig's cache.
     * This will exclude any stream name that looks like a regex in the config,
     * as it doesn't make sense to construct an EventStream without a concrete stream name.
     */
    public List<EventStream> createAllCachedEventStreams() {
        return createEventStreams(
            eventStreamConfig.cachedStreamNames().stream()
                .filter(streamName -> !streamName.startsWith("/"))
                .collect(toImmutableList())
        );
    }

    /**
     * Returns a new EventStream for this streamName using eventSchemaLoader and
     * eventStreamConfig.
     */
    public EventStream createEventStream(String streamName) {
        return new EventStream(streamName, eventSchemaLoader, eventStreamConfig);
    }

    /**
     * Returns a List of new EventStreams using eventSchemaLoader and
     * eventStreamConfig.
     */
    public List<EventStream> createEventStreams(Collection<String> streamNames) {
        return streamNames.stream()
            .map(this::createEventStream)
            .collect(toImmutableList());
    }

    /**
     * Creates EventStreams for the list of specified streams
     * with settings that match the provided settingsFilters.
     * If streamNames is null, it is assumed you don't want to match on stream names,
     * and only setttingsFilters will be considered.
     *
     * Since settingsFilters must all be strings, this only allows filtering
     * on string stream config settings, or at least ones for which JsonNode.asText()
     * returns something sane (which is true for most primitive types).
     */
    public List<EventStream> createEventStreamsMatchingSettings(
        Collection<String> streamNames,
        Map<String, String> settingsFilters
    ) {
        List<EventStream> eventStreams;
        if (streamNames != null) {
            eventStreams = createEventStreams(streamNames);
        } else {
            eventStreams = createAllCachedEventStreams();
        }

        return eventStreams.stream()
            .filter(eventStream -> settingsFilters.entrySet().stream()
                .allMatch(settingEntry -> {
                    JsonNode streamSetting = eventStream.getSetting(settingEntry.getKey());
                    return streamSetting != null && streamSetting.asText().equals(settingEntry.getValue());
                }))
            .collect(toImmutableList());
    }

    /**
     * Returns the EventStreamConfig instance this EventStreamFactory is using.
     */
    public EventStreamConfig getEventStreamConfig() {
        return eventStreamConfig;
    }

    /**
     * Returns the EventSchemaLoader instance this EventStreamFactory is using.
     */
    public EventSchemaLoader getEventSchemaLoader() {
        return eventSchemaLoader;
    }

}