SubgraphPairQueryMetricsExtractor.scala

package org.wikidata.query.rdf.spark.metrics.queries.subgraphpairs

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.wikidata.query.rdf.spark.utils.SparkUtils

object SubgraphPairQueryMetricsExtractor {

  /**
   * Reads input table, calls getSubgraphPairQueryMetrics(...) to extract subgraph pair query metrics, and saves output table
   */
  def extractAndSaveSubgraphPairQueryMetrics(subgraphQueriesPath: String,
                                             subgraphPairQueryMetricsPath: String): Unit = {

    implicit val spark: SparkSession = SparkUtils.getSparkSession("SubgraphPairQueryMetricsExtractor")

    val queriesPerSubgraphPair = getSubgraphPairQueryMetrics(SparkUtils.readTablePartition(subgraphQueriesPath))
    SparkUtils.saveTables(List(queriesPerSubgraphPair.coalesce(1)) zip List(subgraphPairQueryMetricsPath)) // file size ~8Mb
  }

  /**
   * Extracts subgraph pair query count.
   *
   * @param subgraphQueries mapping of query to subgraph. Expected columns: id, subgraph, qid, item, predicate, uri, literal
   * @return spark dataframe with columns: subgraph1, subgraph2, query_count
   */
  def getSubgraphPairQueryMetrics(subgraphQueries: DataFrame): DataFrame = {

    // This will not contain queries that use only 1 subgraph
    // columns: subgraph1, subgraph2, query_count
    val queriesPerSubgraphPair = subgraphQueries
      .selectExpr("subgraph as subgraph1", "id")
      .join(
        subgraphQueries
          .selectExpr("subgraph as subgraph2", "id"),
        Seq("id"),
        "inner"
      )
      .filter("subgraph1 > subgraph2") // this makes sure we have unique pairs of subgraphs
      .groupBy("subgraph1", "subgraph2")
      .count()
      .withColumnRenamed("count", "query_count")

    queriesPerSubgraphPair
  }
}