Params.java

/*
 * Copyright (C) 2019 Glenbrook Networks
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

package org.wikimedia.search.glent;

import static java.util.Collections.singletonList;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.wikimedia.search.glent.analysis.Normalizers;
import org.wikimedia.search.glent.editdistance.EDConfig;
import org.wikimedia.search.glent.editdistance.NormType;
import org.wikimedia.search.glent.editdistance.TokenAwareEditDistance;

import com.beust.jcommander.Parameter;
import com.garygregory.jcommander.converters.time.DurationConverter;
import com.garygregory.jcommander.converters.time.InstantConverter;
import com.google.common.collect.ImmutableMap;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
 * @author Julia.Glen
 */
public class Params {
    @Parameter(names = "--help", help = true)
    boolean help;

    static class WithCirrusLog {
        @Parameter(names = "--wmf-log-name", description = "name of the dataframe with new eventgate wmf search logs")
        String wmfLogName = "event.mediawiki_cirrussearch_request";

        @Parameter(
                names = "--log-ts-from",
                description = "minimum timestamp of log entries to be used in prep or run",
                converter = InstantConverter.class)
        Instant logTsFrom = Instant.EPOCH;

        @Parameter(
                names = "--log-ts-to",
                description = "maximum timestamp of log entries to be used in prep or run",
                converter = InstantConverter.class)
        Instant logTsTo = Instant.MAX;

        @Parameter(
                names = "--query-normalizer",
                description = "Algorithm to use when normalizing the query",
                converter = Normalizers.class
        )
        Function<String, String> normalizer = Normalizers.lowerCase();

        @Parameter(names = "--max-n-queries-per-ident", description = "maximum number of queries per identity allowed; higher means robots")
        int maxNQueriesPerIdent = 2000;

        @Parameter(names = "--map-wikiid-to-lang-name", description = "name of the dataframe that has the mapping between wikiid and lang")
        String mapWikiidToLangName = "canonical_data.wikis";

        private Dataset<Row> loadLangMap(SparkSession spark) {
            return mapWikiidToLangName.isEmpty()
                    ? null
                    : spark.read().table(mapWikiidToLangName);
        }

        private CirrusLogReader reader(SparkSession spark) {
            return new CirrusLogReader(loadLangMap(spark), maxNQueriesPerIdent, normalizer);
        }

        Dataset<Row> load(SparkSession spark) {
            // Converts multiple input datasets into same shape. Should
            // be removed at some point and everything converted to new
            // schema.
            Dataset<Row> df = new CirrusLogLoader().load(spark, wmfLogName, logTsFrom, logTsTo);
            // Filters and simplifies input datasets
            return reader(spark).filterLog(df);
        }

        Dataset<Row> renormalizeQuery(Dataset<Row> df) {
            return reader(df.sparkSession()).normalizeQuery(df);
        }
    }

    static class WithEarliestLegalTs {
        @Parameter(
                names = "--earliest-legal-ts",
                description = "earliest timestamp that can be stored in the prep dataframe; used to clean up older records if requred by legal",
                converter = InstantConverter.class)
        Instant ts = Instant.now().minus(Duration.ofDays(84));
    }

    static class WithEditDistance {
        // TODO: Actual CLI args
        @SuppressFBWarnings(value = "CE_CLASS_ENVY", justification = "Class envy is relatively natural in this builder.")
        SuggestionFilter.EditDistanceCalc build() {
            // For now use a hard coded config. This can all be exposed as arguments as necessary.
            // Config from https://phabricator.wikimedia.org/T238151#5977107
            TokenAwareEditDistance impl = new TokenAwareEditDistance(EDConfig.Builder.newInstance()
                    .setDefaultLimit(1.76f)
                    .setDefaultNormLimit(0.3f)
                    .setPerTokenLimit(true)
                    .setNormType(NormType.MIN)
                    .setInsDelCost(0.84f)
                    .setSubstCost(0.92f)
                    .setSwapCost(0.82f)
                    .setDuplicateCost(0.60f)
                    .setSpaceOnlyCost(0.68f)
                    .setDigitChangePenalty(0.26f)
                    .setTokenDeltaPenalty(0f)
                    .setTokenInitialPenalty(2.00f)
                    .setTokenSepSubstPenalty(0.54f)
                    .setTokenSep(' ')
                    // defaults below
                    //.setLocale(Locale.ENGLISH)
                    //.setTokenSplit("[\\p{Z}\\p{P}\\p{S}]+")
                    //.setTokenizer(null)
                    .build());

            UserDefinedFunction func = udf(
                    (UDF2<String, String, Float>)impl::calcEditDistance,
                    DataTypes.FloatType);
            // func::apply has varargs, we must expose exactly two for the interface.
            return (a, b) -> func.apply(a, b);
        }
    }

    static class WithGlentPartition {
        @Parameter(names = "--input-table", required = true,
                description = "Glent table to read from")
        String table;

        @Parameter(names = "--input-partition", required = true,
                description = "Partition of input table to read")
        String partition;

        Dataset<Row> load(SparkSession spark) {
            return spark.read().table(table)
                    .where(col("date").equalTo(partition))
                    .drop("date");
        }
    }

    static class WithGlentSuggestionPartition extends WithGlentPartition {
        private final String algo;

        WithGlentSuggestionPartition(String algo) {
            this.algo = algo;
        }

