SubgraphPairMetrics.scala
package org.wikidata.query.rdf.spark.metrics.subgraphs.detailed
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
object SubgraphPairMetrics {
/**
* Gets metrics per subgraph pair, that is, number of triples, items, predicates etc common in pairs of subgraphs.
*
* @param allSubgraphs expected columns: subgraph, count
* @param topSubgraphItems expected columns: subgraph, item
* @param subgraphPairTripleCount expected columns: subgraph1, subgraph2, triples_from_1_to_2, triples_from_2_to_1
* @param predicatesPerSubgraph expected columns: subgraph, predicate_code, count
* @param minItems the minimum number of items a subgraph should have to be called a top_subgraph
* @return a spark dataframe with each column as a metric per subgraph-pair.
* Expected columns: subgraph1, subgraph2, triples_from_1_to_2, triples_from_2_to_1, common_predicate_count,
* common_item_count, common_item_percent_of_subgraph1_items, common_item_percent_of_subgraph2_items
*/
def getSubgraphPairMetrics(allSubgraphs: DataFrame,
topSubgraphItems: DataFrame,
subgraphPairTripleCount: DataFrame,
predicatesPerSubgraph: DataFrame,
minItems: Long): DataFrame = {
val topSubgraphs: DataFrame = allSubgraphs.filter(col("count") >= minItems)
// (Q123,Q456) and (Q456,Q123) pairs are the same. Latter will remain.
val commonPredicates = predicatesPerSubgraph
.selectExpr("subgraph as subgraph1", "predicate_code")
.join(predicatesPerSubgraph.selectExpr("subgraph as subgraph2", "predicate_code"), Seq("predicate_code"), "inner")
.filter("subgraph1 > subgraph2") // this makes sure we have unique pairs of subgraphs
.groupBy("subgraph1", "subgraph2")
.count()
.withColumnRenamed("count", "common_predicate_count")
val commonItems = topSubgraphItems.selectExpr("subgraph as subgraph1", "item")
.join(topSubgraphItems.selectExpr("subgraph as subgraph2", "item"), Seq("item"), "inner")
.filter("subgraph1 > subgraph2") // this makes sure we have unique pairs of subgraphs
.groupBy("subgraph1", "subgraph2")
.count()
.join(topSubgraphs.selectExpr("subgraph as subgraph1", "count as subgraph1_item_count"), Seq("subgraph1"), "left")
.join(topSubgraphs.selectExpr("subgraph as subgraph2", "count as subgraph2_item_count"), Seq("subgraph2"), "left")
.withColumn("common_item_percent_of_subgraph1_items", col("count") * 100.0 / col("subgraph1_item_count"))
.withColumn("common_item_percent_of_subgraph2_items", col("count") * 100.0 / col("subgraph2_item_count"))
.drop("subgraph1_item_count", "subgraph2_item_count")
.withColumnRenamed("count", "common_item_count")
subgraphPairTripleCount
.join(commonPredicates, Seq("subgraph1", "subgraph2"), "outer")
.join(commonItems, Seq("subgraph1", "subgraph2"), "outer")
.na.fill(0)
}
}