EventUtilitiesConfig.java
package org.wikimedia.eventutilities.core.event;
import static java.util.Collections.singletonList;
import java.io.Serializable;
import java.net.URI;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.concurrent.Immutable;
import org.wikimedia.eventutilities.core.SerializableClock;
import org.wikimedia.eventutilities.core.http.BasicHttpClient;
import org.wikimedia.eventutilities.core.json.JsonLoader;
import org.wikimedia.eventutilities.core.json.JsonSchemaLoader;
import org.wikimedia.eventutilities.core.util.ResourceLoader;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.With;
/**
* Configuration for Event Utilities.
*
* Default values and instances are provided to aide working with Event
* Streams. Those defaults should only be used for debugging and
* troubleshooting. If you are writing code for a production job you should
* properly configure your code externally with non hard-coded values.
*
* {@code create*()} methods expect the client to manage wiring by taking
* the collaborators of the class created as method arguments. This allows
* the caller to decide on sharing instances or not. And it allows this class
* to stay lightweight and without any mutable state.
*/
@SuppressFBWarnings("SE_NO_SERIALVERSIONID")
@Value @Builder @Immutable @ParametersAreNonnullByDefault
public class EventUtilitiesConfig implements Serializable {
public static final String META_WIKIMEDIA_ORG = "meta.wikimedia.org";
/**
* Default values and instances to aide working with Event Streams in
* Wikimedia Production.
*
* You should only use this class for troubleshooting and debugging.
* If you are writing code for a production job you should properly
* configure your code externally with non hard-coded values.
*/
public static final EventUtilitiesConfig DEFAULT = EventUtilitiesConfig.builder()
.eventSchemaBaseUris(ImmutableList.of(
"https://schema.discovery.wmnet/repositories/primary/jsonschema",
"https://schema.discovery.wmnet/repositories/secondary/jsonschema"
))
.eventloggingSchemaBaseUri("https://" + META_WIKIMEDIA_ORG + "/w/api.php")
.eventServiceToUriMap(ImmutableMap.<String, URI>builder()
.put("eventgate-main", URI.create("https://eventgate-main.discovery.wmnet:4492/v1/events"))
.put("eventgate-main-eqiad", URI.create("https://eventgate-main.svc.eqiad.wmnet:4492/v1/events"))
.put("eventgate-main-codfw", URI.create("https://eventgate-main.svc.codfw.wmnet:4492/v1/events"))
.put("eventgate-analytics", URI.create("https://eventgate-analytics.discovery.wmnet:4592/v1/events"))
.put("eventgate-analytics-eqiad", URI.create("https://eventgate-analytics.svc.eqiad.wmnet:4592/v1/events"))
.put("eventgate-analytics-codfw", URI.create("https://eventgate-analytics.svc.codfw.wmnet:4592/v1/events"))
.put("eventgate-analytics-external", URI.create("https://eventgate-analytics-external.discovery.wmnet:4692/v1/events"))
.put("eventgate-analytics-external-eqiad", URI.create("https://eventgate-analytics-external.svc.eqiad.wmnet:4692/v1/events"))
.put("eventgate-analytics-external-codfw", URI.create("https://eventgate-analytics-external.svc.codfw.wmnet:4692/v1/events"))
.put("eventgate-logging-external", URI.create("https://eventgate-logging-external.discovery.wmnet:4392/v1/events"))
.put("eventgate-logging-external-eqiad", URI.create("https://eventgate-logging-external.svc.eqiad.wmnet:4392/v1/events"))
.put("eventgate-logging-external-codfw", URI.create("https://eventgate-logging-external.svc.codfw.wmnet:4392/v1/events"))
.build())
.eventStreamConfigEndpoint("https://mw-api-int-ro.discovery.wmnet:4446")
.eventStreamConfigUri("https://" + META_WIKIMEDIA_ORG + "/w/api.php")
.build();
/**
* Default values and instances to aide working with Event Streams in
* Wikimedia outside of production networks, e.g. schema.wikimedia.org
* instead of schema.discovery.wmnet.
*
* It will not be able to use non external services like eventgate-main.
*
* You should only use this class for troubleshooting and debugging.
* If you are writing code for a production job you should properly
* configure your code externally with non hard-coded values.
*/
public static final EventUtilitiesConfig EXTERNAL_DEFAULT = EventUtilitiesConfig.builder()
.eventSchemaBaseUris(ImmutableList.of(
"https://schema.wikimedia.org/repositories/primary/jsonschema",
"https://schema.wikimedia.org/repositories/secondary/jsonschema"
))
.eventloggingSchemaBaseUri("https://" + META_WIKIMEDIA_ORG + "/w/api.php")
.eventServiceToUriMap(ImmutableMap.<String, URI>builder()
// eventgate-main and eventgate-analytics are not accessible from external networks.
.put("eventgate-analytics-external", URI.create("https://intake-analytics.wikimedia.org/v1/events"))
.put("eventgate-analytics-external-eqiad", URI.create("https://intake-analytics.wikimedia.org/v1/events"))
.put("eventgate-analytics-external-codfw", URI.create("https://intake-analytics.wikimedia.org/v1/events"))
.put("eventgate-logging-external", URI.create("https://intake-logging.wikimedia.org/v1/events"))
.put("eventgate-logging-external-eqiad", URI.create("https://intake-logging.wikimedia.org/v1/events"))
.put("eventgate-logging-external-codfw", URI.create("https://intake-logging.wikimedia.org/v1/events"))
.build())
.eventStreamConfigEndpoint(null)
.eventStreamConfigUri("https://" + META_WIKIMEDIA_ORG + "/w/api.php")
.build();
@Nonnull List<String> eventSchemaBaseUris;
/** URI from which to get legacy EventLogging on wiki schemas. */
@Nonnull String eventloggingSchemaBaseUri;
/**
* Mapping of service to port, used to route HTTP traffic through SSL
* tunnels. See also https://wikitech.wikimedia.org/wiki/Service_ports.
*/
@Nonnull Map<String, URI> eventServiceToUriMap;
/**
* In production, the mediawiki internal api endpoint needs the Host header
* set to the MediaWiki API that should be accessed.
*/
@Nullable String eventStreamConfigEndpoint;
/** MediaWiki EventStreamConfig API used in WMF production to use to fetch stream configs. */
@Nonnull String eventStreamConfigUri;
/**
* Clock used for various time based operations and timestamp creation.
*
* Overriding the default can be used in tests to create predictable
* timestamps. A default configuration can be used as a starting point,
* overriding only the clock:
* {@code EventUtilitiesConfig config = EventUtilitiesConfig.DEFAULT.withClock(SerializableClock.frozenClock(Instant.EPOCH));}
*/
@Builder.Default
@Nonnull @With SerializableClock clock = Instant::now;
@SneakyThrows
public BasicHttpClient createHttpClient() {
BasicHttpClient.Builder httpClientBuilder = BasicHttpClient.builder();
// Add a custom route to the mediawiki internal api endpoint
if (eventStreamConfigEndpoint != null)
httpClientBuilder.addRoute(eventStreamConfigUri, eventStreamConfigEndpoint);
return httpClientBuilder.build();
}
public EventLoggingSchemaLoader createEventloggingSchemaLoader(BasicHttpClient httpClient) {
return new EventLoggingSchemaLoader(
JsonSchemaLoader.build(ResourceLoader.builder()
.withHttpClient(httpClient)
.setBaseUrls(ResourceLoader.asURLs(singletonList(eventloggingSchemaBaseUri)))
.build()
)
);
}
public MediawikiEventStreamConfigLoader createMediawikiEventStreamConfigLoader(BasicHttpClient httpClient) {
return new MediawikiEventStreamConfigLoader(
eventStreamConfigUri,
new JsonLoader(ResourceLoader.builder().withHttpClient(httpClient).build())
);
}
public EventStreamConfig createEventStreamConfig(EventStreamConfigLoader eventStreamConfigLoader) {
return EventStreamConfig.builder()
.setEventStreamConfigLoader(eventStreamConfigLoader)
.setEventServiceToUriMap(eventServiceToUriMap)
.build();
}
public EventSchemaLoader createEventSchemaLoader(BasicHttpClient httpClient) {
return EventSchemaLoader.builder()
.setJsonSchemaLoader(
JsonSchemaLoader.build(
ResourceLoader.builder()
.withHttpClient(httpClient)
.setBaseUrls(ResourceLoader.asURLs(eventSchemaBaseUris))
.build()
)
)
.build();
}
public EventStreamFactory createEventStreamFactory(
EventSchemaLoader eventSchemaLoader,
EventStreamConfig eventStreamConfig
) {
return EventStreamFactory.builder()
.setEventSchemaLoader(eventSchemaLoader)
.setEventStreamConfig(eventStreamConfig)
.build();
}
public EventSchemaValidator createEventSchemaValidator(EventSchemaLoader eventSchemaLoader) {
return new EventSchemaValidator(eventSchemaLoader);
}
public JsonEventGenerator createJsonEventGenerator(
EventSchemaLoader eventSchemaLoader,
EventStreamConfig eventStreamConfig
) {
return JsonEventGenerator.builder()
.schemaLoader(eventSchemaLoader)
.eventStreamConfig(eventStreamConfig)
.ingestionTimeClock(clock)
.build();
}
}