CirrusLogReader.java
/*
* Copyright (C) 2019 Glenbrook Networks
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.wikimedia.search.glent;
import static org.apache.spark.sql.functions.array_contains;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.udf;
import java.util.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.types.DataTypes;
import org.wikimedia.search.glent.udf.GetMainSearchRequest;
import com.google.common.annotations.VisibleForTesting;
/**
* Transform cirrus request logs into a simplified format that glent can work with
* and attach the language of the wiki to each log entry.
* @author Julia.Glen
*/
public class CirrusLogReader {
private final Dataset<Row> dfLangMap;
private final int maxNQueriesPerIdent;
private final Function<String, String> normalizer;
public CirrusLogReader(Dataset<Row> dfLangMap, int maxNQueriesPerIdent, Function<String, String> normalizer) {
this.dfLangMap = dfLangMap == null
? null
: dfLangMap.select(col("database_code"), col("language_code").alias("lang"));
this.maxNQueriesPerIdent = maxNQueriesPerIdent;
this.normalizer = normalizer;
}
public Dataset<Row> filterLog(Dataset<Row> df) {
df = flattenWmfLog(df); // prune and flatten logEntries
df = filterRobotsFromLog(df);
df = filterComplexSearchesFromLog(df);
df = filterPIIFromLog(df);
df = extractLang(df);
return normalizeQuery(df);
}
Dataset<Row> flattenWmfLog(Dataset<Row> df) {
GetMainSearchRequest getMainSearchRequest = new GetMainSearchRequest(
df.schema().apply("requests"));
return df
.where(col("source").equalTo("web"))
.withColumn("main_request", getMainSearchRequest.apply(
col("wikiid"), col("requests")))
.where(col("main_request").isNotNull())
.select(
col("ts").cast("integer"),
col("wikiid").cast("string"),
col("source").cast("string"),
col("identity").cast("string"),
col("main_request.query").cast("string"),
col("main_request.hitsTotal").cast("integer"),
col("main_request.syntax")
);
}
/**
* Filters all but simple word matching searches.
*
* Complex searches are hard to suggest from, they may contain
* a variety of meaningful syntax that can't simply be word
* replaced. Make the problem easy by throwing out everything
* that isn't a simple set of words.
*
* @param df log dataframe
* @return dataframe with only simple text matching queries
*/
Dataset<Row> filterComplexSearchesFromLog(Dataset<Row> df) {
return df.where(array_contains(col("syntax"),
lit("simple_bag_of_words")));
}
/**
* Filters log entries that were generated by high frequency users that are possible robots.
*
* @param df log dataframe
* @return dataframe without queries generated by robots
*
* It also removes queries with " to remove the advance users queries and robots
*/
Dataset<Row> filterRobotsFromLog(Dataset<Row> df) {
WindowSpec w = Window.partitionBy("identity");
return df
.withColumn("id_count", count(lit(Integer.valueOf(1))).over(w))
.where(col("id_count").lt(maxNQueriesPerIdent))
.drop("id_count");
}
/**
* Currently broadly filters any query that has more than 5 digits using a simple regexp.
*
* @param df log dataframe
* @return dataframe without queries potentially containing PII
*
*/
Dataset<Row> filterPIIFromLog(Dataset<Row> df) {
return df.where(col("query").rlike("^((?!\\d.*\\d.*\\d.*\\d.*\\d.*\\d).)*$"));
}
/**
* Extracts Lang from wikiid based on canonical_data.wikis.
* If map is not available extracts language from 2 first letters of wikiid.
*
* @param df sugg dataframe
* @return dataframe with added lang to logEntry
*
*/
@VisibleForTesting
Dataset<Row> extractLang(Dataset<Row> df) {
if (dfLangMap == null)
// extract language from 2 first letters of wikiid
return df.withColumn("lang", expr("substr(wikiid, 0, 2)"));
else {
// extract language using dfLangMap
return df.join(dfLangMap, df.col("wikiid").equalTo(dfLangMap.col("database_code")))
.drop("database_code");
}
}
/**
* normalization of user query at run time.
*
* @param df sugg dataframe
* @return dataframe with added queryNorm to logEntry
*
*/
Dataset<Row> normalizeQuery(Dataset<Row> df) {
return df.withColumn("queryNorm", udf((UDF1<String, String>)normalizer::apply, DataTypes.StringType).apply(col("query")));
}
}