1 package org.wikimedia.search.extra.superdetectnoop;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.LinkedHashMap;
7 import java.util.Locale;
8 import java.util.Map;
9 import java.util.Objects;
10 import java.util.Set;
11
12 import javax.annotation.Nullable;
13
14 import org.elasticsearch.index.mapper.SourceFieldMapper;
15 import org.elasticsearch.script.ScriptContext;
16 import org.elasticsearch.script.ScriptEngine;
17 import org.elasticsearch.script.UpdateScript;
18
19
20
21
22
23 public class SuperDetectNoopScript extends UpdateScript {
24
25 public static class SuperNoopScriptEngineService implements ScriptEngine {
26 private final Set<ChangeHandler.Recognizer> changeHandlerRecognizers;
27
28 public SuperNoopScriptEngineService(Set<ChangeHandler.Recognizer> changeHandlerRecognizers) {
29 this.changeHandlerRecognizers = changeHandlerRecognizers;
30 }
31
32 @Override
33 public String getType() {
34 return "super_detect_noop";
35 }
36
37 @Override
38 public <T> T compile(String scriptName, String scriptSource, ScriptContext<T> context, Map<String, String> map) {
39 if (!"update".equals(context.name)) {
40 throw new IllegalArgumentException("Unsuppored context [" + context.name + "], " +
41 "super_detect_noop only supports the [update] context");
42 }
43 return context.factoryClazz.cast((UpdateScript.Factory) (params, ctx) -> new SuperDetectNoopScript(params, ctx, this));
44 }
45
46 @Override
47 public void close() throws IOException {
48 }
49
50 @Override
51 public Set<ScriptContext<?>> getSupportedContexts() {
52 return Collections.singleton(UpdateScript.CONTEXT);
53 }
54
55 protected Map<String, ChangeHandler<Object>> handlers(Map<String, Object> params) {
56 @SuppressWarnings("unchecked")
57 Map<String, String> detectorConfigs = (Map<String, String>) params.get("handlers");
58 if (detectorConfigs == null) {
59 return Collections.emptyMap();
60 }
61 Map<String, ChangeHandler<Object>> handlers = new HashMap<>();
62 for (Map.Entry<String, String> detectorConfig : detectorConfigs.entrySet()) {
63 handlers.put(detectorConfig.getKey(), handler(detectorConfig.getValue()));
64 }
65 return Collections.unmodifiableMap(handlers);
66 }
67
68 protected ChangeHandler<Object> handler(String config) {
69 for (ChangeHandler.Recognizer factory : changeHandlerRecognizers) {
70 ChangeHandler<Object> detector = factory.build(config);
71 if (detector != null) {
72 return detector;
73 }
74 }
75 throw new IllegalArgumentException("Don't recognize this type of change handler: " + config);
76 }
77 }
78
79 private final Map<String, Object> source;
80 private final Map<String, ChangeHandler<Object>> pathToHandler;
81
82 public SuperDetectNoopScript(Map<String, Object> params, Map<String, Object> ctx, SuperNoopScriptEngineService service) {
83 super(params, ctx);
84 @SuppressWarnings("unchecked")
85 Map<String, Object> source = (Map<String, Object>) Objects.requireNonNull(params.get("source"), "source must be specified");
86 this.source = source;
87 this.pathToHandler = service.handlers(params);
88 }
89
90 @Override
91 public void execute() {
92 @SuppressWarnings("unchecked")
93 Map<String, Object> oldSource = (Map<String, Object>) super.getCtx().get(SourceFieldMapper.NAME);
94 UpdateStatus changed = update(oldSource, source, "");
95 if (changed != UpdateStatus.UPDATED) {
96 super.getCtx().put("op", "none");
97 }
98 }
99
100 private enum UpdateStatus {
101 NOT_UPDATED, UPDATED, NOOP_DOCUMENT;
102
103
104
105
106 UpdateStatus merge(UpdateStatus other) {
107 return this.compareTo(other) >= 0 ? this : other;
108 }
109 }
110
111 private static void applyUpdate(Map<String, Object> source, String key, @Nullable Object value) {
112 if (value == null) {
113 source.remove(key);
114 } else {
115 source.put(key, value);
116 }
117 }
118
119
120
121
122 UpdateStatus update(Map<String, Object> oldSource, Map<String, Object> newSource, String path) {
123 UpdateStatus modified = UpdateStatus.NOT_UPDATED;
124 for (Map.Entry<String, Object> newEntry : newSource.entrySet()) {
125 String key = newEntry.getKey();
126 String entryPath = path + key;
127 ChangeHandler<Object> handler = pathToHandler.get(entryPath);
128 if (handler == null) {
129 Object newValueRaw = newEntry.getValue();
130 if (newValueRaw instanceof Map) {
131
132
133
134
135 @SuppressWarnings("unchecked")
136 Map<String, Object> oldValue = (Map<String, Object>)oldSource.computeIfAbsent(
137 key, x -> new LinkedHashMap<String, Object>());
138 @SuppressWarnings("unchecked")
139 Map<String, Object> newValue = (Map<String, Object>)newValueRaw;
140 modified = modified.merge(update(oldValue, newValue, entryPath + "."));
141 if (modified == UpdateStatus.NOOP_DOCUMENT) {
142 return modified;
143 }
144 continue;
145 } else {
146 handler = ChangeHandler.Equal.INSTANCE;
147 }
148 }
149 ChangeHandler.Result result;
150 try {
151 result = handler.handle(oldSource.get(key), newEntry.getValue());
152 } catch (IllegalArgumentException e) {
153 throw new IllegalArgumentException(String.format(Locale.ROOT,
154 "Failed updating document property %s", entryPath), e);
155 }
156 if (result.isDocumentNooped()) {
157 return UpdateStatus.NOOP_DOCUMENT;
158 }
159 if (result.isCloseEnough()) {
160 continue;
161 }
162 applyUpdate(oldSource, key, result.newValue());
163 modified = UpdateStatus.UPDATED;
164 }
165
166
167
168
169
170 return modified;
171 }
172 }