BufferedEventSender.java

package org.wikidata.query.rdf.blazegraph.events;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BufferedEventSender implements EventSender {
    private final BlockingQueue<Event> queue;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    BufferedEventSender(int maxCap) {
        this.queue = new ArrayBlockingQueue<>(maxCap);
    }

    @Override
    public boolean push(Event event) {
        return offer(event);
    }

    private boolean offer(Event event) {
        if (!queue.offer(event)) {
            log.warn("Cannot buffer event {}, queue full", event.getMetadata().getStream());
            return false;
        }
        return true;
    }

    Worker newSendWorker(EventSender sender, int bufferSize) {
        return new Worker(sender, queue, bufferSize);
    }

    static class Worker extends Thread {
        private volatile boolean run = true;
        private final EventSender sender;
        private final int bufferSize;
        private final BlockingQueue<Event> queue;

        Worker(EventSender sender, BlockingQueue<Event> queue, int bufferSize) {
            super(Worker.class.getName() + "-" + sender.getClass().getSimpleName());
            this.setDaemon(true);
            this.sender = sender;
            this.queue = queue;
            this.bufferSize = bufferSize;
        }

        /**
         * Tell the thread to exist its run loop after the queue has been drained. Wait for the thread
         * completion if it's running.
         * Calling this method before starting the thread will cause the run method to immediately exit after having
         * drained the queue.
         * @throws InterruptedException
         */
        void stopAndWaitForCompletion() throws InterruptedException {
            this.run = false;
            join(2000);
            if (isAlive()) {
                this.interrupt();
            }
        }

        @Override
        public void run() {
            try {
                while (run || !queue.isEmpty()) {
                    Event event = queue.poll(100, MILLISECONDS);
                    if (event == null) {
                        continue;
                    }
                    List<Event> events = new ArrayList<>(bufferSize);
                    events.add(event);
                    queue.drainTo(events, bufferSize - 1);
                    this.sender.push(events);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}