1 package org.wikimedia.search.extra.latency;
2
3 import static java.util.stream.Collectors.toList;
4
5 import java.util.ArrayList;
6 import java.util.Collections;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Optional;
10 import java.util.Set;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ConcurrentMap;
13 import java.util.function.Supplier;
14
15 import javax.annotation.Nullable;
16
17 import org.HdrHistogram.Histogram;
18 import org.HdrHistogram.Recorder;
19 import org.elasticsearch.common.component.AbstractLifecycleComponent;
20 import org.elasticsearch.common.unit.TimeValue;
21 import org.elasticsearch.index.shard.SearchOperationListener;
22 import org.elasticsearch.search.internal.SearchContext;
23 import org.elasticsearch.threadpool.ThreadPool;
24
25 import com.google.common.annotations.VisibleForTesting;
26
27 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
28
29 public class SearchLatencyListener extends AbstractLifecycleComponent implements SearchOperationListener, SearchLatencyProbe {
30
31
32
33 private static final TimeValue ROTATION_DELAY = TimeValue.timeValueSeconds(5);
34 @VisibleForTesting
35 static final int NUM_ROLLING_HISTOGRAMS = (int) (TimeValue.timeValueMinutes(1).millis() / ROTATION_DELAY.millis());
36 private static final TimeValue HIGHEST_TRACKABLE_VALUE = TimeValue.timeValueMinutes(5);
37 private static final TimeValue LOWEST_DISCERNABLE_VALUE = TimeValue.timeValueMillis(1);
38 static final int SIGNIFICANT_DIGITS = 2;
39
40 private final ConcurrentMap<String, RollingHistogram> statBuckets;
41 private final Supplier<ThreadPool> threadPoolSupplier;
42 @Nullable private ThreadPool.Cancellable cancelRotation;
43
44 public SearchLatencyListener(Supplier<ThreadPool> threadPoolSupplier) {
45 super();
46 this.threadPoolSupplier = threadPoolSupplier;
47 statBuckets = new ConcurrentHashMap<>();
48 }
49
50 @Override
51 protected void doStart() {
52 if (cancelRotation == null) {
53 cancelRotation = threadPoolSupplier.get().scheduleWithFixedDelay(this::rotate, ROTATION_DELAY, ThreadPool.Names.GENERIC);
54 }
55 }
56
57 @Override
58 protected void doStop() {
59 if (cancelRotation != null) {
60 cancelRotation.cancel();
61 cancelRotation = null;
62 }
63 }
64
65 @Override
66 protected void doClose() {
67
68 }
69
70 private Optional<RollingHistogram> getBucket(String name) {
71 return Optional.ofNullable(statBuckets.get(name));
72 }
73
74 private RollingHistogram getOrAddBucket(String name) {
75 return statBuckets.computeIfAbsent(name, n -> new RollingHistogram());
76 }
77
78 public long getMillisAtPercentile(String bucket, double percentile) {
79 return getBucket(bucket).map(hist -> Math.round(hist.getMillisAtPercentile(percentile))).orElse(0L);
80 }
81
82 public List<LatencyStat> getLatencyStats(Set<Double> percentiles) {
83 return statBuckets.entrySet().stream()
84 .flatMap(entry -> percentiles.stream().map(percentile -> {
85 TimeValue tv = entry.getValue().getTimeValueAtPercentile(percentile);
86 return new LatencyStat(entry.getKey(), percentile, tv);
87 }))
88 .collect(toList());
89 }
90
91 @SuppressFBWarnings({"PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS", "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
92 @Override
93 public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
94 if (searchContext.groupStats() == null) {
95 return;
96 }
97 if (tookInNanos > HIGHEST_TRACKABLE_VALUE.nanos()) {
98
99 tookInNanos = HIGHEST_TRACKABLE_VALUE.nanos();
100 }
101 for (String statBucket : searchContext.groupStats()) {
102 getOrAddBucket(statBucket).recordValue(tookInNanos);
103 }
104 }
105
106 @VisibleForTesting
107 void rotate() {
108 Iterator<RollingHistogram> iter = statBuckets.values().iterator();
109 while (iter.hasNext()) {
110 RollingHistogram hist = iter.next();
111 hist.rotate();
112 if (hist.isEmpty()) {
113 iter.remove();
114 }
115 }
116 }
117
118 private static class RollingHistogram {
119 private final Histogram current;
120 private final List<Histogram> list;
121 private final Recorder recorder;
122
123 RollingHistogram() {
124 current = new Histogram(LOWEST_DISCERNABLE_VALUE.nanos(), HIGHEST_TRACKABLE_VALUE.nanos(), SIGNIFICANT_DIGITS);
125 list = new ArrayList<>();
126 recorder = new Recorder(LOWEST_DISCERNABLE_VALUE.nanos(), HIGHEST_TRACKABLE_VALUE.nanos(), SIGNIFICANT_DIGITS);
127 }
128
129
130 void recordValue(long tookInNanos) {
131 recorder.recordValue(tookInNanos);
132 }
133
134 synchronized void rotate() {
135 Histogram hist;
136 if (list.size() < NUM_ROLLING_HISTOGRAMS) {
137 hist = recorder.getIntervalHistogram();
138 list.add(0, hist);
139 } else {
140 Collections.rotate(list, 1);
141 hist = list.get(0);
142 current.subtract(hist);
143 recorder.getIntervalHistogramInto(hist);
144 }
145 current.add(hist);
146 }
147
148 synchronized double getMillisAtPercentile(double percentile) {
149 return current.getValueAtPercentile(percentile) / ((double)TimeValue.NSEC_PER_MSEC);
150 }
151
152 synchronized TimeValue getTimeValueAtPercentile(double percentile) {
153 double nanos = current.getValueAtPercentile(percentile);
154 return TimeValue.timeValueNanos(Math.round(nanos));
155 }
156
157 synchronized boolean isEmpty() {
158 return current.getTotalCount() == 0;
159 }
160
161 }
162 }