SubgraphUaMetrics.scala

package org.wikidata.query.rdf.spark.metrics.queries.subgraphs

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.wikidata.query.rdf.spark.utils.SubgraphUtils.{getPercentileExpr, sparkDfColumnsToListOfStruct}

object SubgraphUaMetrics {

  /** Gets per subgraph topN user-agent list and corresponding metrics.
   *
   * @param subgraphQueriesInfo     all subgraphQueries and their info from processedQueries.
   *                                Expected columns: id, subgraph, query, query_time, query_time_class, ua, q_info
   * @param topN                    Number of top user-agents' information to extract.
   * @param perSubgraphQueryMetrics spark dataframe with columns: subgraph, query_count, query_time, ua_count, query_type, percent_query_count,
   *                                percent_query_time, percent_ua_count, query_count_rank, query_time_rank, avg_query_time
   * @return spark dataframe for user-agent metrics with columns:
   *         - subgraph
   *         - ua_info: array< struct<
   *           ua_rank, ua_query_count, ua_query_time, ua_query_type, ua_query_percent,
   *           ua_query_time_percent, ua_avg_query_time, ua_query_type_percent> >
   * @return spark dataframe with aggregate user-agent query count stats with columns:
   *         - ua_query_count_percentiles
   *         - ua_query_count_mean
   */
  def getPerSubgraphUaMetrics(subgraphQueriesInfo: DataFrame, topN: Long, perSubgraphQueryMetrics: DataFrame): (DataFrame, DataFrame) = {

    // distinct done here to make sure query-time-sum is done for distinct queries
    var perSubgraphUaMetrics = subgraphQueriesInfo
      .dropDuplicates("subgraph", "ua", "id")
      .groupBy("subgraph", "ua")
      .agg(
        count("id").as("ua_query_count"),
        sum("query_time").as("ua_query_time"),
        countDistinct("q_info.opList").as("ua_query_type")
      )
      .join(
        perSubgraphQueryMetrics.select("subgraph", "query_count", "query_time", "query_type"),
        Seq("subgraph"),
        "inner"
      )
      .withColumn("ua_query_percent", col("ua_query_count") * 100.0 / col("query_count"))
      .withColumn("ua_query_time_percent", col("ua_query_time") * 100.0 / col("query_time"))
      .withColumn("ua_avg_query_time", col("ua_query_time") / col("ua_query_count"))
      .withColumn("ua_query_type_percent", col("ua_query_type") * 100.0 / col("query_type"))
      .drop("query_count", "query_time", "query_type")

    val perSubgraphUAQueryDistribution = perSubgraphUaMetrics
      .groupBy("subgraph")
      .agg(
        expr(getPercentileExpr("ua_query_count", "ua_query_count_percentiles")),
        expr("mean(ua_query_count) as ua_query_count_mean")
      )

    val subgraphWindow = Window.partitionBy("subgraph").orderBy(desc("ua_query_count"))

    perSubgraphUaMetrics = sparkDfColumnsToListOfStruct(
      perSubgraphUaMetrics
        .withColumn("ua_rank", row_number().over(subgraphWindow))
        .filter(col("ua_rank") <= topN),
      List("ua_rank", "ua_query_count", "ua_query_time", "ua_query_type", "ua_query_percent",
        "ua_query_time_percent", "ua_avg_query_time", "ua_query_type_percent"),
      "ua_info",
      List("subgraph")
    )

    (perSubgraphUaMetrics, perSubgraphUAQueryDistribution)
  }
}