EventFileSender.java

package org.wikidata.query.rdf.blazegraph.events;

import static com.fasterxml.jackson.core.JsonGenerator.Feature.AUTO_CLOSE_TARGET;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.concurrent.NotThreadSafe;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;


@NotThreadSafe //EventFileSender is wrapped around in BufferedEventSender that serializes event sending
public class EventFileSender implements EventSender {
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    private final ObjectWriter objectWriter;
    private final Writer writer;

    public EventFileSender(Path path) {
        objectWriter = new ObjectMapper()
                .registerModule(new JavaTimeModule())
                .disable(AUTO_CLOSE_TARGET)
                .writer();
        try {
            OutputStream outputStream = Files.newOutputStream(path, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
            writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Override
    public boolean push(Event event) {
        return push(Collections.singleton(event)) == 1;
    }

    @Override
    public int push(Collection<Event> events) {
        AtomicInteger written = new AtomicInteger();
        try {
            events.forEach(value -> {
                try {
                    objectWriter.writeValue(writer, value);
                    writer.write("\n");
                    written.incrementAndGet();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
            writer.flush();
        } catch (IOException e) {
            log.error("Couldn't write events", e);
            return written.intValue();
        }
        return events.size();
    }

    @Override
    public void close() throws IOException {
        writer.close();
    }
}