BlazegraphBasedTPFRequestProcessor.java

package org.wikidata.query.rdf.blazegraph.ldf;

import java.util.HashSet;
import java.util.Set;

import org.openrdf.model.Value;
import org.linkeddatafragments.datasource.AbstractRequestProcessorForTriplePatterns;
import org.linkeddatafragments.fragments.ILinkedDataFragment;
import org.linkeddatafragments.fragments.tpf.ITriplePatternElement;
import org.linkeddatafragments.fragments.tpf.ITriplePatternFragment;
import org.linkeddatafragments.fragments.tpf.ITriplePatternFragmentRequest;

import com.bigdata.rdf.internal.IV;
import com.bigdata.rdf.internal.impl.TermId;
import com.bigdata.rdf.model.BigdataValue;
import com.bigdata.rdf.spo.ISPO;
import com.bigdata.rdf.spo.SPOFilter;
import com.bigdata.rdf.store.AbstractTripleStore;
import com.bigdata.relation.accesspath.IAccessPath;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
 * A Blazegraph-based data source.
 *
 * @author <a href="http://olafhartig.de">Olaf Hartig</a>
 */
public class BlazegraphBasedTPFRequestProcessor extends
            AbstractRequestProcessorForTriplePatterns<BigdataValue, String, String> {
    /**
     * Blazegraph data store.
     */
    private final AbstractTripleStore store;

    public BlazegraphBasedTPFRequestProcessor(final AbstractTripleStore store) {
        this.store = store;
    }

    @Override
    protected Worker getTPFSpecificWorker(final ITriplePatternFragmentRequest<BigdataValue, String, String> req) {
        return new Worker(req);
    }

    /**
     * Blazegraph worker class.
     */
    protected class Worker extends
                AbstractRequestProcessorForTriplePatterns.Worker<BigdataValue, String, String> {
        public Worker(
                final ITriplePatternFragmentRequest<BigdataValue, String, String> req) {
            super(req);
        }

        @SuppressWarnings({"checkstyle:cyclomaticcomplexity", "checkstyle:NPathComplexity"})
        @Override
        protected ILinkedDataFragment createFragment(
                final ITriplePatternElement<BigdataValue, String, String> s,
                final ITriplePatternElement<BigdataValue, String, String> p,
                final ITriplePatternElement<BigdataValue, String, String> o,
                final long offset, final long limit) {
            int numOfValues = 0;
            if (!s.isVariable())
                numOfValues++;
            if (!p.isVariable())
                numOfValues++;
            if (!o.isVariable())
                numOfValues++;

            if (numOfValues > 0) {
                final BigdataValue[] values = new BigdataValue[numOfValues];
                int i = 0;
                if (!s.isVariable())
                    values[i++] = s.asConstantTerm();
                if (!p.isVariable())
                    values[i++] = p.asConstantTerm();
                if (!o.isVariable())
                    values[i++] = o.asConstantTerm();

                // Initialize the IVs of the values.
                store.getLexiconRelation().addTerms(values, numOfValues, true); // readOnly=true

                // Check if some IVs have not been initialized. In this case
                // the corresponding values are not used in any triple in the
                // store. Hence, there cannot be any matching triple for the
                // requested triple pattern and, thus, we return an empty TPF.
                for (int j = 0; j < numOfValues; j++) {
                    @SuppressWarnings("rawtypes")
                    final IV iv = values[j].getIV();
                    if (iv == null || (iv instanceof TermId)
                            && ((TermId<?>) iv).getTermId() == 0L)
                        return createEmptyTriplePatternFragment();
                }
            }

            final VariablesBasedFilter filter = createFilterIfNeeded(s, p, o);
            final IAccessPath<ISPO> ap = store.getSPORelation().getAccessPath(
                    asIVorNull(s), asIVorNull(p), asIVorNull(o), null, // c=null
                    filter);

            return createTriplePatternFragment(ap, offset, limit);
        }

        /**
         * Create filter based on triple request.
         */
        @SuppressWarnings({"checkstyle:npathcomplexity", "checkstyle:cyclomaticcomplexity"})
        public VariablesBasedFilter createFilterIfNeeded(
                final ITriplePatternElement<BigdataValue, String, String> s,
                final ITriplePatternElement<BigdataValue, String, String> p,
                final ITriplePatternElement<BigdataValue, String, String> o) {
            if (!s.isSpecificVariable() && !p.isSpecificVariable()
                    && !o.isSpecificVariable())
                return null;

            final Set<String> varNames = new HashSet<>();

            if (s.isNamedVariable()) {
                varNames.add(s.asNamedVariable());
            }

            if (p.isNamedVariable()) {
                if (varNames.contains(p.asNamedVariable()))
                    return new VariablesBasedFilter(s, p, o);

                varNames.add(p.asNamedVariable());
            }

            if (o.isNamedVariable() && varNames.contains(o.asNamedVariable())) {
                return new VariablesBasedFilter(s, p, o);
            }

            varNames.clear();

            if (s.isAnonymousVariable()) {
                varNames.add(s.asAnonymousVariable());
            }

            if (p.isAnonymousVariable()) {
                if (varNames.contains(p.asAnonymousVariable()))
                    return new VariablesBasedFilter(s, p, o);

                varNames.add(p.asAnonymousVariable());
            }

            if (o.isAnonymousVariable()
                    && varNames.contains(o.asAnonymousVariable())) {
                return new VariablesBasedFilter(s, p, o);
            }

            return null;
        }

        /**
         * Create the fragment.
         *
         * @param ap Access path
         */
        protected ITriplePatternFragment createTriplePatternFragment(
                final IAccessPath<ISPO> ap, final long offset,
                final long limit) {
            final long count = ap.rangeCount(false); // exact=false, i.e., fast
                                                     // range count

            if (count == 0L) {
                return createEmptyTriplePatternFragment();
            } else if (offset >= count) {
                return createEmptyTriplePatternFragment();
            } else {
                return new BlazegraphBasedTPF(ap, store, count, offset, limit,
                        request.getFragmentURL(), request.getDatasetURL(),
                        request.getPageNumber());
            }
        }

    } // end of class Worker

    /**
     * Convert element to IV if it's constant.
     *
     * @return IV or null if not constant.
     */
    @SuppressWarnings("rawtypes")
    public static IV asIVorNull(
            final ITriplePatternElement<BigdataValue, String, String> tpe) {
        return (tpe.isVariable()) ? null : tpe.asConstantTerm().getIV();
    }

    /**
     * Filter based on variables.
     */
    public static class VariablesBasedFilter extends SPOFilter<ISPO> {
        @SuppressFBWarnings(
                value = "IMC_IMMATURE_CLASS_BAD_SERIALVERSIONUID",
                justification = "We need to keep serialVersionUID for blazegraph correctness sake.")
        private static final long serialVersionUID = 6979067019748992496L;

        /**
         * Check subject.
         */
        private final boolean checkS;
        /**
         * Check predicate.
         */
        private final boolean checkP;
        /**
         * Check object.
         */
        private final boolean checkO;

        @SuppressWarnings({"checkstyle:localvariablename", "checkstyle:cyclomaticcomplexity", "checkstyle:NPathComplexity"})
        public VariablesBasedFilter(
                final ITriplePatternElement<BigdataValue, String, String> s,
                final ITriplePatternElement<BigdataValue, String, String> p,
                final ITriplePatternElement<BigdataValue, String, String> o) {
            boolean _checkS = false;
            boolean _checkP = false;
            boolean _checkO = false;

            if (s.isNamedVariable()) {
                final String sVarName = s.asNamedVariable();
                if (p.isNamedVariable()
                        && p.asNamedVariable().equals(sVarName)) {
                    _checkS = true;
                    _checkP = true;
                }

                if (o.isNamedVariable()
                        && o.asNamedVariable().equals(sVarName)) {
                    _checkS = true;
                    _checkO = true;
                }
            }

            if (s.isAnonymousVariable()) {
                final String sVarName = s.asAnonymousVariable();
                if (p.isAnonymousVariable()
                        && p.asAnonymousVariable().equals(sVarName)) {
                    _checkS = true;
                    _checkP = true;
                }

                if (o.isAnonymousVariable()
                        && o.asAnonymousVariable().equals(sVarName)) {
                    _checkS = true;
                    _checkO = true;
                }
            }

            if (p.isNamedVariable() && o.isNamedVariable()
                    && p.asNamedVariable().equals(o.asNamedVariable())) {
                _checkP = true;
                _checkO = true;
            }

            if (p.isAnonymousVariable() && o.isAnonymousVariable() && p
                    .asAnonymousVariable().equals(o.asAnonymousVariable())) {
                _checkP = true;
                _checkO = true;
            }

            checkS = _checkS;
            checkP = _checkP;
            checkO = _checkO;
        }

        @Override
        @SuppressWarnings("checkstyle:cyclomaticcomplexity")
        public boolean isValid(Object obj) {
            if (!canAccept(obj))
                return true;

            final ISPO spo = (ISPO) obj;
            final Value s = spo.getSubject();
            final Value p = spo.getPredicate();
            final Value o = spo.getObject();

            if (checkS && checkP && !s.equals(p))
                return false;

            if (checkS && checkO && !s.equals(o))
                return false;

            if (checkP && checkO && !p.equals(o))
                return false;

            return true;
        }

    } // end of class VariablesBasedFilter

}