"""Ganeti module."""
import logging
from typing import Dict, Optional, Tuple
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException
from wmflib.requests import http_session
from spicerack.constants import PUPPET_CA_PATH
from spicerack.exceptions import SpicerackError
from spicerack.remote import Remote, RemoteHosts
logger = logging.getLogger(__name__)
RAPI_URL_FORMAT: str = 'https://{cluster}:5080'
""":py:class:`str`: the template string to construct the Ganeti RAPI URL."""
CLUSTERS_AND_ROWS: Dict[str, Tuple[str, ...]] = {
'ganeti01.svc.eqiad.wmnet': ('A', 'B', 'C', 'D'),
'ganeti01.svc.codfw.wmnet': ('A', 'B', 'C', 'D'),
'ganeti01.svc.esams.wmnet': ('OE',),
'ganeti01.svc.ulsfo.wmnet': ('1',),
'ganeti01.svc.eqsin.wmnet': ('1',)
}
""":py:class:`dict`: the available Ganeti clusters with the set of available rows in each of them."""
INSTANCE_LINKS: Tuple[str, ...] = ('public', 'private', 'analytics')
""":py:class:`tuple`: the list of possible instance link types."""
[docs]class GanetiError(SpicerackError):
"""Raised on errors from Ganeti operations."""
[docs]class GanetiRAPI:
"""Class which wraps the read-only Ganeti RAPI."""
def __init__(self, cluster_url: str, username: str, password: str, timeout: int, ca_path: str):
"""Initialize the instance.
Arguments:
cluster_url (str): the URL of the RAPI endpoint.
username (str): the RAPI user name
password (str): the RAPI user's password
timeout (int): the timeout in seconds for each request
ca_path (str): the path to the signing certificate authority
"""
self._url = cluster_url
self._http_session = http_session('.'.join((self.__module__, self.__class__.__name__)), timeout=timeout)
self._http_session.auth = HTTPBasicAuth(username, password)
self._http_session.verify = ca_path
def _api_get_request(self, *targets: str) -> Dict:
"""Perform a RAPI request.
Arguments:
*targets: The path components of the request (minus the /2/ part of the path)
Returns:
dict: The decoded JSON response
Raises:
spicerack.ganeti.GanetiError: on non-200 responses
"""
full_url = '/'.join([self._url, '2'] + list(targets))
try:
result = self._http_session.get(full_url)
except RequestException as ex:
raise GanetiError('Error while performing request to RAPI') from ex
if result.status_code != 200:
raise GanetiError('Non-200 from API: {}: {}'.format(result.status_code, result.text))
return result.json()
@property
def info(self) -> Dict:
"""Return complete cluster information.
Returns:
dict: Cluster information dictionary
Raises:
spicerack.ganeti.GanetiError: API errors
"""
return self._api_get_request('info')
@property
def master(self) -> Optional[str]:
"""Return the internal name for the current ganeti master node.
Returns:
str: The hostname of the master node (or None if the data is missing).
Raises:
spicerack.ganeti.GanetiError: API errors
"""
return self.info.get('master')
[docs] def fetch_instance(self, fqdn: str) -> Dict:
"""Return full information about an instance.
Arguments:
fqdn: the FQDN of the instance in question
Returns:
dict: host information
Raises:
spicerack.ganeti.GanetiError: API errors
"""
return self._api_get_request('instances', fqdn)
[docs] def fetch_instance_mac(self, fqdn: str) -> str:
"""Convenience method to return the 0th adapter's MAC address for an instance.
Note that we don't allow the creation of instances with more than one MAC address at this time.
Arguments:
fqdn: the FQDN of the instance in question
Returns:
str: MAC address
Raises:
GanetiError: API errors
"""
instance_info = self.fetch_instance(fqdn)
if 'nic.macs' not in instance_info or not instance_info['nic.macs']:
raise GanetiError("Can't find any MACs for instance")
return instance_info['nic.macs'][0]
[docs]class GntInstance:
"""Class that wraps gnt-instance command execution on a Ganeti cluster master host."""
def __init__(self, master: RemoteHosts, cluster: str, instance: str):
"""Initialize the instance.
Arguments:
master (spicerack.remote.RemoteHosts): the Ganeti cluster master remote instance.
cluster (str): the Ganeti cluster name.
instance (str): the FQDN of the Ganeti VM instance to act upon.
"""
self._master = master
self._cluster = cluster
self._instance = instance
@property
def cluster(self) -> str:
"""Getter for the Ganeti cluster property.
Returns:
str: the Ganeti cluster name the instance belongs to.
"""
return self._cluster
[docs] def shutdown(self, *, timeout: int = 2) -> None:
"""Shutdown the Ganeti VM instance.
Arguments:
timeout (int): time in minutes to wait for a clean shutdown before pulling the plug.
"""
logger.info('Shutting down VM %s in cluster %s', self._instance, self._cluster)
self._master.run_sync('gnt-instance shutdown --timeout={timeout} {instance}'.format(
timeout=timeout, instance=self._instance))
[docs] def remove(self, *, shutdown_timeout: int = 2) -> None:
"""Shutdown and remove the VM instance from the Ganeti cluster, including its disks.
Arguments:
shutdown_timeout (int): time in minutes to wait for a clean shutdown before pulling the plug.
Note:
This action requires few minutes, inform the user about the waiting time when using this method.
"""
logger.info('Removing VM %s in cluster %s. This may take a few minutes.', self._instance, self._cluster)
self._master.run_sync('gnt-instance remove --shutdown-timeout={timeout} --force {instance}'.format(
timeout=shutdown_timeout, instance=self._instance))
[docs] def add(self, *, row: str, vcpus: int, memory: int, disk: int, link: str) -> None:
"""Create the VM for the instance in the Ganeti cluster with the specified characteristic.
Arguments:
row (str): the Datacenter physical row where to allocate the instance, one of
:py:const:`spicerack.ganeti.CLUSTERS_AND_ROWS` based on the current cluster.
vcpus (int): the number of virtual CPUs to assign to the instance.
memory (int): the amount of RAM to assign to the instance in gigabytes.
disk (int): the amount of disk to assign to the instance in gigabytes.
link (str): the type of network link to use, one of :py:const:`spicerack.ganeti.INSTANCE_LINKS`.
Raises:
spicerack.ganeti.GanetiError: on parameter validation error.
Note:
This action requires few minutes, inform the user about the waiting time when using this method.
"""
if link not in INSTANCE_LINKS:
raise GanetiError("Invalid link '{link}', expected one of: {links}".format(
link=link, links=INSTANCE_LINKS))
if row not in CLUSTERS_AND_ROWS[self._cluster]:
raise GanetiError("Invalid row '{row}' for cluster {cluster}, expected one of: {rows}".format(
row=row, cluster=self._cluster, rows=CLUSTERS_AND_ROWS[self._cluster]))
local_vars = locals()
for var_label in ('vcpus', 'memory', 'disk'):
if local_vars[var_label] <= 0:
raise GanetiError("Invalid value '{value}' for {label}, expected positive integer.".format(
value=local_vars[var_label], label=var_label))
command = ('gnt-instance add'
' -t drbd'
' -I hail'
' --net 0:link={link}'
' --hypervisor-parameters=kvm:boot_order=network'
' -o debootstrap+default'
' --no-install'
' -g row_{row}'
' -B vcpus={vcpus},memory={memory}g'
' --disk 0:size={disk}g'
' {fqdn}').format(link=link, row=row, vcpus=vcpus, memory=memory, disk=disk, fqdn=self._instance)
logger.info(('Creating VM %s in cluster %s with row=%s vcpus=%d memory=%dGB disk=%dGB link=%s. '
'This may take a few minutes.'), self._instance, self._cluster, row, vcpus, memory, disk, link)
results = self._master.run_sync(command)
for _, output in results:
logger.info(output.message().decode())
[docs]class Ganeti:
"""Class which wraps all Ganeti clusters."""
def __init__(self, username: str, password: str, timeout: int, remote: Remote):
"""Initialize the instance.
Arguments:
username (str): The RAPI username to use.
password (str): The RAPI password to use.
timeout (int): The timeout in seconds for each request to the API.
remote (spicerack.remote.Remote): the remote instance to connect to Ganeti hosts.
"""
self._username = username
self._password = password
self._timeout = timeout
self._remote = remote
[docs] def rapi(self, cluster: str) -> GanetiRAPI:
"""Return a RAPI object for a particular cluster.
Arguments:
cluster (str): the name of the Ganeti cluster to get a RAPI for.
Returns:
spicerack.ganeti.GanetiRAPI: the RAPI interface object
Raises:
spicerack.ganeti.GanetiError: on an invalid cluster name
"""
if cluster not in CLUSTERS_AND_ROWS:
raise GanetiError('Cannot find cluster {} (expected {}).'.format(cluster, tuple(CLUSTERS_AND_ROWS.keys())))
cluster_url = RAPI_URL_FORMAT.format(cluster=cluster)
return GanetiRAPI(cluster_url, self._username, self._password, self._timeout, PUPPET_CA_PATH)
[docs] def fetch_cluster_for_instance(self, fqdn: str) -> str:
"""Return the cluster name for a given FQDN if possible.
Arguments:
fqdn (str): The FQDN for the host to locate.
Returns:
str: The cluster name if found.
Raises:
spicerack.ganeti.GanetiError: if the host was not found in any configured cluster.
"""
for cluster in CLUSTERS_AND_ROWS:
cluster_rapi = self.rapi(cluster)
try:
cluster_rapi.fetch_instance(fqdn)
return cluster
except GanetiError:
continue
raise GanetiError("Cannot find {} in any configured cluster.".format(fqdn))
[docs] def instance(self, instance: str, *, cluster: str = '') -> GntInstance:
"""Return an instance of GntInstance to perform RW operation on the given Ganeti VM instance.
Arguments:
instance (str): the FQDN of the Ganeti VM instance to act upon.
cluster (str, optional): the name of the Ganeti cluster where to look for the instance.
Returns:
spicerack.ganeti.GntInstance: ready to perform RW actions.
"""
if not cluster:
cluster = self.fetch_cluster_for_instance(instance)
master = self.rapi(cluster).master
if master is None:
raise GanetiError('Master for cluster {cluster} is None'.format(cluster=cluster))
return GntInstance(self._remote.query(master), cluster, instance)