TopSubgraphProperties.scala

package org.wikidata.query.rdf.spark.metrics.queries.subgraphs

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions.{col, countDistinct, expr, row_number}
import org.wikidata.query.rdf.spark.utils.SubgraphUtils.{getPercentileExpr, sparkDfColumnsToMap}

object TopSubgraphProperties {

  /** Gets the list and distribution of the topN matched predicates in queries per subgraph.
   *
   * @param subgraphPredicatesMatchInQuery List of queries that matched with subgraphs due to an predicate.
   *                                       Expected columns: id, subgraph, predicate_code
   * @param topN                           Number of top matched predicates to extract.
   * @param subgraphWindow                 Window.partitionBy("subgraph").orderBy(desc("count"))
   * @return spark dataframes:
   *         - topMatchedPreds: expected columns: subgraph, top_predicates
   *         - matchedPredsDistribution: expected columns: subgraph, matched_predicates_percentiles, matched_predicates_mean
   */
  def getSubgraphPropertiesInfo(subgraphPredicatesMatchInQuery: DataFrame, topN: Long, subgraphWindow: WindowSpec): (DataFrame, DataFrame) = {

    // Top matched properties and distribution
    // Add percent of query if necessary (q*100/total_q)
    val matchedPredsPerSubgraph = subgraphPredicatesMatchInQuery
      .groupBy("subgraph", "predicate_code")
      .agg(countDistinct("id").alias("count"))

    // Top prodicates
    val topMatchedPreds = sparkDfColumnsToMap(
      matchedPredsPerSubgraph
        .withColumn("rank", row_number().over(subgraphWindow))
        .filter(col("rank") <= topN),
      "predicate_code",
      "count",
      "top_predicates",
      List("subgraph")
    )

    // distribution of predicate usage in each subgraph
    val matchedPredsDistribution = matchedPredsPerSubgraph
      .groupBy("subgraph")
      .agg(
        expr(getPercentileExpr("count", "matched_predicates_percentiles")),
        expr("mean(count) as matched_predicates_mean")
      )
    (topMatchedPreds, matchedPredsDistribution)
  }
}