DetailedSubgraphMetricsExtractor.scala
package org.wikidata.query.rdf.spark.metrics.subgraphs.detailed
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.wikidata.query.rdf.spark.metrics.subgraphs.detailed.PerSubgraphMetrics.getPerSubgraphMetrics
import org.wikidata.query.rdf.spark.metrics.subgraphs.detailed.PredicatesPerSubgraph.getPredicatesPerSubgraph
import org.wikidata.query.rdf.spark.metrics.subgraphs.detailed.SubgraphPairMetrics.getSubgraphPairMetrics
import org.wikidata.query.rdf.spark.metrics.subgraphs.detailed.SubgraphPairTriples.getSubgraphPairTriples
import org.wikidata.query.rdf.spark.utils.SparkUtils
object DetailedSubgraphMetricsExtractor {
/**
* Reads input tables, calls getDetailedSubgraphMetrics(...) to extract subgraph metrics, and saves output tables
*/
def extractAndSaveDetailedSubgraphMetrics(topSubgraphItemsPath: String,
topSubgraphTriplesPath: String,
generalSubgraphMetricsPath: String,
allSubgraphsPath: String,
minItems: Long,
perSubgraphMetricsPath: String,
subgraphPairMetricsPath: String): Unit = {
implicit val spark: SparkSession = SparkUtils.getSparkSession("DetailedSubgraphMetricsExtractor")
val (perSubgraphMetrics, subgraphPairMetrics) = getDetailedSubgraphMetrics(
SparkUtils.readTablePartition(generalSubgraphMetricsPath),
SparkUtils.readTablePartition(topSubgraphItemsPath),
SparkUtils.readTablePartition(topSubgraphTriplesPath),
SparkUtils.readTablePartition(allSubgraphsPath),
minItems
)
SparkUtils.saveTables(List(
perSubgraphMetrics.coalesce(1), // file size ~2Mb
subgraphPairMetrics.coalesce(1) // file size ~9Mb
) zip List(perSubgraphMetricsPath, subgraphPairMetricsPath))
}
/**
* Extracts subgraphs metrics in details: per subgraph metrics and per subgraph-pair metrics.
* Calls getSubgraphPairTriples, getPredicatesPerSubgraph, getPerSubgraphMetrics, and getSubgraphPairMetrics methods
*/
def getDetailedSubgraphMetrics(generalSubgraphMetrics: DataFrame,
topSubgraphItems: DataFrame,
topSubgraphTriples: DataFrame,
allSubgraphs: DataFrame,
minItems: Long): (DataFrame, DataFrame) = {
val (subgraphPairTripleCount, fromSubgraphTripleCount, toSubgraphTripleCount) = getSubgraphPairTriples(topSubgraphItems, topSubgraphTriples)
val predicatesPerSubgraph = getPredicatesPerSubgraph(topSubgraphTriples)
val perSubgraphMetrics = getPerSubgraphMetrics(
predicatesPerSubgraph,
fromSubgraphTripleCount,
toSubgraphTripleCount,
topSubgraphTriples,
generalSubgraphMetrics)
val subgraphPairMetrics = getSubgraphPairMetrics(
allSubgraphs,
topSubgraphItems,
subgraphPairTripleCount,
predicatesPerSubgraph,
minItems)
(perSubgraphMetrics, subgraphPairMetrics)
}
}