AsyncEventSender.java
package org.wikidata.query.rdf.blazegraph.events;
import java.io.IOException;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
public class AsyncEventSender implements EventSender {
private final BufferedEventSender bufferedEventGateSender;
private final BufferedEventSender.Worker worker;
private final EventSender underlyingSender;
private final Logger log = LoggerFactory.getLogger(this.getClass());
@VisibleForTesting
AsyncEventSender(BufferedEventSender bufferedEventGateSender,
BufferedEventSender.Worker worker, EventSender underlyingSender) {
this.bufferedEventGateSender = bufferedEventGateSender;
this.worker = worker;
this.underlyingSender = underlyingSender;
}
public static AsyncEventSender wrap(int queueSize, int maxBatchSize, EventSender sender) {
BufferedEventSender bufferedEventGateSender = new BufferedEventSender(queueSize);
BufferedEventSender.Worker w = bufferedEventGateSender.newSendWorker(sender, maxBatchSize);
w.start();
return new AsyncEventSender(bufferedEventGateSender, w, sender);
}
@Override
public void close() throws IOException {
try {
worker.stopAndWaitForCompletion();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for the executor service to shutdown, some events might be lost.");
Thread.currentThread().interrupt();
} finally {
underlyingSender.close();
}
}
@Override
public boolean push(Event event) {
return bufferedEventGateSender.push(event);
}
@Override
public int push(Collection<Event> events) {
return bufferedEventGateSender.push(events);
}
}