Source code for homer.netbox

"""Netbox module."""
import ipaddress
import logging

from collections import UserDict
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union

import pynetbox

from requests.exceptions import RequestException

from homer.devices import Device
from homer.exceptions import HomerError


logger = logging.getLogger(__name__)


[docs] class BaseNetboxData(UserDict): """Base class to gather data dynamically from Netbox.""" def __init__(self, api: pynetbox.api, base_paths: dict[str, str]): """Initialize the dictionary. Arguments: api: the Netbox API instance. base_paths: The path to the public and private directories. """ super().__init__() self._api = api self._base_paths = base_paths def __getitem__(self, key: Any) -> Any: """Dynamically call the related method, if exists, to return the requested data. Parameters according to Python's datamodel, see: https://docs.python.org/3/reference/datamodel.html#object.__getitem__ Returns: The dynamically gathered data. """ method_name = f'_get_{key}' if not hasattr(self, method_name): raise KeyError(key) if key not in self.data: try: self.data[key] = getattr(self, method_name)() except Exception as e: raise HomerError(f'Failed to get key {key}') from e return self.data[key] def _gql_execute(self, query_name: str, variables: Optional[dict] = None) -> dict[str, Any]: """Exposes gql_execute to BaseNetboxData. Arguments: query_name: the name of the file in the ``graphql`` directory without the ``.gql`` extension. variables: the variables to pass to the GraphQL query. Returns: The result of the queried data. """ gql_query_path = Path(self._base_paths['public']) / 'graphql' / f'{query_name}.gql' return gql_execute(self._api, gql_query_path.read_text(), variables)
[docs] class BaseNetboxDeviceData(BaseNetboxData): """Base class to gather device-specific data dynamically from Netbox.""" def __init__(self, api: pynetbox.api, base_paths: dict[str, str], device: Device): """Initialize the dictionary. Arguments: api: the Netbox API instance. base_paths: The path to the public and private directories. device: the device for which to gather the data. """ super().__init__(api, base_paths) self._device = device self._device_interfaces: dict = {} self._device.metadata['netbox_object'] = api.dcim.devices.get(id=device.metadata['id']) @property def _gql_variables(self) -> dict[str, Any]: """Returns the default GraphQL variables to inject into all queries. Returns: The variables dictionary """ # Inject device_id variables = {'device_id': str(self._device.metadata['id'])} # Inject virtual_chassis_id if defined: if self._device.metadata['netbox_object'].virtual_chassis: variables['virtual_chassis_id'] = str(self._device.metadata['netbox_object'].virtual_chassis.id) return variables def _gql_execute(self, query_name: str, variables: Optional[dict] = None) -> dict[str, Any]: """Exposes gql_execute to BaseNetboxDeviceData while injecting device variables. Arguments: query_name: the name of the file in the ``graphql`` directory without the ``.gql`` extension. variables: the variables to pass to the GraphQL query. Returns: The result of the queried data. """ default_variables = self._gql_variables # Variables passed as parameter take priority if any conflict if variables: default_variables.update(variables) return super()._gql_execute(query_name, default_variables)
[docs] def fetch_device_interfaces(self) -> dict: """Fetch interfaces from Netbox. Returns: the interfaces dictionary. """ if not self._device_interfaces: if self._device.metadata['netbox_object'].virtual_chassis: query_name = 'interface_list_virtual_chassis' else: query_name = 'interface_list' self._device_interfaces = gql_execute( self._api, get_gql_query(query_name), self._gql_variables)['interface_list'] return self._device_interfaces
[docs] class NetboxData(BaseNetboxData): """Dynamic dictionary to gather the required generic data from Netbox.""" def _get_vlans(self) -> List[Dict[str, Any]]: """Returns all the vlans defined in Netbox. Returns: A list of vlans. """ return [dict(i) for i in self._api.ipam.vlans.all()]
[docs] class NetboxDeviceData(BaseNetboxDeviceData): """Dynamic dictionary to gather the required device-specific data from Netbox.""" def _get_virtual_chassis_members(self) -> Optional[List[Dict[str, Any]]]: """Returns a list of devices part of the same virtual chassis or None. Returns: A list of devices or :py:data:`None` if the device is not part of a virtual chassis. """ if not self._device.metadata['netbox_object'].virtual_chassis: return None vc_id = self._device.metadata['netbox_object'].virtual_chassis.id return [dict(i) for i in self._api.dcim.devices.filter(virtual_chassis_id=vc_id)] def _get_circuits(self) -> Optional[Dict[str, Dict[str, Any]]]: """Returns a list of circuits connected to the device. Returns: A list of circuits. """ circuits = {} for a_int in self.fetch_device_interfaces(): # b_int is either the patch panel interface facing out or the initial interface # if no patch panel # Using link_peers[0] to mimic pre-Netbox 3.3 behavior, when a cable only had one termination # per side. To be revisited if we start using the multi-termination feature if (a_int['link_peers'] and a_int['link_peers'][0]['__typename'] == 'FrontPortType' and a_int['link_peers'][0]['rear_port']): b_int = a_int['link_peers'][0]['rear_port'] else: # If the patch panel isn't patched through b_int = a_int if b_int['link_peers'] and b_int['link_peers'][0]['__typename'] == 'CircuitTerminationType': circuits[a_int['name']] = dict( self._api.circuits.circuits.get(int(b_int['link_peers'][0]['circuit']['id']))) return circuits def _get_inventory(self) -> Optional[List[Dict[Any, Any]]]: """Returns the list of inventory items on the device. Returns: A list of inventory items. """ device_id = self._device.metadata['netbox_object'].id return [dict(i) for i in self._api.dcim.inventory_items.filter(device_id=device_id)] def _get_vlans(self) -> Dict[int, Any]: """Returns all the vlans defined on a device. Returns: A dict of vlans keyed by VLAN ID. """ vlans = {} for interface in self.fetch_device_interfaces(): if interface['untagged_vlan'] and interface['untagged_vlan']['vid'] not in vlans: vlans[int(interface['untagged_vlan']['vid'])] = interface['untagged_vlan'] for tagged_vlan in interface['tagged_vlans']: vid = int(tagged_vlan['vid']) if vid not in vlans: vlans[vid] = tagged_vlan if interface['name'].startswith('irb'): vid = int(interface['name'].split('.')[1]) if vid not in vlans: vlan = dict(self._api.ipam.vlans.get(vid=vid)) try: vlans[vlan['vid']] = vlan except KeyError as e: raise HomerError(f'IRB interface {interface["name"]} does not match any Vlan in Netbox') from e return vlans
[docs] class NetboxInventory: """Use Netbox as inventory to gather the list of devices to manage.""" def __init__(self, api: pynetbox.api, device_roles: Sequence[str], device_statuses: Sequence[str]): """Initialize the instance. Arguments: api: the Netbox API instance. config: Homer's configuration section about Netbox device_roles: a sequence of Netbox device role slug strings to filter the devices. device_statuses: a sequence of Netbox device status label or value strings to filter the devices. """ self._api = api self._device_roles = device_roles self._device_statuses = [status.lower() for status in device_statuses]
[docs] def get_devices(self) -> Dict[str, Dict[str, str]]: """Get the devices based on role, status and virtual chassis membership. Returns: A dictionary with the device FQDN as keys and a metadata dictionary as value. """ devices: Dict[str, Dict[str, str]] = {} device_list_gql = get_gql_query('device_list') variables = {'role': self._device_roles, 'status': self._device_statuses} devices_gql = gql_execute(self._api, device_list_gql, variables)['device_list'] for device in devices_gql: if (device.get('primary_ip4') and device['primary_ip4'].get('dns_name')): fqdn = device['primary_ip4']['dns_name'] elif (device.get('primary_ip6') and device['primary_ip6'].get('dns_name')): fqdn = device['primary_ip6']['dns_name'] else: logger.debug('Unable to determine FQDN for device %s, skipping.', device['name']) continue metadata = { 'id': device['id'], 'role': device['role']['slug'], 'site': device['site']['slug'], 'type': device['device_type']['slug'], 'status': device['status'].lower(), 'manufacturer': device['device_type']['manufacturer']['slug'] } # Convert Netbox interfaces into IPs if device.get('primary_ip4') is not None: metadata['ip4'] = ipaddress.ip_interface(device['primary_ip4']['address']).ip.compressed if device.get('primary_ip6') is not None: metadata['ip6'] = ipaddress.ip_interface(device['primary_ip6']['address']).ip.compressed devices[fqdn] = metadata return devices
[docs] def get_gql_query(name: str) -> str: """Get one of the GraphQL query provided by Homer. Arguments: name: the name of the query file without extension. Returns: the query as string. """ query_path = Path(__file__).resolve().parent / 'graphql' / f'{name}.gql' return query_path.read_text()
[docs] def gql_execute(api: pynetbox.api, query: str, variables: Optional[dict] = None) -> dict[str, Any]: """Parse the query into a gql query, execute and return the results. Arguments: api: the Netbox API instance. query: a string representing the gql query. variables: A list of variables to send. Raises: homer.exceptions.HomerError: if failed to query Netbox or no data was returned. Results: The results of the query. """ data: dict[str, Union[str, dict]] = {'query': query} if variables is not None: data['variables'] = variables session = api.http_session session.headers.update({'Authorization': f'Token {api.token}'}) session.headers.update({'Content-Type': 'application/json'}) response = None try: response = session.post(api.base_url.replace('/api', '/graphql/'), json=data, timeout=15) response.raise_for_status() return response.json()['data'] except RequestException as error: response_text = f'\n{response.text}' if response is not None else '' raise HomerError(f'failed to fetch netbox data: {error}{response_text}') from error except KeyError as error: raise HomerError(f'No data found in GraphQL response: {error}') from error