Source code for spicerack.mysql_legacy

"""MySQL module (legacy).

Todo:
    replace with a proper MySQL module that uses a Python MySQL client, preferably in a parallel way.

"""
import logging
from collections.abc import Iterator
from datetime import datetime
from typing import Optional, Union

from ClusterShell.MsgTree import MsgTreeElem
from cumin import NodeSet
from wmflib.constants import CORE_DATACENTERS

from spicerack.decorators import retry
from spicerack.exceptions import SpicerackError
from spicerack.remote import Remote, RemoteHostsAdapter

REPLICATION_ROLES: tuple[str, ...] = ("master", "slave", "standalone")
"""Valid replication roles."""
CORE_SECTIONS: tuple[str, ...] = (
    "s1",
    "s2",
    "s3",
    "s4",
    "s5",
    "s6",
    "s7",
    "s8",
    "x1",
    "es4",
    "es5",
)
"""Valid MySQL RW section names (external storage RO sections are not included here)."""

logger = logging.getLogger(__name__)


[docs] class MysqlLegacyError(SpicerackError): """Custom exception class for errors of this module."""
[docs] class MysqlLegacyRemoteHosts(RemoteHostsAdapter): """Custom RemoteHosts class to execute MySQL queries."""
[docs] def run_query( # pylint: disable=too-many-arguments self, query: str, database: str = "", success_threshold: float = 1.0, batch_size: Optional[Union[int, str]] = None, batch_sleep: Optional[float] = None, is_safe: bool = False, ) -> Iterator[tuple[NodeSet, MsgTreeElem]]: """Execute the query via Remote. Arguments: query: the mysql query to be executed. Double quotes must be already escaped. database: an optional MySQL database to connect to before executing the query. success_threshold: to consider the execution successful, must be between 0.0 and 1.0. batch_size: the batch size for cumin, either as percentage (e.g. ``25%``) or absolute number (e.g. ``5``). batch_sleep: the batch sleep in seconds to use in Cumin before scheduling the next host. is_safe: whether the command is safe to run also in dry-run mode because it's a read-only command that doesn't modify the state. Raises: spicerack.remote.RemoteExecutionError: if the Cumin execution returns a non-zero exit code. """ command = f'mysql --skip-ssl --skip-column-names --batch -e "{query}" {database}'.strip() return self._remote_hosts.run_sync( command, success_threshold=success_threshold, batch_size=batch_size, batch_sleep=batch_sleep, is_safe=is_safe, )
[docs] class MysqlLegacy: """Class to manage MySQL servers.""" heartbeat_query: str = ( "SELECT ts FROM heartbeat.heartbeat WHERE datacenter = '{dc}' and shard = '{section}' " "ORDER BY ts DESC LIMIT 1" ) """Query pattern to check the heartbeat for a given datacenter and section.""" def __init__(self, remote: Remote, dry_run: bool = True) -> None: """Initialize the instance. Arguments: remote: the Remote instance. dry_run: whether this is a DRY-RUN. """ self._remote = remote self._dry_run = dry_run
[docs] def get_dbs(self, query: str) -> MysqlLegacyRemoteHosts: """Get a MysqlLegacyRemoteHosts instance for the matching hosts. Arguments: query: the Remote query to use to fetch the DB hosts. """ return MysqlLegacyRemoteHosts(self._remote.query(query))
[docs] def get_core_dbs( self, *, datacenter: Optional[str] = None, section: Optional[str] = None, replication_role: Optional[str] = None, excludes: tuple[str, ...] = (), ) -> MysqlLegacyRemoteHosts: """Get an instance to operated on the core databases matching the parameters. Arguments: datacenter: the name of the datacenter to filter for, accepted values are those specified in :py:data:`spicerack.constants.CORE_DATACENTERS`. replication_role: the repication role to filter for, accepted values are those specified in :py:data:`spicerack.mysql_legacy.REPLICATION_ROLES`. section: a specific section to filter for, accepted values are those specified in :py:data:`spicerack.mysql_legacy.CORE_SECTIONS`. excludes: sections to exclude from getting. Raises: spicerack.mysql_legacy.MysqlLegacyError: on invalid data or unexpected matching hosts. """ query_parts = ["A:db-core"] dc_multipler = len(CORE_DATACENTERS) section_multiplier = len(CORE_SECTIONS) if datacenter is not None: dc_multipler = 1 if datacenter not in CORE_DATACENTERS: raise MysqlLegacyError(f"Got invalid datacenter {datacenter}, accepted values are: {CORE_DATACENTERS}") query_parts.append("A:" + datacenter) for exclude in excludes: if exclude not in CORE_SECTIONS: raise MysqlLegacyError(f"Got invalid excludes {exclude}, accepted values are: {CORE_SECTIONS}") section_multiplier -= 1 query_parts.append(f"not A:db-section-{exclude}") if section is not None: section_multiplier = 1 if section not in CORE_SECTIONS: raise MysqlLegacyError(f"Got invalid section {section}, accepted values are: {CORE_SECTIONS}") query_parts.append(f"A:db-section-{section}") if replication_role is not None: if replication_role not in REPLICATION_ROLES: raise MysqlLegacyError( f"Got invalid replication_role {replication_role}, accepted values are: {REPLICATION_ROLES}" ) query_parts.append(f"A:db-role-{replication_role}") mysql_hosts = MysqlLegacyRemoteHosts(self._remote.query(" and ".join(query_parts))) # Sanity check of matched hosts in case of master selection if replication_role == "master" and len(mysql_hosts) != dc_multipler * section_multiplier: raise MysqlLegacyError(f"Matched {len(mysql_hosts)} masters, expected {dc_multipler * section_multiplier}") return mysql_hosts
[docs] def set_core_masters_readonly(self, datacenter: str) -> None: """Set the core masters in read-only. Arguments: datacenter: the name of the datacenter to filter for. Raises: spicerack.remote.RemoteExecutionError: on Remote failures. spicerack.mysql_legacy.MysqlLegacyError: on failing to verify the modified value. """ logger.debug("Setting core DB masters in %s to be read-only", datacenter) target = self.get_core_dbs(datacenter=datacenter, replication_role="master") target.run_query("SET GLOBAL read_only=1") self.verify_core_masters_readonly(datacenter, True)
[docs] def set_core_masters_readwrite(self, datacenter: str) -> None: """Set the core masters in read-write. Arguments: datacenter: the name of the datacenter to filter for. Raises: spicerack.remote.RemoteExecutionError: on Remote failures. spicerack.mysql_legacy.MysqlLegacyError: on failing to verify the modified value. """ logger.debug("Setting core DB masters in %s to be read-write", datacenter) target = self.get_core_dbs(datacenter=datacenter, replication_role="master") target.run_query("SET GLOBAL read_only=0") self.verify_core_masters_readonly(datacenter, False)
[docs] def verify_core_masters_readonly(self, datacenter: str, is_read_only: bool) -> None: """Verify that the core masters are in read-only or read-write mode. Arguments: datacenter: the name of the datacenter to filter for. is_read_only: whether the read-only mode should be set or not. Raises: spicerack.mysql_legacy.MysqlLegacyError: on failure. """ logger.debug( "Verifying core DB masters in %s have read-only=%d", datacenter, is_read_only, ) target = self.get_core_dbs(datacenter=datacenter, replication_role="master") expected = str(int(is_read_only)) # Convert it to the returned value from MySQL: 1 or 0. failed = False for nodeset, output in target.run_query("SELECT @@global.read_only", is_safe=True): response = output.message().decode() if response != expected: logger.error( "Expected output to be '%s', got '%s' for hosts %s", expected, response, str(nodeset), ) failed = True if failed and not self._dry_run: raise MysqlLegacyError( f"Verification failed that core DB masters in {datacenter} have read-only={is_read_only}" )
[docs] def check_core_masters_in_sync(self, dc_from: str, dc_to: str) -> None: """Check that all core masters in dc_to are in sync with the core masters in dc_from. Arguments: dc_from: the name of the datacenter from where to get the master positions. dc_to: the name of the datacenter where to check that they are in sync. Raises: spicerack.remote.RemoteExecutionError: on failure. """ logger.debug("Waiting for the core DB masters in %s to catch up", dc_to) heartbeats = self.get_core_masters_heartbeats(dc_from, dc_from) self.check_core_masters_heartbeats(dc_to, dc_from, heartbeats)
[docs] def get_core_masters_heartbeats(self, datacenter: str, heartbeat_dc: str) -> dict[str, datetime]: """Get the current heartbeat values from core DB masters in DC for a given heartbeat DC. Arguments: datacenter: the name of the datacenter from where to get the heartbeat values. heartbeat_dc: the name of the datacenter for which to filter the heartbeat query. Returns: A dictionary with the section name :py:class:`str` as keys and their heartbeat :py:class:`datetime.datetime` as values. For example:: {'s1': datetime.datetime(2018, 1, 2, 11, 22, 33, 123456)} Raises: spicerack.mysql_legacy.MysqlLegacyError: on failure to gather the heartbeat or convert it into a datetime. """ heartbeats = {} for section in CORE_SECTIONS: core_dbs = self.get_core_dbs(datacenter=datacenter, section=section, replication_role="master") heartbeats[section] = MysqlLegacy._get_heartbeat(core_dbs, section, heartbeat_dc) return heartbeats
[docs] def check_core_masters_heartbeats( self, datacenter: str, heartbeat_dc: str, heartbeats: dict[str, datetime] ) -> None: """Check the current heartbeat values in the core DB masters in DC are in sync with the provided heartbeats. Arguments: datacenter: the name of the datacenter from where to get the heartbeat values. heartbeat_dc: the name of the datacenter for which to filter the heartbeat query. heartbeats: a dictionary with the section name :py:class:`str` as keys and heartbeat :py:class:`datetime.datetime` for each core section as values. Raises: spicerack.mysql_legacy.MysqlLegacyError: on failure to gather the heartbeat or convert it into a datetime. """ for section, heartbeat in heartbeats.items(): self._check_core_master_in_sync(datacenter, heartbeat_dc, section, heartbeat)
@retry(exceptions=(MysqlLegacyError,)) def _check_core_master_in_sync( self, datacenter: str, heartbeat_dc: str, section: str, parent_heartbeat: datetime, ) -> None: """Check and retry that the heartbeat value in a core DB master in DC is in sync with the provided heartbeat. Arguments: datacenter: the name of the datacenter from where to get the heartbeat value. heartbeat_dc: the name of the datacenter for which to filter the heartbeat query. section: the section name from where to get the heartbeat value and filter the heartbeat query. master_heartbeat: the reference heartbeat from the parent master to use to verify this master is in sync with it. Raises: spicerack.mysql_legacy.MysqlLegacyError: on failure to gather the heartbeat or convert it into a datetime or not yet in sync. """ core_dbs = self.get_core_dbs(datacenter=datacenter, section=section, replication_role="master") local_heartbeat = MysqlLegacy._get_heartbeat(core_dbs, section, heartbeat_dc) # The check requires that local_heartbeat is stricly greater than parent_heartbeat because heartbeat writes also # when the DB is in read-only mode and has a granularity of 1s (as of 2018-09), meaning that an event could have # been written after the last heartbeat but before the DB was set in read-only mode and that event could not # have been replicated, hence checking the next heartbeat to ensure they are in sync. if local_heartbeat <= parent_heartbeat: delta = (local_heartbeat - parent_heartbeat).total_seconds() raise MysqlLegacyError( f"Heartbeat from master {core_dbs} for section {section} not yet in sync: " f"{local_heartbeat} <= {parent_heartbeat} (delta={delta})" ) @staticmethod def _get_heartbeat(mysql_hosts: MysqlLegacyRemoteHosts, section: str, heartbeat_dc: str) -> datetime: """Get the heartbeat from the remote host for a given DC. Arguments: mysql_hosts: the instance for the target DB to query. section: the DB section for which to get the heartbeat. heartbeat_dc: the name of the datacenter for which to filter the heartbeat query. Raises: spicerack.mysql_legacy.MysqlLegacyError: on failure to gather the heartbeat or convert it into a datetime. """ query = MysqlLegacy.heartbeat_query.format(dc=heartbeat_dc, section=section) for _, output in mysql_hosts.run_query(query, is_safe=True): try: heartbeat_str = output.message().decode() heartbeat = datetime.strptime(heartbeat_str, "%Y-%m-%dT%H:%M:%S.%f") break except (TypeError, ValueError) as e: raise MysqlLegacyError(f"Unable to convert heartbeat '{heartbeat_str}' into datetime") from e else: raise MysqlLegacyError(f"Unable to get heartbeat from master {mysql_hosts} for section {section}") return heartbeat