Source code for memento_client.memento_client

A Memento Client library.


import requests
from datetime import datetime
import sys
import logging
import os

# Python 2.7 and 3.X support are different for urlparse
if sys.version_info[0] == 3:
    from urllib.parse import urlparse, urljoin
    from urlparse import urlparse, urljoin

if os.environ.get('DEBUG_MEMENTO_CLIENT') == '1':

HTTP_DT_FORMAT = "%a, %d %b %Y %H:%M:%S GMT"

[docs] class MementoClientException(Exception): """ The memento client Exception class. """ def __init__(self, message, data): super(MementoClientException, self).__init__(message) = data
class MementoClient(object): """ A memento client. """ def __init__(self, timegate_uri=DEFAULT_TIMEGATE_BASE_URI, check_native_timegate=True, max_redirects=MAX_REDIRECTS, session=None): """ A Memento Client that makes it straightforward to access the Web of the past as it is to access the current Web. Basic usage: >>> mc = MementoClient() >>> dt = mc.convert_to_datetime("Sun, 01 Apr 2010 12:00:00 GMT") >>> mc = mc.get_memento_info("", dt) >>> print(mc) {'mementos': {'closest': { 'datetime': datetime.datetime(2010, 5, 23, 10, 19, 6), 'http_status_code': 200, 'uri': [u'']}, 'first': {'datetime': datetime.datetime(1998, 12, 2, 21, 26, 10), 'uri': ['']}, 'last': {'datetime': datetime.datetime(2015, 8, 7, 21, 59, 59), 'uri': ['']}}, 'original_uri': '', 'timegate_uri': ''} The output conforms to the Memento API format explained here: By default, MementoClient uses the Memento Aggregator: It is also possible to use different TimeGate, simply initialize with a preferred timegate base uri. Toggle check_native_timegate to see if the original uri has its own timegate. The native timegate, if found will be used instead of the timegate_uri preferred. If no native timegate is found, the preferred timegate_uri will be used. :param timegate_uri: (str) A valid HTTP base uri for a timegate. Must start with http(s):// and end with a /. :param max_redirects: (int) the maximum number of redirects allowed for all HTTP requests to be made. :return: A MementoClient obj. """ self.timegate_uri = timegate_uri self.check_native_timegate = check_native_timegate self.native_redirect_count = 0 self.max_redirects = max_redirects self.sessionSetOutside = False if session: self.session = session self.sessionSetOutside = True else: self.session = requests.Session() def __exit__(self, exc_type, exc_value, traceback): """ Closes session connection if used in a with statement. """ if not self.sessionSetOutside: self.session.close() def __enter__(self): """ Opens session connection if used in a with statement. """ if not self.session: self.session = requests.Session() return self def __del__(self): """ Closes session connection when called by garbage collector. """ if not self.sessionSetOutside: self.session.close() def get_memento_info(self, request_uri, accept_datetime=None, **kwargs): """ Given an original uri and an accept datetime, this method queries the preferred timegate and returns the closest memento uri, along with prev/next/first/last if available. The response format is explained here: :param request_uri: (str) The input http uri. :param accept_datetime: (datetime) The datetime object of the accept datetime. The current datetime is used if none is provided. :return: (dict) A map of uri and datetime for the closest/prev/next/first/last mementos. """ req_uri_response = kwargs.get("req_uri_response") # for reading the headers of the req uri to find uri_r org_response = kwargs.get("org_response") # for checking native tg uri in uri_r tg_response = kwargs.get("tg_response") if not accept_datetime: accept_datetime = logging.debug("getting URI-R {0} at accept-datetime {1}!!!". format(request_uri, str(accept_datetime))) logging.debug("Starting with URI-G stem: " + self.timegate_uri) assert request_uri and accept_datetime # if not request_uri or not accept_datetime: # raise MementoClientException( # "No uri or accept datetime was provided to retrieve mementos.", {}) if not request_uri.startswith("http://") \ and not request_uri.startswith("https://"): raise ValueError("Only HTTP URIs are supported, " "URI %s unrecognized." % request_uri) if type(accept_datetime) != datetime: raise TypeError("Expecting accept_datetime to be of type " "datetime.") http_acc_dt = MementoClient.convert_to_http_datetime(accept_datetime) # finding the actual original_uri in case the input uri is a memento original_uri = self.get_original_uri(request_uri, response=req_uri_response) logging.debug("original uri: " + original_uri) native_tg = None if self.check_native_timegate: native_tg = self.get_native_timegate_uri( original_uri, accept_datetime=accept_datetime, response=org_response) logging.debug("Found native URI-G: " + str(native_tg)) timegate_uri = native_tg if native_tg \ else self.timegate_uri + original_uri logging.debug("Using URI-G: " + timegate_uri) if not tg_response: response = MementoClient.request_head(timegate_uri, accept_datetime=http_acc_dt, follow_redirects=True, session=self.session) else: response = tg_response logging.debug("request method: " + str(response.request.method)) logging.debug("request URI: " + str(response.request.url)) logging.debug("request headers: " + str(response.request.headers)) logging.debug("response status code: " + str(response.status_code)) logging.debug("response headers: " + str(response.headers)) uri_m = response.url dt_m = None link_header = None mem_status = response.status_code # checking if the timegate redirected. Its an error if not. # raising an exception if there are no tg redirects # The timegate can return a 404 when there are no mementos # and a 200 for 200 style conneg if len(response.history) == 0 and \ response.status_code not in [200, 404]: raise MementoClientException( ("The TimeGate (%s) returned with HTTP status %s and did not " "redirect to a Memento.") % (timegate_uri, str(response.status_code)), {"timegate_uri": timegate_uri, "original_uri": original_uri, "request_uri": request_uri, "status_code": str(response.status_code)}) # getting the memento datetime from the memento response headers if self.is_memento(uri_m, response=response, session=self.session): dt_m = self.convert_to_datetime( response.headers.get("Memento-Datetime")) # link_header = response.headers.get("Link") # getting the next, prev, etc from the timegate reponse headers # so that these headers not locked in any one archive # when using the aggr. for res in response.history: if self.is_timegate(timegate_uri, response=res, session=self.session): logging.debug("found URI-M from timegate response: %s" % uri_m) logging.debug("timegate uri: %s" % res.url) # sometimes we get relative URI-Ms, which have no scheme if not urlparse(uri_m).scheme: uri_m = urlparse(timegate_uri).scheme + "://" \ + urlparse(timegate_uri).netloc + uri_m link_header = res.headers.get("link") logging.debug("link header: " + str(link_header)) if not link_header: raise MementoClientException( "The TimeGate (%s) did not return a Link header." % timegate_uri, {"timegate_uri": timegate_uri, "original_uri": original_uri, "request_uri": request_uri, "memento_uri": uri_m}) break memento_info = {} memento_info["original_uri"] = original_uri memento_info["timegate_uri"] = timegate_uri if not uri_m or not link_header: return memento_info memento_info.update( self.__prepare_memento_response(uri_m=uri_m, dt_m=dt_m, link_header=link_header, status_code=mem_status)) return memento_info def get_native_timegate_uri(self, original_uri, accept_datetime, **kwargs): """ Given an original URL and an accept datetime, check the original uri to see if the timegate uri is provided in the Link header. :param original_uri: (str) An HTTP uri of the original resource. :param accept_datetime: (datetime) The datetime object of the accept datetime :return: (str) The timegate uri of the original resource, if provided, else None. """ org_response = kwargs.get("response") if not org_response: try: org_response = MementoClient.request_head( original_uri, accept_datetime=MementoClient.convert_to_http_datetime( accept_datetime), session=self.session ) except (requests.exceptions.ConnectTimeout, requests.exceptions.ConnectionError) as e: logging.warning("Could not connect to URI {}," " returning no native URI-G".format(original_uri)) return logging.debug("Request headers sent to search for URI-G: " + str(org_response.request.headers)) def follow(): """ a recursive func to follow redirects. """ location = org_response.headers.get("Location") if not location.startswith("http") \ and not location.startswith("//"): location = urljoin(org_response.url, location) logging.debug("Following to new URI of " + location) return self.get_native_timegate_uri( location, accept_datetime) if org_response.headers.get("Vary") and\ 'accept-datetime' in org_response.headers.get('Vary').lower(): logging.debug("Vary header with Accept-Datetime found for URI-R: " + original_uri) return if 'Memento-Datetime' in org_response.headers: logging.debug("Memento-Datetime found in headers for URI-R: {0}," " so assuming it is a URI-M.". format(original_uri)) return if 299 < org_response.status_code < 400 \ and self.native_redirect_count < self.max_redirects: logging.debug("Been redirected from URI-R: " + original_uri) self.native_redirect_count += 1 return follow() if "Link" not in org_response.headers: logging.debug("No URI-G found for URI-R: " + original_uri) return logging.debug("Received raw Link header: " + str(org_response.headers.get("Link"))) link_header = self.parse_link_header(org_response.headers.get("Link")) logging.debug("Received Link header: " + str(link_header)) tg = self.get_uri_dt_for_rel(link_header, ["timegate"]) tg_uri = None if "timegate" in tg: tg_uri = tg["timegate"].get("uri") logging.debug("Search for native URI-G yielded: " + str(tg_uri)) return tg_uri def get_original_uri(self, request_uri, **kwargs): """ Returns the original uri of the given request uri. Checks for rel=original in the response headers of the request uri. Useful when the request uri is a memento, so that the original uri can be used to for the timegate, instead of the memento uri. :param request_uri: the requested http uri. :return: (str) the original uri """ response = kwargs.get("response") if not response: try: response = MementoClient.request_head(request_uri, accept_datetime=None, follow_redirects=True, session=self.session) except (requests.exceptions.ConnectTimeout, requests.exceptions.ConnectionError) as e: logging.warning( "Could not connect to {}," " using it as original URI".format(request_uri)) if response.headers.get("Link"): link_header = response.headers.get("Link") links = self.parse_link_header(link_header) org = self.get_uri_dt_for_rel(links, ["original"]) if org.get("original"): logging.debug("Org URI from request uri headers: " + repr(org)) return org.get("original").get("uri") return request_uri @staticmethod def is_timegate(uri, accept_datetime=None, response=None, session=None): """ Checks if the given uri is a valid timegate according to the RFC. :param uri: the http uri to check. :param accept_datetime: (str)[optional] the accept datetime string in http date format. :param response: (request's response obj)[optional] the response object of the uri. :return: (bool) True if a valid timegate, else False. """ if not response: if not accept_datetime: accept_datetime = MementoClient.convert_to_http_datetime( response = MementoClient.request_head( uri, accept_datetime=accept_datetime, session=session) if response.status_code != 302 and response.status_code != 200: raise MementoClientException( ("TimeGate did not respond with a 302 redirect or 200 OK HTTP " "status code\n" "URI: {0}\n" "Accept-Datetime: {1}\n" "Status code received: {2}\n" ).format(uri, accept_datetime, str(response.status_code)), {"status_code": response.status_code, "timegate_uri": uri, "accept_datetime": accept_datetime}) links = MementoClient.parse_link_header(response.headers.get("Link")) original_uri = MementoClient.get_uri_dt_for_rel(links, ["original"]) if response.headers.get("Vary") \ and "accept-datetime" in response.headers.get("Vary").lower() \ and original_uri: if response.status_code == 302 and not response.headers.get("Location"): return False elif response.status_code == 302 and response.headers.get("Memento-Datetime"): return False elif response.status_code == 200 and \ (not response.headers.get("Memento-Datetime") or not response.headers.get("Vary")): return False return True return False @staticmethod def is_memento(uri, response=None, session=None): """ Determines if the URI given is indeed a Memento. The simple case is to look for a Memento-Datetime header in the request, but not all archives are Memento-compliant yet. :param uri: (str) an HTTP URI for testing :param response: (request's response obj)[optional] the response object of the uri. :return: (bool) True if a Memento, False otherwise """ sessionSet = False if not response: response = MementoClient.request_head(uri, follow_redirects=False) if 'Memento-Datetime' in response.headers: if response.status_code == 302 and \ "accept-datetime" in response.headers.get("Vary", "").lower(): return False if 'Link' in response.headers: links = MementoClient.parse_link_header(response.headers.get("Link")) rels = MementoClient.get_uri_dt_for_rel(links, ["original"]) if 'original' in rels: logging.debug("Memento-Datetime found in headers for" " URI-R: {0}, so assuming it is a URI-M.". format(uri)) return True return False @staticmethod def convert_to_datetime(dt): """ Converts a date string in the HTTP date format to a datetime obj. eg: "Sun, 01 Apr 2010 12:00:00 GMT" -> datetime() :param dt: (str) The date string in HTTP date format. :return: (datetime) The datetime object of the string. """ if not dt: return return datetime.strptime(dt, HTTP_DT_FORMAT) @staticmethod def convert_to_http_datetime(dt): """ Converts a datetime object to a date string in HTTP format. eg: datetime() -> "Sun, 01 Apr 2010 12:00:00 GMT" :param dt: (datetime) A datetime object. :return: (str) The date in HTTP format. """ if not dt: return return dt.strftime(HTTP_DT_FORMAT) @staticmethod def get_uri_dt_for_rel(links, rel_types): """ Returns the uri and the datetime (if available) for a rel type from the parsed link header object. :param links: (dict) the output of parse_link_header. :param rel_types: (list) a list of rel types for which the uris should be found. :return: (dict) {rel: {"uri": "", "datetime": }} """ if not links or not rel_types: return uris = {} for uri in links: for rel in rel_types: if rel in links.get(uri).get("rel"): uris[rel] = {"uri": uri, "datetime": links.get(uri).get("datetime")} return uris @staticmethod def parse_link_header(link): """ Parses the link header character by character. More robust than the parser provided by the requests module. :param link: (str) The HTTP link header as a string. :return: (dict) {"uri": {"rel": ["", ""], "datetime": [""]}...} """ if not link: return state = 'start' data = list(link.strip()) links = {} while data: if state == 'start': dat = data.pop(0) while dat.isspace(): dat = data.pop(0) if dat != "<": raise ValueError("Parsing Link Header: Expected < in " "start, got %s" % dat) state = "uri" elif state == "uri": uri = [] dat = data.pop(0) while dat != ";": uri.append(dat) try: dat = data.pop(0) except: raise ValueError("Error! Invalid Link Header.") uri = ''.join(uri) uri = uri[:-1] data.insert(0, ';') # Not an error to have the same URI multiple times (I think!) if uri not in links: links[uri] = {} state = "paramstart" elif state == 'paramstart': dat = data.pop(0) while data and dat.isspace(): dat = data.pop(0) if dat == ";": state = 'linkparam' elif dat == ',': state = 'start' else: raise ValueError("Parsing Link Header: Expected ;" " in paramstart, got %s" % dat) elif state == 'linkparam': dat = data.pop(0) while dat.isspace(): dat = data.pop(0) param_type = [] while not dat.isspace() and dat != "=": param_type.append(dat) dat = data.pop(0) while dat.isspace(): dat = data.pop(0) if dat != "=": raise ValueError("Parsing Link Header: Expected = in" " linkparam, got %s" % dat) state = 'linkvalue' pt = ''.join(param_type) if pt not in links[uri]: links[uri][pt] = [] elif state == 'linkvalue': dat = data.pop(0) while dat.isspace(): dat = data.pop(0) param_value = [] if dat == '"': pd = dat dat = data.pop(0) while dat != '"' and pd != '\\': param_value.append(dat) pd = dat try: dat = data.pop(0) except: raise ValueError("Error, invalid link header.") else: while not dat.isspace() and dat not in (',', ';'): param_value.append(dat) if data: dat = data.pop(0) else: break if data: data.insert(0, dat) state = 'paramstart' pv = ''.join(param_value) if pt == 'rel': # rel types are case insensitive and space separated links[uri][pt].extend([y.lower() for y in pv.split(' ')]) else: if pv not in links[uri][pt]: links[uri][pt].append(pv) return links @staticmethod def request_head(uri, accept_datetime=None, follow_redirects=False, session=None): """ Makes HEAD requests. :param uri: (str) the uri for the request. :param accept_datetime: (str) the accept-datetime in the http format. :param follow_redirects: (boolean) Toggle to follow redirects. False by default, so does not follow any redirects. :return: the response object. """ sessionSet = False headers = {} if accept_datetime: headers["Accept-Datetime"] = accept_datetime # create a session if not supplied if not session: session = requests.Session() sessionSet = True response = session.head(uri, headers=headers, allow_redirects=follow_redirects) if sessionSet: session.close() return response def __prepare_memento_response(self, uri_m=None, dt_m=None, link_header=None, status_code=None): """ Prepares the response for the get_memento_info function. :param uri_m: (str) the memento uri :param dt_m: (datetime) the memento datetime :param links: (str) the link header from the memento/timegate response :param status_code: (int) the http status code of the memento. :return: (dict) a map of the mementos found. """ logging.debug("Preparing memento response.") if not uri_m and not dt_m and not link_header and not status_code: return memento_info = {} memento_info["mementos"] = {} memento_info["mementos"]["closest"] = {} memento_info["mementos"]["closest"]["uri"] = [uri_m] memento_info["mementos"]["closest"]["http_status_code"] = status_code links = self.parse_link_header(link_header) mementos = self.get_uri_dt_for_rel(links, ["prev", "next", "first", "last"]) logging.debug("DT_M provided: %s" % dt_m) memento_info["mementos"]["closest"]["datetime"] = dt_m logging.debug(links) logging.debug(uri_m) if links and not dt_m and uri_m in links: if "datetime" in links.get(uri_m): dt_m = self.convert_to_datetime(links.get(uri_m). get("datetime")[0]) logging.debug("No dt_m found, looking in the link headers: %s" % dt_m) memento_info["mementos"]["closest"]["datetime"] = dt_m elif isinstance(dt_m, str): logging.debug("dt_m is a string, converting to datetime: %s" % dt_m) dt_m = self.convert_to_datetime(dt_m) memento_info["mementos"]["closest"]["datetime"] = dt_m if not mementos: return memento_info for mem in mementos: memento_info["mementos"][mem] = { "uri": [mementos.get(mem).get("uri")], "datetime": self.convert_to_datetime(mementos.get(mem). get("datetime")[0]) } logging.debug("The full response: " + repr(memento_info)) return memento_info