View Javadoc
1   package org.wikimedia.search.extra.superdetectnoop;
2   
3   import java.io.IOException;
4   import java.util.Collections;
5   import java.util.HashMap;
6   import java.util.LinkedHashMap;
7   import java.util.Locale;
8   import java.util.Map;
9   import java.util.Objects;
10  import java.util.Set;
11  
12  import javax.annotation.Nullable;
13  
14  import org.elasticsearch.index.mapper.SourceFieldMapper;
15  import org.elasticsearch.script.ScriptContext;
16  import org.elasticsearch.script.ScriptEngine;
17  import org.elasticsearch.script.UpdateScript;
18  
19  /**
20   * Like the detect_noop option on updates but with pluggable "close enough"
21   * detectors! So much power!
22   */
23  public class SuperDetectNoopScript extends UpdateScript {
24  
25      public static class SuperNoopScriptEngineService implements ScriptEngine {
26          private final Set<ChangeHandler.Recognizer> changeHandlerRecognizers;
27  
28          public SuperNoopScriptEngineService(Set<ChangeHandler.Recognizer> changeHandlerRecognizers) {
29              this.changeHandlerRecognizers = changeHandlerRecognizers;
30          }
31  
32          @Override
33          public String getType() {
34              return "super_detect_noop";
35          }
36  
37          @Override
38          public <T> T compile(String scriptName, String scriptSource, ScriptContext<T> context, Map<String, String> map) {
39              if (!"update".equals(context.name)) {
40                  throw new IllegalArgumentException("Unsuppored context [" + context.name + "], " +
41                          "super_detect_noop only supports the [update] context");
42              }
43              return context.factoryClazz.cast((UpdateScript.Factory) (params, ctx) -> new SuperDetectNoopScript(params, ctx, this));
44          }
45  
46          @Override
47          public void close() throws IOException {
48          }
49  
50          @Override
51          public Set<ScriptContext<?>> getSupportedContexts() {
52              return Collections.singleton(UpdateScript.CONTEXT);
53          }
54  
55          protected Map<String, ChangeHandler<Object>> handlers(Map<String, Object> params) {
56              @SuppressWarnings("unchecked")
57              Map<String, String> detectorConfigs = (Map<String, String>) params.get("handlers");
58              if (detectorConfigs == null) {
59                  return Collections.emptyMap();
60              }
61              Map<String, ChangeHandler<Object>> handlers = new HashMap<>();
62              for (Map.Entry<String, String> detectorConfig : detectorConfigs.entrySet()) {
63                  handlers.put(detectorConfig.getKey(), handler(detectorConfig.getValue()));
64              }
65              return Collections.unmodifiableMap(handlers);
66          }
67  
68          protected ChangeHandler<Object> handler(String config) {
69              for (ChangeHandler.Recognizer factory : changeHandlerRecognizers) {
70                  ChangeHandler<Object> detector = factory.build(config);
71                  if (detector != null) {
72                      return detector;
73                  }
74              }
75              throw new IllegalArgumentException("Don't recognize this type of change handler:  " + config);
76          }
77      }
78  
79      private final Map<String, Object> source;
80      private final Map<String, ChangeHandler<Object>> pathToHandler;
81  
82      public SuperDetectNoopScript(Map<String, Object> params, Map<String, Object> ctx, SuperNoopScriptEngineService service) {
83          super(params, ctx);
84          @SuppressWarnings("unchecked")
85          Map<String, Object> source = (Map<String, Object>) Objects.requireNonNull(params.get("source"), "source must be specified");
86          this.source = source;
87          this.pathToHandler = service.handlers(params);
88      }
89  
90      @Override
91      public void execute() {
92          @SuppressWarnings("unchecked")
93          Map<String, Object> oldSource = (Map<String, Object>) super.getCtx().get(SourceFieldMapper.NAME);
94          UpdateStatus changed = update(oldSource, source, "");
95          if (changed != UpdateStatus.UPDATED) {
96              super.getCtx().put("op", "none");
97          }
98      }
99  
100     private enum UpdateStatus {
101         NOT_UPDATED, UPDATED, NOOP_DOCUMENT;
102 
103         /**
104          * Return highest priority status.
105          */
106         UpdateStatus merge(UpdateStatus other) {
107             return this.compareTo(other) >= 0 ? this : other;
108         }
109     }
110 
111     private static void applyUpdate(Map<String, Object> source, String key, @Nullable Object value) {
112         if (value == null) {
113             source.remove(key);
114         } else {
115             source.put(key, value);
116         }
117     }
118 
119     /**
120      * Update old with the source and detector configuration of this script.
121      */
122     UpdateStatus update(Map<String, Object> oldSource, Map<String, Object> newSource, String path) {
123         UpdateStatus modified = UpdateStatus.NOT_UPDATED;
124         for (Map.Entry<String, Object> newEntry : newSource.entrySet()) {
125             String key = newEntry.getKey();
126             String entryPath = path + key;
127             ChangeHandler<Object> handler = pathToHandler.get(entryPath);
128             if (handler == null) {
129                 Object newValueRaw = newEntry.getValue();
130                 if (newValueRaw instanceof Map) {
131                     // Apply this::update recursively when provided a map as the value to
132                     // update to and no handler is defined. Boldly assume (i.e. fail if not)
133                     // that if the update is a map, the source document must be either empty
134                     // or a json map.
135                     @SuppressWarnings("unchecked")
136                     Map<String, Object> oldValue = (Map<String, Object>)oldSource.computeIfAbsent(
137                             key, x -> new LinkedHashMap<String, Object>());
138                     @SuppressWarnings("unchecked")
139                     Map<String, Object> newValue = (Map<String, Object>)newValueRaw;
140                     modified = modified.merge(update(oldValue, newValue, entryPath + "."));
141                     if (modified == UpdateStatus.NOOP_DOCUMENT) {
142                         return modified;
143                     }
144                     continue;
145                 } else {
146                     handler = ChangeHandler.Equal.INSTANCE;
147                 }
148             }
149             ChangeHandler.Result result;
150             try {
151                 result = handler.handle(oldSource.get(key), newEntry.getValue());
152             } catch (IllegalArgumentException e) {
153                 throw new IllegalArgumentException(String.format(Locale.ROOT,
154                         "Failed updating document property %s", entryPath), e);
155             }
156             if (result.isDocumentNooped()) {
157                 return UpdateStatus.NOOP_DOCUMENT;
158             }
159             if (result.isCloseEnough()) {
160                 continue;
161             }
162             applyUpdate(oldSource, key, result.newValue());
163             modified = UpdateStatus.UPDATED;
164         }
165         /*
166          * Right now if a field isn't in the source passed to the script the
167          * change handlers never get a chance to look at it - the field is never
168          * changed.
169          */
170         return modified;
171     }
172 }