RdfChunkParser.scala

package org.wikidata.query.rdf.spark.transform.structureddata.dumps

import java.io.{ByteArrayInputStream, InputStream, SequenceInputStream}
import java.nio.charset.StandardCharsets
import java.util.function.BiFunction

import scala.collection.mutable.ListBuffer

import org.openrdf.model.Statement
import org.openrdf.model.impl.ValueFactoryImpl
import org.openrdf.rio.RDFHandler
import org.wikidata.query.rdf.common.uri.{Ontology, UrisScheme, UrisSchemeFactory}
import org.wikidata.query.rdf.tool.rdf._
import org.wikidata.query.rdf.tool.rdf.EntityMungingRdfHandler.EntityCountListener

/**
 * Parse a chunk of the wikibase rdf dump in turtle format
 * The InputStream must represent a valid portion of the dump with consistent entities.
 * Entity delimitation is any line of type:
 * data:QID a schema:Dataset ;
 * So any line starting with "data:" should mark the beginning of an entity
 */
class RdfChunkParser(urisScheme: UrisScheme, munger: Munger, namespaces: Map[String, String]) {
  val predicates = new NamespaceStatementPredicates(urisScheme)
  val valueFactory = new ValueFactoryImpl()

  /**
   * Plain parsing without any munging, only useful to parse some dump metadata triples.
   */
  def parseHeader(inputStream: InputStream): List[Statement] = {
    val statements = new ListBuffer[Statement]()
    val handler: RDFHandler = rdfHandler(statements)
    parse(inputStream, handler)
    statements.toList map headerQuadGenerator
  }

  def parseEntityChunk(inputStream: InputStream): List[Statement] = {
    val statements = new ListBuffer[Statement]()
    val handler: RDFHandler = rdfHandler(statements)
    val countListener = new EntityCountListener {
      override def entitiesProcessed(l: Long): Unit = {}
    }
    val mungingRdfHandler = new EntityMungingRdfHandler(urisScheme, this.munger, handler, countListener, quadGenerator)
    parse(inputStream, mungingRdfHandler)
    statements.toList
  }

  private def parse(inputStream: InputStream, handler: RDFHandler): Unit = {
    val parser = RDFParserSuppliers.defaultRdfParser().get(new NormalizingRdfHandler(handler))
    parser.parse(new SequenceInputStream(new ByteArrayInputStream(makePrefixHeader().getBytes(StandardCharsets.UTF_8)), inputStream), urisScheme.root())
  }

  private def rdfHandler(statements: ListBuffer[Statement]) = {
    val handler = new RDFHandler {
      override def startRDF(): Unit = {}

      override def endRDF(): Unit = {}

      override def handleNamespace(prefix: String, uri: String): Unit = {}

      override def handleComment(comment: String): Unit = {}

      override def handleStatement(st: Statement): Unit = {
        statements += st
      }
    }
    handler
  }

  /**
   * Generate quads for the purpose of attaching together all triples of a given entity.
   */
  private def quadGenerator: BiFunction[Statement, String, Statement] = new BiFunction[Statement, String, Statement] {
    def apply(st: Statement, entityId: String): Statement = {
      val contextURI = st match {
        // Dump statements are Bound to the dump file not a particular entity
        case s: Statement if StatementPredicates.dumpStatement(s) => Ontology.DUMP
        // Values & references cannot be bound to a single entity, let's put them in their rdfs:type for now, this does
        // not make much sense but preferable as assigning them to random entity.
        case s: Statement if predicates.subjectInReferenceNS(s) => Ontology.REFERENCE
        case s: Statement if predicates.subjectInValueNS(s) => Ontology.VALUE
        case _: Any => urisScheme.entityIdToURI(entityId)
      }
      valueFactory.createStatement(st.getSubject, st.getPredicate, st.getObject, valueFactory.createURI(contextURI))
    }
  }

  private def headerQuadGenerator: Function[Statement, Statement] = new Function[Statement, Statement] {
    def apply(st: Statement): Statement = {
      val contextURI = st match {
        // Dump statements are Bound to the dump file not a particular entity
        case s: Statement if StatementPredicates.dumpStatement(s) => Ontology.DUMP
        case _: Any => throw new IllegalArgumentException(s"Received $st but expected ")
      }
      valueFactory.createStatement(st.getSubject, st.getPredicate, st.getObject, valueFactory.createURI(contextURI))
    }
  }

  private def makePrefixHeader(): String = {
    namespaces map { case (k, v) => s"@prefix $k: $v ." } mkString "\n"
  }
}

