Source code for spicerack.ipmi

"""IPMI module.

Todo:
    replace with pyghmi.

"""
import logging

from datetime import timedelta
from subprocess import CalledProcessError, PIPE, run
from typing import Dict, List, Tuple

from spicerack.decorators import retry
from spicerack.exceptions import SpicerackCheckError, SpicerackError


IPMI_PASSWORD_MAX_LEN: int = 20
IPMI_PASSWORD_MIN_LEN: int = 16
IPMI_SAFE_BOOT_PARAMS: Tuple[str, ...] = ('0000000000', '8000020000')  # No or unimportant overrides.
logger = logging.getLogger(__name__)


[docs]class IpmiError(SpicerackError): """Custom exception class for errors of the Ipmi class."""
[docs]class IpmiCheckError(SpicerackCheckError): """Custom exception class for check errors of the Ipmi class."""
[docs]class Ipmi: """Class to manage remote IPMI via ipmitool.""" def __init__(self, password: str, dry_run: bool = True) -> None: """Initialize the instance. Arguments: password (str): the password to use to connect via IPMI. dry_run (bool, optional): whether this is a DRY-RUN. """ self.env: Dict[str, str] = {'IPMITOOL_PASSWORD': password} self._dry_run = dry_run
[docs] def command( # pylint: disable=no-self-use self, mgmt_hostname: str, command_parts: List[str], is_safe: bool = False ) -> str: """Run an ipmitool command for a remote management console hostname. Arguments: mgmt_hostname (str): the FQDN of the management interface of the host to target. command_parts (list): a list of :py:class:`str` with the IPMI command components to execute. is_safe (bool, optional): if this is a safe command to run also in DRY RUN mode. Returns: str: the output of the ipmitool command. Raises: spicerack.ipmi.IpmiError: on failure. """ command = ['ipmitool', '-I', 'lanplus', '-H', mgmt_hostname, '-U', 'root', '-E'] + command_parts logger.info('Running IPMI command: %s', ' '.join(command)) if self._dry_run and not is_safe: return '' try: output = run(command, env=self.env.copy(), stdout=PIPE, check=True).stdout.decode() except CalledProcessError as e: raise IpmiError('Remote IPMI for {mgmt} failed (exit={code}): {output}'.format( mgmt=mgmt_hostname, code=e.returncode, output=e.output)) from e logger.debug(output) return output
[docs] def check_connection(self, mgmt_hostname: str) -> None: """Ensure that remote IPMI is working for the management console hostname. Arguments: mgmt_hostname (str): the FQDN of the management interface of the host to target. Raises: spicerack.ipmi.IpmiError: if unable to connect or execute a test command. """ status = self.command(mgmt_hostname, ['chassis', 'power', 'status'], is_safe=True) if not status.startswith('Chassis Power is'): raise IpmiError('Unexpected chassis status: {status}'.format(status=status))
[docs] def check_bootparams(self, mgmt_hostname: str) -> None: """Check if the BIOS boot parameters are back to normal values. Arguments: mgmt_hostname (str): the FQDN of the management interface of the host to target. Raises: spicerack.ipmi.IpmiCheckError: if the BIOS boot parameters are incorrect. """ param = self._get_boot_parameter(mgmt_hostname, 'Boot parameter data') if param not in IPMI_SAFE_BOOT_PARAMS: raise IpmiCheckError('Expected BIOS boot params in {accepted} got: {param}'.format( accepted=IPMI_SAFE_BOOT_PARAMS, param=param))
[docs] @retry(tries=3, delay=timedelta(seconds=20), backoff_mode='linear', exceptions=(IpmiCheckError,)) def force_pxe(self, mgmt_hostname: str) -> None: """Force PXE for the next boot and verify that the setting was applied. Arguments: mgmt_hostname (str): the FQDN of the management interface of the host to target. Raises: spicerack.ipmi.IpmiCheckError: if unable to verify the PXE mode within the retries. """ self.command(mgmt_hostname, ['chassis', 'bootdev', 'pxe']) boot_device = self._get_boot_parameter(mgmt_hostname, 'Boot Device Selector') if boot_device != 'Force PXE': raise IpmiCheckError('Unable to verify that Force PXE is set. The host might reboot in the current OS')
def _get_boot_parameter(self, mgmt_hostname: str, param_label: str) -> str: """Get a specific boot parameter of the host. Arguments: mgmt_hostname (str): the FQDN of the management interface of the host to target. param_label (str): the label of the boot parameter to lookout for. Raises: spicerack.ipmi.IpmiError: if unable to find the given label or to extract its value. Returns: str: the value of the parameter. """ bootparams = self.command(mgmt_hostname, ['chassis', 'bootparam', 'get', '5'], is_safe=True) for line in bootparams.splitlines(): if param_label in line: try: value = line.split(':')[1].strip(' \n') break except IndexError: raise IpmiError("Unable to extract value for parameter '{label}' from line: {line}".format( label=param_label, line=line)) else: raise IpmiError("Unable to find the boot parameter '{label}' in: {output}".format( label=param_label, output=bootparams)) return value @staticmethod def _get_password_store_size(password: str) -> int: """Parse the password to determine the correct storage size. Ipmitool stores passwords in either 16 or 20 byte strings depending on the password length. Arguments: password(str): the password string to parse Raises: spicerack.ipmi.IpmiError: if unable password is too big or too small Returns: int: A number representing the storage size """ if len(password) > IPMI_PASSWORD_MAX_LEN: raise IpmiError('New passwords is greater then the {max_len} byte limit'.format( max_len=IPMI_PASSWORD_MAX_LEN)) if len(password) > IPMI_PASSWORD_MIN_LEN: return IPMI_PASSWORD_MAX_LEN if len(password) == IPMI_PASSWORD_MIN_LEN: return IPMI_PASSWORD_MIN_LEN raise IpmiError('New passwords must be {min_len} bytes minimum'.format( min_len=IPMI_PASSWORD_MIN_LEN)) def reset_password(self, mgmt_hostname: str, username: str, password: str) -> None: """Reset the given usernames password to the one provided. Arguments: mgmt_hostname (str): the FQDN of the management interface of the host to target. username (str): The username who's password will be reset must not be empty password (str): The new password must have length between {min_len} and {max_len} bytes Raises: spicerack.ipmi.IpmiError: if unable reset password or arguments invalid """.format(min_len=IPMI_PASSWORD_MIN_LEN, max_len=IPMI_PASSWORD_MAX_LEN) if not username: raise IpmiError('Username can not be an empty string') password_store_size = str(Ipmi._get_password_store_size(password)) try: user_id = self._get_user_id(mgmt_hostname, username) except IpmiError: # some systems (HP?) use channel 2 logger.info('unable to find user in channel 1 testing channel 2') user_id = self._get_user_id(mgmt_hostname, username, 2) success = 'Set User Password command successful (user {user_id})\n'.format( user_id=user_id) result = self.command( mgmt_hostname, ['user', 'set', 'password', user_id, password, password_store_size]) if self._dry_run: return if result != success: raise IpmiError('Password reset failed for username: {username}'.format( username=username)) if username == 'root': current_password = self.env['IPMITOOL_PASSWORD'] self.env['IPMITOOL_PASSWORD'] = password try: self.check_connection(mgmt_hostname) except IpmiError as error: self.env['IPMITOOL_PASSWORD'] = current_password raise IpmiError('Password reset failed for username: root') from error def _get_user_id(self, mgmt_hostname: str, username: str, channel: int = 1) -> str: """Get the user ID associated with a given username. Arguments: mgmt_hostname (str): the FQDN of the management interface of the host to target. username (str): The username to search for channel (int): The channel number for the user list; Default: 1 Raises: spicerack.ipmi.IpmiError: if unable to find the given username. Returns: str: the user ID associated with the username """ userlist = self.command(mgmt_hostname, ['user', 'list', str(channel)], is_safe=True) for line in userlist.splitlines(): words = line.split() if words[0] == 'ID': continue if words[1] == username: return words[0] raise IpmiError("Unable to find ID for username: {username}".format(username=username))