Params.java
/*
* Copyright (C) 2019 Glenbrook Networks
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.wikimedia.search.glent;
import static java.util.Collections.singletonList;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.wikimedia.search.glent.analysis.Normalizers;
import org.wikimedia.search.glent.editdistance.EDConfig;
import org.wikimedia.search.glent.editdistance.NormType;
import org.wikimedia.search.glent.editdistance.TokenAwareEditDistance;
import com.beust.jcommander.Parameter;
import com.garygregory.jcommander.converters.time.DurationConverter;
import com.garygregory.jcommander.converters.time.InstantConverter;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* @author Julia.Glen
*/
public class Params {
@Parameter(names = "--help", help = true)
boolean help;
static class WithCirrusLog {
@Parameter(names = "--wmf-log-name", description = "name of the dataframe with new eventgate wmf search logs")
String wmfLogName = "event.mediawiki_cirrussearch_request";
@Parameter(
names = "--log-ts-from",
description = "minimum timestamp of log entries to be used in prep or run",
converter = InstantConverter.class)
Instant logTsFrom = Instant.EPOCH;
@Parameter(
names = "--log-ts-to",
description = "maximum timestamp of log entries to be used in prep or run",
converter = InstantConverter.class)
Instant logTsTo = Instant.MAX;
@Parameter(
names = "--query-normalizer",
description = "Algorithm to use when normalizing the query",
converter = Normalizers.class
)
Function<String, String> normalizer = Normalizers.lowerCase();
@Parameter(names = "--max-n-queries-per-ident", description = "maximum number of queries per identity allowed; higher means robots")
int maxNQueriesPerIdent = 2000;
@Parameter(names = "--map-wikiid-to-lang-name", description = "name of the dataframe that has the mapping between wikiid and lang")
String mapWikiidToLangName = "canonical_data.wikis";
private Dataset<Row> loadLangMap(SparkSession spark) {
return mapWikiidToLangName.isEmpty()
? null
: spark.read().table(mapWikiidToLangName);
}
private CirrusLogReader reader(SparkSession spark) {
return new CirrusLogReader(loadLangMap(spark), maxNQueriesPerIdent, normalizer);
}
Dataset<Row> load(SparkSession spark) {
// Converts multiple input datasets into same shape. Should
// be removed at some point and everything converted to new
// schema.
Dataset<Row> df = new CirrusLogLoader().load(spark, wmfLogName, logTsFrom, logTsTo);
// Filters and simplifies input datasets
return reader(spark).filterLog(df);
}
Dataset<Row> renormalizeQuery(Dataset<Row> df) {
return reader(df.sparkSession()).normalizeQuery(df);
}
}
static class WithEarliestLegalTs {
@Parameter(
names = "--earliest-legal-ts",
description = "earliest timestamp that can be stored in the prep dataframe; used to clean up older records if requred by legal",
converter = InstantConverter.class)
Instant ts = Instant.now().minus(Duration.ofDays(84));
}
static class WithEditDistance {
// TODO: Actual CLI args
@SuppressFBWarnings(value = "CE_CLASS_ENVY", justification = "Class envy is relatively natural in this builder.")
SuggestionFilter.EditDistanceCalc build() {
// For now use a hard coded config. This can all be exposed as arguments as necessary.
// Config from https://phabricator.wikimedia.org/T238151#5977107
TokenAwareEditDistance impl = new TokenAwareEditDistance(EDConfig.Builder.newInstance()
.setDefaultLimit(1.76f)
.setDefaultNormLimit(0.3f)
.setPerTokenLimit(true)
.setNormType(NormType.MIN)
.setInsDelCost(0.84f)
.setSubstCost(0.92f)
.setSwapCost(0.82f)
.setDuplicateCost(0.60f)
.setSpaceOnlyCost(0.68f)
.setDigitChangePenalty(0.26f)
.setTokenDeltaPenalty(0f)
.setTokenInitialPenalty(2.00f)
.setTokenSepSubstPenalty(0.54f)
.setTokenSep(' ')
// defaults below
//.setLocale(Locale.ENGLISH)
//.setTokenSplit("[\\p{Z}\\p{P}\\p{S}]+")
//.setTokenizer(null)
.build());
UserDefinedFunction func = udf(
(UDF2<String, String, Float>)impl::calcEditDistance,
DataTypes.FloatType);
// func::apply has varargs, we must expose exactly two for the interface.
return (a, b) -> func.apply(a, b);
}
}
static class WithGlentPartition {
@Parameter(names = "--input-table", required = true,
description = "Glent table to read from")
String table;
@Parameter(names = "--input-partition", required = true,
description = "Partition of input table to read")
String partition;
Dataset<Row> load(SparkSession spark) {
return spark.read().table(table)
.where(col("date").equalTo(partition))
.drop("date");
}
}
static class WithGlentSuggestionPartition extends WithGlentPartition {
private final String algo;
WithGlentSuggestionPartition(String algo) {
this.algo = algo;
}
Dataset<Row> load(SparkSession spark) {
return super.load(spark)
.where(col("algo").equalTo(algo))
.drop("algo");
}
}
static class WithSuggestionFilter {
@Parameter(names = "--max-edit-dist-sugg", description = "max edit distance between original query and suggestion")
int maxEditDistSugg = 3;
@Parameter(names = "--max-leven-dist-sugg", description = "back-compat max levenshtein distance between original query and suggestion")
Integer maxLevenDistSuggBC;
@Parameter(names = "--max-norm-edit-dist", description = "max normalized edit distance")
double maxNormEditDist = 1.5;
@Parameter(names = "--max-norm-leven-dist", description = "back-compat max normalized levenshtein distance")
Double maxNormLevenDistBC;
@Parameter(names = "--min-hits-diff", description = "min difference in hits between original query and suggestion")
int minHitsDiff = 100;
@Parameter(names = "--min-hits-perc-diff", description = "min difference in percentage of hits between original query and suggestion")
int minHitsPercDiff = 10;
// TODO: This parameter is only for m0prep, not shared
@Parameter(
names = "--sugg-max-ts-diff",
description = "max difference in timestamp between original query and suggestion",
converter = DurationConverter.class)
Duration suggMaxTsDiff = Duration.ofSeconds(120);
SuggestionFilter build() {
return build(functions::levenshtein);
}
SuggestionFilter build(SuggestionFilter.EditDistanceCalc edCalc) {
int editDist = maxLevenDistSuggBC == null ? maxEditDistSugg : maxLevenDistSuggBC;
double normEditDist = maxNormLevenDistBC == null ? maxNormEditDist : maxNormLevenDistBC;
return new SuggestionFilter(minHitsDiff, minHitsPercDiff,
editDist, normEditDist, suggMaxTsDiff, edCalc);
}
}
static class WithPrepOutput {
@Parameter(names = "--output-table", required = true,
description = "Glent table to write output to")
String table;
@Parameter(names = "--output-partition", required = true,
description = "Partition of output table to overwrite")
String partition;
@Parameter(names = "--max-output-partitions", description = "Number of partitions to write result as")
int maxPartitions;
void write(Dataset<Row> df) {
GlentControl.writeDf(df, table, partitionSpec(), maxPartitions);
}
Map<String, String> partitionSpec() {
return ImmutableMap.of(
"date", partition
);
}
}
static class WithSuggestionOutput extends WithPrepOutput {
@Parameter(names = "--output-algo", description = "Algorithm name for output partition spec")
String algo;
WithSuggestionOutput(String algo) {
this.algo = algo;
}
Map<String, String> partitionSpec() {
return ImmutableMap.of(
"algo", algo,
"date", partition
);
}
}
static class WithSuggestionInput {
@Parameter(names = "--input-table", required = true,
description = "Glent suggestions table to read from")
String table;
@Parameter(names = "--input-partition", required = true,
description = "Date portion of partitions in suggestions table to read in YYYYMMDD format.")
String date;
Dataset<Row> load(SparkSession spark) {
// We are going to do things a bit funny as we need the specified
// partition for m0run and m1run, but a range of partitions for
// m2run.
return spark.read().table(table)
.where(col("date").equalTo(date))
.drop("date");
}
}
static class WithTextOutput<T> {
@Parameter(names = "--output-path", required = true)
String path;
@Parameter(names = "--output-compression")
String compression = "gzip";
@Parameter(names = "--max-output-partitions")
int maxPartitions;
Function<Dataset<T>, Dataset<String>> encoder;
WithTextOutput(Function<Dataset<T>, Dataset<String>> encoder) {
this.encoder = encoder;
}
void write(Dataset<T> df) {
if (maxPartitions > 0) {
df = df.coalesce(maxPartitions);
}
this.encoder.apply(df)
.write()
.mode(SaveMode.Overwrite)
.option("compression", compression)
.text(path);
}
}
/**
* Optionally generates a dataframe with a single suggestion used as a marker.
* The MARKER_QUERY can be searched for on deployment to verify in a smoke test
* what version of suggestions are being provided.
*
* Output schema matches SuggestionAggregator output.
*/
static class WithMarkerSuggestion {
// If the marker query somehow makes it into the suggestions dataset
// we would like this row to still have a larger score and be returned.
static final float MARKER_SCORE = 987654321F;
@Parameter(names = "--version-marker", required = false,
description = "Emit a marker suggestion with the provided string as the suggested query")
String marker;
@Parameter(names = "--version-marker-wiki", required = false,
description = "Wiki id to embed in version marker suggestion")
String wiki = "testwiki";
@Parameter(names = "--version-marker-query", required = false,
description = "User query to embed in version marker suggestion")
String query = "__glent_version_query";
Optional<Dataset<Row>> versionMarker(SparkSession spark) {
if (marker == null) {
return Optional.empty();
}
Row row = RowFactory.create(
Method.algos(), wiki, MARKER_SCORE, query, marker);
return Optional.of(spark.createDataFrame(
singletonList(row),
SuggestionAggregator.SCHEMA_OUT));
}
}
}