Source code for data.sparql

"""SPARQL Query interface."""
#
# (C) Pywikibot team, 2016-2022
#
# Distributed under the terms of the MIT license.
#
from contextlib import suppress
from typing import Optional
from urllib.parse import quote

from requests.exceptions import Timeout

from pywikibot import Site, config, sleep, warning
from pywikibot.backports import Dict, List
from pywikibot.comms import http
from pywikibot.exceptions import Error, TimeoutError


try:
    from requests import JSONDecodeError
except ImportError:  # requests < 2.27.0
    from json import JSONDecodeError

DEFAULT_HEADERS = {'cache-control': 'no-cache',
                   'Accept': 'application/sparql-results+json'}


[docs]class SparqlQuery: """ SPARQL Query class. This class allows to run SPARQL queries against any SPARQL endpoint. """ def __init__(self, endpoint: Optional[str] = None, entity_url: Optional[str] = None, repo=None, max_retries: Optional[int] = None, retry_wait: Optional[float] = None) -> None: """ Create endpoint. :param endpoint: SPARQL endpoint URL :param entity_url: URL prefix for any entities returned in a query. :param repo: The Wikibase site which we want to run queries on. If provided this overrides any value in endpoint and entity_url. Defaults to Wikidata. :type repo: pywikibot.site.DataSite :param max_retries: (optional) Maximum number of times to retry after errors, defaults to config.max_retries. :param retry_wait: (optional) Minimum time in seconds to wait after an error, defaults to config.retry_wait seconds (doubles each retry until config.retry_max is reached). """ # default to Wikidata if not repo and not endpoint: repo = Site('wikidata') if repo: try: self.endpoint = repo.sparql_endpoint self.entity_url = repo.concept_base_uri except NotImplementedError: raise NotImplementedError( 'Wiki version must be 1.28-wmf.23 or newer to ' 'automatically extract the sparql endpoint. ' 'Please provide the endpoint and entity_url ' 'parameters instead of a repo.') if not self.endpoint: raise Error('The site {} does not provide a sparql endpoint.' .format(repo)) else: if not entity_url: raise Error('If initialised with an endpoint the entity_url ' 'must be provided.') self.endpoint = endpoint self.entity_url = entity_url self.last_response = None if max_retries is None: self.max_retries = config.max_retries else: self.max_retries = max_retries if retry_wait is None: self.retry_wait = config.retry_wait else: self.retry_wait = retry_wait
[docs] def get_last_response(self): """ Return last received response. :return: Response object from last request or None """ return self.last_response
[docs] def select(self, query: str, full_data: bool = False, headers: Optional[Dict[str, str]] = None ) -> Optional[List[Dict[str, str]]]: """ Run SPARQL query and return the result. The response is assumed to be in format defined by: https://www.w3.org/TR/2013/REC-sparql11-results-json-20130321/ :param query: Query text :param full_data: Whether return full data objects or only values """ if headers is None: headers = DEFAULT_HEADERS data = self.query(query, headers=headers) if not data or 'results' not in data: return None result = [] qvars = data['head']['vars'] for row in data['results']['bindings']: values = {} for var in qvars: if var not in row: # var is not available (OPTIONAL is probably used) values[var] = None elif full_data: if row[var]['type'] not in VALUE_TYPES: raise ValueError('Unknown type: {}' .format(row[var]['type'])) valtype = VALUE_TYPES[row[var]['type']] values[var] = valtype(row[var], entity_url=self.entity_url) else: values[var] = row[var]['value'] result.append(values) return result
[docs] def query(self, query: str, headers: Optional[Dict[str, str]] = None): """ Run SPARQL query and return parsed JSON result. :param query: Query text """ if headers is None: headers = DEFAULT_HEADERS url = f'{self.endpoint}?query={quote(query)}' while True: try: self.last_response = http.fetch(url, headers=headers) except Timeout: self.wait() continue with suppress(JSONDecodeError): return self.last_response.json() break return None
[docs] def wait(self): """Determine how long to wait after a failed request.""" self.max_retries -= 1 if self.max_retries < 0: raise TimeoutError('Maximum retries attempted without success.') warning(f'Waiting {self.retry_wait} seconds before retrying.') sleep(self.retry_wait) # double the next wait, but do not exceed config.retry_max seconds self.retry_wait = min(config.retry_max, self.retry_wait * 2)
[docs] def ask(self, query: str, headers: Optional[Dict[str, str]] = None) -> bool: """ Run SPARQL ASK query and return boolean result. :param query: Query text """ if headers is None: headers = DEFAULT_HEADERS data = self.query(query, headers=headers) return data['boolean']
[docs] def get_items(self, query, item_name: str = 'item', result_type=set): """ Retrieve items which satisfy given query. Items are returned as Wikibase IDs. :param query: Query string. Must contain ?{item_name} as one of the projected values. :param item_name: Name of the value to extract :param result_type: type of the iterable in which SPARQL results are stored (default set) :type result_type: iterable :return: item ids, e.g. Q1234 :rtype: same as result_type """ res = self.select(query, full_data=True) if res: return result_type(r[item_name].getID() for r in res) return result_type()
[docs]class SparqlNode: """Base class for SPARQL nodes.""" def __init__(self, value) -> None: """Create a SparqlNode.""" self.value = value def __str__(self) -> str: return self.value
[docs]class URI(SparqlNode): """Representation of URI result type.""" def __init__(self, data: dict, entity_url, **kwargs) -> None: """Create URI object.""" super().__init__(data.get('value')) self.entity_url = entity_url
[docs] def getID(self): # noqa: N802 """ Get ID of Wikibase object identified by the URI. :return: ID of Wikibase object, e.g. Q1234 """ urllen = len(self.entity_url) if self.value.startswith(self.entity_url): return self.value[urllen:] return None
def __repr__(self) -> str: return '<' + self.value + '>'
[docs]class Literal(SparqlNode): """Representation of RDF literal result type.""" def __init__(self, data: dict, **kwargs) -> None: """Create Literal object.""" super().__init__(data.get('value')) self.type = data.get('datatype') self.language = data.get('xml:lang') def __repr__(self) -> str: if self.type: return self.value + '^^' + self.type if self.language: return self.value + '@' + self.language return self.value
[docs]class Bnode(SparqlNode): """Representation of blank node.""" def __init__(self, data: dict, **kwargs) -> None: """Create Bnode.""" super().__init__(data.get('value')) def __repr__(self) -> str: return '_:' + self.value
VALUE_TYPES = {'uri': URI, 'literal': Literal, 'bnode': Bnode}