UpdateElasticsearchEmitter.java

package org.wikimedia.discovery.cirrus.updater.consumer.graph;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

import javax.annotation.Nonnull;

import org.apache.flink.api.connector.sink2.SinkWriter.Context;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
import org.apache.flink.types.Row;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.wikimedia.discovery.cirrus.updater.common.model.UpdateFields;
import org.wikimedia.eventutilities.flink.formats.json.JsonRowSerializationSchema;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;

public class UpdateElasticsearchEmitter implements ElasticsearchEmitter<Row>, Serializable {

    private static final long serialVersionUID = -5524475722893532508L;

    private static final String SCRIPT_LANG = "super_detect_noop";
    private transient JsonRowSerializationSchema schema;

    private transient ObjectMapper mapper;
    private final RowTypeInfo updateTypeInfo;

    public UpdateElasticsearchEmitter(RowTypeInfo updateTypeInfo) {
        this.updateTypeInfo = updateTypeInfo;
    }

    @Override
    public void emit(Row element, Context context, RequestIndexer indexer) {
        try {
            final String operation = UpdateFields.getOperation(element);

            if (operation.equals(UpdateFields.OPERATION_UPDATE_REVISION)) {
                indexer.add(createUpdateRequest(element));
            } else if (operation.equals(UpdateFields.OPERATION_DELETE)) {
                indexer.add(createDeleteRequest(element));
            } else {
                throw new UnsupportedOperationException(
                        "Unable to emit elasticsearch command for update operation " + operation);
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to encode update fields", e);
        }
    }

    @Nonnull
    private static DeleteRequest createDeleteRequest(Row element) {
        return new DeleteRequest(
                UpdateFields.getIndexName(element), Long.toString(UpdateFields.getPageId(element)));
    }

    @Nonnull
    private UpdateRequest createUpdateRequest(Row updateEvent) throws IOException {
        final Row fields = UpdateFields.getFields(updateEvent);
        final Map<String, String> hints = UpdateFields.getNoopHints(updateEvent);
        final Map<String, Object> source = toMap(fields);

        final Script script =
                new Script(
                        ScriptType.INLINE,
                        SCRIPT_LANG,
                        SCRIPT_LANG,
                        ImmutableMap.of("source", source, "handlers", hints));

        final String index = UpdateFields.getIndexName(updateEvent);
        final String id = Long.toString(UpdateFields.getPageId(updateEvent));

        return new UpdateRequest(index, id).script(script).upsert(source).retryOnConflict(5);
    }

    private Map<String, Object> toMap(Row fields) throws IOException {
        final byte[] encodedJson = getSchema().serialize(fields);
        final ObjectMapper mapper = getMapper();
        final JsonNode decodedJson = mapper.reader().readTree(encodedJson);
        return mapper.convertValue(decodedJson, new TypeReference<Map<String, Object>>() {});
    }

    private ObjectMapper getMapper() {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        return mapper;
    }

    private JsonRowSerializationSchema getSchema() {
        if (schema == null) {
            schema = UpdateFields.buildFieldsRowSerializer(updateTypeInfo);
        }
        return schema;
    }
}