Source code for spicerack.redis_cluster

"""Redis cluster module."""
import logging
from collections import defaultdict
from pathlib import Path
from typing import Any, Union

from redis import StrictRedis
from wmflib.config import load_yaml_config

from spicerack.exceptions import SpicerackError

logger = logging.getLogger(__name__)


[docs] class RedisClusterError(SpicerackError): """Custom exception class for errors in the RedisCluster class."""
[docs] class RedisCluster: """Class to manage a Redis Cluster.""" def __init__(self, cluster: str, config_dir: Path, *, dry_run: bool = True) -> None: """Initialize the instance. Arguments: cluster: the name of the cluster to connect to. config_dir: path to the directory containing the configuration files for the Redis clusters. dry_run: whether this is a DRY-RUN. """ self._dry_run = dry_run self._shards: defaultdict[str, dict] = defaultdict(dict) config = load_yaml_config(config_dir / f"{cluster}.yaml") for datacenter, shards in config["shards"].items(): for shard, data in shards.items(): self._shards[datacenter][shard] = RedisInstance( host=data["host"], port=data["port"], password=config["password"], decode_responses=True, )
[docs] def start_replica(self, datacenter: str, master_datacenter: str) -> None: """Start the cluster replica in a datacenter from a master datacenter. Arguments: datacenter: the datacenter on which to start the replica. master_datacenter: the datacenter from which to replicate. Raises: spicerack.redis.RedisClusterError: on error and invalid parameters. """ if master_datacenter == datacenter: raise RedisClusterError( f"Master datacenter must be different from the current datacenter, got {datacenter}" ) for shard, instance in sorted(self._shards[datacenter].items()): self._start_instance_replica(instance, self._shards[master_datacenter][shard])
[docs] def stop_replica(self, datacenter: str) -> None: """Stop the cluster replica in a datacenter. Arguments: datacenter: the datacenter on which to stop the replica. Raises: spicerack.redis.RedisClusterError: on error. """ for instance in self._shards[datacenter].values(): self._stop_instance_replica(instance)
def _start_instance_replica(self, instance: "RedisInstance", master: "RedisInstance") -> None: """Start the replica in a specific instance from a master instance. Arguments: instance: the instance where to start the replica. master: the master instance to replicate from. Raises: spicerack.redis.RedisClusterError: if unable to verify the replica has started. """ if instance.master_info == master.info: logger.debug("Replica already configured on %s", instance) return if self._dry_run: logger.debug("Skip starting replica on %s in dry-run mode", instance) else: logger.debug("Starting replica %s => %s", master, instance) instance.start_replica(master) if not self._dry_run and instance.master_info != master.info: raise RedisClusterError(f"Replica on {instance} is not correctly configured: {instance.master_info}") def _stop_instance_replica(self, instance: "RedisInstance") -> None: """Stop the replica in a specific instance. Arguments: instance: the instance where to stop the replica. Raises: spicerack.redis.RedisClusterError: on error. """ if instance.is_master: logger.debug("Instance %s is already master, doing nothing", instance) return if self._dry_run: logger.debug("Skip stopping replica on %s in dry-run mode", instance) else: logger.debug("Stopping replica on %s", instance) instance.stop_replica() if not self._dry_run and not instance.is_master: raise RedisClusterError(f"Instance {instance} is still a slave of {instance.master_info}, aborting")
[docs] class RedisInstance: """Class to manage a Redis instance, a simple wrapper around `redis.StrictRedis`.""" def __init__(self, **kwargs: Any) -> None: """Initialize the instance. Arguments: **kwargs: arbitrary keyword arguments, to be passed to the `redis.StrictRedis` constructor. """ self.host: str = kwargs.get("host", "") self.port: int = kwargs.get("port", 0) self._client = StrictRedis(**kwargs) @property def is_master(self) -> bool: """Returns :py:data:`True` if the current instance is a master, :py:data:`False` otherwise.""" return self._client.info("replication")["role"] == "master" @property def master_info(self) -> Union[tuple[None, None], tuple[str, int]]: """Getter to know the master of this instance. Returns: A 2-element tuple with (host/IP, port) of the master instance. If there is no master configured (:py:data:`None`, :py:data:`None`) is returned. """ data = self._client.info("replication") try: return data["master_host"], data["master_port"] except (KeyError, TypeError): return (None, None) @property def info(self) -> tuple[str, int]: """Getter to know the detail of this instance. Returns: A 2-element tuple with (host/IP, port) of the instance. """ return self.host, self.port
[docs] def stop_replica(self) -> None: """Stop the replica on the instance.""" self._client.slaveof()
[docs] def start_replica(self, master: "RedisInstance") -> None: """Start the replica from the given master instance. Arguments: master: the master instance. """ self._client.slaveof(master.host, master.port)
def __str__(self) -> str: """String representation of the instance. Returns: str: the host or IP and port of the instance. """ return f"{self.host}:{self.port}"