SubgraphQueryMappingLauncher.scala

package org.wikidata.query.rdf.spark.transform.queries.subgraphsqueries

import scopt.OptionParser

case class SubgraphQueryMappingParams(
                                       wikidataTriples: String = "",
                                       processedQuery: String = "",
                                       topSubgraphItems: String = "",
                                       topSubgraphTriples: String = "",
                                       subgraphQItemsMatchInQuery: String = "", //out
                                       subgraphPredicatesMatchInQuery: String = "", //out
                                       subgraphUrisMatchInQuery: String = "", //out
                                       queryMapping: String = "", //out
                                       filteringLimit: Double = 99
                                     )

/**
 * Point of entry for wikidata subgraph query mapping .
 * This job maps all SPARQL queries in WDQS to one or more subgraphs they access
 * and saves parquet files in daily format.
 *
 * Command line example:
 * spark2-submit --master yarn --driver-memory 2G --executor-memory 16G --executor-cores 8 \
 * --class org.wikidata.query.rdf.spark.transform.queries.subgraphsqueries.SubgraphQueryMappingLauncher \
 * --name subgraph-query-mapper-spark \
 * --queue root.default \
 * ~akhatun/rdf-spark-tools-0.3.42-SNAPSHOT-jar-with-dependencies.jar \
 * --wikidata-table discovery.wikibase_rdf/date=20220210/wiki=wikidata \
 * --processed-query-table discovery.processed_external_sparql_query/year=2021/month=5/day=1/wiki=wikidata \
 * --top-subgraph-items-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --top-subgraph-triples-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --subgraph-qitem-match-query-table table_name/year=2021/month=5/day=1/wiki=wikidata \
 * --subgraph-predicate-match-query-table table_name/year=2021/month=5/day=1/wiki=wikidata \
 * --subgraph-uri-match-query-table table_name/year=2021/month=5/day=1/wiki=wikidata \
 * --subgraph-query-mapping-table table_name/year=2021/month=5/day=1/wiki=wikidata \
 * --filtering-limit 99
 */
object SubgraphQueryMappingLauncher {

  def main(args: Array[String]): Unit = {
    argParser.parse(args, SubgraphQueryMappingParams()) match {

      case Some(params) => SubgraphQueryMapper.extractAndSaveSubgraphQueryMapping(
        params.wikidataTriples,
        params.processedQuery,
        params.topSubgraphTriples,
        params.topSubgraphItems,
        params.filteringLimit,
        params.subgraphQItemsMatchInQuery,
        params.subgraphPredicatesMatchInQuery,
        params.subgraphUrisMatchInQuery,
        params.queryMapping
      )
      case _ => sys.exit(-1)

    }
  }

  // method has several input arguments to define
  // scalastyle:off method.length
  def argParser: OptionParser[SubgraphQueryMappingParams] = {
    new OptionParser[SubgraphQueryMappingParams]("") {
      head("Subgraph Query Mapping Launcher")
      help("help") text "Prints this usage text"

      opt[String]("wikidata-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(wikidataTriples = x) }
        .text("Table holding Wikidata snapshots with partition specs")

      opt[String]("processed-query-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(processedQuery = x) }
        .text("Table holding the parsed SPARQL queries with partition specs")

      opt[String]("top-subgraph-items-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(topSubgraphItems = x) }
        .text("Output table holding items of top subgraphs (<item> P31 <subgraph>) with partition specs")

      opt[String]("top-subgraph-triples-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(topSubgraphTriples = x) }
        .text("Output table holding all triples of top subgraphs with partition specs")

      opt[String]("subgraph-qitem-match-query-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(subgraphQItemsMatchInQuery = x) }
        .text("Output table holding query id and qitem match with subgraphs with partition specs")

      opt[String]("subgraph-predicate-match-query-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(subgraphPredicatesMatchInQuery = x) }
        .text("Output table holding query id and predicate match with subgraphs with partition specs")

      opt[String]("subgraph-uri-match-query-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(subgraphUrisMatchInQuery = x) }
        .text("Output table holding query id and uris match with subgraphs with partition specs")

      opt[String]("subgraph-query-mapping-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(queryMapping = x) }
        .text("Output table holding mapping of query id and subgraph with partition specs")

      opt[Int]("filtering-limit")
        .required()
        .valueName("<number>")
        .action { (x, p) => p.copy(filteringLimit = x) }
        .text("The percent limit used to associate an item, uri, or property to a particular subgraph. Value is in range [0,100]")
        .validate(x =>
          if (x >= 0) {
            success
          }
          else {
            failure("Option --filtering-limit must be >=0")
          })
        .validate(x =>
          if (x <= 100) {
            success
          }
          else {
            failure("Option --filtering-limit must be <=100")
          })
    }
  }
  // scalastyle:on method.length

}