StatementEncoder.scala

package org.wikidata.query.rdf.spark.transform.structureddata.dumps

import javax.annotation.concurrent.NotThreadSafe

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.openrdf.model.{Statement, Value}
import org.openrdf.model.impl.ValueFactoryImpl
import org.openrdf.rio.ntriples.NTriplesUtil

/**
 * Encode statement Resource and Value using N3 format:
 * URI are wrapped inside <>
 * Strings are wrapped inside ""
 * Localized strings are wrapped in "" and suffixed with @lang
 * Data is wrapped inside "" and suffixed with type info ^^TypeURI
 */
@NotThreadSafe
class StatementEncoder() extends Serializable {
  // on large graph (10B triples for wikidata as of march 2020)
  // we convert 4 values per triple.
  // Keeping a string buffer here we decrease StringBuffer allocations from
  // 40B (one per value) to 80M (one per entity)
  // drawback is that this class becomes not thread safe
  @transient
  private lazy val stringBuffer = new StringBuffer()

  @transient
  private lazy val valueFactory = new ValueFactoryImpl()

  def encode(st: Statement): (String, String, String, String) = {
      if (st.getContext == null) {
        throw new IllegalArgumentException(s"Invalid context provided from triple: $st")
      }
      (
        encode(st.getContext),
        encode(st.getSubject),
        encode(st.getPredicate),
        encode(st.getObject)
      )
  }

  def decode(row: Row): Statement = {
    valueFactory.createStatement(
      NTriplesUtil.parseResource(row.getString(StatementEncoder.baseSchema.fieldIndex("subject")), valueFactory),
      NTriplesUtil.parseURI(row.getString(StatementEncoder.baseSchema.fieldIndex("predicate")), valueFactory),
      NTriplesUtil.parseValue(row.getString(StatementEncoder.baseSchema.fieldIndex("object")), valueFactory),
      NTriplesUtil.parseResource(row.getString(StatementEncoder.baseSchema.fieldIndex("context")), valueFactory))
  }

  def decode(value: String): Value = {
    NTriplesUtil.parseValue(value, valueFactory)
  }

  def encode(elt: Value): String = {
    stringBuffer.setLength(0)
    NTriplesUtil.append(elt, stringBuffer)
    stringBuffer.toString
  }

  def encodeURI(uri: String): String = {
    encode(valueFactory.createURI(uri))
  }
}

object StatementEncoder {
  def baseSchema: StructType = StructType(Seq(
    StructField("context", StringType, nullable = false),
    StructField("subject", StringType, nullable = false),
    StructField("predicate", StringType, nullable = false),
    StructField("object", StringType, nullable = false)
  ))
}