SubgraphMappingLauncher.scala

package org.wikidata.query.rdf.spark.transform.structureddata.subgraphs

import scopt.OptionParser

case class SubgraphMappingParams(
                                  wikidataTriples: String = "", //in
                                  allSubgraphs: String = "", //out
                                  topSubgraphItems: String = "", //out
                                  topSubgraphTriples: String = "", //out
                                  minItems: Long = 10000
                                )

/**
 * Point of entry for wikidata subgraph mapping .
 * This job lists all subgraphs and maps items and triples to subgraphs in wikidata,
 * and saves parquet files in date snapshot format.
 *
 * Command line example:
 * spark2-submit --master yarn --driver-memory 2G --executor-memory 16G --executor-cores 8 \
 * --class org.wikidata.query.rdf.spark.transform.structureddata.subgraphs.SubgraphMappingLauncher \
 * --name subgraph-mapper-spark \
 * --queue root.default \
 * ~akhatun/rdf-spark-tools-0.3.42-SNAPSHOT-jar-with-dependencies.jar \
 * --wikidata-table discovery.wikibase_rdf/date=20220210/wiki=wikidata \
 * --all-subgraphs-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --top-subgraph-items-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --top-subgraph-triples-table discovery.table_name/snapshot=20220210/wiki=wikidata \
 * --min-items 10000
 */
object SubgraphMappingLauncher {

  def main(args: Array[String]): Unit = {
    argParser.parse(args, SubgraphMappingParams()) match {

      case Some(params) => SubgraphMapper.extractAndSaveSubgraphMapping(
        params.wikidataTriples,
        params.minItems,
        params.allSubgraphs,
        params.topSubgraphItems,
        params.topSubgraphTriples
      )
      case _ => sys.exit(-1)

    }
  }

  def argParser: OptionParser[SubgraphMappingParams] = {
    new OptionParser[SubgraphMappingParams]("") {
      head("Subgraph Mapping Launcher")
      help("help") text "Prints this usage text"

      opt[String]("wikidata-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(wikidataTriples = x) }
        .text("Table holding Wikidata snapshots with partition specs")

      opt[String]("all-subgraphs-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(allSubgraphs = x) }
        .text("Output table holding list of subgraphs and item counts with partition specs")

      opt[String]("top-subgraph-items-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(topSubgraphItems = x) }
        .text("Output table holding items of top subgraphs (<item> P31 <subgraph>) with partition specs")

      opt[String]("top-subgraph-triples-table")
        .required()
        .valueName("<table-path>")
        .action { (x, p) => p.copy(topSubgraphTriples = x) }
        .text("Output table holding all triples of top subgraphs with partition specs")

      opt[Long]("min-items")
        .optional()
        .action { (x, p) => p.copy(minItems = x) }
        .text("Minimum number of items a subgraph should have to be considered as a top subgraph. Defaults to 10,000")
    }
  }

}