SimilarQueriesPrep.java
package org.wikimedia.search.glent;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.max;
import static org.apache.spark.sql.functions.sum;
import java.time.Instant;
import java.util.function.BiFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SimilarQueriesPrep implements BiFunction<Dataset<Row>, Dataset<Row>, Dataset<Row>> {
private final Instant earliestLegalTs;
public SimilarQueriesPrep(Instant earliestLegalTs) {
this.earliestLegalTs = earliestLegalTs;
}
public Dataset<Row> apply(Dataset<Row> dfLog, Dataset<Row> dfOld) {
Dataset<Row> df = buildM1Prep(dfLog, dfOld);
return legalReqsM1(df);
}
/**
* Build new version of M1Prep and add it to M1Prep with date = glentDfM1PrepPartNew.
* Steps include:
* limit to previous portion of M1Prep dataframe
* find latest timestamp of the records in previous portion of M1Prep to avoid double counting
* add previous M1Prep defined as M1Prep with date = glentDfM1PrepPartOld
* create new M1Prep
*
* @param df sugg dataframe
* @param dfOld M1Prep dataframe
* @return M1Prep dataframe
*
*/
Dataset<Row> buildM1Prep(Dataset<Row> df, Dataset<Row> dfOld) {
return df
.select(col("query"), col("queryNorm"),
col("wikiid"), col("lang"),
col("ts"), col("hitsTotal"),
lit(1).alias("suggCount"))
.union(dfOld)
.groupBy("query", "queryNorm", "wikiid", "lang")
.agg(
max("ts").alias("ts"),
max("hitsTotal").alias("hitsTotal"),
sum("suggCount").alias("suggCount"))
.select(col("query"), col("queryNorm"), col("wikiid"), col("lang"),
col("ts"), col("hitsTotal"), col("suggCount"));
}
/**
* Removes dataframe entries that have timestamp earlier than required by legal.
*
* @param df M1Prep dataframe
* @return dataframe that satisfies legal requirements
*
*/
Dataset<Row> legalReqsM1(Dataset<Row> df) {
return df.where(col("ts").geq(earliestLegalTs.getEpochSecond()));
}
}