CanaryEventProducer.java
package org.wikimedia.eventutilities.monitoring;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.joda.time.DateTime;
import org.wikimedia.eventutilities.core.event.EventSchemaLoader;
import org.wikimedia.eventutilities.core.event.EventStream;
import org.wikimedia.eventutilities.core.event.EventStreamConfig;
import org.wikimedia.eventutilities.core.event.EventStreamFactory;
import org.wikimedia.eventutilities.core.http.BasicHttpClient;
import org.wikimedia.eventutilities.core.http.BasicHttpResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* Uses an EventStreamFactory to create and POST Wikimedia canary events to
* Wikimedia event services. Canary events are constructed from the first entry in the
* event schema's examples field. It is expected that event schemas have the fields listed
* at https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#Required_fields, and
* also that the receiving event intake service (e.g. EventGate) will set meta.dt if it is
* not present in the event.
*/
public class CanaryEventProducer {
/**
* EventStreamFactory instance used when constructing EventStreams.
*/
protected final EventStreamFactory eventStreamFactory;
/**
* List of data center names that will be used to look up event service urls.
*/
protected static final List<String> DATACENTERS = ImmutableList.of("eqiad", "codfw");
/**
* Used for serializing JsonNode events to Strings.
*/
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Will be used as the value of meta.domain when building canary events.
* It does not matter what this is, but it should be consistent.
*/
protected static final String CANARY_DOMAIN = "canary";
private final BasicHttpClient httpClient;
/**
* Constructs a new instance of CanaryEventProducer with a new instance of EventStreamFactory
* from eventSchemaLoader and eventStreamConfig.
*/
public CanaryEventProducer(
EventSchemaLoader eventSchemaLoader,
EventStreamConfig eventStreamConfig,
BasicHttpClient httpClient
) {
this(
EventStreamFactory.builder()
.setEventSchemaLoader(eventSchemaLoader)
.setEventStreamConfig(eventStreamConfig)
.build(),
httpClient
);
}
/**
* Constructs a new CanaryEventProducer using the provided EventStreamFactory.
*/
public CanaryEventProducer(EventStreamFactory eventStreamFactory, BasicHttpClient client) {
this.eventStreamFactory = eventStreamFactory;
this.httpClient = client;
}
/**
* Returns the EventStreamFactory this CanaryEventProducer is using.
*/
@Nonnull
public EventStreamFactory getEventStreamFactory() {
return eventStreamFactory;
}
/**
* Given a streamName, gets its schema and uses the JSONSchema examples to make a canary event.
*/
@Nonnull
public ObjectNode canaryEvent(String streamName) {
return canaryEvent(eventStreamFactory.createEventStream(streamName));
}
@Nonnull
public ObjectNode canaryEvent(String streamName, DateTime timestamp) {
return canaryEvent(eventStreamFactory.createEventStream(streamName), timestamp);
}
/**
* Given an EventStream, gets its schema and uses the JSONSchema examples to make a canary event.
*/
@Nonnull
public ObjectNode canaryEvent(EventStream es) {
return makeCanaryEvent(
es.streamName(),
es.exampleEvent()
);
}
/**
* Given an EventStream, gets its schema and uses the JSONSchema examples to make a canary event.
*/
@Nonnull
public ObjectNode canaryEvent(EventStream es, DateTime timestamp) {
return makeCanaryEvent(
es.streamName(),
es.exampleEvent(),
timestamp
);
}
/**
* Creates a canary event from an example event for a stream.
*/
@Nonnull
@SuppressFBWarnings(value = "OCP_OVERLY_CONCRETE_PARAMETER", justification = "Precise typing is useful for intention")
protected static ObjectNode makeCanaryEvent(String streamName, ObjectNode example, DateTime timestamp) {
checkArgument(
example != null,
"Cannot make canary event for %s, example is null.", streamName
);
ObjectNode canaryEvent = example.deepCopy();
ObjectNode canaryMeta = (ObjectNode)canaryEvent.get("meta");
canaryMeta.set("domain", JsonNodeFactory.instance.textNode(CANARY_DOMAIN));
canaryMeta.set("stream", JsonNodeFactory.instance.textNode(streamName));
if (timestamp == null) {
// Remove meta.dt so it is set by the Event Service we will POST this event to.
canaryMeta.remove("dt");
} else {
// Set meta.dt at provided timestamp value.
canaryMeta.set("dt", JsonNodeFactory.instance.textNode(timestamp.toString()));
}
canaryEvent.set("meta", canaryMeta);
return canaryEvent;
}
protected static ObjectNode makeCanaryEvent(String streamName, ObjectNode example) {
return makeCanaryEvent(streamName, example, null);
}
/**
* Gets canary events to POST for all streams that EventStreamConfig knows about.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public Map<URI, List<ObjectNode>> getAllCanaryEventsToPost() {
return getCanaryEventsToPost(
eventStreamFactory.getEventStreamConfig().cachedStreamNames()
);
}
/**
* Gets canary events to POST for a single stream.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public Map<URI, List<ObjectNode>> getCanaryEventsToPost(String streamName) {
return getCanaryEventsToPost(ImmutableList.of(streamName));
}
/**
* Gets canary events to POST for a single stream.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public Map<URI, List<ObjectNode>> getCanaryEventsToPost(String streamName, DateTime timestamp) {
return getCanaryEventsToPost(ImmutableList.of(streamName), timestamp);
}
/**
* Gets canary events to POST for a List of stream names.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public Map<URI, List<ObjectNode>> getCanaryEventsToPost(List<String> streamNames) {
return getCanaryEventsToPostForStreams(eventStreamFactory.createEventStreams(streamNames), null);
}
/**
* Gets canary events to POST for a List of stream names.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public Map<URI, List<ObjectNode>> getCanaryEventsToPost(List<String> streamNames, DateTime timestamp) {
return getCanaryEventsToPostForStreams(eventStreamFactory.createEventStreams(streamNames), timestamp);
}
/**
* Given a list of streams and a timestamp, this will return a map of
* datacenter specific event service URIs to a list of canary
* events that should be POSTed to that event service.
* These can then be iterated through and posted to each
* event service URI to post expected canary events for each stream.
*
* If timestamp is null, the meta.dt field of canary events is removed
* to be set by EventGate, otherwise it is set to the provided value.
*/
@Nonnull
public Map<URI, List<ObjectNode>> getCanaryEventsToPostForStreams(
List<EventStream> eventStreams,
DateTime timestamp
) {
// Build a map of datacenter specific event service url to EventStreams
Map<URI, List<EventStream>> eventStreamsByEventServiceUrl = new HashMap<>();
for (String datacenter : DATACENTERS) {
eventStreamsByEventServiceUrl.putAll(
eventStreams.stream().collect(Collectors.groupingBy(
eventStream -> eventStream.eventServiceUri(datacenter)
))
);
}
// Convert the Map of URIs -> EventStreams to URIs -> canary events.
// Each set of canary events can be POSTed to their keyed
// event service url.
return eventStreamsByEventServiceUrl.entrySet().stream()
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> entry.getValue().stream()
.map(eventStream -> canaryEvent(eventStream, timestamp))
.collect(toImmutableList())
));
}
/**
* POSTs canary events for all known streams.
*
* Refer to docs for postCanaryEVents(streamNames).
* Refer to docs for postCanaryEventsForStreams(eventStreams).
*/
@Nonnull
public Map<URI, BasicHttpResult> postAllCanaryEvents() {
return postCanaryEvents(
eventStreamFactory.getEventStreamConfig().cachedStreamNames()
);
}
/**
* Posts canary events for a single streamName.
* Refer to docs for postCanaryEventsForStreams(eventStreams).
*/
@Nonnull
public Map<URI, BasicHttpResult> postCanaryEvents(String streamName) {
return postCanaryEvents(ImmutableList.of(streamName));
}
/**
* Posts canary events for each named event stream.
* Refer to docs for postCanaryEventsForStreams(eventStreams).
*/
@Nonnull
public Map<URI, BasicHttpResult> postCanaryEvents(List<String> streamNames) {
return postCanaryEventsForStreams(eventStreamFactory.createEventStreams(streamNames));
}
/**
* Gets canary events for each eventStream, POSTs them to the appropriate
* event service url(s), and collects the results of each POST
* into a Map of event service url to result ObjectNode.
*
* We want to attempt every POST we are supposed to do without bailing
* when an error is encountered. This is why the results are collected in
* this way The results should be examined after this method returns
* to check for any failures.
*/
@Nonnull
public Map<URI, BasicHttpResult> postCanaryEventsForStreams(List<EventStream> eventStreams) {
return postEventsToUris(getCanaryEventsToPostForStreams(eventStreams, null));
}
/**
* Given a List of ObjectNodes, returns an ArrayNode of those ObjectNodes.
*/
@Nonnull
public static ArrayNode eventsToArrayNode(List<ObjectNode> events) {
ArrayNode eventsArray = JsonNodeFactory.instance.arrayNode();
for (ObjectNode event : events) {
eventsArray.add(event);
}
return eventsArray;
}
/**
* Iterates over the Map of URI to events and posts events to the URI.
*/
@Nonnull
public Map<URI, BasicHttpResult> postEventsToUris(Map<URI, List<ObjectNode>> uriToEvents) {
return uriToEvents.entrySet().stream()
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> postEvents(entry.getKey(), entry.getValue())
));
}
/**
* POSTs the given list of
* events to the eventServiceUri.
* Expects that eventServiceUri returns a JSON response.
* EventGate returns 201 if guaranteed success, 202 if hasty success,
* and 207 if partial success (some events were accepted, others were not).
* We want to only consider 201 and 202 as full success so we pass
* httpPostJson a custom isSuccess function to determine this.
* https://github.com/wikimedia/eventgate/blob/master/spec.yaml#L72
*
* The returned BasicHttpResult will look like:
*
* success: true,
* status 201,
* message: "HTTP response message",
* body: response body if any
* }
* If ANY events failed POSTing, success will be false, and the reasons
* for the failure will be in message and body.
* If there is a local exception during POSTing, success will be false
* and the Exception message will be in message, and in the exception field
* will have the original Exception.
*/
@Nonnull
public BasicHttpResult postEvents(URI eventServiceUri, List<ObjectNode> events) {
// Convert List of events to ArrayNode of events to allow
// jackson to serialize them as an array of events.
ArrayNode eventsArray = eventsToArrayNode(events);
return httpClient.post(
eventServiceUri,
OBJECT_MAPPER, eventsArray,
// Only consider 201 and 202 from EventGate as fully successful POSTs.
statusCode -> statusCode == 201 || statusCode == 202);
}
}