Method.java

/*
 * Copyright (C) 2019 Glenbrook Networks
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

package org.wikimedia.search.glent;

import static java.util.Arrays.asList;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.ParametersDelegate;
import com.google.common.collect.ImmutableMap;

/**
 *
 * @author Julia.Glen
 *
 */

interface Method extends Consumer<SparkSession> {

    static Map<String, Method> methods() {
        return ImmutableMap.<String, Method>builder()
                .put("m0prep", new M0Prep())
                .put("m0run", new M0Run())
                .put("m1prep", new M1Prep())
                .put("m1run", new M1Run())
                .put("m1run.candidates", new M1RunCandidates())
                .put("m2run", new M2Run())
                .put("esbulk", new ESBulk())
                .build();
    }

    /**
     * @return The list of all valid suggestion algorithms that can be emitted.
     */
    static List<String> algos() {
        return asList(M0Run.ALGO, M1Run.ALGO, M2Run.ALGO);
    }

    @Parameters(commandDescription = "prep for session reformulation based suggestions")
    class M0Prep implements Method {
        @ParametersDelegate
        Params.WithCirrusLog cirrus = new Params.WithCirrusLog();

        @ParametersDelegate
        Params.WithGlentPartition glent = new Params.WithGlentPartition();

        @ParametersDelegate
        Params.WithSuggestionFilter suggestionFilter = new Params.WithSuggestionFilter();

        @ParametersDelegate
        Params.WithPrepOutput output = new Params.WithPrepOutput();

        /**
         * Generates fresh M0Prep portion based on the wmf log.
         * Steps include:
         * get log
         * prune and flatten logEntries
         * remove PII, queries by robots, etc.
         * build possible pairs (query,suggestion)
         * remove unsuitable pairs
         * build new M0Prep which is a combination of Sugg dataframe and old M0Prep
         * clean up records if required by legal, e.g. older than a certain date
         * store new M0Prep portion
         *
         * @param spark  session for spark
         */
        @Override
        public void accept(SparkSession spark) {
            output.write(new SessionReformulationPrep(
                    suggestionFilter.build())
                    .apply(cirrus.load(spark), glent.load(spark)));
        }
    }

    @Parameters(commandDescription = "generate session reformulation based suggestions")
    class M0Run implements Method {
        public static final String ALGO = "m0run";

        @ParametersDelegate
        Params.WithGlentPartition glent = new Params.WithGlentPartition();

        @ParametersDelegate
        Params.WithSuggestionOutput output = new Params.WithSuggestionOutput(ALGO);

        /**
         * Generates fresh M0Run portion based on existing user queries and M0Prep dataframe.
         * Steps include:
         * get M0Prep Data
         * get user query record, most likely wmf log
         * prune and flatten logEntries
         * get possible suggestions that match user query
         * remove unsuitable suggestions
         * build DYMs
         * clean up records if required by legal, e.g. older than a certain date
         * store new M0Run portion
         *
         * @param spark  session for spark
         */
        public void accept(SparkSession spark) {
            output.write(new SessionReformulationSuggester()
                    .apply(glent.load(spark)));
        }
    }

    @Parameters(commandDescription = "prep for query similarity based suggestions")
    class M1Prep implements Method {
        @ParametersDelegate
        Params.WithCirrusLog cirrus = new Params.WithCirrusLog();

        @ParametersDelegate
        Params.WithGlentPartition glent = new Params.WithGlentPartition();

        @ParametersDelegate
        Params.WithEarliestLegalTs earliestLegalTs = new Params.WithEarliestLegalTs();

        @ParametersDelegate
        Params.WithPrepOutput output = new Params.WithPrepOutput();

        /**
         * Generates fresh M1Prep portion based on the wmf log.
         * Steps include:
         * get log
         * prune and flatten logEntries
         * remove PII, queries by robots, etc.
         * build vector space based on query
         * build new M1Prep which is a combination of Sugg dataframe and old M1Prep
         * clean up records if required by legal, e.g. older than a certain date
         * store new M1Prep portion
         *
         * @param spark  session for spark
         */
        public void accept(SparkSession spark) {
            output.write(new SimilarQueriesPrep(earliestLegalTs.ts)
                    .apply(cirrus.load(spark), glent.load(spark)));
        }
    }

    @Parameters(commandDescription = "Candidate generation stage of m1run")
    class M1RunCandidates implements Method {
        @ParametersDelegate
        Params.WithGlentPartition glent = new Params.WithGlentPartition();

