Method.java
/*
* Copyright (C) 2019 Glenbrook Networks
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.wikimedia.search.glent;
import static java.util.Arrays.asList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.ParametersDelegate;
import com.google.common.collect.ImmutableMap;
/**
*
* @author Julia.Glen
*
*/
interface Method extends Consumer<SparkSession> {
static Map<String, Method> methods() {
return ImmutableMap.<String, Method>builder()
.put("m0prep", new M0Prep())
.put("m0run", new M0Run())
.put("m1prep", new M1Prep())
.put("m1run", new M1Run())
.put("m1run.candidates", new M1RunCandidates())
.put("m2run", new M2Run())
.put("esbulk", new ESBulk())
.build();
}
/**
* @return The list of all valid suggestion algorithms that can be emitted.
*/
static List<String> algos() {
return asList(M0Run.ALGO, M1Run.ALGO, M2Run.ALGO);
}
@Parameters(commandDescription = "prep for session reformulation based suggestions")
class M0Prep implements Method {
@ParametersDelegate
Params.WithCirrusLog cirrus = new Params.WithCirrusLog();
@ParametersDelegate
Params.WithGlentPartition glent = new Params.WithGlentPartition();
@ParametersDelegate
Params.WithSuggestionFilter suggestionFilter = new Params.WithSuggestionFilter();
@ParametersDelegate
Params.WithPrepOutput output = new Params.WithPrepOutput();
/**
* Generates fresh M0Prep portion based on the wmf log.
* Steps include:
* get log
* prune and flatten logEntries
* remove PII, queries by robots, etc.
* build possible pairs (query,suggestion)
* remove unsuitable pairs
* build new M0Prep which is a combination of Sugg dataframe and old M0Prep
* clean up records if required by legal, e.g. older than a certain date
* store new M0Prep portion
*
* @param spark session for spark
*/
@Override
public void accept(SparkSession spark) {
output.write(new SessionReformulationPrep(
suggestionFilter.build())
.apply(cirrus.load(spark), glent.load(spark)));
}
}
@Parameters(commandDescription = "generate session reformulation based suggestions")
class M0Run implements Method {
public static final String ALGO = "m0run";
@ParametersDelegate
Params.WithGlentPartition glent = new Params.WithGlentPartition();
@ParametersDelegate
Params.WithSuggestionOutput output = new Params.WithSuggestionOutput(ALGO);
/**
* Generates fresh M0Run portion based on existing user queries and M0Prep dataframe.
* Steps include:
* get M0Prep Data
* get user query record, most likely wmf log
* prune and flatten logEntries
* get possible suggestions that match user query
* remove unsuitable suggestions
* build DYMs
* clean up records if required by legal, e.g. older than a certain date
* store new M0Run portion
*
* @param spark session for spark
*/
public void accept(SparkSession spark) {
output.write(new SessionReformulationSuggester()
.apply(glent.load(spark)));
}
}
@Parameters(commandDescription = "prep for query similarity based suggestions")
class M1Prep implements Method {
@ParametersDelegate
Params.WithCirrusLog cirrus = new Params.WithCirrusLog();
@ParametersDelegate
Params.WithGlentPartition glent = new Params.WithGlentPartition();
@ParametersDelegate
Params.WithEarliestLegalTs earliestLegalTs = new Params.WithEarliestLegalTs();
@ParametersDelegate
Params.WithPrepOutput output = new Params.WithPrepOutput();
/**
* Generates fresh M1Prep portion based on the wmf log.
* Steps include:
* get log
* prune and flatten logEntries
* remove PII, queries by robots, etc.
* build vector space based on query
* build new M1Prep which is a combination of Sugg dataframe and old M1Prep
* clean up records if required by legal, e.g. older than a certain date
* store new M1Prep portion
*
* @param spark session for spark
*/
public void accept(SparkSession spark) {
output.write(new SimilarQueriesPrep(earliestLegalTs.ts)
.apply(cirrus.load(spark), glent.load(spark)));
}
}
@Parameters(commandDescription = "Candidate generation stage of m1run")
class M1RunCandidates implements Method {
@ParametersDelegate
Params.WithGlentPartition glent = new Params.WithGlentPartition();
@Parameter(names = "--num-fst", description = "number of partitions for m1run FST indices")
int numFst = 2;
@Parameter(
names = "--output-directory", required = true,
description = "Directory for passing data between m1run stages")
String outputDir;
public void accept(SparkSession spark) {
new SimilarQueriesSuggester(null, numFst)
.generateCandidates(glent.load(spark))
.write()
// Simplify retries and overwrite on demand
.mode(SaveMode.Overwrite)
.parquet(outputDir);
}
}
@Parameters(commandDescription = "generate suggestions based on query similarity")
class M1Run implements Method {
public static final String ALGO = "m1run";
@ParametersDelegate
Params.WithGlentPartition glent = new Params.WithGlentPartition();
@ParametersDelegate
Params.WithEditDistance editDistance = new Params.WithEditDistance();
@ParametersDelegate
Params.WithSuggestionFilter suggestionFilter = new Params.WithSuggestionFilter();
@ParametersDelegate
Params.WithSuggestionOutput output = new Params.WithSuggestionOutput(ALGO);
@Parameter(names = "--num-fst", description = "number of partitions for m1run FST indices. "
+ "Only applicable when candidates are not provided.")
int numFst = 1;
@Parameter(names = "--candidates-directory", description = "Directory for passing data between stages. "
+ "If not provided candidates will be generated in-process.")
String tempDir;
/**
* Generates fresh M1Run portion based on existing user queries and M1Prep dataframe.
* Steps include:
* get M1Prep Data
* get user query record, most likely wmf log
* prune and flatten logEntries
* get possible suggestions that match user query
* remove unsuitable suggestions
* build DYMs
* store new M1Run portion
*
* @param spark session for spark
*/
public void accept(SparkSession spark) {
SuggestionFilter filter = suggestionFilter.build(editDistance.build());
Dataset<Row> candidates = null;
if (tempDir != null) {
candidates = spark.read().parquet(tempDir);
}
output.write(new SimilarQueriesSuggester(filter, numFst)
.apply(glent.load(spark), candidates));
}
}
@Parameters(commandDescription = "generate suggestions based on confusion matrix and dictionary")
class M2Run implements Method {
public static final String ALGO = "m2run";
@ParametersDelegate
Params.WithCirrusLog cirrus = new Params.WithCirrusLog();
@ParametersDelegate
Params.WithGlentSuggestionPartition glent = new Params.WithGlentSuggestionPartition(ALGO);
@ParametersDelegate
Params.WithEarliestLegalTs earliestLegalTs = new Params.WithEarliestLegalTs();
@ParametersDelegate
Params.WithSuggestionOutput output = new Params.WithSuggestionOutput(ALGO);
/**
* Generates fresh M2Run portion based on existing user queries and confusion matrices and dictionaries.
* Steps include:
* get confusion matrices and dictionaries
* get user query record, most likely wmf log
* prune and flatten logEntries
* get possible suggestions that match user query
* remove unsuitable suggestions
* build DYMs
* store new M1Run portion
*
* @param spark session for spark
*/
public void accept(SparkSession spark) {
output.write(new DictionarySuggester(earliestLegalTs.ts).apply(
cirrus.load(spark), cirrus.renormalizeQuery(glent.load(spark))));
}
}
/**
* Aggregates together the results of the other suggestion generation
* methods into a single suggestion per wiki+query.
*/
@Parameters(commandDescription = "aggregate best suggestion from various algorithms")
class ESBulk implements Method {
@ParametersDelegate
Params.WithSuggestionInput suggestions = new Params.WithSuggestionInput();
@ParametersDelegate
Params.WithTextOutput<Row> output = new Params.WithTextOutput<>(EsBulkEncoder::encode);
@ParametersDelegate
Params.WithMarkerSuggestion marker = new Params.WithMarkerSuggestion();
public void accept(SparkSession spark) {
Dataset<Row> df = new SuggestionAggregator().apply(suggestions.load(spark));
output.write(marker.versionMarker(spark).map(df::union).orElse(df));
}
}
}