EsBulkEncoder.java
package org.wikimedia.search.glent;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
/**
* Encodes rows into an elasticsearch bulk create statement.
* No sanity checking is performed, rows must be reasonable elasticsearch docs.
*/
public final class EsBulkEncoder {
private static final String BULK_HEADER = "{\"index\":{}}\n";
private EsBulkEncoder() {
}
public static Dataset<String> encode(Dataset<Row> df) {
return df.toJSON()
.map((MapFunction<String, String>) doc -> BULK_HEADER + doc, Encoders.STRING());
}
}