StreamingUpdate.java
package org.wikidata.query.rdf.updater.consumer;
import static org.wikidata.query.rdf.tool.HttpClientUtils.buildHttpClient;
import static org.wikidata.query.rdf.tool.HttpClientUtils.buildHttpClientRetryer;
import static org.wikidata.query.rdf.tool.HttpClientUtils.getHttpProxyHost;
import static org.wikidata.query.rdf.tool.HttpClientUtils.getHttpProxyPort;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.openrdf.rio.RDFParserRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.common.uri.UrisScheme;
import org.wikidata.query.rdf.common.uri.UrisSchemeFactory;
import org.wikidata.query.rdf.tool.options.OptionsUtils;
import org.wikidata.query.rdf.tool.rdf.RDFParserSuppliers;
import org.wikidata.query.rdf.tool.rdf.RdfRepositoryUpdater;
import org.wikidata.query.rdf.tool.rdf.client.RdfClient;
import org.wikidata.query.rdf.updater.MutationEventData;
import org.wikidata.query.rdf.updater.RDFChunkDeserializer;
import org.wikidata.query.rdf.updater.consumer.options.StreamingUpdateOptions;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.github.rholder.retry.Retryer;
public final class StreamingUpdate {
private static final Logger LOG = LoggerFactory.getLogger(StreamingUpdate.class);
private StreamingUpdate() {}
public static void main(String[] args) {
LOG.info("Starting StreamingUpdater");
StreamingUpdateOptions options = OptionsUtils.handleOptions(StreamingUpdateOptions.class, args);
MetricRegistry metrics = new MetricRegistry();
JmxReporter reporter = JmxReporter.forRegistry(metrics).inDomain(options.metricDomain()).build();
StreamingUpdaterConsumer updater = build(options, metrics);
Thread streamingUpdaterThread = Thread.currentThread();
streamingUpdaterThread.setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in the updater thread: ", e));
addShutdownHook(updater, streamingUpdaterThread, reporter);
reporter.start();
updater.run();
}
static void addShutdownHook(StreamingUpdaterConsumer updater, Thread updaterThread, JmxReporter reporter) {
Thread t = new Thread(() -> {
updater.close();
try {
updaterThread.join(2000);
} catch (InterruptedException e) {
LOG.error("Failed to stop the streaming updater", e);
}
if (updaterThread.isAlive()) {
LOG.warn("Failed to stop the streaming updater cleanly.");
}
reporter.close();
}, "StreamingUpdate shutdown");
Runtime.getRuntime().addShutdownHook(t);
}
static StreamingUpdaterConsumer build(StreamingUpdateOptions options, MetricRegistry metrics) {
RDFChunkDeserializer deser = new RDFChunkDeserializer(new RDFParserSuppliers(RDFParserRegistry.getInstance()));
KafkaStreamConsumer consumer = KafkaStreamConsumer.build(options.brokers(),
options.topic(),
options.partition(),
options.consumerGroup(),
options.batchSize(),
deser,
parseInitialOffset(options),
KafkaStreamConsumerMetricsListener.forRegistry(metrics),
options.bufferedInputMessages(),
buildFilter(StreamingUpdateOptions.entityFilterPattern(options)));
HttpClient httpClient = buildHttpClient(getHttpProxyHost(), getHttpProxyPort());
Retryer<ContentResponse> retryer = buildHttpClientRetryer();
Duration rdfClientTimeout = RdfRepositoryUpdater.getRdfClientTimeout();
RdfClient rdfClient = new RdfClient(httpClient, StreamingUpdateOptions.sparqlUri(options), retryer,
rdfClientTimeout, RdfClient.DEFAULT_MAX_RESPONSE_SIZE);
UrisScheme uris = UrisSchemeFactory.getURISystem();
return new StreamingUpdaterConsumer(consumer, new RdfRepositoryUpdater(rdfClient, uris), metrics, options.inconsistenciesWarningThreshold());
}
private static final String RESET_OFFSETS_TO_EARLIEST = "earliest";
private static BiConsumer<Consumer<String, MutationEventData>, TopicPartition> parseInitialOffset(StreamingUpdateOptions options) {
String initialOffsets = options.initialOffsets();
if (RESET_OFFSETS_TO_EARLIEST.equals(initialOffsets)) {
return null;
}
try {
if (initialOffsets.matches("^[0-9]+$")) {
return KafkaStreamConsumer.resetToOffset(Long.parseLong(initialOffsets));
}
return KafkaStreamConsumer.resetToTime(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(initialOffsets)));
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException("Cannot parse initial offset: [" + initialOffsets + "], " +
"must be 'earliest', a number or a date", iae);
}
}
static Predicate<MutationEventData> buildFilter(Pattern pattern) {
return (m) -> pattern.matcher(m.getEntity()).find();
}
}