Source code for spicerack.remote

"""Remote module to execute commands on hosts via Cumin."""
import logging

from datetime import datetime, timedelta

from cumin import Config, CuminError, query, transport, transports
from cumin.cli import target_batch_size

from spicerack.decorators import retry
from spicerack.exceptions import SpicerackCheckError, SpicerackError


logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class RemoteError(SpicerackError): """Custom exception class for errors of this module."""
[docs]class RemoteCheckError(SpicerackCheckError): """Custom exception class for check errors of this module."""
[docs]class RemoteExecutionError(RemoteError): """Custom exception class for remote execution errors.""" def __init__(self, retcode, message): """Override parent constructor to add the return code attribute.""" super().__init__('{msg} (exit_code={ret})'.format(msg=message, ret=retcode)) self.retcode = retcode
[docs]class RemoteHostsAdapter: """Base adapter to write classes that expand the capabilities of RemoteHosts. This adapter class is a helper class to reduce duplication when writing classes that needs to add capabilities to a RemoteHosts instance. The goal is to not extend the RemoteHosts but instead delegate to its instances. This class fits when a single RemoteHosts instance is enough, but for more complex cases, in which multiple RemoteHosts instances should be orchestrated, it's ok to not extend this class and create a standalone one. """ def __init__(self, remote_hosts): """Initialize the instance. Arguments: remote_hosts (spicerack.remote.RemoteHosts): the instance to act on the remote hosts. """ self._remote_hosts = remote_hosts def __str__(self): """String representation of the instance. Returns: str: the string representation of the target hosts. """ return str(self._remote_hosts) def __len__(self): """Length of the instance. Returns: int: the number of target hosts. """ return len(self._remote_hosts)
[docs]class Remote: """Remote class to interact with Cumin.""" def __init__(self, config, dry_run=True): """Initialize the instance. Arguments: config (str): the path of Cumin's configuration file. dry_run (bool, optional): whether this is a DRY-RUN. """ self._config = Config(config) self._dry_run = dry_run
[docs] def query(self, query_string): """Execute a Cumin query and return the matching hosts. Arguments: query_string (str): the Cumin query string to execute. Returns: spicerack.remote.RemoteHosts: RemoteHosts instance matching the given query. """ try: hosts = query.Query(self._config).execute(query_string) except CuminError as e: raise RemoteError('Failed to execute Cumin query') from e return RemoteHosts(self._config, hosts, dry_run=self._dry_run)
[docs]class RemoteHosts: """Remote Executor class. This class can be extended to customize the interaction with remote hosts passing a custom factory function to `spicerack.remote.Remote.query`. """ def __init__(self, config, hosts, dry_run=True): """Initialize the instance. Arguments: config (cumin.Config): the configuration for Cumin. hosts (ClusterShell.NodeSet.NodeSet): the hosts to target for the remote execution. dry_run (bool, optional): whether this is a DRY-RUN. Raises: spicerack.remote.RemoteError: if no hosts were provided. """ if not hosts: raise RemoteError('No hosts provided') self._config = config self._hosts = hosts self._dry_run = dry_run @property def hosts(self): """Getter for the hosts property. Returns: ClusterShell.NodeSet.NodeSet: a copy of the targeted hosts. """ return self._hosts.copy() def __str__(self): """String representation of the instance. Returns: str: the string representation of the target hosts. """ return str(self._hosts) def __len__(self): """Length of the instance. Returns: int: the number of target hosts. """ return len(self._hosts)
[docs] def run_async(self, *commands, success_threshold=1.0, batch_size=None, batch_sleep=None, is_safe=False): """Execute commands on hosts matching a query via Cumin in async mode. Arguments: *commands (str, cumin.transports.Command): arbitrary number of commands to execute on the target hosts. success_threshold (float, optional): to consider the execution successful, must be between 0.0 and 1.0. batch_size (int, str, optional): the batch size for cumin, either as percentage (e.g. ``25%``) or absolute number (e.g. ``5``). batch_sleep (float, optional): the batch sleep in seconds to use in Cumin before scheduling the next host. is_safe (bool, optional): whether the command is safe to run also in dry-run mode because it's a read-only command that doesn't modify the state. Returns: generator: cumin.transports.BaseWorker.get_results to allow to iterate over the results. Raises: RemoteExecutionError: if the Cumin execution returns a non-zero exit code. """ return self._execute(list(commands), mode='async', success_threshold=success_threshold, batch_size=batch_size, batch_sleep=batch_sleep, is_safe=is_safe)
[docs] def run_sync(self, *commands, success_threshold=1.0, batch_size=None, batch_sleep=None, is_safe=False): """Execute commands on hosts matching a query via Cumin in sync mode. Arguments: *commands (str, cumin.transports.Command): arbitrary number of commands to execute on the target hosts. success_threshold (float, optional): to consider the execution successful, must be between 0.0 and 1.0. batch_size (int, str, optional): the batch size for cumin, either as percentage (e.g. ``25%``) or absolute number (e.g. ``5``). batch_sleep (float, optional): the batch sleep in seconds to use in Cumin before scheduling the next host. is_safe (bool, optional): whether the command is safe to run also in dry-run mode because it's a read-only command that doesn't modify the state. Returns: generator: cumin.transports.BaseWorker.get_results to allow to iterate over the results. Raises: RemoteExecutionError: if the Cumin execution returns a non-zero exit code. """ return self._execute(list(commands), mode='sync', success_threshold=success_threshold, batch_size=batch_size, batch_sleep=batch_sleep, is_safe=is_safe)
[docs] def reboot(self, batch_size=1, batch_sleep=180.0): """Reboot hosts. Arguments: batch_size (int, optional): how many hosts to reboot in parallel. batch_sleep (float, optional): how long to sleep between one reboot and the next. """ if len(self._hosts) == 1: # Temporary workaround until T213296 is fixed. batch_sleep = None logger.info('Rebooting %d hosts in batches of %d with %.1fs of sleep in between: %s', len(self._hosts), batch_size, batch_sleep if batch_sleep is not None else 0.0, self._hosts) self.run_sync(transports.Command('reboot-host', timeout=30), batch_size=batch_size, batch_sleep=batch_sleep)
[docs] @retry(tries=25, delay=timedelta(seconds=10), backoff_mode='linear', exceptions=(RemoteExecutionError, RemoteCheckError)) def wait_reboot_since(self, since): """Poll the host until is reachable and has an uptime lower than the provided datetime. Arguments: since (datetime.datetime): the time after which the host should have booted. Raises: spicerack.remote.RemoteCheckError: if unable to connect to the host or the uptime is higher than expected. """ remaining = self.hosts delta = (datetime.utcnow() - since).total_seconds() for nodeset, uptime in self.uptime(): if uptime >= delta: raise RemoteCheckError('Uptime for {hosts} higher than threshold: {uptime} > {delta}'.format( hosts=nodeset, uptime=uptime, delta=delta)) remaining.difference_update(nodeset) if remaining: raise RemoteCheckError('Unable to check uptime from {num} hosts: {hosts}'.format( num=len(remaining), hosts=remaining)) logger.info('Found reboot since %s for hosts %s', since, self._hosts)
[docs] def uptime(self): """Get current uptime. Returns: list: a list of 2-element :py:class:`tuple` instances with hosts :py:class:`ClusterShell.NodeSet.NodeSet` as first item and :py:class:`float` uptime as second item. """ results = self.run_sync(transports.Command('cat /proc/uptime', timeout=10), is_safe=True) # Callback to extract the uptime from /proc/uptime (i.e. getting 12345.67 from '12345.67 123456789.00'). return RemoteHosts.results_to_list(results, callback=lambda output: float(output.split()[0]))
[docs] def init_system(self): """Detect the init system. Returns: list: a list of 2-element tuples with hosts :py:class:`ClusterShell.NodeSet.NodeSet` as first item and the init system :py:class:`str` as second. """ results = self.run_sync(transports.Command('ps --no-headers -o comm 1', timeout=10), is_safe=True) return RemoteHosts.results_to_list(results)
[docs] @staticmethod def results_to_list(results, callback=None): """Extract execution results into a list converting them with an optional callback. Todo: move it directly into Cumin. Arguments: results (generator): generator returned by run_sync() and run_async() to iterate over the results. callback (callable, optional): an optional callable to apply to each result output (it can be multiline). The callback will be called with a the string output as the only parameter and must return the extracted value. The return type can be chosen freely. Returns: list: a list of 2-element tuples with hosts :py:class:`ClusterShell.NodeSet.NodeSet` as first item and the extracted outputs :py:class:`str` as second. This is because NodeSet are not hashable. Raises: spicerack.remote.RemoteError: if unable to run the callback. """ extracted = [] for nodeset, output in results: result = output.message().decode().strip() if callback is not None: try: result = callback(result) except Exception as e: raise RemoteError('Unable to extract data with {cb} for {hosts} from: {output}'.format( cb=callback.__name__, hosts=nodeset, output=result)) from e extracted.append((nodeset, result)) return extracted
def _execute(self, commands, mode='sync', success_threshold=1.0, # pylint: disable=too-many-arguments batch_size=None, batch_sleep=None, is_safe=False): """Lower level Cumin's execution of commands on the target nodes. Arguments: commands (list): the list of commands to execute on the target hosts, either a list of commands or a list of cumin.transports.Command instances. mode (str, optional): the Cumin's mode of execution. Accepted values: sync, async. success_threshold (float, optional): to consider the execution successful, must be between 0.0 and 1.0. batch_size (int, str, optional): the batch size for cumin, either as percentage (e.g. ``25%``) or absolute number (e.g. ``5``). batch_sleep (float, optional): the batch sleep in seconds to use in Cumin before scheduling the next host. is_safe (bool, optional): whether the command is safe to run also in dry-run mode because it's a read-only command that doesn't modify the state. Returns: generator: as returned by :py:meth:`cumin.transports.BaseWorker.get_results` to iterate over the results. Raises: RemoteExecutionError: if the Cumin execution returns a non-zero exit code. """ if batch_size is None: parsed_batch_size = {'value': None, 'ratio': None} else: parsed_batch_size = target_batch_size(str(batch_size)) target = transports.Target( self._hosts, batch_size=parsed_batch_size['value'], batch_size_ratio=parsed_batch_size['ratio'], batch_sleep=batch_sleep) worker = transport.Transport.new(self._config, target) worker.commands = commands worker.handler = mode worker.success_threshold = success_threshold logger.debug('Executing commands %s on %d hosts: %s', commands, len(target.hosts), str(target.hosts)) if self._dry_run and not is_safe: return iter(()) # Empty generator ret = worker.execute() if ret != 0 and not self._dry_run: raise RemoteExecutionError(ret, 'Cumin execution failed') return worker.get_results()