QueryEventSenderFilter.java
package org.wikidata.query.rdf.blazegraph.filters;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.blazegraph.WikibaseContextListener;
import org.wikidata.query.rdf.blazegraph.events.AsyncEventSender;
import org.wikidata.query.rdf.blazegraph.events.EventFileSender;
import org.wikidata.query.rdf.blazegraph.events.EventHttpSender;
import org.wikidata.query.rdf.blazegraph.events.EventSender;
import org.wikidata.query.rdf.blazegraph.events.QueryEvent;
import org.wikidata.query.rdf.blazegraph.events.QueryEventGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* This filter assumes that it is configured to only track
* read queries.
*/
public class QueryEventSenderFilter extends MonitoredFilter implements QueryEventSenderMXBean {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private EventSender sender;
private QueryEventGenerator queryEventGenerator;
private String enableIfHeader;
private String disableIfHeader;
private final OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
private final AtomicInteger queryCounter = new AtomicInteger(0);
private final AtomicLong startedQueries = new AtomicLong(0);
public QueryEventSenderFilter() {
}
@VisibleForTesting
QueryEventSenderFilter(EventSender sender, QueryEventGenerator eventGenerator, String enableIfHeader, String disableIfHeader) {
this.sender = sender;
this.queryEventGenerator = eventGenerator;
this.enableIfHeader = enableIfHeader;
this.disableIfHeader = disableIfHeader;
}
@Override
@SuppressFBWarnings(value = "MDM_INETADDRESS_GETLOCALHOST", justification = "are there alternatives?")
public void init(FilterConfig filterConfig) throws ServletException {
super.init(filterConfig);
FilterConfiguration config = new FilterConfiguration(filterConfig, FilterConfiguration.WDQS_CONFIG_PREFIX);
String wdqsHostname;
try {
wdqsHostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new ServletException(e);
}
String graphName = config.loadStringParam("graph-name");
String streamName = config.loadStringParam("event-gate-sparql-query-stream", "blazegraph.sparql-query");
this.queryEventGenerator = new QueryEventGenerator(() -> UUID.randomUUID().toString(),
Clock.systemUTC(),
wdqsHostname,
graphName,
streamName,
operatingSystemMXBean::getSystemLoadAverage);
this.enableIfHeader = config.loadStringParam("enable-event-sender-if-header");
this.disableIfHeader = config.loadStringParam("disable-event-sender-if-header");
boolean eventFileSenderEnabled = config.loadBooleanParam("file-event-sender", false);
int maxCap = config.loadIntParam("queue-size", 1000);
int maxEventsPerHttpRequest = config.loadIntParam("http-max-events-per-request", 10);
EventSender eventSender = eventFileSenderEnabled ? createFileEventSender(config) : createHttpEventSender(config);
this.sender = AsyncEventSender.wrap(maxCap, maxEventsPerHttpRequest, eventSender);
}
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "We trust the FilterConfiguration")
private EventSender createFileEventSender(FilterConfiguration config) {
Path path = Paths.get(config.loadStringParam("file-event-sender-filepath"));
return new EventFileSender(path);
}
private EventSender createHttpEventSender(FilterConfiguration config) {
String httpEndPoint = config.loadStringParam("event-gate-endpoint");
int httpReadTimeout = config.loadIntParam("http-read-timeout", EventHttpSender.DEFAULT_READ_TIMEOUT);
int httpConTimeout = config.loadIntParam("http-con-timeout", EventHttpSender.DEFAULT_CON_TIMEOUT);
if (httpEndPoint == null) {
return e -> true; // /dev/null
}
return EventHttpSender.build(httpEndPoint, httpReadTimeout, httpConTimeout);
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
if (!canLogQueryEvent(httpRequest)) {
filterChain.doFilter(httpRequest, httpResponse);
return;
}
Instant start = queryEventGenerator.instant();
boolean succeed = false;
int countBefore = queryCounter.getAndIncrement();
startedQueries.incrementAndGet();
try {
filterChain.doFilter(servletRequest, servletResponse);
succeed = true;
} finally {
int countAfter = queryCounter.decrementAndGet();
int responseStatus;
Instant end = queryEventGenerator.instant();
if (succeed) {
responseStatus = httpResponse.getStatus();
} else {
responseStatus = 500;
}
String defaultNamespace = (String) servletRequest.getServletContext().getAttribute(WikibaseContextListener.BLAZEGRAPH_DEFAULT_NAMESPACE);
QueryEvent event = queryEventGenerator.generateQueryEvent(httpRequest, responseStatus, Duration.between(start, end),
start, defaultNamespace, countBefore, countAfter);
if (!sender.push(event)) {
log.warn("Cannot sent event for {} (queue full?)", event.getMetadata().getStream());
}
}
}
private boolean canLogQueryEvent(HttpServletRequest httpRequest) {
if (!Strings.isNullOrEmpty(enableIfHeader) && httpRequest.getHeader(enableIfHeader) == null) {
return false;
}
if (!Strings.isNullOrEmpty(disableIfHeader) && httpRequest.getHeader(disableIfHeader) != null) {
return false;
}
if (!queryEventGenerator.hasValidPath(httpRequest)) {
return false;
}
return httpRequest.getParameter(QueryEventGenerator.QUERY_PARAM) != null;
}
@Override
public void destroy() {
try {
sender.close();
} catch (IOException e) {
log.error("Exception thrown while closing event sender.", e);
}
}
@Override
public int getRunningQueries() {
return queryCounter.get();
}
@Override
public long getStartedQueries() {
return startedQueries.get();
}
}