Source code for homer

"""Homer package."""
import json
import logging
import os
import pathlib
import sys


from collections import defaultdict
from importlib import import_module
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from typing import Callable, DefaultDict, Dict, List, Mapping, Optional, Tuple

import pynetbox

from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

from homer.capirca import CapircaGenerate
from homer.config import HierarchicalConfig, load_yaml_config
from homer.devices import Device, Devices
from homer.exceptions import HomerAbortError, HomerConnectError, HomerError, HomerTimeoutError
from homer.netbox import NetboxData, NetboxDeviceData, NetboxInventory
from homer.templates import DeviceConfigurationBase, JinjaDeviceConfiguration, JinjaRenderer, PythonRenderer
from homer.transports import DEFAULT_JSONRPC_PORT, DEFAULT_PORT, DEFAULT_TIMEOUT, json_rpc, junos

TIMEOUT_ATTEMPTS = 3
"""The number of attempts to try when there is a timeout."""
DIFF_EXIT_CODE = 99
"""The exit code used when the diff command is executed and there is a diff."""

try:
    __version__ = version(__name__)  # Must be the same used as 'name' in setup.py
    """The version of the current Homer package."""
except PackageNotFoundError:  # pragma: no cover - this should never happen during tests
    pass  # package is not installed

logger = logging.getLogger(__name__)


