EventHttpSender.java

package org.wikidata.query.rdf.blazegraph.events;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;

import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.blazegraph.JacksonUtil;

import com.fasterxml.jackson.databind.ObjectWriter;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public class EventHttpSender implements EventSender, AutoCloseable {
    public static final int DEFAULT_CON_TIMEOUT = 5000;
    public static final int DEFAULT_READ_TIMEOUT = 5000;

    private final CloseableHttpClient httpClient;
    private final String eventGateUri;
    private final ObjectWriter objectWriter;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public EventHttpSender(CloseableHttpClient httpClient, String eventGateUri, ObjectWriter objectWriter) {
        this.httpClient = httpClient;
        this.eventGateUri = eventGateUri;
        this.objectWriter = objectWriter;
    }

    public static EventHttpSender build(String eventGateUri, int timeOut, int connectionTimeout) {
        RequestConfig config = RequestConfig.custom().setConnectTimeout(connectionTimeout)
                .setSocketTimeout(timeOut)
                .build();
        CloseableHttpClient httpclient = HttpClients.custom()
                .setDefaultRequestConfig(config)
                .build();
        return new EventHttpSender(httpclient, eventGateUri, JacksonUtil.DEFAULT_OBJECT_WRITER);
    }

    public boolean push(Event event) {
        return push(Collections.singletonList(event)) > 0;
    }

    @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE",
            justification = "false positive - known issue with try-with-resources and spotbugs")
    public int push(Collection<Event> events) {
        HttpPost post = new HttpPost(eventGateUri);
        // We're fine dropping messages here if they fail
        // just return 0 in case of errors so that the caller has a chance
        // to verify if something went wrong or not.
        // Adding a retry mechanism might be worthwhile in the future if this
        // class is used to ship something more important events than some
        // used for analysis purposes
        try {
            post.setEntity(httpEntity(events));
            try (CloseableHttpResponse response = httpClient.execute(post)) {
                if (response.getStatusLine().getStatusCode() >= 400) {
                    log.info("Cannot send events to eventgate endpoint: {}: {}", eventGateUri, response.getStatusLine());
                    return 0;
                }
            }
            return events.size();
        } catch (IOException e) {
            log.info("Cannot send events to eventgate endpoint: {}: {}", eventGateUri, e.getMessage());
            return 0;
        }
    }

    @Override
    public void close() {
        try {
            httpClient.close();
        } catch (IOException e) {
            log.error("Cannot close the http client", e);
        }
    }

    private HttpEntity httpEntity(Collection<Event> events) {
        AbstractHttpEntity entity = new AbstractHttpEntity() {
            @Override
            public boolean isRepeatable() {
                return true;
            }

            @Override
            public long getContentLength() {
                return -1L;
            }

            @Override
            public InputStream getContent() {
                throw new UnsupportedOperationException();
            }

            @Override
            public void writeTo(OutputStream outputStream) throws IOException {
                objectWriter.writeValue(outputStream, events);
            }

            @Override
            public boolean isStreaming() {
                return false;
            }
        };
        entity.setContentType(ContentType.APPLICATION_JSON.toString());
        return entity;
    }
}