        Dataset<Row> load(SparkSession spark) {
            return super.load(spark)
                    .where(col("algo").equalTo(algo))
                    .drop("algo");
        }
    }

    static class WithSuggestionFilter {
        @Parameter(names = "--max-edit-dist-sugg", description = "max edit distance between original query and suggestion")
        int maxEditDistSugg = 3;

        @Parameter(names = "--max-leven-dist-sugg", description = "back-compat max levenshtein distance between original query and suggestion")
        Integer maxLevenDistSuggBC;

        @Parameter(names = "--max-norm-edit-dist", description = "max normalized edit distance")
        double maxNormEditDist = 1.5;

        @Parameter(names = "--max-norm-leven-dist", description = "back-compat max normalized levenshtein distance")
        Double maxNormLevenDistBC;

        @Parameter(names = "--min-hits-diff", description = "min difference in hits between original query and suggestion")
        int minHitsDiff = 100;

        @Parameter(names = "--min-hits-perc-diff", description = "min difference in percentage of hits between original query and suggestion")
        int minHitsPercDiff = 10;

        // TODO: This parameter is only for m0prep, not shared
        @Parameter(
                names = "--sugg-max-ts-diff",
                description = "max difference in timestamp between original query and suggestion",
                converter = DurationConverter.class)
        Duration suggMaxTsDiff = Duration.ofSeconds(120);

        SuggestionFilter build() {
            return build(functions::levenshtein);
        }

        SuggestionFilter build(SuggestionFilter.EditDistanceCalc edCalc) {
            int editDist = maxLevenDistSuggBC == null ? maxEditDistSugg : maxLevenDistSuggBC;
            double normEditDist = maxNormLevenDistBC == null ? maxNormEditDist : maxNormLevenDistBC;
            return new SuggestionFilter(minHitsDiff, minHitsPercDiff,
                    editDist, normEditDist, suggMaxTsDiff, edCalc);
        }
    }

    static class WithPrepOutput {
        @Parameter(names = "--output-table", required = true,
                description = "Glent table to write output to")
        String table;

        @Parameter(names = "--output-partition", required = true,
                description = "Partition of output table to overwrite")
        String partition;

        @Parameter(names = "--max-output-partitions", description = "Number of partitions to write result as")
        int maxPartitions;

        void write(Dataset<Row> df) {
            GlentControl.writeDf(df, table, partitionSpec(), maxPartitions);
        }

        Map<String, String> partitionSpec() {
            return ImmutableMap.of(
                    "date", partition
            );
        }
    }

    static class WithSuggestionOutput extends WithPrepOutput {
        @Parameter(names = "--output-algo", description = "Algorithm name for output partition spec")
        String algo;

        WithSuggestionOutput(String algo) {
            this.algo = algo;
        }

        Map<String, String> partitionSpec() {
            return ImmutableMap.of(
                    "algo", algo,
                    "date", partition
            );
        }
    }

    static class WithSuggestionInput {
        @Parameter(names = "--input-table", required = true,
                description = "Glent suggestions table to read from")
        String table;

        @Parameter(names = "--input-partition", required = true,
                description = "Date portion of partitions in suggestions table to read in YYYYMMDD format.")
        String date;

        Dataset<Row> load(SparkSession spark) {
            // We are going to do things a bit funny as we need the specified
            // partition for m0run and m1run, but a range of partitions for
            // m2run.
            return spark.read().table(table)
                    .where(col("date").equalTo(date))
                    .drop("date");
        }
    }

    static class WithTextOutput<T> {
        @Parameter(names = "--output-path", required = true)
        String path;

        @Parameter(names = "--output-compression")
        String compression = "gzip";

        @Parameter(names = "--max-output-partitions")
        int maxPartitions;

        Function<Dataset<T>, Dataset<String>> encoder;

        WithTextOutput(Function<Dataset<T>, Dataset<String>> encoder) {
            this.encoder = encoder;
        }

        void write(Dataset<T> df) {
            if (maxPartitions > 0) {
                df = df.coalesce(maxPartitions);
            }
            this.encoder.apply(df)
                .write()
                .mode(SaveMode.Overwrite)
                .option("compression", compression)
                .text(path);
        }
    }

    /**
     * Optionally generates a dataframe with a single suggestion used as a marker.
     * The MARKER_QUERY can be searched for on deployment to verify in a smoke test
     * what version of suggestions are being provided.
     *
     * Output schema matches SuggestionAggregator output.
     */
    static class WithMarkerSuggestion {
        // If the marker query somehow makes it into the suggestions dataset
        // we would like this row to still have a larger score and be returned.
        static final float MARKER_SCORE = 987654321F;

        @Parameter(names = "--version-marker", required = false,
                description = "Emit a marker suggestion with the provided string as the suggested query")
        String marker;

        @Parameter(names = "--version-marker-wiki", required = false,
                description = "Wiki id to embed in version marker suggestion")
        String wiki = "testwiki";

        @Parameter(names = "--version-marker-query", required = false,
                description = "User query to embed in version marker suggestion")
        String query = "__glent_version_query";

        Optional<Dataset<Row>> versionMarker(SparkSession spark) {
            if (marker == null) {
                return Optional.empty();
            }

            Row row = RowFactory.create(
                    Method.algos(), wiki, MARKER_SCORE, query, marker);

            return Optional.of(spark.createDataFrame(
                    singletonList(row),
                    SuggestionAggregator.SCHEMA_OUT));
        }
    }
}