Source code for

# -*- coding: utf-8 -*-
"""SPARQL Query interface."""
# (C) Pywikibot team, 2016-2020
# Distributed under the terms of the MIT license.
import json

from urllib.parse import quote

from requests.exceptions import Timeout

from pywikibot import config, warning, Site, sleep
from pywikibot.comms import http
from pywikibot.exceptions import Error, TimeoutError

DEFAULT_HEADERS = {'cache-control': 'no-cache',
                   'Accept': 'application/sparql-results+json'}

[docs]class SparqlQuery: """ SPARQL Query class. This class allows to run SPARQL queries against any SPARQL endpoint. """
[docs] def __init__(self, endpoint=None, entity_url=None, repo=None, max_retries=None, retry_wait=None): """ Create endpoint. @param endpoint: SPARQL endpoint URL @type endpoint: str @param entity_url: URL prefix for any entities returned in a query. @type entity_url: str @param repo: The Wikibase site which we want to run queries on. If provided this overrides any value in endpoint and entity_url. Defaults to Wikidata. @type repo: @param max_retries: (optional) Maximum number of times to retry after errors, defaults to config.max_retries. @type max_retries: int @param retry_wait: (optional) Minimum time in seconds to wait after an error, defaults to config.retry_wait seconds (doubles each retry until config.retry_max is reached). @type retry_wait: float """ # default to Wikidata if not repo and not endpoint: repo = Site('wikidata', 'wikidata') if repo: try: self.endpoint = repo.sparql_endpoint self.entity_url = repo.concept_base_uri except NotImplementedError: raise NotImplementedError( 'Wiki version must be 1.28-wmf.23 or newer to ' 'automatically extract the sparql endpoint. ' 'Please provide the endpoint and entity_url ' 'parameters instead of a repo.') if not self.endpoint: raise Error('The site {0} does not provide a sparql endpoint.' .format(repo)) else: if not entity_url: raise Error('If initialised with an endpoint the entity_url ' 'must be provided.') self.endpoint = endpoint self.entity_url = entity_url self.last_response = None if max_retries is None: self.max_retries = config.max_retries else: self.max_retries = max_retries if retry_wait is None: self.retry_wait = config.retry_wait else: self.retry_wait = retry_wait
[docs] def get_last_response(self): """ Return last received response. @return: Response object from last request or None """ return self.last_response
[docs] def select(self, query: str, full_data=False, headers=DEFAULT_HEADERS): """ Run SPARQL query and return the result. The response is assumed to be in format defined by: @param query: Query text @param full_data: Whether return full data objects or only values @type full_data: bool @return: List of query results or None if query failed """ data = self.query(query, headers=headers) if data and 'results' in data: result = [] qvars = data['head']['vars'] for row in data['results']['bindings']: values = {} for var in qvars: if var not in row: # var is not available (OPTIONAL is probably used) values[var] = None elif full_data: if row[var]['type'] not in VALUE_TYPES: raise ValueError('Unknown type: {}' .format(row[var]['type'])) valtype = VALUE_TYPES[row[var]['type']] values[var] = valtype(row[var], entity_url=self.entity_url) else: values[var] = row[var]['value'] result.append(values) return result return None
[docs] def query(self, query: str, headers=DEFAULT_HEADERS): """ Run SPARQL query and return parsed JSON result. @param query: Query text """ url = '{0}?query={1}'.format(self.endpoint, quote(query)) while True: try: self.last_response = http.fetch(url, headers=headers) except Timeout: self.wait() continue if not self.last_response.text: return None try: return json.loads(self.last_response.text) except ValueError: return None
[docs] def wait(self): """Determine how long to wait after a failed request.""" self.max_retries -= 1 if self.max_retries < 0: raise TimeoutError('Maximum retries attempted without success.') warning('Waiting {0} seconds before retrying.'.format(self.retry_wait)) sleep(self.retry_wait) # double the next wait, but do not exceed config.retry_max seconds self.retry_wait = min(config.retry_max, self.retry_wait * 2)
[docs] def ask(self, query: str, headers=DEFAULT_HEADERS) -> bool: """ Run SPARQL ASK query and return boolean result. @param query: Query text """ data = self.query(query, headers=headers) return data['boolean']
[docs] def get_items(self, query, item_name='item', result_type=set): """ Retrieve items which satisfy given query. Items are returned as Wikibase IDs. @param query: Query string. Must contain ?{item_name} as one of the projected values. @param item_name: Name of the value to extract @param result_type: type of the iterable in which SPARQL results are stored (default set) @type result_type: iterable @return: item ids, e.g. Q1234 @rtype: same as result_type """ res =, full_data=True) if res: return result_type(r[item_name].getID() for r in res) return result_type()
[docs]class SparqlNode: """Base class for SPARQL nodes."""
[docs] def __init__(self, value): """Create a SparqlNode.""" self.value = value
[docs] def __str__(self): return self.value
[docs]class URI(SparqlNode): """Representation of URI result type."""
[docs] def __init__(self, data: dict, entity_url, **kwargs): """Create URI object.""" super().__init__(data.get('value')) self.entity_url = entity_url
[docs] def getID(self): """ Get ID of Wikibase object identified by the URI. @return: ID of Wikibase object, e.g. Q1234 """ urllen = len(self.entity_url) if self.value.startswith(self.entity_url): return self.value[urllen:] else: return None
[docs] def __repr__(self): return '<' + self.value + '>'
[docs]class Literal(SparqlNode): """Representation of RDF literal result type."""
[docs] def __init__(self, data: dict, **kwargs): """Create Literal object.""" super().__init__(data.get('value')) self.type = data.get('datatype') self.language = data.get('xml:lang')
[docs] def __repr__(self): if self.type: return self.value + '^^' + self.type if self.language: return self.value + '@' + self.language return self.value
[docs]class Bnode(SparqlNode): """Representation of blank node."""
[docs] def __init__(self, data: dict, **kwargs): """Create Bnode.""" super().__init__(data.get('value'))
[docs] def __repr__(self): return '_:' + self.value
VALUE_TYPES = {'uri': URI, 'literal': Literal, 'bnode': Bnode}