CirrusLogReader.java

/*
 * Copyright (C) 2019 Glenbrook Networks
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

package org.wikimedia.search.glent;

import static org.apache.spark.sql.functions.array_contains;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.udf;

import java.util.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.types.DataTypes;
import org.wikimedia.search.glent.udf.GetMainSearchRequest;

import com.google.common.annotations.VisibleForTesting;

/**
 * Transform cirrus request logs into a simplified format that glent can work with
 * and attach the language of the wiki to each log entry.
 * @author Julia.Glen
 */
public class CirrusLogReader {
    private final Dataset<Row> dfLangMap;
    private final int maxNQueriesPerIdent;
    private final Function<String, String> normalizer;

    public CirrusLogReader(Dataset<Row> dfLangMap, int maxNQueriesPerIdent, Function<String, String> normalizer) {
        this.dfLangMap = dfLangMap == null
            ? null
            : dfLangMap.select(col("database_code"), col("language_code").alias("lang"));
        this.maxNQueriesPerIdent = maxNQueriesPerIdent;
        this.normalizer = normalizer;
    }

    public Dataset<Row> filterLog(Dataset<Row> df) {
        df = flattenWmfLog(df);                // prune and flatten logEntries
        df = filterRobotsFromLog(df);
        df = filterComplexSearchesFromLog(df);
        df = filterPIIFromLog(df);
        df = extractLang(df);
        return normalizeQuery(df);
    }

    Dataset<Row> flattenWmfLog(Dataset<Row> df) {
        GetMainSearchRequest getMainSearchRequest = new GetMainSearchRequest(
                df.schema().apply("requests"));
        return df
            .where(col("source").equalTo("web"))
            .withColumn("main_request", getMainSearchRequest.apply(
                    col("wikiid"), col("requests")))
            .where(col("main_request").isNotNull())
            .select(
                    col("ts").cast("integer"),
                    col("wikiid").cast("string"),
                    col("source").cast("string"),
                    col("identity").cast("string"),
                    col("main_request.query").cast("string"),
                    col("main_request.hitsTotal").cast("integer"),
                    col("main_request.syntax")
            );
    }

    /**
     * Filters all but simple word matching searches.
     *
     * Complex searches are hard to suggest from, they may contain
     * a variety of meaningful syntax that can't simply be word
     * replaced. Make the problem easy by throwing out everything
     * that isn't a simple set of words.
     *
     * @param df log dataframe
     * @return dataframe with only simple text matching queries
     */
    Dataset<Row> filterComplexSearchesFromLog(Dataset<Row> df) {
        return df.where(array_contains(col("syntax"),
                lit("simple_bag_of_words")));
    }

    /**
     * Filters log entries that were generated by high frequency users that are possible robots.
     *
     * @param df log dataframe
     * @return dataframe without queries generated by robots
     *
     * It also removes queries with " to remove the advance users queries and robots
     */
    Dataset<Row> filterRobotsFromLog(Dataset<Row> df) {
        WindowSpec w = Window.partitionBy("identity");
        return df
            .withColumn("id_count", count(lit(Integer.valueOf(1))).over(w))
            .where(col("id_count").lt(maxNQueriesPerIdent))
            .drop("id_count");
    }

    /**
     * Currently broadly filters any query that has more than 5 digits using a simple regexp.
     *
     * @param df log dataframe
     * @return dataframe without queries potentially containing PII
     *
     */
    Dataset<Row> filterPIIFromLog(Dataset<Row> df) {
        return df.where(col("query").rlike("^((?!\\d.*\\d.*\\d.*\\d.*\\d.*\\d).)*$"));
    }

    /**
     * Extracts Lang from wikiid based on canonical_data.wikis.
     * If map is not available extracts language from 2 first letters of wikiid.
     *
     * @param df sugg dataframe
     * @return dataframe with added lang to logEntry
     *
     */
    @VisibleForTesting
    Dataset<Row> extractLang(Dataset<Row> df) {
        if (dfLangMap == null)
            //	extract language from 2 first letters of wikiid
            return df.withColumn("lang", expr("substr(wikiid, 0, 2)"));
        else {
            //      extract language using dfLangMap
            return df.join(dfLangMap, df.col("wikiid").equalTo(dfLangMap.col("database_code")))
                     .drop("database_code");
        }
    }

    /**
     * normalization of user query at run time.
     *
     * @param df sugg dataframe
     * @return dataframe with added queryNorm to logEntry
     *
     */
    Dataset<Row> normalizeQuery(Dataset<Row> df) {
        return df.withColumn("queryNorm", udf((UDF1<String, String>)normalizer::apply, DataTypes.StringType).apply(col("query")));
    }
}