GeneralQueryMetrics.scala

package org.wikidata.query.rdf.spark.metrics.queries.subgraphs

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.wikidata.query.rdf.spark.utils.SubgraphUtils.sparkDfColumnsToMap

object GeneralQueryMetrics {

  /**
   * Extracts aggregated SPARQL query metrics like total query count, query time, ua count etc.
   *
   * @param eventQueries     existing table on information about queries. Required columns: http.status_code, query.
   *                         Event query table is use to get total query count and the distribution of status code for
   *                         all queries.
   * @param processedQueries parsed SPARQL queries. Expected columns: id, query, query_time, query_time_class, ua, q_info
   * @return tuple of the following items:
   *         - totalTime: Total processed query time
   *         - processedQueryCount : Number of queries processed
   *         - uaCount: Number of distinct user agents
   *         - generalQueryMetrics: spark dataframe with each column containing an aggregated metric on queries
   *           Expected columns: total_query_count, processed_query_count, percent_processed_query, distinct_query_count,
   *           percent_query_repeated, total_ua_count, total_time, status_code_query_count, query_time_class_query_count
   */
  def getGeneralQueryMetrics(eventQueries: DataFrame,
                             processedQueries: DataFrame)(implicit spark: SparkSession): (Long, Long, Long, DataFrame) = {

    val allQueryCount = eventQueries.count()
    val allOkQueryCount = eventQueries
      .filter("http.status_code IN (500, 200)")
      .filter("query != ' ASK{ ?x ?y ?z }'") // these ASK queries are "life-checks" and so removed from analysis
      .count()
    val processedQueryCount = processedQueries.count()
    val percentProcessed = processedQueryCount * 100.0 / allOkQueryCount

    val distinctQueryCount = processedQueries.select("query").distinct().count()
    val percentRepeated = (processedQueryCount - distinctQueryCount) * 100.0 / processedQueryCount

    val queryCountPerStatusCode = sparkDfColumnsToMap(
      eventQueries
        .groupBy("http.status_code")
        .count(),
      "status_code",
      "count",
      "status_code_query_count",
      List()
    )

    val queryTimeClassDistribution = sparkDfColumnsToMap(
      processedQueries
        .groupBy("query_time_class")
        .count(),
      "query_time_class",
      "count",
      "query_time_class_query_count",
      List()
    )

    val uaCount = processedQueries.select("ua").distinct().count()

    val totalTime = processedQueries.groupBy().sum("query_time").first.getLong(0) // in ms

    val data = Seq((allQueryCount, processedQueryCount, percentProcessed, distinctQueryCount,
      percentRepeated, uaCount, totalTime))
    val columns = Seq("total_query_count", "processed_query_count", "percent_processed_query",
      "distinct_query_count", "percent_query_repeated", "total_ua_count", "total_time")

    val generalQueryMetrics = spark.createDataFrame(data)
      .toDF(columns: _*)
      .crossJoin(queryCountPerStatusCode)
      .crossJoin(queryTimeClassDistribution)

    (totalTime, processedQueryCount, uaCount, generalQueryMetrics)
  }
}