GeneralSubgraphQueryMetrics.scala

package org.wikidata.query.rdf.spark.metrics.queries.subgraphs

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.countDistinct
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.NumOfSubgraphsPerQueryDist.getNumOfSubgraphsPerQueryDist
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.QueryTimeClassBySubgraphDist.getQueryTimeClassBySubgraphDist
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.UaSubgraphCounts.getUaSubgraphCounts

object GeneralSubgraphQueryMetrics {

  /**
   * Extracts aggregated subgraphs query metrics like distribution of user-agent count, query count etc.
   * Calls getUaSubgraphCounts, getNumOfSubgraphsPerQueryDist, and getQueryTimeClassBySubgraphDist methods
   *
   * @param processedQueries parsed SPARQL queries. Expected columns: id, query, query_time, query_time_class, ua, q_info
   * @param subgraphQueries  mapping of query to subgraph. Expected columns: id, subgraph, qid, item, predicate, uri, literal
   * @return tuple of the following dataframes:
   *         - numOfSubgraphsPerQuery: Number of subgraphs each query accesses
   *           Expected columns: id, subgraph_count
   *         - subgraphQueriesInfo: all subgraphQueries (queries mapped to subgraphs) and their info from processedQueries
   *           Expected columns: id, subgraph, query, query_time, query_time_class, ua, q_info
   *         - generalSubgraphQueryMetrics: spark dataframe with each column containing an aggregated metric on subgraph queries
   *           Expected columns: total_subgraph_query_count, ua_subgraph_dist, query_subgraph_dist, query_time_class_subgraph_dist
   */
  def getGeneralSubgraphQueryMetrics(processedQueries: DataFrame,
                                     subgraphQueries: DataFrame)(implicit spark: SparkSession): (DataFrame, DataFrame, DataFrame) = {

    val subgraphQueryCount = subgraphQueries.select("id").distinct().count()

    val subgraphQueriesInfo = subgraphQueries.join(processedQueries, Seq("id"), "left")

    // UA vs Number of subgraphs accessed (How many UA access how many subgraphs?)
    val uaSubgraphCounts = getUaSubgraphCounts(subgraphQueriesInfo)

    // Number of Queries that access `X` Number of Subgraphs together.
    // This gives: How many Queries use only 1 subgraph, how many use 2, more than 2 etc
    val numOfSubgraphsPerQuery = subgraphQueries
      .groupBy("id")
      .agg(countDistinct("subgraph").as("subgraph_count"))

    val numOfSubgraphsPerQueryDist = getNumOfSubgraphsPerQueryDist(numOfSubgraphsPerQuery)

    val queryTimeClassBySubgraphAccess: DataFrame = getQueryTimeClassBySubgraphDist(
      subgraphQueries,
      processedQueries,
      subgraphQueriesInfo,
      numOfSubgraphsPerQuery)

    val generalSubgraphQueryMetrics = spark.createDataFrame(Seq(Tuple1(subgraphQueryCount))).toDF("total_subgraph_query_count")
      .crossJoin(uaSubgraphCounts)
      .crossJoin(numOfSubgraphsPerQueryDist)
      .crossJoin(queryTimeClassBySubgraphAccess)

    (numOfSubgraphsPerQuery, subgraphQueriesInfo, generalSubgraphQueryMetrics)
  }
}