KafkaStreamConsumerMetricsListener.java
package org.wikidata.query.rdf.updater.consumer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
public class KafkaStreamConsumerMetricsListener {
private final Counter triplesAccum;
private final Counter invalidDuplicates;
private final Counter triplesOffered;
private final Counter deletedEntities;
private final AtomicLong lag = new AtomicLong();
private final Clock clock;
KafkaStreamConsumerMetricsListener(MetricRegistry registry, Clock clock) {
triplesAccum = registry.counter("kafka-stream-consumer-triples-accumulated");
invalidDuplicates = registry.counter("kafka-stream-consumer-triples-invalid-duplicates");
triplesOffered = registry.counter("kafka-stream-consumer-triples-offered");
deletedEntities = registry.counter("kafka-stream-consumer-deleted-entities");
registry.gauge("kafka-stream-consumer-lag", () -> lag::get);
this.clock = clock;
}
public static KafkaStreamConsumerMetricsListener forRegistry(MetricRegistry registry) {
return new KafkaStreamConsumerMetricsListener(registry, Clock.systemUTC());
}
void triplesAccum(int triples) {
triplesAccum.inc(triples);
}
void invalidDuplicates(int triples) {
invalidDuplicates.inc(triples);
}
void triplesOffered(int triples) {
triplesOffered.inc(triples);
}
void deletedEntities(int deletes) {
deletedEntities.inc(deletes);
}
void lag(Instant lastEventTime) {
lag.set(Duration.between(lastEventTime, clock.instant()).toMillis());
}
}