EntityRevisionMapGenerator.scala
package org.wikidata.query.rdf.spark.transform.structureddata.dumps
import java.net.URI
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.openrdf.model.Literal
import org.wikidata.query.rdf.common.uri.{FederatedUrisScheme, SchemaDotOrg, UrisScheme, UrisSchemeFactory}
import org.wikidata.query.rdf.spark.utils.SparkUtils
import scopt.OptionParser
object EntityRevisionMapGenerator {
val schema: StructType = StructType(Seq(
StructField("entity", StringType, nullable = false),
StructField("revision", LongType, nullable = false)
))
/**
* Class handling parsed parameters
*/
case class Params(
table: String = "",
date: String = "",
outputPath: String = "",
hostname: String = "www.wikidata.org",
numPartitions: Int = 8,
urisScheme: String = "wikidata",
commonsConceptUri: Option[URI] = None,
wikidataConceptUri: Option[URI] = None
)
/**
* CLI Option Parser for job parameters (fill-in Params case class)
*/
private val argsParser = new OptionParser[Params]("") {
head("Wikidata Entity Revision map generator", "")
help("help") text "Prints this usage text"
opt[String]('t', "input-table") required() valueName "<input-table>" action { (x, p) =>
p.copy(table = x)
} text "Table-partition holding the wikidata triples"
opt[String]('u', "uris-scheme") optional() action { (x, p) =>
p.copy(urisScheme = x)
}
opt[String]('h', "hostname") optional() valueName "<hostname>" action { (x, p) =>
p.copy(hostname = x)
} text "Hostname of the rdf data"
opt[String]('o', "output-path") required() valueName "<output-path>" action { (x, p) =>
p.copy(outputPath = if (x.endsWith("/")) x.dropRight(1) else x)
} text "Path to output cvs file"
opt[Int]('n', "num-partitions") optional() action { (x, p) =>
p.copy(numPartitions = x)
} text "Number of partitions to use (output files). Defaults to 100"
opt[String]("commons-concept-uri") optional() action { (x, p) =>
p.copy(commonsConceptUri = Some(URI.create(x)))
} text "Overrides uri for commons"
opt[String]("wikidata-concept-uri") optional() action { (x, p) =>
p.copy(wikidataConceptUri = Some(URI.create(x)))
} text "Overrides uri for wikidata"
}
def main(args: Array[String]): Unit = {
argsParser.parse(args, Params()) match {
case Some(params) =>
val spark = SparkSession
.builder()
.getOrCreate()
// Make spark read text with dedicated separator instead of end-of-line
generateMap(spark, params.table, params.numPartitions,
params.outputPath, urisScheme(params.urisScheme, params.hostname, params.commonsConceptUri, params.wikidataConceptUri))
case None => sys.exit(1) // If args parsing fail (parser prints nice error)
}
}
private def urisScheme(urisScheme: String, hostname: String, commonsConceptUri: Option[URI], wikidataConceptUri: Option[URI])(): UrisScheme = {
urisScheme match {
case "commons" => new FederatedUrisScheme(
UrisSchemeFactory.forCommons(commonsConceptUri.getOrElse(UrisSchemeFactory.commonsUri(hostname))),
UrisSchemeFactory.forWikidata(wikidataConceptUri.getOrElse(UrisSchemeFactory.wikidataUri(UrisSchemeFactory.WIKIDATA_HOSTNAME))))
case "wikidata" =>
UrisSchemeFactory.forWikidata(wikidataConceptUri.getOrElse(UrisSchemeFactory.wikidataUri(hostname)))
case _ => throw new IllegalArgumentException(s"Unknown uris_scheme: $urisScheme")
}
}
def generateMap(implicit spark: SparkSession,
tableAndPartitionSpec: String,
numPartitions: Int,
outputPath: String,
urisSchemeProvider: () => UrisScheme
): Unit = {
val versionPredicate = new StatementEncoder().encodeURI(SchemaDotOrg.VERSION)
val df = SparkUtils.readTablePartition(tableAndPartitionSpec)
val entityRevDf = spark.createDataFrame(
df
.filter(df("predicate") === versionPredicate)
.rdd
.map(r => {
val encoder = new StatementEncoder()
val statement = encoder.decode(r)
val entity: String = urisSchemeProvider().entityURItoId(statement.getSubject.toString)
val rev: Long = statement.getObject match {
case e: Literal => e.longValue()
case _: Any => -1 // We should probably fail?
}
Row.fromTuple(entity, rev)
}), schema)
entityRevDf
// repartition so that the caller can control from the command line how many files will be generated
// in the scenario these files need to be transferred out of hdfs
.coalesce(numPartitions)
.write
.format("csv")
// force bzip2, flink is currently unable to detect&use the snappy codec
.option("compression", "bzip2")
.save(outputPath)
}
}