Source code for wmflib.dns

"""DNS module."""
import logging

from typing import cast, List, Optional, Sequence, Union

from dns import resolver, reversename, rrset
from dns.exception import DNSException
from dns.name import Name

from wmflib.constants import PUBLIC_AUTHDNS
from wmflib.exceptions import WmflibError


logger = logging.getLogger(__name__)


[docs] class DnsError(WmflibError): """Custom exception class for errors of the Dns class."""
[docs] class DnsNotFound(DnsError): """Custom exception class to indicate the record was not found. One or more resource records might exist for this domain but no record matches the resource record type requested. """
[docs] class Dns: """Class to interact with the DNS.""" def __init__(self, *, nameserver_addresses: Optional[Sequence[str]] = None, port: Optional[int] = None) -> None: """Initialize the instance optionally specifying the nameservers to use. Examples: Using the host's default DNS resolvers:: >>> from wmflib.dns import Dns >>> dns = Dns() Using a specific set of resolvers and port:: >>> from wmflib.dns import Dns >>> dns = Dns(nameserver_addresses=['10.0.0.1', '10.0.0.2'], port=5353) Arguments: nameserver_addresses (Sequence, optional): the nameserveres address to use, if not set uses the OS configuration. port (int, optional): the port the ``nameserver_addresses`` nameserveres is listening to, if different from the default 53. This applies only if a nameserveres is explicitelyes specified. """ if nameserver_addresses is not None: self._resolver = resolver.Resolver(configure=False) if port is not None: self._resolver.port = port self._resolver.nameservers = list(nameserver_addresses) else: self._resolver = resolver.Resolver()
[docs] def resolve_ipv4(self, name: str) -> List[str]: """Perform a DNS lookup for an A record for the given name. Examples: :: >>> dns.resolve_ipv4('api.svc.eqiad.wmnet') ['10.2.2.22'] Arguments: name (str): the name to resolve. Returns: list: the list of IPv4 addresses as strings returned by the DNS response. """ return self._resolve_addresses(name, 'A')
[docs] def resolve_ipv6(self, name: str) -> List[str]: """Perform a DNS lookup for an AAAA record for the given name. Examples: :: >>> dns.resolve_ipv6('wikimedia.org') ['2620:0:861:ed1a::1'] Arguments: name (str): the name to resolve. Returns: list: the list of IPv6 addresses as strings returned by the DNS response. """ return self._resolve_addresses(name, 'AAAA')
[docs] def resolve_ips(self, name: str) -> List[str]: """Perform a DNS lookup for A and AAAA records for the given name. Examples: :: >>> dns.resolve_ips('wikimedia.org') ['208.80.154.224', '2620:0:861:ed1a::1'] Arguments: name (str): the name to resolve. Returns: list: the list of IPv4 and IPv6 addresses as strings returned by the DNS response. Raises: wmflib.dns.DnsNotFound: when no address is found. """ addresses = [] for func in ('resolve_ipv4', 'resolve_ipv6'): try: addresses += getattr(self, func)(name) except DnsNotFound: pass # Allow single stack answers if not addresses: raise DnsNotFound(f'Record A or AAAA not found for {name}') return addresses
[docs] def resolve_ptr(self, address: str) -> List[str]: """Perform a DNS lookup for PTR record for the given address. Examples: :: >>> dns.resolve_ptr('208.80.154.224') ['text-lb.eqiad.wikimedia.org'] Arguments: address (str): the IPv4 or IPv6 address to resolve. Returns: list: the list of absolute target PTR records as strings, without the trailing dot. """ response = self.resolve(reversename.from_address(address), 'PTR') return self._parse_targets(cast(rrset.RRset, response.rrset))
[docs] def resolve_cname(self, name: str) -> str: """Perform a DNS lookup for CNAME record for the given name. Examples: :: >>> dns.resolve_cname('puppet.codfw.wmnet') 'puppetmaster2001.codfw.wmnet' Arguments: name (str): the name to resolve. Returns: str: the absolute target name for this CNAME, without the trailing dot. """ targets = self._parse_targets(cast(rrset.RRset, self.resolve(name, 'CNAME').rrset)) if len(targets) != 1: raise DnsError(f'Found multiple CNAMEs target for {name}: {targets}') return targets[0]
[docs] def resolve(self, qname: Union[str, Name], record_type: str) -> resolver.Answer: """Perform a DNS lookup for the given qname and record type. Examples: :: >>> response = dns.resolve('wikimedia.org', 'MX') >>> [rdata.to_text() for rdata in response.rrset] ['10 mx1001.wikimedia.org.', '50 mx2001.wikimedia.org.'] Arguments: qname (str): the name or address to resolve. record_type (str): the DNS record type to lookup for, like 'A', 'AAAA', 'PTR', etc. Returns: dns.resolver.Answer: the DNS response. Raises: wmflib.dns.DnsNotFound: if there are no records for the given record type but the qname has records for different record type(s). wmflib.dns.DnsError: on generic error. """ try: response = self._resolver.query(qname, record_type) logger.debug('Resolved %s record for %s: %s', record_type, qname, response.rrset) except (resolver.NoAnswer, resolver.NXDOMAIN) as e: raise DnsNotFound(f'Record {record_type} not found for {qname}') from e except DNSException as e: raise DnsError(f'Unable to resolve {record_type} record for {qname}') from e return response
def _resolve_addresses(self, name: str, record_type: str) -> List[str]: """Extract and return all the matching addresses for the given name and record type. Arguments: name (str): the name to resolve. record_type (str): the DNS record type to lookup for, like 'A' and 'AAAA'. Returns: list: the list of IPv4 or IPv6 addresses as strings returned by the DNS response. """ return [rdata.address for rdata in cast(rrset.RRset, self.resolve(name, record_type).rrset)] @staticmethod def _parse_targets(response_set: rrset.RRset) -> List[str]: """Extract and return all the matching names from the given rrset without the trailing dot. Arguments: response_set (dns.rrset.RRset): the RRset to parse. Returns: list: the list of absolute target record names as strings without the trailing dot. Raises: wmflib.dns.DnsError: if a relative record is found. """ targets = [] for rdata in response_set: target = rdata.target.to_text() if target[-1] != '.': raise DnsError(f'Unsupported relative target {target} found') targets.append(target[:-1]) return targets
[docs] class PublicAuthDns(Dns): """Class to interact with the DNS using the wikimedia foundation authoritative servers.""" def __init__(self) -> None: """Initialize the instance with the WMF public authoritative namerservers. It uses the nameservers defined in :py:const:`wmflib.constants.PUBLIC_AUTHDNS`. Examples: :: >>> from wmflib.dns import PublicAuthDns >>> dns = PublicAuthDns() """ super().__init__(nameserver_addresses=PUBLIC_AUTHDNS)