QueryTimeClassBySubgraphDist.scala
package org.wikidata.query.rdf.spark.metrics.queries.subgraphs
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit, when}
import org.wikidata.query.rdf.spark.utils.SubgraphUtils.{sparkDfColumnsToListOfStruct, sparkDfColumnsToMap}
object QueryTimeClassBySubgraphDist {
/**
* @param subgraphQueries mapping of query to subgraph. Expected columns: id, subgraph, qid, item, predicate, uri, literal
* @param processedQueries parsed SPARQL queries. Expected columns: id, query, query_time, query_time_class, ua, q_info
* @param subgraphQueriesInfo all subgraphQueries and their info from processedQueries.
* Expected columns: id, subgraph, query, query_time, query_time_class, ua, q_info
* @param numOfSubgraphsPerQuery number of Queries that access `X` Number of Subgraphs together. Expected columns: id, subgraph_count
* @return spark dataframe. Expected columns:
* query_time_class_subgraph_dist: array< struct< subgraph_count: bigint, query_time_class: map< string, bigint> > >
*/
def getQueryTimeClassBySubgraphDist(subgraphQueries: DataFrame,
processedQueries: DataFrame,
subgraphQueriesInfo: DataFrame,
numOfSubgraphsPerQuery: DataFrame): DataFrame = {
// Number of subgraphs vs query time class
// Join all >5 subgraphs
val queryTimeClassByNumSubgraphs = numOfSubgraphsPerQuery
.join(subgraphQueriesInfo, Seq("id"), "left")
.withColumn(
"subgraph_count",
when(col("subgraph_count") >= 5, "4+").otherwise(col("subgraph_count"))
)
.groupBy("subgraph_count", "query_time_class")
.count()
val queryTimeClassOfNonSubgraphQueries = processedQueries
.join(subgraphQueries, Seq("id"), "leftanti")
.groupBy("query_time_class")
.count()
.withColumn("subgraph_count", lit("n/a"))
// join the above two results
// subgraph_count: 1,2,3,4,4+,n/a
val queryTimeClassBySubgraphDist = queryTimeClassOfNonSubgraphQueries
.unionByName(queryTimeClassByNumSubgraphs)
val queryTimeClassBySubgraphDistAsStruct = sparkDfColumnsToListOfStruct(
sparkDfColumnsToMap(
queryTimeClassBySubgraphDist,
"query_time_class",
"count",
"query_time_class",
List("subgraph_count")
),
List("subgraph_count", "query_time_class"),
"query_time_class_subgraph_dist",
List()
)
queryTimeClassBySubgraphDistAsStruct
}
}