SubgraphQueryComposition.scala
package org.wikidata.query.rdf.spark.metrics.queries.subgraphs
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.countDistinct
object SubgraphQueryComposition {
/** Gets per subgraph query composition, that is, counts of the reasons why queries matched with subgraphs.
*
* @param subgraphQueries mapping of query to subgraph. Expected columns: id, subgraph, qid, item, predicate, uri, literal
* @return spark dataframe with columns: subgraph, qid_count, item_count, pred_count, uri_count, literal_count
*/
def getSubgraphQueryComposition(subgraphQueries: DataFrame): DataFrame = {
val perSubgraphQueryByQID = subgraphQueries
.filter("qid == 1")
.groupBy("subgraph")
.agg(countDistinct("id").as("qid_count"))
val perSubgraphQueryByItem = subgraphQueries
.filter("item == 1")
.groupBy("subgraph")
.agg(countDistinct("id").as("item_count"))
val perSubgraphQueryByPredicate = subgraphQueries
.filter("predicate == 1")
.groupBy("subgraph")
.agg(countDistinct("id").as("pred_count"))
val perSubgraphQueryByUri = subgraphQueries
.filter("uri == 1")
.groupBy("subgraph")
.agg(countDistinct("id").as("uri_count"))
val perSubgraphQueryByLiteral = subgraphQueries
.filter("literal == 1")
.groupBy("subgraph")
.agg(countDistinct("id").as("literal_count"))
perSubgraphQueryByQID
.join(perSubgraphQueryByItem, Seq("subgraph"), "outer")
.join(perSubgraphQueryByPredicate, Seq("subgraph"), "outer")
.join(perSubgraphQueryByUri, Seq("subgraph"), "outer")
.join(perSubgraphQueryByLiteral, Seq("subgraph"), "outer")
.na.fill(0)
}
}