object RdfChunkParser {
  // TODO: stop hardcoding prefixes, either generate from the hostname
  //  or read them from the dump
  private val prefixes = Map[String, String](
    "rdf" -> "<http://www.w3.org/1999/02/22-rdf-syntax-ns#>",
    "xsd" -> "<http://www.w3.org/2001/XMLSchema#>",
    "ontolex" -> "<http://www.w3.org/ns/lemon/ontolex#>",
    "dct" -> "<http://purl.org/dc/terms/>",
    "rdfs" -> "<http://www.w3.org/2000/01/rdf-schema#>",
    "owl" -> "<http://www.w3.org/2002/07/owl#>",
    "wikibase" -> "<http://wikiba.se/ontology#>",
    "skos" -> "<http://www.w3.org/2004/02/skos/core#>",
    "schema" -> "<http://schema.org/>",
    "cc" -> "<http://creativecommons.org/ns#>",
    "geo" -> "<http://www.opengis.net/ont/geosparql#>",
    "prov" -> "<http://www.w3.org/ns/prov#>",
    "v" -> "<http://www.wikidata.org/value/>",
    "wd" -> "<http://www.wikidata.org/entity/>",
    "data" -> "<https://www.wikidata.org/wiki/Special:EntityData/>",
    "s" -> "<http://www.wikidata.org/entity/statement/>",
    "ref" -> "<http://www.wikidata.org/reference/>",
    "wdt" -> "<http://www.wikidata.org/prop/direct/>",
    "wdtn" -> "<http://www.wikidata.org/prop/direct-normalized/>",
    "p" -> "<http://www.wikidata.org/prop/>",
    "ps" -> "<http://www.wikidata.org/prop/statement/>",
    "psv" -> "<http://www.wikidata.org/prop/statement/value/>",
    "psn" -> "<http://www.wikidata.org/prop/statement/value-normalized/>",
    "pq" -> "<http://www.wikidata.org/prop/qualifier/>",
    "pqv" -> "<http://www.wikidata.org/prop/qualifier/value/>",
    "pqn" -> "<http://www.wikidata.org/prop/qualifier/value-normalized/>",
    "pr" -> "<http://www.wikidata.org/prop/reference/>",
    "prv" -> "<http://www.wikidata.org/prop/reference/value/>",
    "prn" -> "<http://www.wikidata.org/prop/reference/value-normalized/>",
    "wdno" -> "<http://www.wikidata.org/prop/novalue/>"
  )

  def forWikidata(skolemize: Boolean = false): RdfChunkParser = {
    val urisScheme = UrisSchemeFactory.WIKIDATA
    new RdfChunkParser(urisScheme, buildMunger(skolemize, urisScheme), prefixes)
  }

  // TODO: stop hardcoding prefixes, either generate from the hostname
  //  or read them from the dump
  private val commons_prefixes = prefixes ++ Map[String, String](
"sdc" -> "<https://commons.wikimedia.org/entity/>",
    "sdcdata" -> "<https://commons.wikimedia.org/wiki/Special:EntityData/>",
    "sdcs" -> "<https://commons.wikimedia.org/entity/statement/>",
    "sdcref" -> "<https://commons.wikimedia.org/reference/>",
    "sdcv" -> "<https://commons.wikimedia.org/value/>",
    "sdct" -> "<https://commons.wikimedia.org/prop/direct/>",
    "sdctn" -> "<https://commons.wikimedia.org/prop/direct-normalized/>",
    "sdcp" -> "<https://commons.wikimedia.org/prop/>",
    "sdcps" -> "<https://commons.wikimedia.org/prop/statement/>",
    "sdcpsv" -> "<https://commons.wikimedia.org/prop/statement/value/>",
    "sdcpsn" -> "<https://commons.wikimedia.org/prop/statement/value-normalized/>",
    "sdcpq" -> "<https://commons.wikimedia.org/prop/qualifier/>",
    "sdcpqv" -> "<https://commons.wikimedia.org/prop/qualifier/value/>",
    "sdcpqn" -> "<https://commons.wikimedia.org/prop/qualifier/value-normalized/>",
    "sdcpr" -> "<https://commons.wikimedia.org/prop/reference/>",
    "sdcprv" -> "<https://commons.wikimedia.org/prop/reference/value/>",
    "sdcprn" -> "<https://commons.wikimedia.org/prop/reference/value-normalized/>",
    "sdcno" -> "<https://commons.wikimedia.org/prop/novalue/>"
  )

  def forCommons(skolemize: Boolean = false): RdfChunkParser = {
    val urisScheme = UrisSchemeFactory.fromConceptUris("http://www.wikidata.org/", "https://commons.wikimedia.org")
    new RdfChunkParser(urisScheme, buildMunger(skolemize, urisScheme), commons_prefixes)
  }

  def bySite(site: Site.Value, skolemize: Boolean): RdfChunkParser = {
    site match {
      case Site.wikidata => forWikidata(skolemize)
      case Site.commons => forCommons(skolemize)
    }
  }

  private def buildMunger(skolemize: Boolean, urisScheme: UrisScheme) = {
    val munger = Munger.builder(urisScheme).convertBNodesToSkolemIRIs(skolemize).build()
    // also hardcode format version for now
    munger.setFormatVersion("1.0.0")
    munger
  }
}