Source code for spicerack.dns

"""DNS module."""
import logging

from typing import List, Optional, Union

from dns import resolver, reversename, rrset
from dns.exception import DNSException
from dns.name import Name

from spicerack.exceptions import SpicerackError


logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class DnsError(SpicerackError): """Custom exception class for errors of the Dns class."""
[docs]class DnsNotFound(DnsError): """Custom exception class to indicate the record was not found. One or more resource records exist for this domain but there isn’t a record matching the resource record type. """
[docs]class Dns: """Class to interact with the DNS.""" def __init__(self, *, nameserver_address: Optional[str] = None, port: Optional[int] = None) -> None: """Initialize the instance. Arguments: nameserver_address (str, optional): the nameserver address to use, if not set uses the OS configuration. port (int, optional): the port the ``nameserver_address`` nameserver is listening to, if different from the default 53. This applies only if a nameserver is explicitely specified. """ if nameserver_address is not None: self._resolver = resolver.Resolver(configure=False) if port is not None: self._resolver.port = port self._resolver.nameservers = [nameserver_address] else: self._resolver = resolver.Resolver()
[docs] def resolve_ipv4(self, name: str) -> List[str]: """Perform a DNS lookup for an A record for the given name. Arguments: name (str): the name to resolve. Returns: list: the list of IPv4 addresses as strings returned by the DNS response. """ return self._resolve_addresses(name, 'A')
[docs] def resolve_ipv6(self, name: str) -> List[str]: """Perform a DNS lookup for an AAAA record for the given name. Arguments: name (str): the name to resolve. Returns: list: the list of IPv6 addresses as strings returned by the DNS response. """ return self._resolve_addresses(name, 'AAAA')
[docs] def resolve_ips(self, name: str) -> List[str]: """Perform a DNS lookup for A and AAAA records for the given name. Arguments: name (str): the name to resolve. Returns: list: the list of IPv4 and IPv6 addresses as strings returned by the DNS response. Raises: spicerack.dns.DnsNotFound: when no address is found. """ addresses = [] for func in ('resolve_ipv4', 'resolve_ipv6'): try: addresses += getattr(self, func)(name) except DnsNotFound: pass # Allow single stack answers if not addresses: raise DnsNotFound('Record A or AAAA not found for {name}'.format(name=name)) return addresses
[docs] def resolve_ptr(self, address: str) -> List[str]: """Perform a DNS lookup for PTR record for the given address. Arguments: address (str): the IPv4 or IPv6 address to resolve. Returns: list: the list of absolute target PTR records as strings, without the trailing dot. """ response = self.resolve(reversename.from_address(address), 'PTR') return self._parse_targets(response.rrset) # type: ignore
[docs] def resolve_cname(self, name: str) -> str: """Perform a DNS lookup for CNAME record for the given name. Arguments: name (str): the name to resolve. Returns: str: the absolute target name for this CNAME, without the trailing dot. """ targets = self._parse_targets(self.resolve(name, 'CNAME').rrset) # type: ignore if len(targets) != 1: raise DnsError('Found multiple CNAMEs target for {name}: {targets}'.format(name=name, targets=targets)) return targets[0]
[docs] def resolve(self, qname: Union[str, Name], record_type: str) -> resolver.Answer: """Perform a DNS lookup for the given qname and record type. Arguments: qname (str): the name or address to resolve. record_type (str): the DNS record type to lookup for, like 'A', 'AAAA', 'PTR', etc. Returns: dns.resolver.Answer: the DNS response. Raises: spicerack.dns.DnsNotFound: if there are no records for the given record type but the qname has records for different record type(s). spicerack.dns.DnsError: on generic error. """ try: response = self._resolver.query(qname, record_type) logger.debug('Resolved %s record for %s: %s', record_type, qname, response.rrset) except (resolver.NoAnswer, resolver.NXDOMAIN) as e: raise DnsNotFound('Record {record_type} not found for {qname}'.format( record_type=record_type, qname=qname)) from e except DNSException as e: raise DnsError('Unable to resolve {record_type} record for {qname}'.format( record_type=record_type, qname=qname)) from e return response
def _resolve_addresses(self, name: str, record_type: str) -> List[str]: """Extract and return all the matching addresses for the given name and record type. Arguments: name (str): the name to resolve. record_type (str): the DNS record type to lookup for, like 'A' and 'AAAA'. Returns: list: the list of IPv4 or IPv6 addresses as strings returned by the DNS response. """ return [rdata.address for rdata in self.resolve(name, record_type).rrset] # type: ignore @staticmethod def _parse_targets(response_set: rrset.RRset) -> List[str]: """Extract and return all the matching names from the given rrset without the trailing dot. Arguments: response_set (dns.rrset.RRset): the RRset to parse. Returns: list: the list of absolute target record names as strings without the trailing dot. Raises: spicerack.dns.DnsError: if a relative record is found. """ targets = [] for rdata in response_set: target = rdata.target.to_text() if target[-1] != '.': raise DnsError('Unsupported relative target {target} found'.format(target=target)) targets.append(target[:-1]) return targets