EsBulkEncoder.java

package org.wikimedia.search.glent;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;


/**
 * Encodes rows into an elasticsearch bulk create statement.
 * No sanity checking is performed, rows must be reasonable elasticsearch docs.
 */
public final class EsBulkEncoder {

    private static final String BULK_HEADER = "{\"index\":{}}\n";

    private EsBulkEncoder() {
    }

    public static Dataset<String> encode(Dataset<Row> df) {
        return df.toJSON()
            .map((MapFunction<String, String>) doc -> BULK_HEADER + doc, Encoders.STRING());
    }
}