SubgraphMetricsLauncher.scala

package org.wikidata.query.rdf.spark.metrics

import org.wikidata.query.rdf.spark.metrics.queries.subgraphpairs.SubgraphPairQueryMetricsExtractor.extractAndSaveSubgraphPairQueryMetrics
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.QueryMetricsExtractor.extractAndSaveQueryMetrics
import org.wikidata.query.rdf.spark.metrics.subgraphs.detailed.DetailedSubgraphMetricsExtractor.extractAndSaveDetailedSubgraphMetrics
import org.wikidata.query.rdf.spark.metrics.subgraphs.general.GeneralSubgraphMetricsExtractor.extractAndSaveGeneralSubgraphMetrics
import scopt.OptionParser

case class SubgraphMetricsParams(
                                  metric: String = "",
                                  wikidataTriples: String = "",
                                  allSubgraphs: String = "",
                                  topSubgraphItems: String = "",
                                  topSubgraphTriples: String = "",
                                  generalSubgraphMetrics: String = "", //in and out table
                                  eventQueries: String = "",
                                  processedQueries: String = "",
                                  subgraphQueries: String = "",
                                  perSubgraphMetrics: String = "", //out table
                                  subgraphPairMetrics: String = "", //out table
                                  generalQueryMetrics: String = "", //out table
                                  generalSubgraphQueryMetricsPath: String = "", //out table
                                  perSubgraphQueryMetrics: String = "", //out table
                                  subgraphPairQueryMetrics: String = "", //out table
                                  subgraphQItemsMatchInQuery: String = "",
                                  subgraphPredicatesMatchInQuery: String = "",
                                  subgraphUrisMatchInQuery: String = "",
                                  topN: Long = 100,
                                  minItems: Long = 10000
                                )

class SubgraphMetricsLauncher {

  def extractSubgraphMetrics(params: SubgraphMetricsParams): Unit = {

    params.metric match {
      case "general-subgraph-metrics" => extractAndSaveGeneralSubgraphMetrics(
        params.minItems,
        params.wikidataTriples,
        params.allSubgraphs,
        params.topSubgraphItems,
        params.topSubgraphTriples,
        params.generalSubgraphMetrics
      )
      case "detailed-subgraph-metrics" => extractAndSaveDetailedSubgraphMetrics(
        params.topSubgraphItems,
        params.topSubgraphTriples,
        params.generalSubgraphMetrics,
        params.allSubgraphs,
        params.minItems,
        params.perSubgraphMetrics,
        params.subgraphPairMetrics
      )
      case "query-metrics" => extractAndSaveQueryMetrics(
        params.topN,
        params.eventQueries,
        params.processedQueries,
        params.subgraphQueries,
        params.subgraphQItemsMatchInQuery,
        params.subgraphPredicatesMatchInQuery,
        params.subgraphUrisMatchInQuery,
        params.generalQueryMetrics,
        params.generalSubgraphQueryMetricsPath,
        params.perSubgraphQueryMetrics
      )
      case "subgraph-pair-query-metrics" => extractAndSaveSubgraphPairQueryMetrics(params.subgraphQueries, params.subgraphPairQueryMetrics)
      case _ => sys.exit(1)
    }

  }
}

/**
 * Point of entry to extract subgraph metrics. This includes metrics on wikidata subgraphs
 * and on queries that access these subgraphs, also known as `subgraph queries`.
 * This job lists all subgraph metrics and saves parquet files in date snapshot format.
 * This job also lists all subgraph query metrics and saves parquet files in daily format.
 *
 * Command line example:
 * spark2-submit --master yarn --driver-memory 2G --executor-memory 16G --executor-cores 8 \
 * --class org.wikidata.query.rdf.spark.metrics.SubgraphMetricsLauncher \
 * --name subgraph-metrics-spark \
 * --queue root.default \
 * general-subgraph-metrics \
 * ~akhatun/rdf-spark-tools-0.3.42-SNAPSHOT-jar-with-dependencies.jar \
 * --wikidata-triples-table discovery.wikibase_rdf/date=20220210/wiki=wikidata \
 * --all-subgraphs-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --top-subgraph-items-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --top-subgraph-triples-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --general-subgraph-metrics-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --min-items 10000
 */
object SubgraphMetricsLauncher {

  def main(args: Array[String]): Unit = {
    argParser.parse(args, SubgraphMetricsParams()) match {

      case Some(params) => new SubgraphMetricsLauncher().extractSubgraphMetrics(params)
      case _ => sys.exit(-1)

    }
  }

