View Javadoc
1   package org.wikimedia.search.extra.latency;
2   
3   import static java.util.Collections.emptyList;
4   import static java.util.Objects.requireNonNull;
5   import static java.util.stream.Collectors.averagingDouble;
6   import static java.util.stream.Collectors.groupingBy;
7   import static java.util.stream.Collectors.toList;
8   
9   import java.io.IOException;
10  import java.util.List;
11  import java.util.Map;
12  import java.util.Set;
13  import java.util.stream.Stream;
14  
15  import javax.annotation.Nullable;
16  
17  import org.elasticsearch.action.ActionType;
18  import org.elasticsearch.action.FailedNodeException;
19  import org.elasticsearch.action.support.nodes.BaseNodeResponse;
20  import org.elasticsearch.action.support.nodes.BaseNodesRequest;
21  import org.elasticsearch.action.support.nodes.BaseNodesResponse;
22  import org.elasticsearch.cluster.ClusterName;
23  import org.elasticsearch.cluster.node.DiscoveryNode;
24  import org.elasticsearch.common.io.stream.StreamInput;
25  import org.elasticsearch.common.io.stream.StreamOutput;
26  import org.elasticsearch.common.io.stream.Writeable;
27  import org.elasticsearch.common.unit.TimeValue;
28  import org.elasticsearch.common.xcontent.ToXContent;
29  import org.elasticsearch.common.xcontent.XContentBuilder;
30  import org.wikimedia.search.extra.latency.SearchLatencyProbe.LatencyStat;
31  
32  import com.google.common.annotations.VisibleForTesting;
33  import com.google.common.collect.Sets;
34  
35  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
36  import lombok.AccessLevel;
37  import lombok.Getter;
38  
39  public final class LatencyStatsAction extends ActionType<LatencyStatsAction.LatencyStatsNodesResponse> {
40  
41      static final String NAME = "cluster:monitor/extra-latency-stats";
42      public static final LatencyStatsAction INSTANCE = new LatencyStatsAction();
43  
44      private LatencyStatsAction() {
45          super(NAME, LatencyStatsNodesResponse::new);
46      }
47  
48      @Override
49      public Writeable.Reader<LatencyStatsNodesResponse> getResponseReader() {
50          return LatencyStatsNodesResponse::new;
51      }
52  
53      public static class LatencyStatsNodesResponse extends BaseNodesResponse<LatencyStatsNodeResponse>
54              implements ToXContent {
55  
56          @Nullable
57          @VisibleForTesting
58          @Getter(AccessLevel.PACKAGE)
59          private StatDetails allNodes;
60  
61          LatencyStatsNodesResponse(StreamInput in) throws IOException {
62              super(in);
63              allNodes = new StatDetails(in);
64          }
65  
66          LatencyStatsNodesResponse(ClusterName clusterName, List<LatencyStatsNodeResponse> nodes, List<FailedNodeException> failures) {
67              super(clusterName, nodes, failures);
68              allNodes = new StatDetails(nodes.stream().map(n -> n.statDetails));
69          }
70  
71          @Override
72          protected List<LatencyStatsNodeResponse> readNodesFrom(StreamInput in) throws IOException {
73              return in.readList(LatencyStatsNodeResponse::new);
74          }
75  
76          @Override
77          protected void writeNodesTo(StreamOutput out, List<LatencyStatsNodeResponse> nodes) throws IOException {
78              out.writeList(nodes);
79          }
80  
81          @Override
82          public void writeTo(StreamOutput out) throws IOException {
83              super.writeTo(out);
84              allNodes.writeTo(out);
85          }
86  
87          @Override
88          public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
89              builder.field("all", allNodes);
90              builder.startObject("nodes");
91              for (LatencyStatsNodeResponse resp : super.getNodes()) {
92                  builder.startObject(resp.getNode().getId());
93                  builder.field("name", resp.getNode().getName());
94                  builder.field("hostname", resp.getNode().getHostName());
95                  builder.field("latencies", resp.statDetails);
96                  builder.endObject();
97              }
98              builder.endObject();
99              return builder;
100         }
101 
102     }
103 
104     static class LatencyStatsNodesRequest extends BaseNodesRequest<LatencyStatsNodesRequest> {
105         LatencyStatsNodesRequest(StreamInput in) throws IOException {
106             super(in);
107         }
108 
109         LatencyStatsNodesRequest(String... nodesIds) {
110             super(nodesIds);
111         }
112     }
113 
114     public static class LatencyStatsNodeResponse extends BaseNodeResponse {
115         StatDetails statDetails = new StatDetails();
116 
117         LatencyStatsNodeResponse(DiscoveryNode node) {
118             super(node);
119         }
120         @SuppressFBWarnings(
121                 value = "PCOA_PARTIALLY_CONSTRUCTED_OBJECT_ACCESS",
122                 justification = "readFrom has a well understood contract")
123         LatencyStatsNodeResponse(StreamInput in) throws IOException {
124             super(in);
125             statDetails.readFrom(in);
126         }
127 
128         @Override
129         public void writeTo(StreamOutput out) throws IOException {
130             super.writeTo(out);
131             statDetails.writeTo(out);
132         }
133 
134         LatencyStatsNodeResponse initFromProbe(SearchLatencyProbe latencyProbe) {
135             statDetails = new StatDetails(latencyProbe);
136             return this;
137         }
138     }
139 
140     @Getter
141     public static class StatDetails implements Writeable, ToXContent {
142         private static final Set<Double> DEFAULT_LATENCIES = Sets.newHashSet(50D, 75D, 95D, 99D);
143 
144         private List<LatencyStat> latencies;
145 
146         StatDetails() {
147             latencies = emptyList();
148         }
149 
150         StatDetails(SearchLatencyProbe probe) {
151             this(probe.getLatencyStats(DEFAULT_LATENCIES));
152         }
153 
154         StatDetails(List<LatencyStat> latencies) {
155             this.latencies = requireNonNull(latencies);
156         }
157 
158         StatDetails(Stream<StatDetails> details) {
159             // Note that the results of this are NOT percentiles anymore. The results
160             // are the average percentile across nodes. Imagine, for example, averaging
161             // the minimum (0) percentile or the maximum (100) percentile. After averaging
162             // they are not the the minimum or maximum latency across the cluster, they
163             // are instead the average per-node minimum or maximum. We could get real
164             // percentiles by shipping around the full histograms, but that seems unnecessary.
165             this.latencies = details.flatMap(stat -> stat.latencies.stream())
166                     // Group things up so we have Map<Bucket, Map<Percentile, AvgNodeLatency>>
167                     .collect(groupingBy(LatencyStat::getBucket,
168                             groupingBy(LatencyStat::getPercentile,
169                                     averagingDouble(stat -> stat.getLatency().nanos()))))
170                     // Flatten it back out into List<LatencyStat>
171                     .entrySet().stream().flatMap(bucketEntry ->
172                             bucketEntry.getValue().entrySet().stream().map(latencyEntry -> {
173                                 TimeValue tv = TimeValue.timeValueNanos(Math.round(latencyEntry.getValue()));
174                                 return new LatencyStat(bucketEntry.getKey(), latencyEntry.getKey(), tv);
175                             }))
176                     .collect(toList());
177         }
178 
179         @SuppressFBWarnings(
180                 value = "PCOA_PARTIALLY_CONSTRUCTED_OBJECT_ACCESS",
181                 justification = "readFrom has a well understood contract")
182         StatDetails(StreamInput in) throws IOException {
183             latencies = requireNonNull(in.readList(LatencyStat::new));
184         }
185 
186         void readFrom(StreamInput in) throws IOException {
187             latencies = requireNonNull(in.readList(LatencyStat::new));
188         }
189 
190         @Override
191         public void writeTo(StreamOutput out) throws IOException {
192             out.writeList(latencies);
193         }
194 
195         @Override
196         @SuppressFBWarnings("PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS")
197         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
198             Map<String, List<LatencyStat>> byBucket = latencies.stream()
199                     .collect(groupingBy(LatencyStat::getBucket));
200 
201             builder.startObject();
202             for (Map.Entry<String, List<LatencyStat>> entry : byBucket.entrySet()) {
203                 builder.startArray(entry.getKey());
204                 for (LatencyStat stat : entry.getValue()) {
205                     builder.startObject();
206                     builder.field("percentile", stat.getPercentile());
207                     builder.field("latencyMs", stat.getLatency().millisFrac());
208                     builder.endObject();
209                 }
210                 builder.endArray();
211             }
212             return builder.endObject();
213         }
214     }
215 }