QueryMetricsExtractor.scala
package org.wikidata.query.rdf.spark.metrics.queries.subgraphs
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.GeneralQueryMetrics.getGeneralQueryMetrics
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.GeneralSubgraphQueryMetrics.getGeneralSubgraphQueryMetrics
import org.wikidata.query.rdf.spark.metrics.queries.subgraphs.PerSubgraphQueryMetrics.getPerSubgraphQueryMetrics
import org.wikidata.query.rdf.spark.utils.SparkUtils
object QueryMetricsExtractor {
/**
* Reads input tables, calls getQueryMetrics(...) to extract subgraph query metrics, and saves output tables
*/
// entry point method takes in all parameters for subsequent functions
// scalastyle:off parameter.number
def extractAndSaveQueryMetrics(topN: Long,
eventQueriesPath: String,
processedQueriesPath: String,
subgraphQueriesPath: String,
subgraphQItemsMatchInQueryPath: String,
subgraphPredicatesMatchInQueryPath: String,
subgraphUrisMatchInQueryPath: String,
generalQueryMetricsPath: String,
generalSubgraphQueryMetricsPath: String,
perSubgraphQueryMetricsPath: String): Unit = {
// scalastyle:on parameter.number
implicit val spark: SparkSession = SparkUtils.getSparkSession("QueryMetricsExtractor")
val (generalQueryMetrics, generalSubgraphQueryMetrics, perSubgraphQueryMetrics) = getQueryMetrics(
topN,
SparkUtils.readTablePartition(eventQueriesPath),
SparkUtils.readTablePartition(processedQueriesPath),
SparkUtils.readTablePartition(subgraphQueriesPath),
SparkUtils.readTablePartition(subgraphQItemsMatchInQueryPath),
SparkUtils.readTablePartition(subgraphPredicatesMatchInQueryPath),
SparkUtils.readTablePartition(subgraphUrisMatchInQueryPath)
)
SparkUtils.saveTables(List(
generalQueryMetrics.coalesce(1), // file size <1Mb
generalSubgraphQueryMetrics.coalesce(1), // file size <1Mb
perSubgraphQueryMetrics.coalesce(1) // file size ~5Mb
)
zip List(generalQueryMetricsPath, generalSubgraphQueryMetricsPath, perSubgraphQueryMetricsPath))
}
/**
* Extracts general and subgraphs query metrics. Calls getGeneralQueryMetrics, getGeneralSubgraphQueryMetrics
* and getPerSubgraphQueryMetrics methods
*/
def getQueryMetrics(topN: Long,
eventQueries: DataFrame,
processedQueries: DataFrame,
subgraphQueries: DataFrame,
subgraphQItemsMatchInQuery: DataFrame,
subgraphPredicatesMatchInQuery: DataFrame,
subgraphUrisMatchInQuery: DataFrame)(implicit spark: SparkSession): (DataFrame, DataFrame, DataFrame) = {
val (totalTime, processedQueryCount, uaCount, generalQueryMetrics) = getGeneralQueryMetrics(eventQueries, processedQueries)
val (numOfSubgraphsPerQuery, subgraphQueriesInfo, generalSubgraphQueryMetrics) = getGeneralSubgraphQueryMetrics(processedQueries, subgraphQueries)
val perSubgraphQueryMetrics = getPerSubgraphQueryMetrics(
totalTime,
processedQueryCount,
uaCount,
numOfSubgraphsPerQuery,
subgraphQueriesInfo,
subgraphQueries,
subgraphQItemsMatchInQuery,
subgraphPredicatesMatchInQuery,
subgraphUrisMatchInQuery,
topN)
(generalQueryMetrics, generalSubgraphQueryMetrics, perSubgraphQueryMetrics)
}
}