Source code for homer.netbox

"""Netbox module."""
import ipaddress
import logging

from collections import UserDict
from typing import Any, Dict, List, Optional, Sequence, Union

import pynetbox
import requests

from requests.exceptions import RequestException

from homer.devices import Device
from homer.exceptions import HomerError


logger = logging.getLogger(__name__)


[docs] class BaseNetboxData(UserDict): """Base class to gather data dynamically from Netbox.""" def __init__(self, api: pynetbox.api): """Initialize the dictionary. Arguments: api (pynetbox.api): the Netbox API instance. """ super().__init__() self._api = api def __getitem__(self, key: Any) -> Any: """Dynamically call the related method, if exists, to return the requested data. Parameters according to Python's datamodel, see: https://docs.python.org/3/reference/datamodel.html#object.__getitem__ Returns: mixed: the dynamically gathered data. """ method_name = f'_get_{key}' if not hasattr(self, method_name): raise KeyError(key) if key not in self.data: try: self.data[key] = getattr(self, method_name)() except Exception as e: raise HomerError(f'Failed to get key {key}') from e return self.data[key]
[docs] class BaseNetboxDeviceData(BaseNetboxData): """Base class to gather device-specific data dynamically from Netbox.""" def __init__(self, api: pynetbox.api, device: Device): """Initialize the dictionary. Arguments: api (pynetbox.api): the Netbox API instance. device (homer.devices.Device): the device for which to gather the data. """ super().__init__(api) self._device = device self._device.metadata['netbox_object'] = api.dcim.devices.get(id=device.metadata['id'])
[docs] class NetboxData(BaseNetboxData): """Dynamic dictionary to gather the required generic data from Netbox.""" def _get_vlans(self) -> List[Dict[str, Any]]: """Returns all the vlans defined in Netbox. Returns: list: a list of vlans. """ return [dict(i) for i in self._api.ipam.vlans.all()]
[docs] class NetboxDeviceData(BaseNetboxDeviceData): """Dynamic dictionary to gather the required device-specific data from Netbox.""" def _get_virtual_chassis_members(self) -> Optional[List[Dict[str, Any]]]: """Returns a list of devices part of the same virtual chassis or None. Returns: list: a list of devices. None: the device is not part of a virtual chassis. """ if not self._device.metadata['netbox_object'].virtual_chassis: return None vc_id = self._device.metadata['netbox_object'].virtual_chassis.id return [dict(i) for i in self._api.dcim.devices.filter(virtual_chassis_id=vc_id)] def _get_circuits(self) -> Optional[Dict[str, Dict[str, Any]]]: """Returns a list of circuits connected to the device. Returns: list: A list of circuits. """ device_id = self._device.metadata['netbox_object'].id circuits = {} for a_int in self._api.dcim.interfaces.filter(device_id=device_id): # b_int is either the patch panel interface facing out or the initial interface # if no patch panel if a_int.link_peer_type == 'dcim.frontport' and a_int.link_peer.rear_port: b_int = a_int.link_peer.rear_port else: # If the patch panel isn't patched through b_int = a_int if b_int.link_peer_type == 'circuits.circuittermination': circuits[a_int.name] = dict(self._api.circuits.circuits.get(b_int.link_peer.circuit.id)) return circuits def _get_inventory(self) -> Optional[List[Dict[Any, Any]]]: """Returns the list of inventory items on the device. Returns: list: A list of inventory items. """ device_id = self._device.metadata['netbox_object'].id return [dict(i) for i in self._api.dcim.inventory_items.filter(device_id=device_id)] def _get_vlans(self) -> Dict[int, Any]: """Returns all the vlans defined on a device. Returns: dict: a dict of vlans. """ vlans = {} device_id = self._device.metadata['netbox_object'].id for interface in self._api.dcim.interfaces.filter(device_id=device_id): if interface.untagged_vlan and interface.untagged_vlan.vid not in vlans: vlans[interface.untagged_vlan.vid] = interface.untagged_vlan if interface.tagged_vlans: for tagged_vlan in interface.tagged_vlans: if tagged_vlan.vid not in vlans: vlans[tagged_vlan.vid] = tagged_vlan return vlans
[docs] class NetboxInventory: """Use Netbox as inventory to gather the list of devices to manage.""" def __init__(self, config: dict, device_roles: Sequence[str], device_statuses: Sequence[str]): """Initialize the instance. Arguments: config (dict): Homer's configuration section about Netbox device_roles (list): a sequence of Netbox device role slug strings to filter the devices. device_statuses (list): a sequence of Netbox device status label or value strings to filter the devices. """ self._config = config self._device_roles = device_roles self._device_statuses = [status.lower() for status in device_statuses]
[docs] def get_devices(self) -> Dict[str, Dict[str, str]]: """Return all the devices based on configuration with their role and site. Returns: dict: a dictionary with the device FQDN as keys and a metadata dictionary as value. """ # TODO maybe remove this class devices = self._get_devices() return devices
def _gql_execute(self, query: str, variables: Optional[dict] = None) -> dict: """Parse the query into a gql query, execute and return the results. Arguments: query: a string representing the gql query variables: A list of variables to send Results: dict: the results """ data: dict[str, Union[str, dict]] = {"query": query} if variables is not None: data["variables"] = variables try: session = requests.Session() session.headers.update( {"Authorization": f"Token {self._config['token']}", "User-Agent": "Homer"} ) response = session.post(f"{self._config['url']}/graphql/", json=data, timeout=15) response.raise_for_status() return response.json()['data'] except RequestException as error: raise HomerError( f"failed to fetch netbox data: {error}\n{response.text}" ) from error except KeyError as error: raise HomerError(f"No data found in GraphQL response: {error}") from error def _get_devices(self) -> Dict[str, Dict[str, str]]: """Get the devices based on role, status and virtual chassis membership. Returns: dict: a dictionary with the device FQDN as keys and a metadata dictionary as value. """ device_list_gql = """ query ($role: [String], $status: [String]) { device_list(role: $role, status: $status) { id name status platform { slug } site { slug } device_type { slug } device_role { slug } primary_ip4 { address dns_name } primary_ip6 { address dns_name } } } """ devices: Dict[str, Dict[str, str]] = {} variables = {"role": self._device_roles, "status": self._device_statuses} devices_gql = self._gql_execute(device_list_gql, variables)['device_list'] for device in devices_gql: if (device.get('primary_ip4') and device['primary_ip4'].get('dns_name')): fqdn = device['primary_ip4']['dns_name'] elif (device.get('primary_ip6') and device['primary_ip6'].get('dns_name')): fqdn = device['primary_ip6']['dns_name'] else: logger.debug('Unable to determine FQDN for device %s, skipping.', device['name']) continue metadata = { 'id': device['id'], 'role': device['device_role']['slug'], 'site': device['site']['slug'], 'type': device['device_type']['slug'], 'status': device['status'].lower(), } # Convert Netbox interfaces into IPs if device.get('primary_ip4') is not None: metadata['ip4'] = ipaddress.ip_interface(device['primary_ip4']['address']).ip.compressed if device.get('primary_ip6') is not None: metadata['ip6'] = ipaddress.ip_interface(device['primary_ip6']['address']).ip.compressed devices[fqdn] = metadata return devices