"""Automation and orchestration framework written in Python."""
import logging
import os
import subprocess
from importlib.metadata import PackageNotFoundError, version
import yaml
from ClusterShell.NodeSet import NodeSet, RESOLVER_NOGROUP
KERBEROS_KLIST = '/usr/bin/klist'
try:
__version__ = version(__name__)
""":py:class:`str`: the version of the current Cumin module."""
except PackageNotFoundError: # pragma: no cover - this happens only if the package is not installed
# Support the use case of the Debian building system where tests are run without installation
if 'SETUPTOOLS_SCM_PRETEND_VERSION' in os.environ:
__version__ = os.environ['SETUPTOOLS_SCM_PRETEND_VERSION']
[docs]
class CuminError(Exception):
"""Base Exception class for all Cumin's custom Exceptions."""
##############################################################################
# Add a custom log level TRACE to logging for development debugging
LOGGING_TRACE_LEVEL_NUMBER = 8
LOGGING_TRACE_LEVEL_NAME = 'TRACE'
# Fail if the custom logging slot is already in use with a different name or
# Access to a private property of logging was preferred over matching the default string returned by
# logging.getLevelName() for unused custom slots.
if (LOGGING_TRACE_LEVEL_NUMBER in logging._levelToName # pylint: disable=protected-access
and LOGGING_TRACE_LEVEL_NAME not in logging._nameToLevel): # pylint: disable=protected-access
raise CuminError("Unable to set custom logging for trace, logging level {level} is alredy set for '{name}'.".format(
level=LOGGING_TRACE_LEVEL_NUMBER, name=logging.getLevelName(LOGGING_TRACE_LEVEL_NUMBER)))
[docs]
def trace(self, msg, *args, **kwargs):
"""Additional logging level for development debugging.
:Parameters:
according to :py:class:`logging.Logger` interface for log levels.
"""
if self.isEnabledFor(LOGGING_TRACE_LEVEL_NUMBER):
self._log(LOGGING_TRACE_LEVEL_NUMBER, msg, args, **kwargs) # pragma: no cover, pylint: disable=protected-access
# Install the trace method and it's logging level if not already present
if LOGGING_TRACE_LEVEL_NAME not in logging._nameToLevel: # pylint: disable=protected-access
logging.addLevelName(LOGGING_TRACE_LEVEL_NUMBER, LOGGING_TRACE_LEVEL_NAME)
if not hasattr(logging.Logger, 'trace'):
logging.Logger.trace = trace # type: ignore
##############################################################################
[docs]
class Config(dict):
"""Singleton-like dictionary class to load the configuration from a given path only once."""
_instances = {} # Keep track of different loaded configurations
def __new__(cls, config='/etc/cumin/config.yaml'):
"""Load the given configuration if not already loaded and return it.
Called by Python's data model for each new instantiation of the class.
Arguments:
config (str, optional): path to the configuration file to load.
Returns:
dict: the configuration dictionary.
Examples:
>>> import cumin
>>> config = cumin.Config()
"""
if config not in cls._instances:
cls._instances[config] = parse_config(config)
alias_file = os.path.join(os.path.dirname(config), 'aliases.yaml')
if os.path.isfile(alias_file): # Load the aliases only if present
cls._instances[config]['aliases'] = parse_config(alias_file)
return cls._instances[config]
[docs]
def parse_config(config_file):
"""Parse the YAML configuration file.
Arguments:
config_file (str): the path of the configuration file to load.
Returns:
dict: the configuration dictionary.
Raises:
CuminError: if unable to read or parse the configuration.
"""
try:
with open(os.path.expanduser(config_file), 'r', encoding='utf8') as f:
config = yaml.safe_load(f)
except IOError as e:
raise CuminError('Unable to read configuration file: {message}'.format(message=e)) from e
except yaml.parser.ParserError as e:
raise CuminError("Unable to parse configuration file '{config}':\n{message}".format(
config=config_file, message=e)) from e
if config is None:
config = {}
return config
[docs]
def nodeset(nodes=None):
"""Instantiate a ClusterShell NodeSet with the resolver defaulting to :py:const:`RESOLVER_NOGROUP`.
This allow to avoid any conflict with Cumin grammars.
Returns:
ClusterShell.NodeSet.NodeSet: the instantiated NodeSet.
See Also:
https://github.com/cea-hpc/clustershell/issues/368
"""
return NodeSet(nodes=nodes, resolver=RESOLVER_NOGROUP)
[docs]
def nodeset_fromlist(nodelist):
"""Instantiate a ClusterShell NodeSet from a list with the resolver defaulting to :py:const:`RESOLVER_NOGROUP`.
This allow to avoid any conflict with Cumin grammars.
Returns:
ClusterShell.NodeSet.NodeSet: the instantiated NodeSet.
See Also:
https://github.com/cea-hpc/clustershell/issues/368
"""
return NodeSet.fromlist(nodelist, resolver=RESOLVER_NOGROUP)
[docs]
def ensure_kerberos_ticket(config: Config) -> None:
"""Ensure that there is a valid Kerberos ticket for the current user, according to the given configuration.
Arguments:
config (cumin.Config): the Cumin's configuration dictionary.
"""
kerberos_config = config.get('kerberos', {})
if not kerberos_config or not kerberos_config.get('ensure_ticket', False):
return
if not kerberos_config.get('ensure_ticket_root', False) and os.geteuid() == 0:
return
if not os.access(KERBEROS_KLIST, os.X_OK):
raise CuminError('The Kerberos config ensure_ticket is set to true, but {klist} executable was '
'not found.'.format(klist=KERBEROS_KLIST))
try:
subprocess.run([KERBEROS_KLIST, '-s'], check=True) # nosec
except subprocess.CalledProcessError as e:
raise CuminError('The Kerberos config ensure_ticket is set to true, but no active Kerberos ticket was found, '
"please run 'kinit' and retry.") from e