SubgraphQueryMapper.scala
package org.wikidata.query.rdf.spark.transform.queries.subgraphsqueries
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{col, lit}
import org.wikidata.query.rdf.spark.utils.SparkUtils
import org.wikidata.query.rdf.spark.utils.SubgraphUtils.extractItem
class SubgraphQueryMapper() {
/**
* Gets the query mapping given the necessary intermediate calculated dataframes.
*
* @param queryWDnames expected dataframe columns: id, item
* @param queryUri expected dataframe columns: id, item, uri
* @param queryLiterals expected dataframe columns: id, item, literal
* @param selectedPredicates expected dataframe columns: subgraph, predicate_code
* @param selectedUris expected dataframe columns: subgraph, uri
* @param selectedLiterals expected dataframe columns: subgraph, literal
* @param topSubgraphItems expected dataframe columns: subgraph, item
* @return four spark DataFrame containing the query-mapping and some intermediate results
* - subgraphQItemsMatchInQuery: Expected columns: id, subgraph, item
* - subgraphPredicatesMatchInQuery: Expected columns: id, subgraph, predicate_code
* - subgraphUrisMatchInQuery: Expected columns: id, subgraph, uri
* - queryMapping: mapping of query to subgraph. Expected columns: id, subgraph, qid, item, predicate, uri, literal
*/
// method works on multiple inputs, hence the length
// scalastyle:off method.length
def getQueryMapping(queryWDnames: DataFrame,
queryUri: DataFrame,
queryLiterals: DataFrame,
selectedPredicates: DataFrame,
selectedUris: DataFrame,
selectedLiterals: DataFrame,
topSubgraphItems: DataFrame): (DataFrame, DataFrame, DataFrame, DataFrame) = {
// Q-ID (cols: id, subgraph, item)
val subgraphNames = topSubgraphItems
.withColumn("item", extractItem(col("subgraph"), lit("/")))
.distinct()
val subgraphQidMatchInQuery = queryWDnames
.join(subgraphNames, Seq("item"), "inner")
.drop("item")
.distinct()
// Instance ITEMS (cols: id, subgraph, item)
val subgraphQItemsMatchInQuery = topSubgraphItems
.withColumn("item", extractItem(col("item"), lit("/")))
.join(queryWDnames, Seq("item"), "inner")
.distinct()
// PREDICATES (cols: id, subgraph, predicate_code)
val subgraphPredicatesMatchInQuery = queryWDnames
.join(selectedPredicates, queryWDnames("item") === selectedPredicates("predicate_code"), "inner")
.drop("item")
.distinct()
// SUB/OBJ (cols: id, subgraph, uri)
val subgraphUrisMatchInQuery = queryUri
.join(selectedUris, Seq("uri"), "inner")
.distinct()
// LITERALS (cols: id, subgraph, literal)
val subgraphLiteralsMatchInQuery = queryLiterals
.join(selectedLiterals, Seq("literal"), "inner")
.distinct()
// TOTAL QUERIES
val queryMapping = subgraphQItemsMatchInQuery
.select(col("id"), col("subgraph"),
lit(true).alias("item"))
.join(subgraphPredicatesMatchInQuery.select(col("id"), col("subgraph"),
lit(true).alias("predicate")).distinct(), Seq("id", "subgraph"), "outer")
.join(subgraphUrisMatchInQuery.select(col("id"), col("subgraph"),
lit(true).alias("uri")).distinct(), Seq("id", "subgraph"), "outer")
.join(subgraphQidMatchInQuery.select(col("id"), col("subgraph"),
lit(true).alias("qid")).distinct(), Seq("id", "subgraph"), "outer")
.join(subgraphLiteralsMatchInQuery.select(col("id"), col("subgraph"),
lit(true).alias("literal")).distinct(), Seq("id", "subgraph"), "outer")
.na.fill(false)
.distinct()
(subgraphQItemsMatchInQuery, subgraphPredicatesMatchInQuery, subgraphUrisMatchInQuery, queryMapping)
}
// scalastyle:on method.length
}
object SubgraphQueryMapper {
/**
* Reads input tables with spark, gets subgraph-query-mapping, and saves the output tables.
* Calls getSubgraphQueryMapping(...)
*/
// entry point method takes in all parameters for subsequent functions
// scalastyle:off parameter.number
def extractAndSaveSubgraphQueryMapping(wikidataTriplesPath: String,
processedQueryPath: String,
topSubgraphTriplesPath: String,
topSubgraphItemsPath: String,
filteringLimit: Double,
subgraphQItemsMatchInQueryPath: String,
subgraphPredicatesMatchInQueryPath: String,
subgraphUrisMatchInQueryPath: String,
queryMappingPath: String): Unit = {
implicit val spark: SparkSession = SparkUtils.getSparkSession("SubgraphQueryMapper")
val (
subgraphQItemsMatchInQuery,
subgraphPredicatesMatchInQuery,
subgraphUrisMatchInQuery,
queryMapping
) = getSubgraphQueryMapping(
SparkUtils.readTablePartition(wikidataTriplesPath),
SparkUtils.readTablePartition(processedQueryPath),
SparkUtils.readTablePartition(topSubgraphTriplesPath),
SparkUtils.readTablePartition(topSubgraphItemsPath),
filteringLimit
)
SparkUtils.saveTables(
List(
subgraphQItemsMatchInQuery.repartition(25), // file size ~2.5G
subgraphPredicatesMatchInQuery.repartition(4), // file size ~220Mb
subgraphUrisMatchInQuery.repartition(16), // file size ~1.7G
queryMapping.repartition(8) // file size ~550Mb
) zip List(
subgraphQItemsMatchInQueryPath,
subgraphPredicatesMatchInQueryPath,
subgraphUrisMatchInQueryPath,
queryMappingPath
)
)
}
// scalastyle:on parameter.number
/**
* Get subgraph query mapping using wikidata triples and the parsed queries.
* Calls the util functions in SubgraphQueryMapperUtils and finally gets the query mapping
* using subgraphQueryMapper.getQueryMapping(...)
*
* @return spark DataFrame containing the query-mapping and three intermediate results table.
*/
def getSubgraphQueryMapping(wikidataTriples: DataFrame,
processedQueries: DataFrame,
topSubgraphTriples: DataFrame,
topSubgraphItems: DataFrame,
filteringLimit: Double): (DataFrame, DataFrame, DataFrame, DataFrame) = {
val subgraphQueryMapper = new SubgraphQueryMapper()
val subgraphQueryMapperUtils = new SubgraphQueryMapperUtils(wikidataTriples, processedQueries, topSubgraphTriples, filteringLimit)
val queryWDnames = subgraphQueryMapperUtils.getQueryWDnames()
val queryLiterals = subgraphQueryMapperUtils.getQueryLiterals()
val queryUri = subgraphQueryMapperUtils.getQueryURIs()
val selectedPredicates = subgraphQueryMapperUtils.getSelectedPredicates()
val wikidataNodeMetrics = subgraphQueryMapperUtils.getWikidataNodeMetrics()
val subgraphNodeMetrics = subgraphQueryMapperUtils.getSubgraphNodeMetrics()
val selectedUris = subgraphQueryMapperUtils.getSelectedURIs(wikidataNodeMetrics, subgraphNodeMetrics)
val selectedLiterals = subgraphQueryMapperUtils.getSelectedLiterals(wikidataNodeMetrics, subgraphNodeMetrics)
val (
subgraphQItemsMatchInQuery,
subgraphPredicatesMatchInQuery,
subgraphUrisMatchInQuery,
queryMapping
) = subgraphQueryMapper.getQueryMapping(
queryWDnames,
queryUri,
queryLiterals,
selectedPredicates,
selectedUris,
selectedLiterals,
topSubgraphItems
)
(subgraphQItemsMatchInQuery, subgraphPredicatesMatchInQuery, subgraphUrisMatchInQuery, queryMapping)
}
}