Source code for homer

"""Homer package."""
import logging
import os
import pathlib
import sys

from collections import defaultdict
from importlib import import_module
from typing import Callable, DefaultDict, Dict, List, Mapping, Optional, Tuple

import pynetbox

from pkg_resources import DistributionNotFound, get_distribution
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

from homer.capirca import CapircaGenerate
from homer.config import HierarchicalConfig, load_yaml_config
from homer.devices import Device, Devices
from homer.exceptions import HomerAbortError, HomerConnectError, HomerError, HomerTimeoutError
from homer.netbox import NetboxData, NetboxDeviceData, NetboxInventory
from homer.templates import Renderer
from homer.transports import DEFAULT_PORT, DEFAULT_TIMEOUT
from homer.transports.junos import connected_device

""":py:class:`int`: the number of attempts to try when there is a timeout."""
""":py:class:`int`: the exit code used when the diff command is executed and there is a diff."""

    __version__ = get_distribution('homer').version  # Must be the same used as 'name' in
    """:py:class:`str`: the version of the current Homer package."""
except DistributionNotFound:  # pragma: no cover - this should never happen during tests
    pass  # package is not installed

logger = logging.getLogger(__name__)

[docs] class Homer: """The instance to run Homer.""" OUT_EXTENSION = '.out' """:py:class:`str`: the extension for the generated output files.""" def __init__(self, main_config: Mapping): """Initialize the instance. Arguments: main_config (dict): the configuration dictionary. """ logger.debug('Initialized with configuration: %s', main_config) self._main_config = main_config self.private_base_path = self._main_config['base_paths'].get('private', '') self._config = HierarchicalConfig( self._main_config['base_paths']['public'], private_base_path=self.private_base_path) self._netbox_api = None self._device_plugin = None if self._main_config.get('netbox', {}): self._netbox_api = pynetbox.api( self._main_config['netbox']['url'], token=self._main_config['netbox']['token'], threading=True) retry_session = Session() retry_adapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)) retry_session.mount('http://', retry_adapter) retry_session.mount('https://', retry_adapter) self._netbox_api.http_session = retry_session if self._main_config['netbox'].get('plugin', ''): self._device_plugin = import_module( self._main_config['netbox']['plugin']).NetboxDeviceDataPlugin devices_all_config = load_yaml_config( os.path.join(self._main_config['base_paths']['public'], 'config', 'devices.yaml')) devices_config = {fqdn: data.get('config', {}) for fqdn, data in devices_all_config.items()} netbox_inventory = self._main_config.get('netbox', {}).get('inventory', {}) devices = devices_all_config.copy() for data in devices.values(): data.pop('config', None) if netbox_inventory: # Get the data from Netbox while keeping any existing metadata from the devices.yaml file. # The data from Netbox overrides the existing keys for each device, if both present. netbox_devices = NetboxInventory( self._main_config['netbox'], netbox_inventory['device_roles'], netbox_inventory['device_statuses']).get_devices() for fqdn, data in netbox_devices.items(): if fqdn in devices: devices[fqdn].update(data) else: devices[fqdn] = data private_devices_config: Dict = {} if self.private_base_path: private_devices_config = load_yaml_config( os.path.join(self.private_base_path, 'config', 'devices.yaml')) self._ignore_warning = self._main_config.get('transports', {}).get('junos', {}).get('ignore_warning', False) self._transport_username = self._main_config.get('transports', {}).get('username', '') self._transport_timeout = self._main_config.get('transports', {}).get('timeout', DEFAULT_TIMEOUT) self._port = self._main_config.get('transports', {}).get('port', DEFAULT_PORT) transport_ssh_config = self._main_config.get('transports', {}).get('ssh_config', None) if transport_ssh_config is not None: transport_ssh_config = str(pathlib.Path(transport_ssh_config).expanduser()) self._transport_ssh_config = transport_ssh_config self._devices = Devices(devices, devices_config, private_devices_config) self._renderer = Renderer(self._main_config['base_paths']['public'], self.private_base_path) self._output_base_path = pathlib.Path(self._main_config['base_paths']['output'])
[docs] def generate(self, query: str) -> int: """Generate the configuration only saving it locally, no remote action is performed. Arguments: query (str): the query to select the devices. Return: int: ``0`` on success, a small positive integer on failure. """'Generating configuration for query %s', query) self._prepare_out_dir() successes, _ = self._execute(self._device_generate, query) return Homer._parse_results(successes)
[docs] def diff(self, query: str, *, omit_diff: bool = False) -> int: """Generate the configuration and check the diff with the current live one. Arguments: query (str): the query to select the devices. omit_diff (bool, optional): whether to not show the actual diff to avoid leak of private data. Return: int: ``0`` on success, a small positive integer on failure. """'Generating diff for query %s', query) successes, diffs = self._execute(self._device_diff, query) has_diff = False for diff, diff_devices in diffs.items(): print(f'Changes for {len(diff_devices)} devices: {diff_devices}') if diff is None: print('# Failed') elif not diff: print('# No diff') else: has_diff = True if omit_diff: print('# Non-empty diff omitted, -o/--omit-diff set') else: print(diff) print('---------------') ret = Homer._parse_results(successes) if ret == 0 and has_diff: return DIFF_EXIT_CODE return ret
[docs] def commit(self, query: str, *, message: str = '-') -> int: """Commit the generated configuration asking for confirmation. Arguments: query (str): the query to select the devices. message (str): the commit message to use. Return: int: ``0`` on success, a small positive integer on failure. """'Committing config for query %s with message: %s', query, message) successes, _ = self._execute(self._device_commit, query, message=message) return Homer._parse_results(successes)
def _device_generate(self, device: Device, device_config: str, _: int) -> Tuple[bool, Optional[str]]: """Save the generated configuration in a local file. Arguments: device (homer.devices.Device): the device instance. device_config (str): the generated configuration for the device. attempt (int, unused): the current attempt number. Returns: tuple: a two-element tuple with a boolean as first parameter that represent the success of the operation or not and a second element with a string or None that is not used but is required by the callback API. """ output_path = self._output_base_path / f'{device.fqdn}{Homer.OUT_EXTENSION}' with open(str(output_path), 'w', encoding='utf-8') as f: f.write(device_config)'Written configuration for %s in %s', device.fqdn, output_path) return True, None def _device_diff(self, device: Device, device_config: str, _: int) -> Tuple[bool, Optional[str]]: """Perform a configuration diff between the generated configuration and the live one. Arguments: device (homer.devices.Device): the device instance. device_config (str): the generated configuration for the device. attempt (int, unused): the current attempt number. Returns: tuple: a two-element tuple with a boolean as first parameter that represent the success of the operation or not and a second element with a string that contains the configuration differences or None if unable to load the new configuration in the device to generate the diff. """ timeout = device.metadata.get('timeout', self._transport_timeout) port = device.metadata.get('port', self._port) with connected_device(device.fqdn, username=self._transport_username, port=port, ssh_config=self._transport_ssh_config, timeout=timeout) as connection: return connection.commit_check(device_config, self._ignore_warning) def _device_commit(self, device: Device, device_config: str, # noqa: MC0001; pylint: disable=no-self-use attempt: int, *, message: str = '-') -> Tuple[bool, Optional[str]]: """Commit a new configuration to the device. Arguments: device (homer.devices.Device): the device instance. device_config (str): the generated configuration for the device. attempt (int): the current attempt number. message (str): the commit message to use. Raises: HomerTimeoutError: on timeout. Returns: tuple: a two-element tuple with a boolean as first parameter that represent the success of the operation or not and a second element with a string or None that is not used but is required by the callback API. """ def callback(fqdn: str, diff: str) -> None: """Callback as required by :py:class:`homer.transports.junos.ConnectedDevice.commit`.""" if not sys.stdout.isatty(): raise HomerError('Not in a TTY, unable to ask for confirmation') print(f'Configuration diff for {fqdn}:\n{diff}') print('Type "yes" to commit, "no" to abort.') for _ in range(2): resp = input('> ') if resp == 'yes': break if resp == 'no': raise HomerAbortError('Commit aborted') print(('Invalid response, please type "yes" to commit or "no" to abort. After 2 wrong answers the ' 'commit will be aborted.')) else: raise HomerAbortError('Too many invalid answers, commit aborted') is_retry = attempt != 1 timeout = device.metadata.get('timeout', self._transport_timeout) port = device.metadata.get('port', self._port) with connected_device(device.fqdn, username=self._transport_username, port=port, ssh_config=self._transport_ssh_config, timeout=timeout) as connection: try: connection.commit(device_config, message, callback, ignore_warning=self._ignore_warning, is_retry=is_retry) return True, '' except HomerTimeoutError: raise # To be catched later for automatic retry except HomerAbortError as e: logger.warning('%s on %s', e, device.fqdn) except Exception as e: # pylint: disable=broad-except logger.error('Failed to commit on %s: %s', device.fqdn, e) logger.debug('Traceback:', exc_info=True) return False, '' def _prepare_out_dir(self) -> None: """Prepare the out directory creating the directory if doesn't exists and deleting any pre-generated file.""" self._output_base_path.mkdir(parents=True, exist_ok=True) for path in self._output_base_path.iterdir(): if path.is_file() and path.suffix == Homer.OUT_EXTENSION: path.unlink() def _execute(self, callback: Callable, query: str, **kwargs: str) -> Tuple[Dict, DefaultDict]: # noqa, mccabe: MC0001 too complex """Execute Homer based on the given action and query. Arguments: callback (Callable): the callback to call for each device. query (str): the query to filter the devices to act on. **kwargs (str): any additional keyword argument to pass to the callback Returns: tuple: a two-element tuple, with the first item as a dictionary that contains two keys (:py:data:`True` and :py:data:`False`) and as value a list of device FQDN that were successful (True) or failed (False) the operation and a second element a :py:class:`collections.defaultdict` that has as keys the configuration differences and as values the list of device FQDN that reported that diff. """ diffs: DefaultDict[str, list] = defaultdict(list) successes: Dict[bool, list] = {True: [], False: []} netbox_data = None if self._netbox_api is not None:'Gathering global Netbox data') netbox_data = NetboxData(self._netbox_api) for device in self._devices.query(query):'Generating configuration for %s', device.fqdn) try: device_config = [] device_data = self._config.get(device) # Render the ACLs using Capirca if 'capirca' in device_data and not self._main_config.get('capirca', {}).get('disabled', False): capirca = CapircaGenerate(self._main_config, device_data['capirca'], self._netbox_api) generated_acls = capirca.generate_acls() if generated_acls: device_config.extend(generated_acls) if netbox_data is not None: device_data['netbox'] = { 'global': netbox_data, 'device': NetboxDeviceData(self._netbox_api, device), } if self._device_plugin is not None: device_data['netbox']['device_plugin'] = self._device_plugin(self._netbox_api, device) # Render the Jinja templates based on yaml + netbox data device_config.append(self._renderer.render(device.metadata['role'], device_data)) except HomerError: logger.exception('Device %s failed to render the template, skipping.', device.fqdn) successes[False].append(device.fqdn) continue for attempt in range(1, TIMEOUT_ATTEMPTS + 1): try: device_success, device_diff = callback(device, '\n'.join(device_config), attempt, **kwargs) break except (HomerTimeoutError, HomerConnectError) as e: logger.error('Attempt %d/%d failed: %s', attempt, TIMEOUT_ATTEMPTS, e) else: device_success = False device_diff = '' successes[device_success].append(device.fqdn) diffs[device_diff].append(device.fqdn) return successes, diffs @staticmethod def _parse_results(successes: Mapping[bool, List[Device]]) -> int: """Parse the results dictionary, log and return the approriate exit status code. Arguments: successes (dict): a dictionary that contains two keys (:py:data:`True` and :py:data:`False`) and as value a list of device FQDN that were successful (True) or failed (False) the operation. Return: int: ``0`` on success, a small positive integer on failure. """ if successes[False]: logger.error('Homer run had issues on %d devices: %s', len(successes[False]), successes[False]) return 1'Homer run completed successfully on %d devices: %s', len(successes[True]), successes[True]) return 0