SimilarQueriesPrep.java

package org.wikimedia.search.glent;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.max;
import static org.apache.spark.sql.functions.sum;

import java.time.Instant;
import java.util.function.BiFunction;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class SimilarQueriesPrep implements BiFunction<Dataset<Row>, Dataset<Row>, Dataset<Row>> {
    private final Instant earliestLegalTs;

    public SimilarQueriesPrep(Instant earliestLegalTs) {
        this.earliestLegalTs = earliestLegalTs;
    }

    public Dataset<Row> apply(Dataset<Row> dfLog, Dataset<Row> dfOld) {
        Dataset<Row> df = buildM1Prep(dfLog, dfOld);
        return legalReqsM1(df);
    }

    /**
     * Build new version of M1Prep and add it to M1Prep with date = glentDfM1PrepPartNew.
     * Steps include:
     * limit to previous portion of M1Prep dataframe
     * find latest timestamp of the records in previous portion of M1Prep to avoid double counting
     * add previous M1Prep defined as M1Prep with date = glentDfM1PrepPartOld
     * create new M1Prep
     *
     * @param df sugg dataframe
     * @param dfOld M1Prep dataframe
     * @return M1Prep dataframe
     *
     */
    Dataset<Row> buildM1Prep(Dataset<Row> df, Dataset<Row> dfOld) {
        return df
            .select(col("query"), col("queryNorm"),
                    col("wikiid"), col("lang"),
                    col("ts"), col("hitsTotal"),
                    lit(1).alias("suggCount"))
            .union(dfOld)
            .groupBy("query", "queryNorm", "wikiid", "lang")
            .agg(
                max("ts").alias("ts"),
                max("hitsTotal").alias("hitsTotal"),
                sum("suggCount").alias("suggCount"))
            .select(col("query"), col("queryNorm"), col("wikiid"), col("lang"),
                    col("ts"), col("hitsTotal"), col("suggCount"));
    }

    /**
     * Removes dataframe entries that have timestamp earlier than required by legal.
     *
     * @param df M1Prep dataframe
     * @return dataframe that satisfies legal requirements
     *
     */
    Dataset<Row> legalReqsM1(Dataset<Row> df) {
        return df.where(col("ts").geq(earliestLegalTs.getEpochSecond()));
    }
}