        @Parameter(names = "--num-fst", description = "number of partitions for m1run FST indices")
        int numFst = 2;

        @Parameter(
            names = "--output-directory", required = true,
            description = "Directory for passing data between m1run stages")
        String outputDir;

        public void accept(SparkSession spark) {
            new SimilarQueriesSuggester(null, numFst)
                    .generateCandidates(glent.load(spark))
                    .write()
                    // Simplify retries and overwrite on demand
                    .mode(SaveMode.Overwrite)
                    .parquet(outputDir);
        }
    }

    @Parameters(commandDescription = "generate suggestions based on query similarity")
    class M1Run implements Method {
        public static final String ALGO = "m1run";

        @ParametersDelegate
        Params.WithGlentPartition glent = new Params.WithGlentPartition();

        @ParametersDelegate
        Params.WithEditDistance editDistance = new Params.WithEditDistance();

        @ParametersDelegate
        Params.WithSuggestionFilter suggestionFilter = new Params.WithSuggestionFilter();

        @ParametersDelegate
        Params.WithSuggestionOutput output = new Params.WithSuggestionOutput(ALGO);

        @Parameter(names = "--num-fst", description = "number of partitions for m1run FST indices. "
            + "Only applicable when candidates are not provided.")
        int numFst = 1;

        @Parameter(names = "--candidates-directory", description = "Directory for passing data between stages. "
            + "If not provided candidates will be generated in-process.")
        String tempDir;

        /**
         * Generates fresh M1Run portion based on existing user queries and M1Prep dataframe.
         * Steps include:
         * get M1Prep Data
         * get user query record, most likely wmf log
         * prune and flatten logEntries
         * get possible suggestions that match user query
         * remove unsuitable suggestions
         * build DYMs
         * store new M1Run portion
         *
         * @param spark  session for spark
         */
        public void accept(SparkSession spark) {
            SuggestionFilter filter = suggestionFilter.build(editDistance.build());
            Dataset<Row> candidates = null;
            if (tempDir != null) {
                candidates = spark.read().parquet(tempDir);
            }
            output.write(new SimilarQueriesSuggester(filter, numFst)
                    .apply(glent.load(spark), candidates));
        }
    }

    @Parameters(commandDescription = "generate suggestions based on confusion matrix and dictionary")
    class M2Run implements Method {
        public static final String ALGO = "m2run";

        @ParametersDelegate
        Params.WithCirrusLog cirrus = new Params.WithCirrusLog();

        @ParametersDelegate
        Params.WithGlentSuggestionPartition glent = new Params.WithGlentSuggestionPartition(ALGO);

        @ParametersDelegate
        Params.WithEarliestLegalTs earliestLegalTs = new Params.WithEarliestLegalTs();

        @ParametersDelegate
        Params.WithSuggestionOutput output = new Params.WithSuggestionOutput(ALGO);
        /**
         *  Generates fresh M2Run portion based on existing user queries and confusion matrices and dictionaries.
         *  Steps include:
         *  get confusion matrices and dictionaries
         *  get user query record, most likely wmf log
         *  prune and flatten logEntries
         *  get possible suggestions that match user query
         *  remove unsuitable suggestions
         *  build DYMs
         *  store new M1Run portion
         *
         *  @param spark session for spark
         */
        public void accept(SparkSession spark) {
            output.write(new DictionarySuggester(earliestLegalTs.ts).apply(
                    cirrus.load(spark), cirrus.renormalizeQuery(glent.load(spark))));
        }
    }

    /**
     * Aggregates together the results of the other suggestion generation
     * methods into a single suggestion per wiki+query.
     */
    @Parameters(commandDescription = "aggregate best suggestion from various algorithms")
    class ESBulk implements Method {
        @ParametersDelegate
        Params.WithSuggestionInput suggestions = new Params.WithSuggestionInput();

        @ParametersDelegate
        Params.WithTextOutput<Row> output = new Params.WithTextOutput<>(EsBulkEncoder::encode);

        @ParametersDelegate
        Params.WithMarkerSuggestion marker = new Params.WithMarkerSuggestion();

        public void accept(SparkSession spark) {
            Dataset<Row> df = new SuggestionAggregator().apply(suggestions.load(spark));
            output.write(marker.versionMarker(spark).map(df::union).orElse(df));
        }
    }
}