1 package org.wikimedia.search.extra.latency;
2
3 import static java.util.Collections.emptyList;
4 import static java.util.Objects.requireNonNull;
5 import static java.util.stream.Collectors.averagingDouble;
6 import static java.util.stream.Collectors.groupingBy;
7 import static java.util.stream.Collectors.toList;
8
9 import java.io.IOException;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Set;
13 import java.util.stream.Stream;
14
15 import javax.annotation.Nullable;
16
17 import org.elasticsearch.action.ActionType;
18 import org.elasticsearch.action.FailedNodeException;
19 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
20 import org.elasticsearch.action.support.nodes.BaseNodesRequest;
21 import org.elasticsearch.action.support.nodes.BaseNodesResponse;
22 import org.elasticsearch.cluster.ClusterName;
23 import org.elasticsearch.cluster.node.DiscoveryNode;
24 import org.elasticsearch.common.io.stream.StreamInput;
25 import org.elasticsearch.common.io.stream.StreamOutput;
26 import org.elasticsearch.common.io.stream.Writeable;
27 import org.elasticsearch.common.unit.TimeValue;
28 import org.elasticsearch.common.xcontent.ToXContent;
29 import org.elasticsearch.common.xcontent.XContentBuilder;
30 import org.wikimedia.search.extra.latency.SearchLatencyProbe.LatencyStat;
31
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.collect.Sets;
34
35 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
36 import lombok.AccessLevel;
37 import lombok.Getter;
38
39 public final class LatencyStatsAction extends ActionType<LatencyStatsAction.LatencyStatsNodesResponse> {
40
41 static final String NAME = "cluster:monitor/extra-latency-stats";
42 public static final LatencyStatsAction INSTANCE = new LatencyStatsAction();
43
44 private LatencyStatsAction() {
45 super(NAME, LatencyStatsNodesResponse::new);
46 }
47
48 @Override
49 public Writeable.Reader<LatencyStatsNodesResponse> getResponseReader() {
50 return LatencyStatsNodesResponse::new;
51 }
52
53 public static class LatencyStatsNodesResponse extends BaseNodesResponse<LatencyStatsNodeResponse>
54 implements ToXContent {
55
56 @Nullable
57 @VisibleForTesting
58 @Getter(AccessLevel.PACKAGE)
59 private StatDetails allNodes;
60
61 LatencyStatsNodesResponse(StreamInput in) throws IOException {
62 super(in);
63 allNodes = new StatDetails(in);
64 }
65
66 LatencyStatsNodesResponse(ClusterName clusterName, List<LatencyStatsNodeResponse> nodes, List<FailedNodeException> failures) {
67 super(clusterName, nodes, failures);
68 allNodes = new StatDetails(nodes.stream().map(n -> n.statDetails));
69 }
70
71 @Override
72 protected List<LatencyStatsNodeResponse> readNodesFrom(StreamInput in) throws IOException {
73 return in.readList(LatencyStatsNodeResponse::new);
74 }
75
76 @Override
77 protected void writeNodesTo(StreamOutput out, List<LatencyStatsNodeResponse> nodes) throws IOException {
78 out.writeList(nodes);
79 }
80
81 @Override
82 public void writeTo(StreamOutput out) throws IOException {
83 super.writeTo(out);
84 allNodes.writeTo(out);
85 }
86
87 @Override
88 public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
89 builder.field("all", allNodes);
90 builder.startObject("nodes");
91 for (LatencyStatsNodeResponse resp : super.getNodes()) {
92 builder.startObject(resp.getNode().getId());
93 builder.field("name", resp.getNode().getName());
94 builder.field("hostname", resp.getNode().getHostName());
95 builder.field("latencies", resp.statDetails);
96 builder.endObject();
97 }
98 builder.endObject();
99 return builder;
100 }
101
102 }
103
104 static class LatencyStatsNodesRequest extends BaseNodesRequest<LatencyStatsNodesRequest> {
105 LatencyStatsNodesRequest(StreamInput in) throws IOException {
106 super(in);
107 }
108
109 LatencyStatsNodesRequest(String... nodesIds) {
110 super(nodesIds);
111 }
112 }
113
114 public static class LatencyStatsNodeResponse extends BaseNodeResponse {
115 StatDetails statDetails = new StatDetails();
116
117 LatencyStatsNodeResponse(DiscoveryNode node) {
118 super(node);
119 }
120 @SuppressFBWarnings(
121 value = "PCOA_PARTIALLY_CONSTRUCTED_OBJECT_ACCESS",
122 justification = "readFrom has a well understood contract")
123 LatencyStatsNodeResponse(StreamInput in) throws IOException {
124 super(in);
125 statDetails.readFrom(in);
126 }
127
128 @Override
129 public void writeTo(StreamOutput out) throws IOException {
130 super.writeTo(out);
131 statDetails.writeTo(out);
132 }
133
134 LatencyStatsNodeResponse initFromProbe(SearchLatencyProbe latencyProbe) {
135 statDetails = new StatDetails(latencyProbe);
136 return this;
137 }
138 }
139
140 @Getter
141 public static class StatDetails implements Writeable, ToXContent {
142 private static final Set<Double> DEFAULT_LATENCIES = Sets.newHashSet(50D, 75D, 95D, 99D);
143
144 private List<LatencyStat> latencies;
145
146 StatDetails() {
147 latencies = emptyList();
148 }
149
150 StatDetails(SearchLatencyProbe probe) {
151 this(probe.getLatencyStats(DEFAULT_LATENCIES));
152 }
153
154 StatDetails(List<LatencyStat> latencies) {
155 this.latencies = requireNonNull(latencies);
156 }
157
158 StatDetails(Stream<StatDetails> details) {
159
160
161
162
163
164
165 this.latencies = details.flatMap(stat -> stat.latencies.stream())
166
167 .collect(groupingBy(LatencyStat::getBucket,
168 groupingBy(LatencyStat::getPercentile,
169 averagingDouble(stat -> stat.getLatency().nanos()))))
170
171 .entrySet().stream().flatMap(bucketEntry ->
172 bucketEntry.getValue().entrySet().stream().map(latencyEntry -> {
173 TimeValue tv = TimeValue.timeValueNanos(Math.round(latencyEntry.getValue()));
174 return new LatencyStat(bucketEntry.getKey(), latencyEntry.getKey(), tv);
175 }))
176 .collect(toList());
177 }
178
179 @SuppressFBWarnings(
180 value = "PCOA_PARTIALLY_CONSTRUCTED_OBJECT_ACCESS",
181 justification = "readFrom has a well understood contract")
182 StatDetails(StreamInput in) throws IOException {
183 latencies = requireNonNull(in.readList(LatencyStat::new));
184 }
185
186 void readFrom(StreamInput in) throws IOException {
187 latencies = requireNonNull(in.readList(LatencyStat::new));
188 }
189
190 @Override
191 public void writeTo(StreamOutput out) throws IOException {
192 out.writeList(latencies);
193 }
194
195 @Override
196 @SuppressFBWarnings("PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS")
197 public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
198 Map<String, List<LatencyStat>> byBucket = latencies.stream()
199 .collect(groupingBy(LatencyStat::getBucket));
200
201 builder.startObject();
202 for (Map.Entry<String, List<LatencyStat>> entry : byBucket.entrySet()) {
203 builder.startArray(entry.getKey());
204 for (LatencyStat stat : entry.getValue()) {
205 builder.startObject();
206 builder.field("percentile", stat.getPercentile());
207 builder.field("latencyMs", stat.getLatency().millisFrac());
208 builder.endObject();
209 }
210 builder.endArray();
211 }
212 return builder.endObject();
213 }
214 }
215 }