  // arguments of multiple commands
  // scalastyle:off method.length
  def argParser: OptionParser[SubgraphMetricsParams] = {
    new OptionParser[SubgraphMetricsParams]("") {
      head("Subgraph Metrics Launcher")
      help("help") text "Prints this usage text"

      def allSubgraphsOption = opt[String]("all-subgraphs-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(allSubgraphs = x) }
        .text("Table holding list of subgraphs and item counts with partition specs")

      def topSubgraphItemsOption = opt[String]("top-subgraph-items-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(topSubgraphItems = x) }
        .text("Table holding items of top subgraphs (<item> P31 <subgraph>) with partition specs")

      def topSubgraphTriplesOption = opt[String]("top-subgraph-triples-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(topSubgraphTriples = x) }
        .text("Table holding all triples of top subgraphs with partition specs")

      def subgraphQueryOption = opt[String]("subgraph-query-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(subgraphQueries = x) }
        .text("Table holding SPARQL query ids mapped to the subgraph it accesses with partition specs")

      def generalSubgraphMetricsOption = opt[String]("general-subgraph-metrics-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(generalSubgraphMetrics = x) }
        .text("Table holding some basic statistics on wikidata and its subgraphs with partition specs")

      def minItemsOption = opt[Int]("min-items")
        .optional()
        .valueName("<number>")
        .action { (x, p) => p.copy(minItems = x) }
        .text("Number of top entries to save for various analysis. Defaults to 10000")

      cmd("general-subgraph-metrics")
        .action((_, c) => c.copy(metric = "general-subgraph-metrics"))
        .text("general-subgraph-metrics is a command")
        .children(
          minItemsOption,
          opt[String]("wikidata-triples-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(wikidataTriples = x) }
            .text("Table holding Wikidata snapshots with partition specs"),
          allSubgraphsOption,
          topSubgraphItemsOption,
          topSubgraphTriplesOption,
          generalSubgraphMetricsOption
        )

      cmd("detailed-subgraph-metrics")
        .action((_, c) => c.copy(metric = "detailed-subgraph-metrics"))
        .text("detailed-subgraph-metrics is a command")
        .children(
          minItemsOption,
          generalSubgraphMetricsOption,
          allSubgraphsOption,
          topSubgraphItemsOption,
          topSubgraphTriplesOption,
          opt[String]("per-subgraph-metrics-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(perSubgraphMetrics = x) }
            .text("Output path to save per subgraph metrics with partition specs."),
          opt[String]("subgraph-pair-metrics-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(subgraphPairMetrics = x) }
            .text("Output path to save per subgraph-pair metrics with partition specs.")
        )

      cmd("query-metrics")
        .action((_, c) => c.copy(metric = "query-metrics"))
        .text("query-metrics is a command")
        .children(
          opt[String]("event-query-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(eventQueries = x) }
            .text("Table holding the SPARQL queries with partition specs"),
          opt[String]("processed-query-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(processedQueries = x) }
            .text("Table holding the parsed SPARQL queries with partition specs"),
          subgraphQueryOption,
          opt[String]("matched-Qitems-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(subgraphQItemsMatchInQuery = x) }
            .text("Table holding list of Q-items that are contained in each query with partition specs"),
          opt[String]("matched-predicates-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(subgraphPredicatesMatchInQuery = x) }
            .text("Table holding list of predicates that are contained in each query with partition specs"),
          opt[String]("matched-uris-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(subgraphUrisMatchInQuery = x) }
            .text("Table holding list of URIs that are contained in each query with partition specs"),
          opt[Int]("top-n")
            .optional()
            .valueName("<number>")
            .action { (x, p) => p.copy(topN = x) }
            .text("Number of top entries to save for various analysis. Defaults to 100"),
          opt[String]("general-query-metrics-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(generalQueryMetrics = x) }
            .text("Output path to save overall query metrics with partition specs."),
          opt[String]("general-subgraph-query-metrics-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(generalSubgraphQueryMetricsPath = x) }
            .text("Output path to save overall subgraph query metrics with partition specs."),
          opt[String]("per-subgraph-query-metrics-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(perSubgraphQueryMetrics = x) }
            .text("Output path to save per subgraph query metrics with partition specs.")
        )

      cmd("subgraph-pair-query-metrics")
        .action((_, c) => c.copy(metric = "subgraph-pair-query-metrics"))
        .text("subgraph-pair-query-metrics is a command")
        .children(
          subgraphQueryOption,
          opt[String]("subgraph-pair-query-metrics-table")
            .required()
            .valueName("<table-path>")
            .action { (x, p) => p.copy(subgraphPairQueryMetrics = x) }
            .text("Output path to save per subgraph-pair query metrics with partition specs.")
        )
    }
  }
  // scalastyle:on method.length
}