"""Redfish module."""
import ipaddress
import json
import logging
import re
import time
from abc import abstractmethod
from datetime import datetime, timedelta
from enum import Enum
from io import BufferedReader
from pathlib import Path
from typing import Any, Optional, Type, Union
import urllib3
from packaging import version
from requests import Response
try: # JSONDecodeError present since requests 2.27.0, bullseye has 2.25.1
from requests.exceptions import JSONDecodeError
CompatJSONDecodeError: Union[Type[ValueError], Type[JSONDecodeError]] = JSONDecodeError
except ImportError:
CompatJSONDecodeError = ValueError
from wmflib.requests import http_session
from spicerack.apiclient import APIClient, APIClientError, APIClientResponseError
from spicerack.decorators import retry
from spicerack.exceptions import SpicerackError
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
[docs]
class RedfishError(SpicerackError):
"""General errors raised by this module."""
[docs]
class RedfishTaskNotCompletedError(RedfishError):
"""Raised when a Redfish task is not found on the server."""
[docs]
class ChassisResetPolicy(Enum):
"""Subset of available Chassis.Reset policies compatible with all supported vendors (at this moment only Dell)."""
FORCE_OFF: str = "ForceOff"
"""Turn off the unit immediately (nongraceful shutdown)."""
FORCE_RESTART: str = "ForceRestart"
"""Shut down immediately and nongracefully and restart the system."""
GRACEFUL_RESTART: str = "GracefulRestart"
"""Shut down gracefully and power on."""
GRACEFUL_SHUTDOWN: str = "GracefulShutdown"
"""Shut down gracefully and power off."""
ON: str = "On"
"""Turn on the unit."""
[docs]
class DellSCPRebootPolicy(Enum):
"""Available Dell SCP (Server Configuration Profiles) reboot policies."""
FORCED: str = "Forced"
"""Issue an immediate hard reboot without notifying the operating system."""
GRACEFUL: str = "Graceful"
"""Issue a reboot notifying the operating system."""
NO_REBOOT: str = "NoReboot"
"""Do not reboot right now, the Redfish task will be pending the next reboot to apply the changes."""
[docs]
class DellSCPPowerStatePolicy(Enum):
"""Available Dell SCP (Server Configuration Profiles) final power state after an operation policies."""
OFF: str = "Off"
"""Keep the host powered off after the operation."""
ON: str = "On"
"""Turn the host power back on after the operation."""
[docs]
class DellSCPTargetPolicy(Enum):
"""Available sections of the Dell SCP (Server Configuration Profiles) to target."""
ALL = "ALL"
"""All settings."""
BIOS = "BIOS"
"""Only BIOS settings."""
IDRAC = "IDRAC"
"""Only iDRAC settings."""
NIC = "NIC"
"""Only network interfaces settings."""
RAID = "RAID"
"""Only RAID controller settings."""
[docs]
class RedfishUserRoles(Enum):
"""Allowed role values for new Redfish users."""
ADMINISTRATOR = "Administrator"
"""Administrator user role for Redfish."""
OPERATOR = "Operator"
"""Operator user role for Redfish."""
READONLY = "ReadOnly"
"""ReadOnly user role for Redfish."""
[docs]
class Redfish:
"""Manage Redfish operations on a specific device."""
# Properties to be defined by derived classes.
system = ""
"""The name of the System manager."""
manager = ""
"""The name of the Out of Band manager."""
log_service = ""
"""The name of the Log service."""
reboot_message_id = ""
"""The message ID for a reboot event."""
boot_mode_attribute = ""
"""The boot mode key in the Bios attributes."""
http_boot_target = ""
"""The value to the BootSourceOverrideTarget key for HTTP boot."""
def __init__(
self,
hostname: str,
interface: Union[ipaddress.IPv4Interface, ipaddress.IPv6Interface],
username: str,
password: str,
*,
dry_run: bool = True,
):
"""Initialize the instance.
Arguments:
hostname: the hostname (not FQDN) the management console belongs to.
interface: the interface of the management console to connect to.
username: the API username.
password: the API password.
dry_run: whether this is a DRY-RUN.
"""
self._dry_run = dry_run
self._hostname = hostname
self._interface = interface
self._username = username
self._password = password
session = http_session(".".join((self.__module__, self.__class__.__name__)), timeout=10)
# TODO: evaluate if we should create an intermediate CA for managament consoles
session.verify = False # The devices have a self-signed certificate
session.auth = (self._username, self._password)
session.headers.update({"Accept": "application/json"})
self._api_client = APIClient(f"https://{self._interface.ip}", session, dry_run=self._dry_run)
self._upload_session = http_session(".".join((self.__module__, self.__class__.__name__)), timeout=60 * 30)
self._upload_session.verify = False # The devices have a self-signed certificate
self._upload_session.auth = (self._username, self._password)
self._upload_session.headers.update({"Accept": "application/json"})
self._oob_info: dict = {}
self._system_info: dict = {}
self._updateservice_info: dict = {}
def __str__(self) -> str:
"""String representation of the instance."""
return f"{self._username}@{self._hostname} ({self._interface.ip})"
@property
def system_manager(self) -> str:
"""Property to return the System manager."""
return f"/redfish/v1/Systems/{self.system}"
@property
def oob_manager(self) -> str:
"""Property to return the Out of Band manager."""
return f"/redfish/v1/Managers/{self.manager}"
@property
def storage_manager(self) -> str:
"""Property to return the Storage manager."""
return f"/redfish/v1/Systems/{self.system}/Storage"
@property
def account_manager(self) -> str:
"""Property to return the Storage manager."""
return "/redfish/v1/AccountService"
def _update_system_info(self) -> None:
"""Property to return a dict of manager metadata."""
self._system_info = self.request("get", self.system_manager).json()
def _update_oob_info(self) -> None:
"""Update the data Out of Band manager info."""
self._oob_info = self.request("get", self.oob_manager).json()
@property
def update_service(self) -> str:
"""Property to return the Out of Band manager."""
# for now this is the same for both dell and supermicro
return "/redfish/v1/UpdateService"
@property
def log_entries(self) -> str:
"""Property to return the uri for the log entries."""
return f"/redfish/v1/Managers/{self.manager}/LogServices/{self.log_service}/Entries"
@property
def hostname(self) -> str:
"""Getter for the device hostname."""
return self._hostname
@property
def interface(self) -> Union[ipaddress.IPv4Interface, ipaddress.IPv6Interface]:
"""Getter for the management interface address with netmask."""
return self._interface
@property
def system_info(self) -> dict:
"""Property to return the system info as a dict."""
if not self._system_info:
self._update_system_info()
return self._system_info
@property
def oob_info(self) -> dict:
"""Property to return the oob info as a dict."""
if not self._oob_info:
self._update_oob_info()
return self._oob_info
@property
def updateservice_info(self) -> dict:
"""Property to return a dict of manager metadata."""
if not self._updateservice_info:
result = self.request("get", self.update_service)
self._updateservice_info = result.json()
return self._updateservice_info
@property
def oob_model(self) -> str:
"""Property to return a string representing the model."""
return self.oob_info["Model"]
@property
def firmware_version(self) -> version.Version:
"""Property to return a version instance representing the firmware version."""
self._update_oob_info()
return version.parse(self.oob_info["FirmwareVersion"])
@property
def bios_version(self) -> version.Version:
"""Property to return a instance representing the Bios version."""
self._update_system_info()
return version.parse(self.system_info["BiosVersion"])
@property
def model(self) -> str:
"""Property to return a string representing the model."""
return self.system_info["Model"]
@property
def manufacturer(self) -> str:
"""Property to return a string representing the model."""
return self.system_info["Manufacturer"]
@property
def pushuri(self) -> str:
"""Property representing the HttpPushUri of the idrac for uploading firmwares to it."""
try:
return self.updateservice_info["HttpPushUri"]
except KeyError:
raise NotImplementedError from KeyError
@property
def multipushuri(self) -> str:
"""Property representing the MultipartHttpPushUri of the idrac for uploading firmwares to it."""
return self.updateservice_info["MultipartHttpPushUri"]
[docs]
def upload_file(self, file_path: Path, reboot: bool = False) -> str:
"""Upload a file to the firmware directory via redfish and return the job_id URI.
Arguments:
file_path: The file path to upload.
reboot: if true immediately reboot the server.
"""
with file_path.open("rb") as file_handle:
return self.multipush_upload(file_handle, file_path.name, reboot)
[docs]
def multipush_upload(self, file_handle: BufferedReader, filename: str, reboot: bool = False) -> str:
"""Upload a file via redfish and return its job_id URI.
Arguments:
file_handle: On open file handle to the object to upload.
filename: filename name to use for upload.
reboot: if true immediately reboot the server.
"""
operation = "Immediate" if reboot else "OnReset"
payload = {"Targets": [], "@Redfish.OperationApplyTime": operation, "Oem": {}}
files = {
"UpdateParameters": (None, json.dumps(payload), "application/json"),
"UpdateFile": (filename, file_handle, "application/octet-stream"),
}
job_id = self.submit_files(files)
logger.debug("upload has task ID: %s", job_id)
return job_id
[docs]
@staticmethod
def most_recent_member(members: list[dict], key: str) -> dict:
"""Return the most recent member of members result from dell api.
Members will be sorted on key and the most recent value is returned.
The value of key is assumed to be an iso date.
Arguments:
members: A list of dicts returned from the dell api.
key: The key to search on.
"""
def sorter(element: dict) -> datetime:
"""Sort by datetime."""
return datetime.fromisoformat(element[key])
return sorted(members, key=sorter)[-1]
[docs]
def last_reboot(self) -> datetime:
"""Get the the last reboot time."""
# TODO: we can possibly use filter once all OOB's are updated. e.g.
# Lclog/Entries?$filter=MessageId eq 'reboot_code'
# currently we get the following on some older models
# Message=Querying is not supported by the implementation, MessageArgs=$filter"
last_reboot = datetime.fromisoformat("1970-01-01T00:00:00-00:00")
results = self.request("get", self.log_entries).json()
# use ends with as sometimes there is an additional string prefix to the code e.g. IDRAC.2.7.RAC0182
members = [m for m in results["Members"] if m["MessageId"].endswith(self.reboot_message_id)]
if members:
last_reboot = datetime.fromisoformat(self.most_recent_member(members, "Created")["Created"])
logger.debug("%s: last reboot %s", self._hostname, last_reboot)
return last_reboot
[docs]
@retry(
tries=240,
delay=timedelta(seconds=10),
backoff_mode="constant",
exceptions=(RedfishError,),
)
def wait_reboot_since(self, since: datetime) -> None:
"""Wait for idrac/redfish to become responsive.
Arguments:
since: The datetime of the last reboot.
"""
self.check_connection()
latest = self.last_reboot()
if since >= latest:
raise RedfishError("no new reboot detected")
logger.debug("%s: new management console reboot detected %s", self._hostname, latest)
[docs]
def request(self, method: str, uri: str, **kwargs: Any) -> Response:
"""Perform a request against the target Redfish instance with the provided HTTP method and data.
See :py:meth:`spicerack.apiclient.APIClient.request` for the arguments documentation.
Raises:
spicerack.redfish.RedfishError: if the response status code is
between 400 and 600 or if the given uri does not start with a slash
(/) or if the request couldn't be performed.
"""
try:
return self._api_client.request(method, uri, **kwargs)
except APIClientResponseError as e:
try:
logger.error("%s\nResponse payload: %s", e, e.response.json())
except CompatJSONDecodeError:
logger.error("The response payload does not contain any valid JSON to log.")
raise RedfishError(str(e)) from e
except APIClientError as e:
raise RedfishError(str(e)) from e
def _parse_submit_task(self, response: Response) -> str:
"""Submit a request that generates a task, return the URI of the submitted task.
Arguments:
response: the response to parse.
Raises:
spicerack.redfish.RedfishError: if the response status code is not 202 or there is no Location header.
"""
uri = response.request.path_url
if response.status_code != 202:
raise RedfishError(
f"Unable to start task for {uri}, expected HTTP 202, "
f"got HTTP {response.status_code} instead:\n{response.text}"
)
# requests allow to access headers with any capitalization
if "Location" not in response.headers or not response.headers["Location"]:
raise RedfishError(
"Unable to get the task URI to poll results for the {uri} request. Returned headers:\n"
f"{response.headers}"
)
return response.headers["Location"]
[docs]
def submit_files(self, files: dict) -> str:
"""Submit a upload file request that generates a task, return the URI of the submitted task.
Arguments:
uri: the relative URI to request.
files: the files to upload to send in the request.
"""
if self._dry_run:
return "/"
# BUG: timeout is not honoured by self.request
# response = self.request('post', self.multipushuri, files=files)
response = self._upload_session.post(f"https://{self.interface.ip}{self.multipushuri}", files=files)
return self._parse_submit_task(response)
[docs]
def submit_task(self, uri: str, data: Optional[dict] = None, method: str = "post") -> str:
"""Submit a request that generates a task, return the URI of the submitted task.
Arguments:
uri: the relative URI to request.
data: the data to send in the request.
method: the HTTP method to use, if not the default one.
"""
if self._dry_run:
return "/"
response = self.request(method, uri, json=data)
return self._parse_submit_task(response)
[docs]
def check_connection(self) -> None:
"""Check the connection with the Redfish API.
Raises:
spicerack.redfish.RedfishError: if unable to connect to Redfish API.
"""
logger.info("Testing Redfish API connection to %s (%s)", self._hostname, self._interface.ip)
self.request("get", "/redfish")
[docs]
@retry(
tries=30,
delay=timedelta(seconds=30),
backoff_mode="constant",
exceptions=(RedfishTaskNotCompletedError,),
failure_message="Polling task",
)
def poll_task(self, uri: str) -> dict:
"""Poll a Redfish task until the results are available and return them.
Arguments:
uri: the URI of the task, usually returned as Location header by the originating API call.
Raises:
spicerack.redfish.RedfishError: if the response from the server is outside the expected values of HTTP
200 or 202.
spicearck.redfish.RedfishTaskNotCompletedError: if the task is not yet completed.
"""
if self._dry_run:
return {}
response = self.request("get", uri)
if response.status_code not in (200, 202):
raise RedfishError(f"{uri} returned HTTP {response.status_code}:\n{response.text}")
results = response.json()
if response.status_code == 200 and "@odata.id" not in results: # Task completed, got data without metadata
return results
for message in results["Messages"]:
if "Oem" in message:
continue # Skip Oem messages, they might have any custom structure
# Some older Dell implementation use both keys in the same API response :/
message_id = message.get("MessageId", message.get("MessageID", "N.A."))
logger.info("[%s] %s", message_id, message["Message"])
try:
end_time = datetime.fromisoformat(results.get("EndTime", "1970-01-01T00:00:00")).timestamp()
except ValueError:
# Endtime is TIME_NA
end_time = 0
if end_time == 0:
raise RedfishTaskNotCompletedError(
f"{results['Id']} not completed yet: status={results['TaskStatus']}, state={results['TaskState']}, "
f"completed={results.get('PercentComplete', 'unknown')}%"
)
return results # When a task is polled after returning the data, will return again the metadata
[docs]
def find_account(self, username: str) -> tuple[str, str]:
"""Find the account for the given username and return its URI.
Arguments:
username: the username to search for.
Returns:
A 2-element tuple with the URI for the account and the ETag header value of the GET response.
Raises:
spicerack.redfish.RedfishError: if unable to find the account.
"""
accounts = self.request("get", f"{self.account_manager}/Accounts").json()
uris = [account["@odata.id"] for account in accounts["Members"]]
for uri in uris:
response = self.request("get", uri)
if response.json()["UserName"] == username:
return uri, response.headers.get("ETag", "")
raise RedfishError(f"Unable to find account for username {username}")
[docs]
def change_user_password(self, username: str, password: str) -> None:
"""Change the password for the account with the given username.
If the username matches the one used by the instance to connect to the API, automatically updates the instance
value so that the instance will keep working.
Arguments:
username: the username to search for.
password: the new password to set.
Raises:
spicerack.redfish.RedfishError: if unable to find the user or update its password.
"""
user_uri, etag = self.find_account(username)
logger.info("Changing password for the account with username %s: %s", username, user_uri)
response = self.request(
"patch", user_uri, json={"UserName": username, "Password": password}, headers={"If-Match": etag}
)
if response.status_code != 200:
raise RedfishError(f"Got unexpected HTTP {response.status_code}, expected 200:\n{response.text}")
if self._username == username and not self._dry_run:
self._password = password
self._api_client.http_session.auth = (self._username, self._password)
logger.info("Updated current instance password to the new password")
try:
for message in response.json().get("@Message.ExtendedInfo", []):
identifier = " ".join((message.get("MessageId", ""), message.get("Severity", ""))).strip()
logger.info(
"[%s] %s | Resolution: %s", identifier, message.get("Message", ""), message.get("Resolution", "")
)
except json.JSONDecodeError as e:
logger.info("No JSON response after changing the management password, moving on: %s", e)
[docs]
@abstractmethod
def get_power_state(self) -> str:
"""Return the current power state of the device."""
@property
def is_uefi(self) -> bool:
"""Return weather the host is legacy BIOS or UEFI."""
response = self.request("get", f"{self.system_manager}/Bios").json()
return "efi" in response["Attributes"].get(self.boot_mode_attribute, "").lower()
[docs]
def force_http_boot_once(self) -> None:
"""Force the host to boot over UEFI HTTP at the next reboot.
Raises:
spicerack.redfish.RedfishError: if unable to perform the config change or not UEFI host.
"""
if not self.is_uefi:
raise RedfishError("HTTP boot is only possible for UEFI hosts.")
logger.info("Setting the next boot to UEFI HTTP for %s", self._hostname)
efi_http_boot = {
"Boot": {
"BootSourceOverrideEnabled": "Once",
"BootSourceOverrideTarget": self.http_boot_target,
"BootSourceOverrideMode": "UEFI",
}
}
self.request(
"patch",
self.system_manager,
json=efi_http_boot,
)
[docs]
def chassis_reset(self, action: ChassisResetPolicy) -> None:
"""Perform a reset of the chassis power status.
Arguments:
action: the reset policy to use.
Raises:
spicerack.redfish.RedfishError: if unable to perform the reset.
"""
logger.info("Resetting chassis power status for %s to %s", self._hostname, action.value)
response = self.request(
"post",
f"{self.system_manager}/Actions/ComputerSystem.Reset",
json={"ResetType": action.value},
)
if response.status_code not in [204, 200] and not self._dry_run:
raise RedfishError(
f"Got unexpected response HTTP {response.status_code}, expected HTTP 200/204: {response.text}"
)
[docs]
class DellSCP:
"""Represent a Dell System Configuration Profile configuration as returned by Redfish API."""
def __init__(self, config: dict, target: DellSCPTargetPolicy, *, allow_new_attributes: bool = False):
"""Parse the Redfish API response.
Arguments:
config: the configuration as returned by Redfish API.
target: describe which sections of the configuration are represented in the loaded configuration.
allow_new_attributes: when set to :py:data:`True` it allows the creation of new attributes not
already present in the provided configuration that otherwise would raise an exception. This is
useful for example when changing the boot mode between Uefi and Bios that changes the keys present.
"""
self._config = config
self._target = target
self._allow_new_attributes = allow_new_attributes
# Track if the Components property have been emptied, allowing the creation of new components
self._emptied_components = False
@property
def config(self) -> dict:
"""Getter for the whole configuration in Dell's format."""
return self._config
@property
def target(self) -> DellSCPTargetPolicy:
"""Getter for the target that the current configuration represents."""
return self._target
@property
def service_tag(self) -> str:
"""Getter for the device Dell's Service Tag."""
return self._config["SystemConfiguration"]["ServiceTag"]
@property
def model(self) -> str:
"""Getter for the device Dell's model."""
return self._config["SystemConfiguration"]["Model"]
@property
def timestamp(self) -> datetime:
"""Getter for the timestamp when the configuration dump was generated."""
return datetime.strptime(self._config["SystemConfiguration"]["TimeStamp"], "%c")
@property
def comments(self) -> list[str]:
"""Getter for the comments associated with the configuration."""
return [
comment["Comment"]
for comment in self._config["SystemConfiguration"].get("Comments", [])
if "Comment" in comment
]
@property
def components(self) -> dict[str, dict[str, str]]:
"""Getter for the components present in the configuration in a simplified view.
The returned dictionary where all the keys are recursively sorted and has the following format::
{
'<component name>': {
'key1': 'value1',
'key2': 'value2',
},
}
"""
components: dict[str, dict[str, str]] = {}
for component in self._config["SystemConfiguration"].get("Components", []):
components[component["FQDD"]] = {}
for attribute in component.get("Attributes", []):
components[component["FQDD"]][attribute["Name"]] = attribute["Value"]
# Sort the components recursively
return {component: dict(sorted(components[component].items())) for component in sorted(components)}
[docs]
def set(self, component_name: str, attribute_name: str, attribute_value: str) -> bool:
"""Update the current configuration setting to the new value for the given key in the given component.
Notes:
This updates only the instance representation of the instance. To push and apply the changes to the server
see :py:meth:`spicerack.redfish.RedfishDell.scp_push`.
In order to add attributes not present the instance must have been created with `allow_new_attributes` set
to :py:data:`True` or the :py:meth:`spicerack.redfish.DellSCP.empty_component` method called. This last one
allows to automatically create any missing component while setting attributes.
Arguments:
component_name: the name of the component the settings belongs to.
attribute_name: the attribute name whose value needs to be updated.
attribute_value: the new value for the attribute to set.
Returns:
:py:data:`True` if the value was added or changed, :py:data:`False` if it had already the correct value.
Raises:
spicerack.redfish.RedfishError: if unable to find the given component or attribute and the creation of new
items is not allowed.
"""
def new_attribute() -> dict[str, str]:
"""Local helper that returns a new attribute to append to the component."""
attribute = {
"Name": attribute_name,
"Value": attribute_value,
"Set On Import": "True",
"Comment": "Read and Write",
}
logger.info(
"Created attribute %s -> %s (with Set On Import True) with value %s",
component_name,
attribute_name,
attribute_value,
)
return attribute
for component in self._config["SystemConfiguration"]["Components"]:
if component["FQDD"] != component_name:
continue
for i in range(len(component["Attributes"])):
attribute = component["Attributes"][i]
if attribute["Name"] != attribute_name:
continue
# Attribute found, update it if different, consider comma-separated lists identical with both ',' and
# ', ' as separators.
if attribute["Value"].replace(", ", ",") == attribute_value.replace(", ", ","):
logger.info(
"Skipped set of attribute %s -> %s, has already the correct value: %s",
component_name,
attribute_name,
attribute["Value"],
)
return False
logger.info(
"Updated value for attribute %s -> %s%s: %s => %s",
component_name,
attribute_name,
" (marked Set On Import to True)" if attribute["Set On Import"] == "False" else "",
attribute["Value"],
attribute_value,
)
attribute["Value"] = attribute_value
attribute["Set On Import"] = "True"
return True
# Attribute not found, add it or raise
if self._allow_new_attributes or self._emptied_components:
component["Attributes"].append(new_attribute())
return True
raise RedfishError(f"Unable to find attribute {component_name} -> {attribute_name}")
# Component not found, add it or raise
if self._emptied_components:
self._config["SystemConfiguration"]["Components"].append(
{"FQDD": component_name, "Attributes": [new_attribute()]}
)
return True
raise RedfishError(f"Unable to find component {component_name}")
[docs]
def update(self, changes: dict[str, dict[str, str]]) -> bool:
"""Bulk update the current configuration with the set of changes provided.
Notes:
This updates only the instance representation of the instance. To push and apply the changes to the server
see :py:meth:`spicerack.redfish.RedfishDell.scp_push`.
Arguments:
changes: a dictionary of changes to apply in the same format of the one returned by
:py:meth:`spicerack.redfish.DellSCP.components`.
Returns:
:py:data:`True` if any of the values produced a change, :py:data:`False` if no change was made.
Raises:
spicerack.redfish.RedfishError: if unable to apply all the changes.
"""
was_changed = False
for component, attributes in changes.items():
for name, value in attributes.items():
was_changed |= self.set(component, name, value)
return was_changed
[docs]
def empty_components(self) -> None:
"""Empty the current Components from the configuration, allowing to create a new configuration from scratch.
After calling this method is possible to set values for non-existing components that would otherwise raise an
exception.
"""
self._config["SystemConfiguration"]["Components"] = []
self._emptied_components = True
[docs]
class RedfishSupermicro(Redfish):
"""Redfish class for SuperMicro servers."""
system = "1"
"""The name of the System manager."""
manager = "1"
"""The name of the Out of Band manager."""
log_service = "Log1"
"""The name of the Log service."""
reboot_message_id = "Event.1.0.SystemPowerAction"
"""The message ID for a reboot event."""
boot_mode_attribute = "BootModeSelect"
"""The boot mode key in the Bios attributes."""
http_boot_target = "Pxe"
"""The value to the BootSourceOverrideTarget key for HTTP boot."""
[docs]
def get_power_state(self) -> str:
"""Return the current power state of the device."""
response = self.request("get", self.system_manager).json()
return response["PowerState"]
[docs]
def add_account(
self, username: str, password: str, role: RedfishUserRoles = RedfishUserRoles.ADMINISTRATOR
) -> None:
"""Create a new account with username and password.
Arguments:
username: the username to create.
password: the password to associate with the user.
role: a :py:class:`spicerack.redfish.RedfishUserRoles` value identifying the Redfish Role.
Default is set to Administrator.
Raises:
spicerack.redfish.RedfishError: if unable to create the account.
"""
new_user_data = {"UserName": username, "Password": password, "RoleId": role.value, "Enabled": True}
self.request("post", f"{self.account_manager}/Accounts", json=new_user_data)
[docs]
class RedfishDell(Redfish):
"""Dell specific Redfish support."""
system = "System.Embedded.1"
"""The name of the System manager."""
manager = "iDRAC.Embedded.1"
"""The name of the Out of Band manager."""
log_service = "Lclog"
"""The name of the Log service."""
reboot_message_id = "RAC0182"
"""The message ID for a reboot event."""
boot_mode_attribute = "BootMode"
"""The boot mode key in the Bios attributes."""
http_boot_target = "UefiHttp"
"""The value to the BootSourceOverrideTarget key for HTTP boot."""
scp_base_uri: str = "/redfish/v1/Managers/iDRAC.Embedded.1/Actions/Oem/EID_674_Manager"
"""The Dell's SCP push base URI."""
def __init__(
self,
hostname: str,
interface: Union[ipaddress.IPv4Interface, ipaddress.IPv6Interface],
username: str,
password: str,
*,
dry_run: bool = True,
):
"""Override parent's constructor."""
super().__init__(hostname, interface, username, password, dry_run=dry_run)
self._generation = 0
@property
def log_entries(self) -> str:
"""String representing the log entries uri."""
if self.firmware_version < version.Version("4.10"):
return "/redfish/v1/Managers/Logs/Lclog"
return super().log_entries
@property
def generation(self) -> int:
"""Property representing the generation of the idrac.
This is often 13 for idrac8 and 14 for idrac9. This property allows us to add workarounds
for older idrac models
"""
if self._generation == 0:
match = re.search(r"\d+", self.oob_model)
if match is None:
logger.error("%s: Unrecognized iDRAC model %s, setting generation to 1", self._hostname, self.oob_model)
# Setting this to one allows use to continue but assumes the minimal level of support
self._generation = 1
else:
self._generation = int(match.group(0))
logger.debug("%s: iDRAC generation %s", self._hostname, self._generation)
return self._generation
[docs]
@retry(
tries=240,
delay=timedelta(seconds=10),
backoff_mode="constant",
exceptions=(RedfishError,),
)
def wait_reboot_since(self, since: datetime) -> None:
"""Wait for idrac/redfish to become responsive.
Arguments:
since: the datetime of the last reboot.
"""
self.check_connection()
if self._generation < 14:
# Probing the Gen13/iDRAC8 devices too early seems to cause the redfish deamon to crash
print("sleeping for 2 mins to let idrac boot")
time.sleep(120)
latest = self.last_reboot()
if since >= latest:
raise RedfishError("no new reboot detected")
logger.debug("%s: new management console reboot detected %s", self._hostname, latest)
# Its still takes a bit of time for redfish to fully come only so
# We just arbitrarily sleep for a bit
sleep_secs = 30
logger.debug("%s: sleeping for %d secs", self._hostname, sleep_secs)
time.sleep(sleep_secs)
[docs]
def scp_dump(
self, target: DellSCPTargetPolicy = DellSCPTargetPolicy.ALL, *, allow_new_attributes: bool = False
) -> DellSCP:
"""Dump and return the SCP (Server Configuration Profiles) configuration.
Arguments:
target: choose which sections to dump.
allow_new_attributes: when set to :py:data:`True` it allows the creation of new attributes not
already present in the retrieved configuration that otherwise would raise an exception. This is
useful for example when changing the boot mode between Uefi and Bios that changes the keys present.
Raises:
spicerack.redfish.RedfishError: if the API call fail.
spicerack.redfish.RedfishTaskNotCompletedError: if unable to fetch the dumped results.
"""
data = {"ExportFormat": "JSON", "ShareParameters": {"Target": target.value}}
task_uri = self.submit_task(f"{self.scp_base_uri}.ExportSystemConfiguration", data)
return DellSCP(self.poll_task(task_uri), target, allow_new_attributes=allow_new_attributes)
[docs]
def scp_push(
self,
scp: DellSCP,
*,
reboot: DellSCPRebootPolicy = DellSCPRebootPolicy.NO_REBOOT,
power_state: DellSCPPowerStatePolicy = DellSCPPowerStatePolicy.ON,
preview: bool = True,
) -> dict:
"""Push the SCP (Server Configuration Profiles) configuration.
Arguments:
scp: the configuration that will pushed to the server.
reboot: which reboot policy to use to apply the changes.
power_state: which final power state policy to use to apply to the host after the changes have been
applied.
preview: if :py:data:`True` perform a test push of the SCP data. This will tell if the file
parses correctly and would not result in any writes. The comments will tell if the new configuration
would not produce any changes. Forces the reboot parameter to be
:py:const:`spicerack.redfish.DellSCPRebootPolicy.NO_REBOOT`.
Returns:
The results of the push operation.
Raises:
spicerack.redfish.RedfishError: if the API call fail.
spicerack.redfish.RedfishTaskNotCompletedError: if unable to fetch the dumped results.
"""
if preview:
uri = "ImportSystemConfigurationPreview"
reboot = DellSCPRebootPolicy.NO_REBOOT
else:
uri = "ImportSystemConfiguration"
data = {
"ImportBuffer": json.dumps(scp.config), # The API requires a JSON-encoded string inside a JSON payload.
"ShareParameters": {"Target": scp.target.value},
"HostPowerState": power_state.value,
"ShutdownType": reboot.value,
}
task_id = self.submit_task(f"{self.scp_base_uri}.{uri}", data)
return self.poll_task(task_id)
[docs]
def get_power_state(self) -> str:
"""Return the current power state of the device."""
response = self.request("get", "/redfish/v1/Chassis/System.Embedded.1").json()
return response["PowerState"]