UpdateElasticsearchEmitter.java
package org.wikimedia.discovery.cirrus.updater.consumer.graph;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
import org.apache.flink.types.Row;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.wikimedia.discovery.cirrus.updater.common.model.UpdateFields;
import org.wikimedia.eventutilities.flink.formats.json.JsonRowSerializationSchema;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
public class UpdateElasticsearchEmitter implements ElasticsearchEmitter<Row>, Serializable {
private static final long serialVersionUID = -5524475722893532508L;
private static final String SCRIPT_LANG = "super_detect_noop";
private transient JsonRowSerializationSchema schema;
private transient ObjectMapper mapper;
private final RowTypeInfo updateTypeInfo;
public UpdateElasticsearchEmitter(RowTypeInfo updateTypeInfo) {
this.updateTypeInfo = updateTypeInfo;
}
@Override
public void emit(Row element, Context context, RequestIndexer indexer) {
try {
final String operation = UpdateFields.getOperation(element);
if (operation.equals(UpdateFields.OPERATION_UPDATE_REVISION)) {
indexer.add(createUpdateRequest(element));
} else if (operation.equals(UpdateFields.OPERATION_DELETE)) {
indexer.add(createDeleteRequest(element));
} else {
throw new UnsupportedOperationException(
"Unable to emit elasticsearch command for update operation " + operation);
}
} catch (IOException e) {
throw new RuntimeException("Failed to encode update fields", e);
}
}
@Nonnull
private static DeleteRequest createDeleteRequest(Row element) {
return new DeleteRequest(
UpdateFields.getIndexName(element), Long.toString(UpdateFields.getPageId(element)));
}
@Nonnull
private UpdateRequest createUpdateRequest(Row updateEvent) throws IOException {
final Row fields = UpdateFields.getFields(updateEvent);
final Map<String, String> hints = UpdateFields.getNoopHints(updateEvent);
final Map<String, Object> source = toMap(fields);
final Script script =
new Script(
ScriptType.INLINE,
SCRIPT_LANG,
SCRIPT_LANG,
ImmutableMap.of("source", source, "handlers", hints));
final String index = UpdateFields.getIndexName(updateEvent);
final String id = Long.toString(UpdateFields.getPageId(updateEvent));
return new UpdateRequest(index, id).script(script).upsert(source).retryOnConflict(5);
}
private Map<String, Object> toMap(Row fields) throws IOException {
final byte[] encodedJson = getSchema().serialize(fields);
final ObjectMapper mapper = getMapper();
final JsonNode decodedJson = mapper.reader().readTree(encodedJson);
return mapper.convertValue(decodedJson, new TypeReference<Map<String, Object>>() {});
}
private ObjectMapper getMapper() {
if (mapper == null) {
mapper = new ObjectMapper();
}
return mapper;
}
private JsonRowSerializationSchema getSchema() {
if (schema == null) {
schema = UpdateFields.buildFieldsRowSerializer(updateTypeInfo);
}
return schema;
}
}