ScholarlyArticleSplit.scala
package org.wikidata.query.rdf.spark.transform.structureddata.dumps
import org.apache.spark.sql.SparkSession
import org.wikidata.query.rdf.spark.utils.SparkUtils
import org.wikidata.query.rdf.tool.subgraph.{SubgraphDefinitions, SubgraphDefinitionsParser}
import scopt.OptionParser
import scala.collection.JavaConverters._
/**
* This job creates a partitioned table with scholarly article-related
* triples from Wikidata in one partition and everything else in another.
* There is technically some level of duplication between the partitions,
* because there are common Value and Reference types associated with both.
*
* The job depends upon WikibaseRDFDumpConverter having successfully
* imported Wikidata triples.
*
* Command line example:
* spark3-submit \
* --master yarn \
* --driver-memory 16G \
* --executor-memory 12G \
* --executor-cores 4 \
* --conf spark.driver.cores=2 \
* --conf spark.executor.memoryOverhead=4g \
* --conf spark.sql.shuffle.partitions=512 \
* --conf spark.dynamicAllocation.maxExecutors=128 \
* --conf spark.sql.autoBroadcastJoinThreshold=-1 \
* --conf spark.yarn.maxAppAttempts=1 \
* --class org.wikidata.query.rdf.spark.transform.structureddata.dumps.ScholarlyArticleSplit \
* --name scholarly-article-spark \
* ~yourusername/rdf-spark-tools-0.3.137-SNAPSHOT-jar-with-dependencies.jar \
* --input-table-partition-spec discovery.wikibase_rdf/date=20231106/wiki=wikidata
* --output-table-partition-spec yourusername.wikibase_rdf_scholarly_split/snapshot=20231106/wiki=wikidata
*/
case class ScholarlyArticleSplitParams(
inputPartition: String = "",
outputPartitionParent: String = "",
subgraphDefinitions: SubgraphDefinitions = ScholarlyArticleSplit.NULL_SUBGRAPH_DEF,
subgraphs: List[String] = List.empty
)
object ScholarlyArticleSplit {
val V1_SUBGRAPHS: List[String] = List("scholarly_articles", "wikidata_main")
val V1_SUBGRAPH_DEFINITIONS: String = "wdqs-subgraph-definitions-v1"
val NULL_SUBGRAPH_DEF: SubgraphDefinitions = new SubgraphDefinitions(List.empty.asJava)
implicit val sparkSession: SparkSession = {
SparkUtils.getSparkSession("ScholarlyArticleSplitWorker")
}
/**
* Main method, parsing args and launching the partitioning
*
* @param args the arguments to parse
*/
def main(args: Array[String]): Unit = {
parseParams(args) match {
case Some(params) => split(params)
case _ => sys.exit(-1)
}
}
def split(params: ScholarlyArticleSplitParams): Unit = {
ScholarlyArticleSplitter.splitIntoPartitions(params)
}
/**
* CLI Option Parser for job parameters (fill-in Params case class)
*/
private def argsParser: OptionParser[ScholarlyArticleSplitParams] = {
new OptionParser[ScholarlyArticleSplitParams]("") {
head("Wikidata Scholarly Article Split", "")
help("help") text "Prints this usage text"
opt[String]('i', "input-table-partition-spec") required() valueName "<input-table-partition-spec>" action { (x, p) =>
p.copy(inputPartition = x)
} text "Input partition as source_database_name.source_table_name/date=YYYYMMDD/wiki=wikidata"
opt[String]('o', "output-table-partition-spec") required() valueName "<output-table-partition-spec>" action { (x, p) =>
p.copy(outputPartitionParent = x)
} text "Output partition parent as target_database_name.target_table_name/snapshot=YYYYMMDD/wiki=wikidata"
opt[String]('s', "subgraph-definitions") optional() valueName "<subgraphs-definition>" action { (x, p) =>
p.copy(subgraphDefinitions = loadSubGraphDefinitions(x))
} text "Name of the subgraph definition strategy"
opt[Seq[String]]('n', "subgraph-names") optional() valueName "<subgraph-name-1>,<subgraph-name-2>" action { (x, p) =>
p.copy(subgraphs = x.toList)
} text "List of subgraph names to extract"
}
}
def parseParams(args: Array[String]): Option[ScholarlyArticleSplitParams] =
argsParser.parse(args, ScholarlyArticleSplitParams()) match {
case Some(params) =>
if (params.inputPartition.isEmpty) {
Console.err.print("--input-table-partition-spec must be provided\n")
None
} else if (params.outputPartitionParent.isEmpty) {
Console.err.print("--output-table-partition-spec must be provided\n")
None
} else if (params.subgraphDefinitions == NULL_SUBGRAPH_DEF) {
Some(params.copy(subgraphDefinitions = loadSubGraphDefinitions(V1_SUBGRAPH_DEFINITIONS), subgraphs = V1_SUBGRAPHS))
} else if (params.subgraphs.isEmpty) {
Console.err.print("--subgraph-names must be provided\n")
None
} else {
Some(params)
}
case _ => None
}
def loadSubGraphDefinitions(strategy: String): SubgraphDefinitions = {
SubgraphDefinitionsParser.parseYaml(classOf[SubgraphDefinitionsParser].getResourceAsStream(s"/$strategy.yaml"))
}
}