View Javadoc
1   package org.wikimedia.search.extra.latency;
2   
3   import static java.util.stream.Collectors.toList;
4   
5   import java.util.ArrayList;
6   import java.util.Collections;
7   import java.util.Iterator;
8   import java.util.List;
9   import java.util.Optional;
10  import java.util.Set;
11  import java.util.concurrent.ConcurrentHashMap;
12  import java.util.concurrent.ConcurrentMap;
13  import java.util.function.Supplier;
14  
15  import javax.annotation.Nullable;
16  
17  import org.HdrHistogram.Histogram;
18  import org.HdrHistogram.Recorder;
19  import org.elasticsearch.common.component.AbstractLifecycleComponent;
20  import org.elasticsearch.common.unit.TimeValue;
21  import org.elasticsearch.index.shard.SearchOperationListener;
22  import org.elasticsearch.search.internal.SearchContext;
23  import org.elasticsearch.threadpool.ThreadPool;
24  
25  import com.google.common.annotations.VisibleForTesting;
26  
27  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
28  
29  public class SearchLatencyListener extends AbstractLifecycleComponent implements SearchOperationListener, SearchLatencyProbe {
30      // Keep a rolling histogram over the last minute with 5 second rotation. This allows
31      // latencies to represent the last minute worth of activity, but respond to changes in
32      // latency at 5 second intervals.
33      private static final TimeValue ROTATION_DELAY = TimeValue.timeValueSeconds(5);
34      @VisibleForTesting
35      static final int NUM_ROLLING_HISTOGRAMS = (int) (TimeValue.timeValueMinutes(1).millis() / ROTATION_DELAY.millis());
36      private static final TimeValue HIGHEST_TRACKABLE_VALUE = TimeValue.timeValueMinutes(5);
37      private static final TimeValue LOWEST_DISCERNABLE_VALUE = TimeValue.timeValueMillis(1);
38      static final int SIGNIFICANT_DIGITS = 2;
39  
40      private final ConcurrentMap<String, RollingHistogram> statBuckets;
41      private final Supplier<ThreadPool> threadPoolSupplier;
42      @Nullable private ThreadPool.Cancellable cancelRotation;
43  
44      public SearchLatencyListener(Supplier<ThreadPool> threadPoolSupplier) {
45          super();
46          this.threadPoolSupplier = threadPoolSupplier;
47          statBuckets = new ConcurrentHashMap<>();
48      }
49  
50      @Override
51      protected void doStart() {
52          if (cancelRotation == null) {
53              cancelRotation = threadPoolSupplier.get().scheduleWithFixedDelay(this::rotate, ROTATION_DELAY, ThreadPool.Names.GENERIC);
54          }
55      }
56  
57      @Override
58      protected void doStop() {
59          if (cancelRotation != null) {
60              cancelRotation.cancel();
61              cancelRotation = null;
62          }
63      }
64  
65      @Override
66      protected void doClose() {
67          // Should we do anything for final shutdown? Clear all the data?
68      }
69  
70      private Optional<RollingHistogram> getBucket(String name) {
71          return Optional.ofNullable(statBuckets.get(name));
72      }
73  
74      private RollingHistogram getOrAddBucket(String name) {
75          return statBuckets.computeIfAbsent(name, n -> new RollingHistogram());
76      }
77  
78      public long getMillisAtPercentile(String bucket, double percentile) {
79          return getBucket(bucket).map(hist -> Math.round(hist.getMillisAtPercentile(percentile))).orElse(0L);
80      }
81  
82      public List<LatencyStat> getLatencyStats(Set<Double> percentiles) {
83          return statBuckets.entrySet().stream()
84                  .flatMap(entry -> percentiles.stream().map(percentile -> {
85                      TimeValue tv = entry.getValue().getTimeValueAtPercentile(percentile);
86                      return new LatencyStat(entry.getKey(), percentile, tv);
87                  }))
88                  .collect(toList());
89      }
90  
91      @SuppressFBWarnings({"PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS", "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
92      @Override
93      public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
94          if (searchContext.groupStats() == null) {
95              return;
96          }
97          if (tookInNanos > HIGHEST_TRACKABLE_VALUE.nanos()) {
98              // While this is a bit of a lie, it's probably better than not adding anything.
99              tookInNanos = HIGHEST_TRACKABLE_VALUE.nanos();
100         }
101         for (String statBucket : searchContext.groupStats()) {
102             getOrAddBucket(statBucket).recordValue(tookInNanos);
103         }
104     }
105 
106     @VisibleForTesting
107     void rotate() {
108         Iterator<RollingHistogram> iter = statBuckets.values().iterator();
109         while (iter.hasNext()) {
110             RollingHistogram hist = iter.next();
111             hist.rotate();
112             if (hist.isEmpty()) {
113                 iter.remove();
114             }
115         }
116     }
117 
118     private static class RollingHistogram {
119         private final Histogram current;
120         private final List<Histogram> list;
121         private final Recorder recorder;
122 
123         RollingHistogram() {
124             current = new Histogram(LOWEST_DISCERNABLE_VALUE.nanos(), HIGHEST_TRACKABLE_VALUE.nanos(), SIGNIFICANT_DIGITS);
125             list = new ArrayList<>();
126             recorder = new Recorder(LOWEST_DISCERNABLE_VALUE.nanos(), HIGHEST_TRACKABLE_VALUE.nanos(), SIGNIFICANT_DIGITS);
127         }
128 
129         // Recorder is explicitly thread safe and requires no synchronization
130         void recordValue(long tookInNanos) {
131             recorder.recordValue(tookInNanos);
132         }
133 
134         synchronized void rotate() {
135             Histogram hist;
136             if (list.size() < NUM_ROLLING_HISTOGRAMS) {
137                 hist = recorder.getIntervalHistogram();
138                 list.add(0, hist);
139             } else {
140                 Collections.rotate(list, 1);
141                 hist = list.get(0);
142                 current.subtract(hist);
143                 recorder.getIntervalHistogramInto(hist);
144             }
145             current.add(hist);
146         }
147 
148         synchronized double getMillisAtPercentile(double percentile) {
149             return current.getValueAtPercentile(percentile) / ((double)TimeValue.NSEC_PER_MSEC);
150         }
151 
152         synchronized TimeValue getTimeValueAtPercentile(double percentile) {
153             double nanos = current.getValueAtPercentile(percentile);
154             return TimeValue.timeValueNanos(Math.round(nanos));
155         }
156 
157         synchronized boolean isEmpty() {
158             return current.getTotalCount() == 0;
159         }
160 
161     }
162 }