TopPathsPerSubgraph.scala
package org.wikidata.query.rdf.spark.metrics.queries.subgraphs
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions.{col, explode, row_number}
import org.wikidata.query.rdf.spark.utils.SubgraphUtils.sparkDfColumnsToMap
object TopPathsPerSubgraph {
/** Gets the number of times the topN paths are used for queries in each subgraph.
*
* @param topN Number of top paths to extract.
* @param subgraphQueriesInfo all subgraphQueries and their info from processedQueries.
* Expected columns: id, subgraph, query, query_time, query_time_class, ua, q_info
* @param subgraphWindow Window.partitionBy("subgraph").orderBy(desc("count"))
* @return spark dataframe with columns: subgraph, path_counts: map< string, bigint >
*/
def getTopPaths(topN: Long, subgraphQueriesInfo: DataFrame, subgraphWindow: WindowSpec): DataFrame = {
sparkDfColumnsToMap(
subgraphQueriesInfo
.select(col("subgraph"), explode(col("q_info.triples")).as("triple"))
.filter(col("triple.predicateNode.nodeType").startsWith("PATH"))
.selectExpr("subgraph", "triple.predicateNode.nodeValue as path")
.groupBy("subgraph", "path")
.count()
.withColumn("rank", row_number().over(subgraphWindow))
.filter(col("rank") <= topN),
"path",
"count",
"path_counts",
List("subgraph")
)
}
}