"""JunOS module."""
import logging
from contextlib import contextmanager
from typing import Callable, Iterator, List, Optional, Tuple, Union
from jnpr.junos import Device as JunOSDevice
from jnpr.junos.exception import CommitError, ConfigLoadError, ConnectError, RpcTimeoutError, UnlockError
from jnpr.junos.utils.config import Config
from ncclient.operations.errors import TimeoutExpiredError
from homer.exceptions import HomerConnectError, HomerError, HomerTimeoutError
from homer.transports import DEFAULT_PORT, DEFAULT_TIMEOUT
logger = logging.getLogger(__name__)
DIFF_ADDED_CODE = 32
DIFF_REMOVED_CODE = 31
DIFF_MOVED_CODE = 33
[docs]@contextmanager
def connected_device(fqdn: str, *, username: str = '', ssh_config: Optional[str] = None,
port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT) -> Iterator['ConnectedDevice']:
"""Context manager to perform actions on a connected Juniper device.
Arguments:
fqdn (str): the FQDN of the Juniper device.
username (str): the username to use to connect to the Juniper device.
ssh_config (Optional[str]): an ssh_config file if you want other than ~/.ssh/config
port (int, optional): the port to use to connect to the device.
timeout (int, optional): the timeout in seconds to use when operating on the device.
Yields:
ConnectedDevice: the Juniper connected device instance.
"""
try:
device = ConnectedDevice(fqdn, username=username, ssh_config=ssh_config, port=port, timeout=timeout)
except ConnectError as e:
raise HomerConnectError(f'Unable to connect to {fqdn}') from e
try:
yield device
finally:
device.close()
# pylint: disable=no-member
# Pylint doesn't recognize the 'cu' member in junos.Device instances and generated-members seems to have a bug
[docs]class ConnectedDevice:
"""Juniper transport to manage a JunOS connected device."""
def __init__(self, fqdn: str, *, username: str = '', ssh_config: Optional[str] = None,
port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT):
"""Initialize the instance and open the connection to the device.
Arguments:
fqdn (str): the FQDN of the Juniper device.
username (str): the username to use to connect to the Juniper device.
ssh_config (Optional[str]): an ssh_config file if you want other than ~/.ssh/config
port (int, optional): the port to use to connect to the device.
timeout (int, optional): the timeout in seconds to use when operating on the device.
"""
self._fqdn = fqdn
self._port = port
self._timeout = timeout
logger.debug('Connecting to device %s (user=%s ssh_config=%s timeout=%d)',
self._fqdn, username, ssh_config, self._timeout)
self._device = JunOSDevice(host=self._fqdn, user=username, port=self._port, ssh_config=ssh_config,
conn_open_timeout=self._timeout)
self._device.open()
self._device.bind(cu=Config)
[docs] def commit(self, config: str, message: str, callback: Callable, *, # noqa, mccabe: MC0001 too complex (11)
ignore_warning: Union[bool, str, List[str]] = False, is_retry: bool = False) -> None:
"""Commit the loaded configuration.
Arguments:
config (str): the device new configuration.
message (str): the commit message to use.
callback (callable): a callable function that accepts two parameters: a string with the FQDN of the
current device and a string with the diff between the current configuration and the new one. The
callback must raise any exception if the execution should be interrupted and the config rollbacked or
return :py:data:`None`.
ignore_warning (mixed, optional): the warnings to tell JunOS to ignore, see:
https://junos-pyez.readthedocs.io/en/2.3.0/jnpr.junos.utils.html#jnpr.junos.utils.config.Config.load
is_retry (bool, optional): whether this is a retry and the commit_check should be run anyway, also if the
diff is empty.
Raises:
HomerTimeoutError: on timeout.
HomerError: on commit error.
Exception: on generic failure.
"""
diff = self._prepare(config, ignore_warning)
if not diff:
if not is_retry:
logger.info('Empty diff for %s, skipping device.', self._fqdn)
return
else:
try:
callback(self._fqdn, diff)
except Exception:
self._rollback()
raise
logger.info('Committing the configuration on %s', self._fqdn)
try:
if diff:
self._device.cu.commit(confirm=2, comment=message, timeout=self._timeout)
self._device.cu.commit_check(timeout=self._timeout)
except RpcTimeoutError as e:
raise HomerTimeoutError(str(e)) from e
except CommitError as e:
raise HomerError(f'Commit error: {ConnectedDevice._parse_commit_error(e)}') from e
[docs] def commit_check(self, config: str,
ignore_warning: Union[bool, str, List[str]] = False) -> Tuple[bool, Optional[str]]:
"""Perform commit check, reuturn the diff and rollback.
Arguments:
config (str): the device new configuration.
Returns:
tuple: a two-element tuple with a boolean as first item that is :py:data:`True` on success and
:py:data:`False` on failure and a string as second item with the difference between the current
configuration and the new one, empty string on no diff and :py:data:`None` on failure.
"""
success = False
try:
diff = self._prepare(config, ignore_warning)
except Exception as e: # pylint: disable=broad-except
logger.error('Failed to get diff for %s: %s', self._fqdn, e)
logger.debug('Traceback:', exc_info=True)
self._rollback()
return False, None
if not diff:
logger.info('Empty diff for %s, skipping device.', self._fqdn)
self._rollback()
return True, diff
logger.info('Running commit check on %s', self._fqdn)
try:
self._device.cu.commit_check(timeout=self._timeout)
success = True
except CommitError as e:
logger.error('Commit check error on %s: %s', self._fqdn, ConnectedDevice._parse_commit_error(e))
except Exception as e: # pylint: disable=broad-except
logger.error('Failed to commit check on %s: %s', self._fqdn, e)
logger.debug('Traceback:', exc_info=True)
finally:
self._rollback()
return success, diff
[docs] def close(self) -> None:
"""Close the connection with the device."""
try:
self._device.cu.unlock()
except UnlockError:
pass
try:
self._device.close()
except (RpcTimeoutError, TimeoutExpiredError) as e:
logger.warning('Unable to close the connection to the device: %s', e)
def _prepare(self, config: str, ignore_warning: Union[bool, str, List[str]] = False) -> str:
"""Prepare the new configuration to be committed.
Arguments:
config (str): the device new configuration.
Raises:
Exception: on generic failure.
Returns:
str: the differences between the current config and the new one.
"""
logger.debug('Preparing the configuration on %s', self._fqdn)
diff = ''
try:
self._device.cu.lock()
self._device.cu.load(config, format='text', merge=False, ignore_warning=ignore_warning)
diff = self._device.cu.diff()
except ConfigLoadError:
raise
except Exception:
self._rollback()
raise
if diff is None:
diff = ''
return color_diff(diff)
def _rollback(self) -> None:
"""Rollback the current staged configuration."""
logger.debug('Rolling back staged config on %s', self._fqdn)
try:
self._device.cu.rollback()
except ValueError as e:
logger.error('Invalid rollback ID on %s: %s', self._fqdn, e)
except Exception as e: # pylint: disable=broad-except
logger.error('Failed to rollback on %s: %s', self._fqdn, e)
logger.debug('Traceback:', exc_info=True)
@staticmethod
def _parse_commit_error(exc: CommitError) -> str:
"""Parse a CommitError exception and returnonly the reason.
Arguments:
exc (jnpr.junos.exception.CommitError): the exception to parse.
Returns:
str: the reason for the commit errror.
"""
if exc.rsp.find('.//ok') is None:
return exc.rsp.findtext('.//error-message')
return str(exc)
[docs]def color_diff(diff: str) -> str:
"""Color the diff based on JunOS diff syntax."""
lines = []
for line in diff.splitlines():
if line.startswith('+'):
code = DIFF_ADDED_CODE
elif line.startswith('-'):
code = DIFF_REMOVED_CODE
elif line.startswith('!'):
code = DIFF_MOVED_CODE
else:
code = 0
if code:
line = f'\x1b[{code}m{line}\x1b[39m'
lines.append(line)
colored_diff = '\n'.join(lines)
if diff and diff[-1] == '\n': # Re-add the last trailing newline if present
colored_diff += '\n'
return colored_diff