1 package org.wikimedia.search.extra.latency; 2 3 import java.io.IOException; 4 import java.util.List; 5 6 import org.elasticsearch.action.FailedNodeException; 7 import org.elasticsearch.action.support.ActionFilters; 8 import org.elasticsearch.action.support.nodes.BaseNodeRequest; 9 import org.elasticsearch.action.support.nodes.TransportNodesAction; 10 import org.elasticsearch.cluster.service.ClusterService; 11 import org.elasticsearch.common.inject.Inject; 12 import org.elasticsearch.common.io.stream.StreamInput; 13 import org.elasticsearch.common.io.stream.StreamOutput; 14 import org.elasticsearch.threadpool.ThreadPool; 15 import org.elasticsearch.transport.TransportService; 16 import org.wikimedia.search.extra.latency.LatencyStatsAction.LatencyStatsNodeResponse; 17 import org.wikimedia.search.extra.latency.LatencyStatsAction.LatencyStatsNodesRequest; 18 import org.wikimedia.search.extra.latency.LatencyStatsAction.LatencyStatsNodesResponse; 19 20 public class TransportLatencyStatsAction extends TransportNodesAction<LatencyStatsNodesRequest, 21 LatencyStatsNodesResponse, TransportLatencyStatsAction.LatencyStatsNodeRequest, 22 LatencyStatsNodeResponse> { 23 private final SearchLatencyProbe latencyProbe; 24 25 @Inject 26 public TransportLatencyStatsAction(ThreadPool threadPool, 27 ClusterService clusterService, TransportService transportService, 28 ActionFilters actionFilters, 29 SearchLatencyListener latencyProbe) { 30 super(LatencyStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, 31 LatencyStatsNodesRequest::new, LatencyStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT, 32 LatencyStatsNodeResponse.class); 33 this.latencyProbe = latencyProbe; 34 } 35 36 @Override 37 protected LatencyStatsNodesResponse newResponse(LatencyStatsNodesRequest request, List<LatencyStatsNodeResponse> responses, 38 List<FailedNodeException> failures) { 39 return new LatencyStatsNodesResponse(clusterService.getClusterName(), responses, failures); 40 } 41 42 @Override 43 protected LatencyStatsNodeRequest newNodeRequest(LatencyStatsNodesRequest nodesRequest) { 44 return new LatencyStatsNodeRequest(nodesRequest); 45 } 46 47 @Override 48 protected LatencyStatsNodeResponse newNodeResponse(StreamInput streamInput) throws IOException { 49 return new LatencyStatsNodeResponse(streamInput); 50 } 51 52 @Override 53 protected LatencyStatsNodeResponse nodeOperation(LatencyStatsNodeRequest request) { 54 return new LatencyStatsNodeResponse(clusterService.localNode()).initFromProbe(latencyProbe); 55 } 56 57 static class LatencyStatsNodeRequest extends BaseNodeRequest { 58 private final LatencyStatsNodesRequest request; 59 60 LatencyStatsNodeRequest(StreamInput in) throws IOException { 61 super(in); 62 request = new LatencyStatsNodesRequest(in); 63 } 64 65 66 LatencyStatsNodeRequest(LatencyStatsNodesRequest request) { 67 this.request = request; 68 } 69 70 @Override 71 public void writeTo(StreamOutput out) throws IOException { 72 super.writeTo(out); 73 request.writeTo(out); 74 } 75 } 76 }