[docs] class Homer: # pylint: disable=too-many-instance-attributes """The instance to run Homer.""" OUT_EXTENSION = '.out' """The extension for the generated output files.""" def __init__(self, main_config: Mapping): # noqa: MC0001 """Initialize the instance. Arguments: main_config: the configuration dictionary. """ logger.debug('Initialized with configuration: %s', main_config) self._main_config = main_config self.private_base_path = self._main_config['base_paths'].get('private', '') self._config = HierarchicalConfig( self._main_config['base_paths']['public'], private_base_path=self.private_base_path) self._netbox_api = None self._device_plugin = None self._capirca = None if self._main_config.get('netbox', {}): self._netbox_api = pynetbox.api( self._main_config['netbox']['url'], token=self._main_config['netbox']['token'], threading=True) retry_session = Session() retry_adapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)) retry_session.mount('http://', retry_adapter) retry_session.mount('https://', retry_adapter) retry_session.headers.update({'User-Agent': f'Homer {__version__}'}) self._netbox_api.http_session = retry_session if self._main_config['netbox'].get('plugin', ''): self._device_plugin = import_module( self._main_config['netbox']['plugin']).NetboxDeviceDataPlugin if not self._main_config.get('capirca', {}).get('disabled', False): self._capirca = CapircaGenerate(self._main_config, self._netbox_api) devices_all_config = load_yaml_config( os.path.join(self._main_config['base_paths']['public'], 'config', 'devices.yaml')) devices_config = {fqdn: data.get('config', {}) for fqdn, data in devices_all_config.items()} netbox_inventory = self._main_config.get('netbox', {}).get('inventory', {}) devices = devices_all_config.copy() for data in devices.values(): data.pop('config', None) if netbox_inventory: # Get the data from Netbox while keeping any existing metadata from the devices.yaml file. # The data from Netbox overrides the existing keys for each device, if both present. netbox_devices = NetboxInventory( self._netbox_api, netbox_inventory['device_roles'], netbox_inventory['device_statuses']).get_devices() for fqdn, data in netbox_devices.items(): if fqdn in devices: devices[fqdn].update(data) else: devices[fqdn] = data private_devices_config: Dict = {} if self.private_base_path: private_devices_config = load_yaml_config( os.path.join(self.private_base_path, 'config', 'devices.yaml')) self._ignore_warning = self._main_config.get('transports', {}).get('junos', {}).get('ignore_warning', False) self._transport_username = self._main_config.get('transports', {}).get('username', '') self._transport_password = self._main_config.get('transports', {}).get('password', '') self._transport_timeout = self._main_config.get('transports', {}).get('timeout', DEFAULT_TIMEOUT) self._transport_jsonrpc_port = self._main_config.get('transports', {}).get('jsonrpc_port', DEFAULT_JSONRPC_PORT) self._transport_jsonrpc_output = self._main_config.get('transports', {}).get('json_rpc_output', 'text') self._port = self._main_config.get('transports', {}).get('port', DEFAULT_PORT) transport_ssh_config = self._main_config.get('transports', {}).get('ssh_config', None) if transport_ssh_config is not None: transport_ssh_config = str(pathlib.Path(transport_ssh_config).expanduser()) self._transport_ssh_config = transport_ssh_config self._devices = Devices(devices, devices_config, private_devices_config) module_dir = Path(self._main_config['base_paths']['public']) / 'modules' if module_dir.is_dir(): sys.path.append(str(module_dir)) self._renderers = { 'python': PythonRenderer(self._main_config['base_paths']['public'], self.private_base_path), 'jinja': JinjaRenderer(self._main_config['base_paths']['public'], self.private_base_path), } self._transports = {'json_rpc': json_rpc, 'junos': junos} self._output_base_path = Path(self._main_config['base_paths']['output'])
[docs] def generate(self, query: str) -> int: """Generate the configuration only saving it locally, no remote action is performed. Arguments: query: the query to select the devices. Return: ``0`` on success, a small positive integer on failure. """ logger.info('Generating configuration for query %s', query) self._prepare_out_dir() successes, _ = self._execute(self._device_generate, query) return Homer._parse_results(successes)
[docs] def diff(self, query: str, *, omit_diff: bool = False) -> int: """Generate the configuration and check the diff with the current live one. Arguments: query: the query to select the devices. omit_diff: whether to not show the actual diff to avoid leak of private data. Return: ``0`` on success, a small positive integer on failure. """ logger.info('Generating diff for query %s', query) successes, diffs = self._execute(self._device_diff, query) has_diff = False for diff, diff_devices in diffs.items(): print(f'Changes for {len(diff_devices)} devices: {diff_devices}') if diff is None: print('# Failed') elif not diff: print('# No diff') else: has_diff = True if omit_diff: print('# Non-empty diff omitted, -o/--omit-diff set') else: print(diff) print('---------------') ret = Homer._parse_results(successes) if ret == 0 and has_diff: return DIFF_EXIT_CODE return ret
[docs] def commit(self, query: str, *, message: str = '-') -> int: """Commit the generated configuration asking for confirmation. Arguments: query: the query to select the devices. message: the commit message to use. Return: ``0`` on success, a small positive integer on failure. """ logger.info('Committing config for query %s with message: %s', query, message) successes, _ = self._execute(self._device_commit, query, message=message) return Homer._parse_results(successes)
def _device_generate(self, device: Device, device_config: DeviceConfigurationBase, _: int) -> Tuple[bool, Optional[str]]: """Save the generated configuration in a local file. Arguments: device (homer.devices.Device): the device instance. device_config (homer.templates.DeviceConfigurationBase): the generated configuration for the device. attempt (int, unused): the current attempt number. Returns: A two-element tuple with a boolean as first parameter that represent the success of the operation or not and a second element with a string or None that is not used but is required by the callback API. """ output_path = self._output_base_path / f'{device.fqdn}{Homer.OUT_EXTENSION}' with open(str(output_path), 'w', encoding='utf-8') as f: # Juniper (Jinja) if isinstance(device_config, JinjaDeviceConfiguration): f.write(str(device_config)) else: # Use JSON for Nokia generate json.dump(device_config, f, indent=4) logger.info('Written configuration for %s in %s', device.fqdn, output_path) return True, None def _device_diff(self, device: Device, device_config: str, _: int) -> Tuple[bool, Optional[str]]: """Perform a configuration diff between the generated configuration and the live one. Arguments: device: the device instance. device_config: the generated configuration for the device. attempt: the current attempt number. Returns: A two-element tuple with a boolean as first parameter that represent the success of the operation or not and a second element with a string that contains the configuration differences or None if unable to load the new configuration in the device to generate the diff. """ transport_type = device.metadata.get('transport', 'junos') with self._transports[transport_type].connected_device(**device.metadata['connect_args']) as connection: return connection.commit_check(device_config, self._ignore_warning) def _device_commit(self, device: Device, # noqa: MC0001 device_config: DeviceConfigurationBase, attempt: int, *, message: str = '-') -> Tuple[bool, Optional[str]]: """Commit a new configuration to the device. Arguments: device: the device instance. device_config: the generated configuration for the device. attempt: the current attempt number. message: the commit message to use. Raises: HomerTimeoutError: on timeout. Returns: A two-element tuple with a boolean as first parameter that represent the success of the operation or not and a second element with a string or None that is not used but is required by the callback API. """ is_retry = attempt != 1 transport_type = device.metadata.get('transport', 'junos') with self._transports[transport_type].connected_device(**device.metadata['connect_args']) as connection: try: connection.commit(device_config, message, ignore_warning=self._ignore_warning, is_retry=is_retry) return True, '' except HomerTimeoutError: raise # To be caught later for automatic retry except HomerAbortError as e: logger.warning('%s on %s', e, device.fqdn) except Exception as e: # pylint: disable=broad-except logger.error('Failed to commit on %s: %s', device.fqdn, e) logger.debug('Traceback:', exc_info=True) return False, '' def _prepare_out_dir(self) -> None: """Prepare the out directory creating the directory if doesn't exists and deleting any pre-generated file.""" self._output_base_path.mkdir(parents=True, exist_ok=True) for path in self._output_base_path.iterdir(): if path.is_file() and path.suffix == Homer.OUT_EXTENSION: path.unlink() # pylint: disable-msg=too-many-locals,too-many-nested-blocks,too-many-branches def _execute(self, callback: Callable, query: str, **kwargs: str) -> Tuple[Dict, DefaultDict]: # noqa: MC0001 """Execute Homer based on the given action and query. Arguments: callback: the callback to call for each device. query: the query to filter the devices to act on. **kwargs: any additional keyword argument to pass to the callback Returns: A two-element tuple, with the first item as a dictionary that contains two keys (:py:data:`True` and :py:data:`False`) and as value a list of device FQDN that were successful (True) or failed (False) the operation and a second element a :py:class:`collections.defaultdict` that has as keys the configuration differences and as values the list of device FQDN that reported that diff. """ diffs: DefaultDict[str, list] = defaultdict(list) successes: Dict[bool, list] = {True: [], False: []} netbox_data = None if self._netbox_api is not None: logger.info('Gathering global Netbox data') netbox_data = NetboxData(self._netbox_api, self._main_config['base_paths']) for device in self._devices.query(query): logger.info('Generating configuration for %s', device.fqdn) try: device_data = self._config.get(device) # Set renderer and transport based on match criteria in homer config and device metadata # TODO add a default or show a nice error if those keys are missing from config.yaml for device_attribute in ('renderer', 'transport'): selector = self._main_config['selectors'][device_attribute] for option_name, match_criteria in selector.items(): for elements in match_criteria: if all(device.metadata.get(key) == value for key, value in elements.items()): device.metadata[device_attribute] = option_name # Set the args for device connection we use for diff and commit device.metadata['connect_args'] = { 'fqdn': device.fqdn, 'username': self._transport_username, 'timeout': device.metadata.get('timeout', self._transport_timeout) } transport_type = device.metadata.get('transport', 'junos') if transport_type == 'junos': device.metadata['connect_args']['port'] = self._port device.metadata['connect_args']['ssh_config'] = self._transport_ssh_config elif transport_type == 'json_rpc': device.metadata['connect_args']['port'] = self._transport_jsonrpc_port device.metadata['connect_args']['password'] = self._transport_password device.metadata['connect_args']['output_format'] = self._transport_jsonrpc_output # Override global port and timeout if defined in the device config if 'port' in device.metadata: device.metadata['connect_args']['port'] = device.metadata['port'] if 'timeout' in device.metadata: device.metadata['connect_args']['timeout'] = device.metadata['timeout'] # Render the ACLs using Capirca if self._capirca and 'capirca' in device_data: generated_acls = self._capirca.generate_acls(device_data['capirca']) else: generated_acls = [] if netbox_data is not None: device_data['netbox'] = { 'global': netbox_data, 'device': NetboxDeviceData(self._netbox_api, self._main_config['base_paths'], device), } if self._device_plugin is not None: device_data['netbox']['device_plugin'] = self._device_plugin(self._netbox_api, self._main_config['base_paths'], device) # Render the configuration based on yaml + netbox data device_config = self._renderers[device.metadata.get('renderer', 'jinja')].render( device.metadata['role'], device_data, generated_acls) except HomerError: logger.exception('Device %s failed to render the template, skipping.', device.fqdn) successes[False].append(device.fqdn) continue for attempt in range(1, TIMEOUT_ATTEMPTS + 1): try: device_success, device_diff = callback(device, device_config, attempt, **kwargs) break except (HomerTimeoutError, HomerConnectError) as e: logger.error('Attempt %d/%d failed: %s', attempt, TIMEOUT_ATTEMPTS, e) else: device_success = False device_diff = '' successes[device_success].append(device.fqdn) diffs[device_diff].append(device.fqdn) return successes, diffs @staticmethod def _parse_results(successes: Mapping[bool, List[Device]]) -> int: """Parse the results dictionary, log and return the appropriate exit status code. Arguments: successes: a dictionary that contains two keys (:py:data:`True` and :py:data:`False`) and as value a list of device FQDN that were successful (True) or failed (False) the operation. Return: ``0`` on success, a small positive integer on failure. """ if successes[False]: logger.error('Homer run had issues on %d devices: %s', len(successes[False]), successes[False]) return 1 logger.info('Homer run completed successfully on %d devices: %s', len(successes[True]), successes[True]) return 0