"""MySQL module (legacy).
Todo:
replace with a proper MySQL module that uses a Python MySQL client, preferably in a parallel way.
"""
import logging
from collections.abc import Iterator
from datetime import datetime
from typing import Optional, Union
from ClusterShell.MsgTree import MsgTreeElem
from cumin import NodeSet
from wmflib.constants import CORE_DATACENTERS
from spicerack.decorators import retry
from spicerack.exceptions import SpicerackError
from spicerack.remote import Remote, RemoteHostsAdapter
REPLICATION_ROLES: tuple[str, ...] = ("master", "slave", "standalone")
"""Valid replication roles."""
CORE_SECTIONS: tuple[str, ...] = (
"s1",
"s2",
"s3",
"s4",
"s5",
"s6",
"s7",
"s8",
"x1",
"es4",
"es5",
)
"""Valid MySQL RW section names (external storage RO sections are not included here)."""
logger = logging.getLogger(__name__)
[docs]
class MysqlLegacyError(SpicerackError):
"""Custom exception class for errors of this module."""
[docs]
class MysqlLegacyRemoteHosts(RemoteHostsAdapter):
"""Custom RemoteHosts class to execute MySQL queries."""
[docs]
def run_query( # pylint: disable=too-many-arguments
self,
query: str,
database: str = "",
success_threshold: float = 1.0,
batch_size: Optional[Union[int, str]] = None,
batch_sleep: Optional[float] = None,
is_safe: bool = False,
) -> Iterator[tuple[NodeSet, MsgTreeElem]]:
"""Execute the query via Remote.
Arguments:
query: the mysql query to be executed. Double quotes must be already escaped.
database: an optional MySQL database to connect to before executing the query.
success_threshold: to consider the execution successful, must be between 0.0 and 1.0.
batch_size: the batch size for cumin, either as percentage (e.g. ``25%``) or absolute number (e.g. ``5``).
batch_sleep: the batch sleep in seconds to use in Cumin before scheduling the next host.
is_safe: whether the command is safe to run also in dry-run mode because it's a read-only command that
doesn't modify the state.
Raises:
spicerack.remote.RemoteExecutionError: if the Cumin execution returns a non-zero exit code.
"""
command = f'mysql --skip-ssl --skip-column-names --batch -e "{query}" {database}'.strip()
return self._remote_hosts.run_sync(
command,
success_threshold=success_threshold,
batch_size=batch_size,
batch_sleep=batch_sleep,
is_safe=is_safe,
)
[docs]
class MysqlLegacy:
"""Class to manage MySQL servers."""
heartbeat_query: str = (
"SELECT ts FROM heartbeat.heartbeat WHERE datacenter = '{dc}' and shard = '{section}' "
"ORDER BY ts DESC LIMIT 1"
)
"""Query pattern to check the heartbeat for a given datacenter and section."""
def __init__(self, remote: Remote, dry_run: bool = True) -> None:
"""Initialize the instance.
Arguments:
remote: the Remote instance.
dry_run: whether this is a DRY-RUN.
"""
self._remote = remote
self._dry_run = dry_run
[docs]
def get_dbs(self, query: str) -> MysqlLegacyRemoteHosts:
"""Get a MysqlLegacyRemoteHosts instance for the matching hosts.
Arguments:
query: the Remote query to use to fetch the DB hosts.
"""
return MysqlLegacyRemoteHosts(self._remote.query(query))
[docs]
def get_core_dbs(
self,
*,
datacenter: Optional[str] = None,
section: Optional[str] = None,
replication_role: Optional[str] = None,
excludes: tuple[str, ...] = (),
) -> MysqlLegacyRemoteHosts:
"""Get an instance to operated on the core databases matching the parameters.
Arguments:
datacenter: the name of the datacenter to filter for, accepted values are those specified in
:py:data:`spicerack.constants.CORE_DATACENTERS`.
replication_role: the repication role to filter for, accepted values are those specified in
:py:data:`spicerack.mysql_legacy.REPLICATION_ROLES`.
section: a specific section to filter for, accepted values are those specified in
:py:data:`spicerack.mysql_legacy.CORE_SECTIONS`.
excludes: sections to exclude from getting.
Raises:
spicerack.mysql_legacy.MysqlLegacyError: on invalid data or unexpected matching hosts.
"""
query_parts = ["A:db-core"]
dc_multipler = len(CORE_DATACENTERS)
section_multiplier = len(CORE_SECTIONS)
if datacenter is not None:
dc_multipler = 1
if datacenter not in CORE_DATACENTERS:
raise MysqlLegacyError(f"Got invalid datacenter {datacenter}, accepted values are: {CORE_DATACENTERS}")
query_parts.append("A:" + datacenter)
for exclude in excludes:
if exclude not in CORE_SECTIONS:
raise MysqlLegacyError(f"Got invalid excludes {exclude}, accepted values are: {CORE_SECTIONS}")
section_multiplier -= 1
query_parts.append(f"not A:db-section-{exclude}")
if section is not None:
section_multiplier = 1
if section not in CORE_SECTIONS:
raise MysqlLegacyError(f"Got invalid section {section}, accepted values are: {CORE_SECTIONS}")
query_parts.append(f"A:db-section-{section}")
if replication_role is not None:
if replication_role not in REPLICATION_ROLES:
raise MysqlLegacyError(
f"Got invalid replication_role {replication_role}, accepted values are: {REPLICATION_ROLES}"
)
query_parts.append(f"A:db-role-{replication_role}")
mysql_hosts = MysqlLegacyRemoteHosts(self._remote.query(" and ".join(query_parts)))
# Sanity check of matched hosts in case of master selection
if replication_role == "master" and len(mysql_hosts) != dc_multipler * section_multiplier:
raise MysqlLegacyError(f"Matched {len(mysql_hosts)} masters, expected {dc_multipler * section_multiplier}")
return mysql_hosts
[docs]
def set_core_masters_readonly(self, datacenter: str) -> None:
"""Set the core masters in read-only.
Arguments:
datacenter: the name of the datacenter to filter for.
Raises:
spicerack.remote.RemoteExecutionError: on Remote failures.
spicerack.mysql_legacy.MysqlLegacyError: on failing to verify the modified value.
"""
logger.debug("Setting core DB masters in %s to be read-only", datacenter)
target = self.get_core_dbs(datacenter=datacenter, replication_role="master")
target.run_query("SET GLOBAL read_only=1")
self.verify_core_masters_readonly(datacenter, True)
[docs]
def set_core_masters_readwrite(self, datacenter: str) -> None:
"""Set the core masters in read-write.
Arguments:
datacenter: the name of the datacenter to filter for.
Raises:
spicerack.remote.RemoteExecutionError: on Remote failures.
spicerack.mysql_legacy.MysqlLegacyError: on failing to verify the modified value.
"""
logger.debug("Setting core DB masters in %s to be read-write", datacenter)
target = self.get_core_dbs(datacenter=datacenter, replication_role="master")
target.run_query("SET GLOBAL read_only=0")
self.verify_core_masters_readonly(datacenter, False)
[docs]
def verify_core_masters_readonly(self, datacenter: str, is_read_only: bool) -> None:
"""Verify that the core masters are in read-only or read-write mode.
Arguments:
datacenter: the name of the datacenter to filter for.
is_read_only: whether the read-only mode should be set or not.
Raises:
spicerack.mysql_legacy.MysqlLegacyError: on failure.
"""
logger.debug(
"Verifying core DB masters in %s have read-only=%d",
datacenter,
is_read_only,
)
target = self.get_core_dbs(datacenter=datacenter, replication_role="master")
expected = str(int(is_read_only)) # Convert it to the returned value from MySQL: 1 or 0.
failed = False
for nodeset, output in target.run_query("SELECT @@global.read_only", is_safe=True):
response = output.message().decode()
if response != expected:
logger.error(
"Expected output to be '%s', got '%s' for hosts %s",
expected,
response,
str(nodeset),
)
failed = True
if failed and not self._dry_run:
raise MysqlLegacyError(
f"Verification failed that core DB masters in {datacenter} have read-only={is_read_only}"
)
[docs]
def check_core_masters_in_sync(self, dc_from: str, dc_to: str) -> None:
"""Check that all core masters in dc_to are in sync with the core masters in dc_from.
Arguments:
dc_from: the name of the datacenter from where to get the master positions.
dc_to: the name of the datacenter where to check that they are in sync.
Raises:
spicerack.remote.RemoteExecutionError: on failure.
"""
logger.debug("Waiting for the core DB masters in %s to catch up", dc_to)
heartbeats = self.get_core_masters_heartbeats(dc_from, dc_from)
self.check_core_masters_heartbeats(dc_to, dc_from, heartbeats)
[docs]
def get_core_masters_heartbeats(self, datacenter: str, heartbeat_dc: str) -> dict[str, datetime]:
"""Get the current heartbeat values from core DB masters in DC for a given heartbeat DC.
Arguments:
datacenter: the name of the datacenter from where to get the heartbeat values.
heartbeat_dc: the name of the datacenter for which to filter the heartbeat query.
Returns:
A dictionary with the section name :py:class:`str` as keys and their heartbeat
:py:class:`datetime.datetime` as values. For example::
{'s1': datetime.datetime(2018, 1, 2, 11, 22, 33, 123456)}
Raises:
spicerack.mysql_legacy.MysqlLegacyError: on failure to gather the heartbeat or convert it into a datetime.
"""
heartbeats = {}
for section in CORE_SECTIONS:
core_dbs = self.get_core_dbs(datacenter=datacenter, section=section, replication_role="master")
heartbeats[section] = MysqlLegacy._get_heartbeat(core_dbs, section, heartbeat_dc)
return heartbeats
[docs]
def check_core_masters_heartbeats(
self, datacenter: str, heartbeat_dc: str, heartbeats: dict[str, datetime]
) -> None:
"""Check the current heartbeat values in the core DB masters in DC are in sync with the provided heartbeats.
Arguments:
datacenter: the name of the datacenter from where to get the heartbeat values.
heartbeat_dc: the name of the datacenter for which to filter the heartbeat query.
heartbeats: a dictionary with the section name :py:class:`str` as keys and heartbeat
:py:class:`datetime.datetime` for each core section as values.
Raises:
spicerack.mysql_legacy.MysqlLegacyError: on failure to gather the heartbeat or convert it into a datetime.
"""
for section, heartbeat in heartbeats.items():
self._check_core_master_in_sync(datacenter, heartbeat_dc, section, heartbeat)
@retry(exceptions=(MysqlLegacyError,))
def _check_core_master_in_sync(
self,
datacenter: str,
heartbeat_dc: str,
section: str,
parent_heartbeat: datetime,
) -> None:
"""Check and retry that the heartbeat value in a core DB master in DC is in sync with the provided heartbeat.
Arguments:
datacenter: the name of the datacenter from where to get the heartbeat value.
heartbeat_dc: the name of the datacenter for which to filter the heartbeat query.
section: the section name from where to get the heartbeat value and filter the heartbeat query.
master_heartbeat: the reference heartbeat from the parent master to use to verify this master is in sync
with it.
Raises:
spicerack.mysql_legacy.MysqlLegacyError: on failure to gather the heartbeat or convert it into a datetime
or not yet in sync.
"""
core_dbs = self.get_core_dbs(datacenter=datacenter, section=section, replication_role="master")
local_heartbeat = MysqlLegacy._get_heartbeat(core_dbs, section, heartbeat_dc)
# The check requires that local_heartbeat is stricly greater than parent_heartbeat because heartbeat writes also
# when the DB is in read-only mode and has a granularity of 1s (as of 2018-09), meaning that an event could have
# been written after the last heartbeat but before the DB was set in read-only mode and that event could not
# have been replicated, hence checking the next heartbeat to ensure they are in sync.
if local_heartbeat <= parent_heartbeat:
delta = (local_heartbeat - parent_heartbeat).total_seconds()
raise MysqlLegacyError(
f"Heartbeat from master {core_dbs} for section {section} not yet in sync: "
f"{local_heartbeat} <= {parent_heartbeat} (delta={delta})"
)
@staticmethod
def _get_heartbeat(mysql_hosts: MysqlLegacyRemoteHosts, section: str, heartbeat_dc: str) -> datetime:
"""Get the heartbeat from the remote host for a given DC.
Arguments:
mysql_hosts: the instance for the target DB to query.
section: the DB section for which to get the heartbeat.
heartbeat_dc: the name of the datacenter for which to filter the heartbeat query.
Raises:
spicerack.mysql_legacy.MysqlLegacyError: on failure to gather the heartbeat or convert it into a datetime.
"""
query = MysqlLegacy.heartbeat_query.format(dc=heartbeat_dc, section=section)
for _, output in mysql_hosts.run_query(query, is_safe=True):
try:
heartbeat_str = output.message().decode()
heartbeat = datetime.strptime(heartbeat_str, "%Y-%m-%dT%H:%M:%S.%f")
break
except (TypeError, ValueError) as e:
raise MysqlLegacyError(f"Unable to convert heartbeat '{heartbeat_str}' into datetime") from e
else:
raise MysqlLegacyError(f"Unable to get heartbeat from master {mysql_hosts} for section {section}")
return heartbeat