Source code for homer.transports.junos

"""JunOS module."""
import logging

from contextlib import contextmanager
from typing import Callable, Iterator, List, Optional, Tuple, Union

from jnpr.junos import Device as JunOSDevice
from jnpr.junos.exception import CommitError, ConfigLoadError, ConnectError, RpcTimeoutError, UnlockError
from jnpr.junos.utils.config import Config
from ncclient.operations.errors import TimeoutExpiredError

from homer.exceptions import HomerConnectError, HomerError, HomerTimeoutError
from homer.transports import DEFAULT_PORT, DEFAULT_TIMEOUT


logger = logging.getLogger(__name__)
DIFF_ADDED_CODE = 32
DIFF_REMOVED_CODE = 31
DIFF_MOVED_CODE = 33


[docs] @contextmanager def connected_device(fqdn: str, *, username: str = '', ssh_config: Optional[str] = None, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT) -> Iterator['ConnectedDevice']: """Context manager to perform actions on a connected Juniper device. Arguments: fqdn (str): the FQDN of the Juniper device. username (str): the username to use to connect to the Juniper device. ssh_config (Optional[str]): an ssh_config file if you want other than ~/.ssh/config port (int, optional): the port to use to connect to the device. timeout (int, optional): the timeout in seconds to use when operating on the device. Yields: ConnectedDevice: the Juniper connected device instance. """ try: device = ConnectedDevice(fqdn, username=username, ssh_config=ssh_config, port=port, timeout=timeout) except ConnectError as e: raise HomerConnectError(f'Unable to connect to {fqdn}') from e try: yield device finally: device.close()
# pylint: disable=no-member # Pylint doesn't recognize the 'cu' member in junos.Device instances and generated-members seems to have a bug
[docs] class ConnectedDevice: """Juniper transport to manage a JunOS connected device.""" def __init__(self, fqdn: str, *, username: str = '', ssh_config: Optional[str] = None, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT): """Initialize the instance and open the connection to the device. Arguments: fqdn (str): the FQDN of the Juniper device. username (str): the username to use to connect to the Juniper device. ssh_config (Optional[str]): an ssh_config file if you want other than ~/.ssh/config port (int, optional): the port to use to connect to the device. timeout (int, optional): the timeout in seconds to use when operating on the device. """ self._fqdn = fqdn self._port = port self._timeout = timeout logger.debug('Connecting to device %s (user=%s ssh_config=%s timeout=%d)', self._fqdn, username, ssh_config, self._timeout) self._device = JunOSDevice(host=self._fqdn, user=username, port=self._port, ssh_config=ssh_config, conn_open_timeout=self._timeout) self._device.open() self._device.bind(cu=Config)
[docs] def commit(self, config: str, message: str, callback: Callable, *, # noqa, mccabe: MC0001 too complex (11) ignore_warning: Union[bool, str, List[str]] = False, is_retry: bool = False) -> None: """Commit the loaded configuration. Arguments: config (str): the device new configuration. message (str): the commit message to use. callback (callable): a callable function that accepts two parameters: a string with the FQDN of the current device and a string with the diff between the current configuration and the new one. The callback must raise any exception if the execution should be interrupted and the config rollbacked or return :py:data:`None`. ignore_warning (mixed, optional): the warnings to tell JunOS to ignore, see: https://junos-pyez.readthedocs.io/en/2.3.0/jnpr.junos.utils.html#jnpr.junos.utils.config.Config.load is_retry (bool, optional): whether this is a retry and the commit_check should be run anyway, also if the diff is empty. Raises: HomerTimeoutError: on timeout. HomerError: on commit error. Exception: on generic failure. """ diff = self._prepare(config, ignore_warning) if not diff: if not is_retry: logger.info('Empty diff for %s, skipping device.', self._fqdn) return else: try: callback(self._fqdn, diff) except Exception: self._rollback() raise logger.info('Committing the configuration on %s', self._fqdn) try: if diff: self._device.cu.commit(confirm=2, comment=message, timeout=self._timeout) self._device.cu.commit_check(timeout=self._timeout) except RpcTimeoutError as e: raise HomerTimeoutError(str(e)) from e except CommitError as e: raise HomerError(f'Commit error: {ConnectedDevice._parse_commit_error(e)}') from e
[docs] def commit_check(self, config: str, ignore_warning: Union[bool, str, List[str]] = False) -> Tuple[bool, Optional[str]]: """Perform commit check, reuturn the diff and rollback. Arguments: config (str): the device new configuration. Returns: tuple: a two-element tuple with a boolean as first item that is :py:data:`True` on success and :py:data:`False` on failure and a string as second item with the difference between the current configuration and the new one, empty string on no diff and :py:data:`None` on failure. """ success = False try: diff = self._prepare(config, ignore_warning) except Exception as e: # pylint: disable=broad-except logger.error('Failed to get diff for %s: %s', self._fqdn, e) logger.debug('Traceback:', exc_info=True) self._rollback() return False, None if not diff: logger.info('Empty diff for %s, skipping device.', self._fqdn) self._rollback() return True, diff logger.info('Running commit check on %s', self._fqdn) try: self._device.cu.commit_check(timeout=self._timeout) success = True except CommitError as e: logger.error('Commit check error on %s: %s', self._fqdn, ConnectedDevice._parse_commit_error(e)) except Exception as e: # pylint: disable=broad-except logger.error('Failed to commit check on %s: %s', self._fqdn, e) logger.debug('Traceback:', exc_info=True) finally: self._rollback() return success, diff
[docs] def close(self) -> None: """Close the connection with the device.""" try: self._device.cu.unlock() except UnlockError: pass try: self._device.close() except (RpcTimeoutError, TimeoutExpiredError) as e: logger.warning('Unable to close the connection to the device: %s', e)
def _prepare(self, config: str, ignore_warning: Union[bool, str, List[str]] = False) -> str: """Prepare the new configuration to be committed. Arguments: config (str): the device new configuration. Raises: Exception: on generic failure. Returns: str: the differences between the current config and the new one. """ logger.debug('Preparing the configuration on %s', self._fqdn) diff = '' try: self._device.cu.lock() self._device.cu.load(config, format='text', merge=False, ignore_warning=ignore_warning) diff = self._device.cu.diff() except ConfigLoadError: raise except Exception: self._rollback() raise if diff is None: diff = '' return color_diff(diff) def _rollback(self) -> None: """Rollback the current staged configuration.""" logger.debug('Rolling back staged config on %s', self._fqdn) try: self._device.cu.rollback() except ValueError as e: logger.error('Invalid rollback ID on %s: %s', self._fqdn, e) except Exception as e: # pylint: disable=broad-except logger.error('Failed to rollback on %s: %s', self._fqdn, e) logger.debug('Traceback:', exc_info=True) @staticmethod def _parse_commit_error(exc: CommitError) -> str: """Parse a CommitError exception and returnonly the reason. Arguments: exc (jnpr.junos.exception.CommitError): the exception to parse. Returns: str: the reason for the commit errror. """ if exc.rsp.find('.//ok') is None: try: path = exc.rsp.findtext('.//error-path').strip() element = exc.rsp.findtext('.//bad-element').strip() message = exc.rsp.findtext('.//error-message').strip() return f'{message}\nIn {path} ({element})' except AttributeError: pass return str(exc)
[docs] def color_diff(diff: str) -> str: """Color the diff based on JunOS diff syntax.""" lines = [] for line in diff.splitlines(): if line.startswith('+'): code = DIFF_ADDED_CODE elif line.startswith('-'): code = DIFF_REMOVED_CODE elif line.startswith('!'): code = DIFF_MOVED_CODE else: code = 0 if code: line = f'\x1b[{code}m{line}\x1b[39m' lines.append(line) colored_diff = '\n'.join(lines) if diff and diff[-1] == '\n': # Re-add the last trailing newline if present colored_diff += '\n' return colored_diff