Source code for spicerack.toolforge.etcdctl

"""Wrapper around etcdctl handling parameters and such."""

import json
import logging
from typing import Optional, Union, cast

from spicerack.exceptions import SpicerackError
from spicerack.remote import RemoteHosts, RemoteHostsAdapter

logger = logging.getLogger(__name__)
SimpleType = Union[str, int, bool]


[docs] class TooManyHosts(SpicerackError): """Raised when there's more hosts than supported passed."""
[docs] class UnableToParseOutput(SpicerackError): """Raised when there's an error trying to parse etcdctl output."""
[docs] class EtcdctlController(RemoteHostsAdapter): """Node that is able to run etcdctl and control an etcd cluster.""" def __init__(self, *, remote_host: RemoteHosts): """Init.""" if len(remote_host.hosts) > 1: raise TooManyHosts("EtcdctlController currently only supports running in one node.") super().__init__(remote_hosts=remote_host) endpoints = f"https://{self._remote_hosts.hosts}:2379" cert_file = f"/etc/etcd/ssl/{self._remote_hosts.hosts}.pem" ca_file = "/etc/etcd/ssl/ca.pem" key_file = f"/etc/etcd/ssl/{self._remote_hosts.hosts}.priv" self._base_args = [ "ETCDCTL_API=3", "etcdctl", "--endpoints", endpoints, "--cacert", ca_file, "--cert", cert_file, "--key", key_file, ]
[docs] def get_cluster_info(self) -> dict[str, dict[str, SimpleType]]: """Gets the current etcd cluster information.""" args = [*self._base_args, "member", "list", "-w=json"] raw_results = self._remote_hosts.run_sync(" ".join(args)) try: result = next(raw_results)[1].message().decode("utf8") except StopIteration as e: raise UnableToParseOutput("Got no results when trying to retrieve the etcdctl members list.") from e members = json.loads(result.strip()).get("members", []) if result else [] structured_result = {} for member in members: if "ID" not in member: raise UnableToParseOutput(f"Unable to parse etcdctl output (missing ID)\nFull output: {result}") if "peerURLs" not in member: raise UnableToParseOutput( f"Unable to parse etcdctl output (missing peerURLs)\nFull output: {result}" ) struct_elem: dict[str, SimpleType] = {} decimal_id = member["ID"] member_id = format(int(decimal_id), "x") struct_elem["member_id"] = member_id name = member.get("name", "") if name: struct_elem["name"] = name clienturls = member.get("clientURLs", [""])[0] if clienturls: struct_elem["clientURLs"] = clienturls peerurls = member.get("peerURLs", [""])[0] struct_elem["peerURLs"] = peerurls struct_elem["status"] = "up" if "clientURLs" in struct_elem else "unstarted" structured_result[member_id] = struct_elem return structured_result
[docs] def ensure_node_exists( self, new_member_fqdn: str, member_peer_url: Optional[str] = None, ) -> str: """Ensure the existance of an etcd member adding it if not present. Makes sure that the given new_member_fqdn member exists and is part of the etcd cluster, and returns its member id. """ if not member_peer_url: member_peer_url = f"https://{new_member_fqdn}:2380" before_members = self.get_cluster_info() current_entry = self._get_member_or_none( members=before_members, member_name=new_member_fqdn, member_peer_url=member_peer_url, ) extra_args = None if current_entry and current_entry["peerURLs"] == member_peer_url: logger.info( "Skipping addition of member %s as it already exists.", new_member_fqdn, ) return cast(str, current_entry["member_id"]) if current_entry and current_entry["peerURLs"] != member_peer_url: logger.info( "Updating url for already existing member %s.", new_member_fqdn, ) extra_args = [ "member", "update", cast(str, current_entry["member_id"]), member_peer_url, ] else: extra_args = ["member", "add", new_member_fqdn, "--peer-urls", member_peer_url] self._remote_hosts.run_sync(" ".join(self._base_args + extra_args)) if not current_entry: # unfortunately, etcdctl add does not give the member_id, but only the new # name, so we have to diff before and after to find out which one is the # new member id after_members = self.get_cluster_info() new_member_id = list(set(after_members.keys()) - set(before_members.keys()))[0] else: new_member_id = cast(str, current_entry["member_id"]) return new_member_id
[docs] def ensure_node_does_not_exist( self, member_fqdn: str, ) -> Optional[str]: """Ensure the non existance of an etcd member, removing it if present. Makes sure that the given member_fqdn member is not part of the etcd cluster, returns its old member id or None if it was not there. """ before_members = self.get_cluster_info() current_entry = self._get_member_or_none( members=before_members, member_name=member_fqdn, ) if not current_entry: logger.info( "Skipping removal of member %s as it does not exist.", member_fqdn, ) return None logger.info("Removing etcd member %s.", member_fqdn) extra_args = ["member", "remove", str(current_entry["member_id"])] self._remote_hosts.run_sync(" ".join(self._base_args + extra_args)) return str(current_entry["member_id"])
@staticmethod def _to_simple_type(maybe_not_string: str) -> SimpleType: """Simple type interpolation, etcdctl member list does not return json.""" if maybe_not_string == "true": return True if maybe_not_string == "false": return False try: return int(maybe_not_string) except ValueError: pass return maybe_not_string @classmethod def _to_simple_tuple(cls, elem: str) -> tuple[str, SimpleType]: elems = elem.split("=", 1) if len(elems) != 2: raise UnableToParseOutput(f"Malformed element '{elem}', has no '='.") return (elems[0], cls._to_simple_type(elems[1])) @staticmethod def _get_member_or_none( members: dict[str, dict[str, SimpleType]], member_name: str, member_peer_url: Optional[str] = None ) -> dict[str, SimpleType]: return next( ( member for member in members.values() if ( ("name" in member and member["name"] == member_name) # in case the member is not started, it does not show the name, # just the peer url or ("name" not in member and member["peerURLs"] == member_peer_url) ) ), {}, )