Source code for spicerack.peeringdb

"""PeeringDB module."""
import json
import logging
import time
from collections.abc import MutableMapping
from pathlib import Path
from typing import Optional, cast

from wmflib.requests import http_session

from spicerack.exceptions import SpicerackError

logger = logging.getLogger(__name__)


[docs] class PeeringDBError(SpicerackError): """Custom exception class for errors of the PeeringDB class."""
[docs] class CacheMiss(SpicerackError): """Custom exception class for cache management."""
[docs] class PeeringDB: """Basic dumb wrapper over the PeeringDB API. Implements the beta/v0 PeeringDB API. Tries to be smart by: a) keeping a persistent keep-alived session for multiple requests b) operating a local filesystem cache, if so desired. """ baseurl: str = "https://www.peeringdb.com/api/" """The PeeringDB base API URL.""" def __init__( self, *, ttl: int = 86400, cachedir: Optional[Path] = None, proxies: Optional[MutableMapping[str, str]] = None, token: str = "", ): """Initiliaze the module. Arguments: ttl: TTL for cached objects. cachedir: Root path for objects caching. proxies: Proxies for Internet access. token: PeeringDB read-only token. """ self.session = http_session(".".join((self.__module__, self.__class__.__name__))) if token: self.session.headers.update({"Authorization": f"Api-Key {token}"}) if proxies: self.session.proxies = proxies self.ttl = ttl self.use_cache = cachedir is not None and self.ttl > 0 self.cachedir: Path if cachedir is not None: self.cachedir = cast(Path, cachedir) self.cachedir.mkdir(exist_ok=True) @staticmethod def _get_cache_key(resource: str, *, resource_id: Optional[int] = None, filters: Optional[dict] = None) -> str: """Return a cache key based on the resource requested. Arguments: resource: The PeeringDB resource requested. resource_id: Optional resource number. filters: A dictionary of addtional filter parameters. """ if resource_id is None and filters is None: return f"{resource}/index" if filters is None: return f"{resource}/{str(resource_id)}" filter_key = "/".join("/".join([k, str(v)]) for k, v in filters.items()) if resource_id is None: return f"{resource}/{filter_key}" return f"{resource}/{str(resource_id)}/{filter_key}"
[docs] def fetch_asn(self, asn: int) -> dict: """Fetch a specific asn data. Arguments: asn: The Autonomous system number. """ return self.fetch("net", filters={"asn": asn, "depth": 2})
[docs] def fetch(self, resource: str, resource_id: Optional[int] = None, filters: Optional[dict] = None) -> dict: """Get a PeeringDB resource. Arguments: resource: The PeeringDB resource requested. resource_id: Optional resource number. filters: A dictionary of addtional filter parameters. """ if resource_id is None: endpoint = resource else: endpoint = f"{resource}/{str(resource_id)}" cache_key = self._get_cache_key(resource=resource, resource_id=resource_id, filters=filters) try: json_response = self._cache_get(cache_key) except CacheMiss as e: url = self.baseurl + endpoint logging.debug("Fetching %s from PeeringDB (params: %s)", url, filters) raw_response = self.session.get(url, params=filters) if not raw_response.ok: raise PeeringDBError( f"Server response with status {raw_response.status_code}" f" ({raw_response.text})" ) from e json_response = raw_response.json() self._cache_put(json_response, cache_key) return json_response["data"]
def _cache_get(self, cache_key: str) -> dict: """Get the resource from the on disk cache if present. Arguments: cache_key: The on-disk path of the resource. """ if not self.use_cache: raise CacheMiss() cachefile = self.cachedir / cache_key try: mtime = cachefile.stat().st_mtime age = time.time() - mtime if age > self.ttl: raise CacheMiss() return json.loads(cachefile.read_text()) except OSError as e: raise CacheMiss() from e def _cache_put(self, content: dict, cache_key: str) -> None: """Write the resource to the on disk cache if configured to do so. Arguments: content: Dictionary of data to cache. cache_key: The on-disk path of the resource. """ if not self.use_cache: return cachefile = self.cachedir / cache_key cachefile.parent.mkdir(exist_ok=True, parents=True) cachefile.write_text(json.dumps(content, indent=2))