PerSubgraphMetrics.scala

package org.wikidata.query.rdf.spark.metrics.subgraphs.detailed

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, countDistinct, desc, row_number}
import org.wikidata.query.rdf.spark.metrics.subgraphs.detailed.TriplesPerSubgraphItem.getTriplesPerSubgraphItem
import org.wikidata.query.rdf.spark.utils.SubgraphUtils.sparkDfColumnsToMap

object PerSubgraphMetrics {

  /**
   * Gets metrics per subgraph which includes number of triples, items, predicates etc.
   * Calls getTriplesPerSubgraphItem(...)
   *
   * @param predicatesPerSubgraph   expected columns: subgraph, predicate_code, count
   * @param fromSubgraphTripleCount expected columns: subgraph, subgraph_to_WD_triples
   * @param toSubgraphTripleCount   expected columns: subgraph, WD_to_subgraph_triples
   * @param topSubgraphTriples      expected columns: subgraph, item, subject, predicate, object, predicate_code
   * @param generalSubgraphMetrics  expected columns: total_items, total_triples, percent_subgraph_item, percent_subgraph_triples,
   *                                num_subgraph, num_top_subgraph, subgraph_size_percentiles, subgraph_size_mean
   * @return a spark dataframe with each column as a metric per subgraph, Expected columns: subgraph, item_count,
   *         triple_count, predicate_count, item_percent, triple_percent, density, item_rank, triple_rank,
   *         triples_per_item_percentiles, triples_per_item_mean, num_direct_triples, num_statements,
   *         num_statement_triples, predicate_counts, subgraph_to_WD_triples, WD_to_subgraph_triples
   */
  def getPerSubgraphMetrics(predicatesPerSubgraph: DataFrame,
                            fromSubgraphTripleCount: DataFrame,
                            toSubgraphTripleCount: DataFrame,
                            topSubgraphTriples: DataFrame,
                            generalSubgraphMetrics: DataFrame): DataFrame = {

    val totalItems = generalSubgraphMetrics.select("total_items").first.getLong(0)
    val totalTriples = generalSubgraphMetrics.select("total_triples").first.getLong(0)
    val topSubgraphMetrics = topSubgraphTriples
      .groupBy("subgraph")
      .agg(
        countDistinct("item").as("item_count"),
        countDistinct("subject", "predicate", "object").as("triple_count"),
        countDistinct("predicate").as("predicate_count")
      )
      .withColumn("item_percent", col("item_count") * 100.0 / totalItems)
      .withColumn("triple_percent", col("triple_count") * 100.0 / totalTriples)
      .withColumn("density", col("triple_count") / col("item_count"))
      .withColumn("item_rank", row_number().over(Window.orderBy(desc("item_count"))))
      .withColumn("triple_rank", row_number().over(Window.orderBy(desc("triple_count"))))

    val (triplesPerItem, numDirectTriples, numStatements, numFullStatements) = getTriplesPerSubgraphItem(topSubgraphTriples)

    val predicateListPerSubgraph = sparkDfColumnsToMap(
      predicatesPerSubgraph,
      "predicate_code",
      "count",
      "predicate_counts",
      List("subgraph")
    )

    val perSubgraphMetrics = topSubgraphMetrics
      .join(triplesPerItem, Seq("subgraph"), "outer")
      .join(numDirectTriples, Seq("subgraph"), "outer")
      .join(numStatements, Seq("subgraph"), "outer")
      .join(numFullStatements, Seq("subgraph"), "outer")
      .join(predicateListPerSubgraph, Seq("subgraph"), "outer")
      .join(fromSubgraphTripleCount, Seq("subgraph"), "outer")
      .join(toSubgraphTripleCount, Seq("subgraph"), "outer")
      .na.fill(0)

    perSubgraphMetrics
  }
}