PerSubgraphQueryMetrics.scala
package org.wikidata.query.rdf.spark.metrics.queries.subgraphs
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.desc
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.SubgraphQueriesBasicMetrics.getSubgraphQueriesBasicMetrics
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.SubgraphQueryComposition.getSubgraphQueryComposition
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.SubgraphUaMetrics.getPerSubgraphUaMetrics
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.TopPathsPerSubgraph.getTopPaths
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.TopServicesPerSubgraph.getTopServices
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.TopSubgraphItems.getSubgraphItemsInfo
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.TopSubgraphProperties.getSubgraphPropertiesInfo
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.TopSubgraphUris.getSubgraphUrisInfo
import org.wikidata.query.rdf.spark.utils.SubgraphUtils.{sparkDfColumnsToListOfStruct, sparkDfColumnsToMap}
object PerSubgraphQueryMetrics {
/** Gets per subgraph query metrics.
* Calls getSubgraphQueriesBasicMetrics, getSubgraphQueryComposition, getPerSubgraphUaMetrics, getTopPaths,
* getTopServices, getSubgraphItemsInfo, getSubgraphPropertiesInfo, getSubgraphUrisInfo
*
* @param totalTime total query time for all processed queries
* @param processedQueryCount total number of processed queries
* @param uaCount total number of unique user-agents
* @param numOfSubgraphsPerQuery Number of Queries that access `X` Number of Subgraphs together.
* Expected columns: id, subgraph_count
* @param subgraphQueriesInfo all subgraphQueries and their info from processedQueries.
* Expected columns: id, subgraph, query, query_time, query_time_class, ua, q_info
* @param subgraphQueries mapping of query to subgraph. Expected columns: id, subgraph, qid, item, predicate, uri, literal
* @param subgraphQItemsMatchInQuery List of queries that matched with subgraphs due to an item.
* Expected columns: id, subgraph, item
* @param subgraphPredicatesMatchInQuery List of queries that matched with subgraphs due to a predicate.
* Expected columns: id, subgraph, predicate_code
* @param subgraphUrisMatchInQuery List of queries that matched with subgraphs due to a URI.
* Expected columns: id, subgraph, uri
* @param topN Number of top item, predicates, uris etc to save.
* @return spark dataframe with columns: subgraph, query_count, query_time, ua_count, query_type, percent_query_count,
* percent_query_time, percent_ua_count, query_count_rank, query_time_rank, avg_query_time, qid_count, item_count,
* pred_count, uri_count, literal_count, query_time_class_counts, ua_info, ua_query_count_percentiles,
* ua_query_count_mean, subgraph_composition, query_only_accessing_this_subgraph, top_items, matched_items_percentiles,
* matched_items_mean, top_predicates, matched_predicates_percentiles, matched_predicates_mean, top_uris,
* matched_uris_percentiles, matched_uris_mean, service_counts, path_counts
*/
// method has multiple small pieces, cannot be split further
// scalastyle:off method.length
// method also requires all the inputs and are separated for readability
// scalastyle:off parameter.number
def getPerSubgraphQueryMetrics(totalTime: Long,
processedQueryCount: Long,
uaCount: Long,
numOfSubgraphsPerQuery: DataFrame,
subgraphQueriesInfo: DataFrame,
subgraphQueries: DataFrame,
subgraphQItemsMatchInQuery: DataFrame,
subgraphPredicatesMatchInQuery: DataFrame,
subgraphUrisMatchInQuery: DataFrame,
topN: Long): DataFrame = {
var perSubgraphQueryMetrics = getSubgraphQueriesBasicMetrics(totalTime, processedQueryCount, uaCount, subgraphQueriesInfo)
val subgraphQueryCompositionCount = getSubgraphQueryComposition(subgraphQueries)
// Query Time classes
val queryTimeClass = sparkDfColumnsToMap(
subgraphQueriesInfo
.groupBy("subgraph", "query_time_class")
.count(),
"query_time_class",
"count",
"query_time_class_counts",
List("subgraph")
)
// UA usage distribution
val (perSubgraphUaMetrics, perSubgraphUAQueryDistribution) = getPerSubgraphUaMetrics(subgraphQueriesInfo, topN, perSubgraphQueryMetrics)
// Composition of queries in each subgraph
val subgraphQueryComposition = sparkDfColumnsToListOfStruct(
subgraphQueries
.groupBy("subgraph", "item", "predicate", "uri", "qid", "literal")
.count(),
List("item", "predicate", "uri", "qid", "literal", "count"),
"subgraph_composition",
List("subgraph")
)
val queriesOnlyInThisSubgraphsVsWithOthers = numOfSubgraphsPerQuery
.filter("subgraph_count == 1")
.join(subgraphQueries, Seq("id"), "left")
.groupBy("subgraph")
.count()
.withColumnRenamed("count", "query_only_accessing_this_subgraph")
val subgraphWindow = Window.partitionBy("subgraph").orderBy(desc("count"))
val (topMatchedItems: DataFrame, matchedItemsDistribution: DataFrame) = getSubgraphItemsInfo(subgraphQItemsMatchInQuery, topN, subgraphWindow)
val (topMatchedPreds: DataFrame, matchedPredsDistribution: DataFrame) = getSubgraphPropertiesInfo(subgraphPredicatesMatchInQuery, topN, subgraphWindow)
val (topMatchedUris: DataFrame, matchedUrisDistribution: DataFrame) = getSubgraphUrisInfo(subgraphUrisMatchInQuery, topN, subgraphWindow)
val services: DataFrame = getTopServices(subgraphQueriesInfo)
val topPaths: DataFrame = getTopPaths(topN, subgraphQueriesInfo, subgraphWindow)
perSubgraphQueryMetrics = perSubgraphQueryMetrics
.join(subgraphQueryCompositionCount, Seq("subgraph"), "outer")
.join(queryTimeClass, Seq("subgraph"), "outer")
.join(perSubgraphUaMetrics, Seq("subgraph"), "outer")
.join(perSubgraphUAQueryDistribution, Seq("subgraph"), "outer")
.join(subgraphQueryComposition, Seq("subgraph"), "outer")
.join(queriesOnlyInThisSubgraphsVsWithOthers, Seq("subgraph"), "outer")
.join(topMatchedItems, Seq("subgraph"), "outer")
.join(matchedItemsDistribution, Seq("subgraph"), "outer")
.join(topMatchedPreds, Seq("subgraph"), "outer")
.join(matchedPredsDistribution, Seq("subgraph"), "outer")
.join(topMatchedUris, Seq("subgraph"), "outer")
.join(matchedUrisDistribution, Seq("subgraph"), "outer")
.join(services, Seq("subgraph"), "outer")
.join(topPaths, Seq("subgraph"), "outer")
perSubgraphQueryMetrics
}
// scalastyle:on parameter.number
// scalastyle:on method.length
}