elasticsearch_cluster

ElasticsearchCluster module.

exception spicerack.elasticsearch_cluster.ElasticsearchClusterCheckError[source]

Bases: SpicerackCheckError

Custom Exception class for check errors of this module.

exception spicerack.elasticsearch_cluster.ElasticsearchClusterError[source]

Bases: SpicerackError

Custom Exception class for errors of this module.

class spicerack.elasticsearch_cluster.ElasticsearchCluster(elasticsearch: elasticsearch.client.Elasticsearch, remote: spicerack.remote.Remote, dry_run: bool = True) None[source]

Bases: object

Class to manage elasticsearch cluster.

Initialize ElasticsearchCluster.

Parameters:
  • elasticsearch (elasticsearch.client.Elasticsearch) -- elasticsearch instance.

  • remote (spicerack.remote.Remote) -- the Remote instance.

  • dry_run (bool, default: True) -- whether this is a DRY-RUN.

check_green() None[source]

Cluster health status.

Raises:
Return type:

None

check_yellow_w_no_moving_shards() None[source]

Cluster health status.

Raises:
Return type:

None

flush_markers(timeout: datetime.timedelta = datetime.timedelta(seconds=60)) None[source]

Flush markers unsynced.

Note

flush and flush_synced are called here because from experience, it results in fewer shards not syncing. This also makes the recovery faster.

Parameters:

timeout (datetime.timedelta, default: datetime.timedelta(seconds=60)) -- elasticsearch request timeout.

Return type:

None

force_allocation_of_all_unassigned_shards() None[source]

Manual allocation of unassigned shards.

Return type:

None

frozen_writes(reason: spicerack.administrative.Reason) collections.abc.Iterator[None][source]

Stop writes to all elasticsearch indices and enable them on exit.

Parameters:

reason (spicerack.administrative.Reason) -- the reason for freezing writes.

Return type:

collections.abc.Iterator[None]

get_nodes() dict[source]

Get all Elasticsearch Nodes in the cluster.

Return type:

dict

is_node_in_cluster_nodes(node: str) bool[source]

Checks if node is in a list of elasticsearch cluster nodes.

Parameters:

node (str) -- the elasticsearch host.

Return type:

bool

Returns:

True if node is present and False if not.

reset_indices_to_read_write() None[source]

Reset all readonly indices to read/write.

In some cases (running low on disk space), indices are switched to readonly. This method will update all readonly indices to read/write.

Return type:

None

stopped_replication() collections.abc.Iterator[None][source]

Context manager to perform actions while the cluster replication is stopped.

Return type:

collections.abc.Iterator[None]

class spicerack.elasticsearch_cluster.ElasticsearchClusters(clusters: collections.abc.Sequence[ElasticsearchCluster], remote: spicerack.remote.Remote, prometheus: wmflib.prometheus.Prometheus, write_queue_datacenters: collections.abc.Sequence[str], dry_run: bool = True) None[source]

Bases: object

Class to manage elasticsearch clusters.

Initialize ElasticsearchClusters.

Parameters:
flush_markers(timeout: datetime.timedelta = datetime.timedelta(seconds=60)) None[source]

Flush markers on all clusters.

Parameters:

timeout (datetime.timedelta, default: datetime.timedelta(seconds=60)) -- timedelta object for elasticsearch request timeout.

Return type:

None

force_allocation_of_all_unassigned_shards() None[source]

Force allocation of unassigned shards on all clusters.

Return type:

None

frozen_writes(reason: spicerack.administrative.Reason) collections.abc.Iterator[list[None]][source]

Freeze all writes to the clusters and then perform operations before unfreezing writes.

Parameters:

reason (spicerack.administrative.Reason) -- Reason for freezing writes.

Yields:

list -- a side-effect list of None, as a result of the stack of context managers.

Return type:

collections.abc.Iterator[list[None]]

get_next_clusters_nodes(started_before: datetime.datetime, size: int = 1) spicerack.elasticsearch_cluster.ElasticsearchHosts | None[source]

Get next set of cluster nodes for cookbook operations like upgrade, rolling restart etc.

Nodes are selected from the row with the least restarted nodes. This ensures that a row is fully upgraded before moving to the next row. Since shards cannot move to a node with an older version of elasticsearch, this should help to keep all shards allocated at all times.

Master capable nodes are returned after all other nodes have restarted to support version upgrades which strongly suggest the masters are upgraded last.

Parameters:
  • started_before (datetime.datetime) -- the time against after which we check if the node has been restarted.

  • size (int, default: 1) -- size of nodes not restarted in a row.

Return type:

typing.Optional[spicerack.elasticsearch_cluster.ElasticsearchHosts]

Returns:

Next eligible nodes for ElasticsearchHosts or None when all nodes have been processed.

reset_indices_to_read_write() None[source]

Reset all readonly indices to read/write.

In some cases (running low on disk space), indices are switched to readonly. This method will update all readonly indices to read/write.

Return type:

None

stopped_replication() collections.abc.Iterator[list[None]][source]

Stops replication for all clusters.

Yields:

list -- a side-effect list of None, as a result of the stack of context managers.

Return type:

collections.abc.Iterator[list[None]]

wait_for_all_write_queues_empty() None[source]

Wait for all relevant CirrusSearch write queues to be empty.

Checks the Prometheus server in each of the wmflib.constants.CORE_DATACENTERS. At most waits for 60*60 seconds = 1 hour. Does not retry if prometheus returns empty results for all datacenters.

Return type:

None

wait_for_green(timeout: datetime.timedelta = datetime.timedelta(seconds=3600)) None[source]

Wait for green on all clusters.

Parameters:

timeout (datetime.timedelta, default: datetime.timedelta(seconds=3600)) -- timedelta object to represent how long to wait for green status on all clusters.

Return type:

None

wait_for_yellow_w_no_moving_shards(timeout: datetime.timedelta = datetime.timedelta(seconds=3600)) None[source]

Wait for a yellow cluster status with no relocating or initializing shards.

Parameters:

timeout (datetime.timedelta, default: datetime.timedelta(seconds=3600)) -- timedelta object to represent how long to wait for no yellow status with no initializing or relocating shards on all clusters.

Return type:

None

class spicerack.elasticsearch_cluster.ElasticsearchHosts(remote_hosts: spicerack.remote.RemoteHosts, nodes: collections.abc.Sequence[NodesGroup], dry_run: bool = True) None[source]

Bases: RemoteHostsAdapter

Remotehosts Adapter for managing elasticsearch nodes.

After calling the super's constructor, initialize other instance variables.

Parameters:
depool_nodes() None[source]

Depool the hosts.

Return type:

None

pool_nodes() None[source]

Pool the hosts.

Return type:

None

restart_elasticsearch() None[source]

Restarts all elasticsearch instances.

Return type:

None

start_elasticsearch() None[source]

Starts all elasticsearch instances.

Return type:

None

stop_elasticsearch() None[source]

Stops all elasticsearch instances.

Return type:

None

wait_for_elasticsearch_up(timeout: datetime.timedelta = datetime.timedelta(seconds=900)) None[source]

Check if elasticsearch instances on each node are up.

Parameters:

timeout (datetime.timedelta, default: datetime.timedelta(seconds=900)) -- represent how long to wait for all instances to be up.

Return type:

None

class spicerack.elasticsearch_cluster.NodesGroup(json_node: dict, cluster: spicerack.elasticsearch_cluster.ElasticsearchCluster) None[source]

Bases: object

Internal class, used for parsing responses from the elasticsearch node API.

Since the same server can host multiple elasticsearch instances, this class can consolidate those multiple instances in a single object.

Instantiate a new node.

Parameters:
accumulate(json_node: dict, cluster: spicerack.elasticsearch_cluster.ElasticsearchCluster) None[source]

Accumulate information from other elasticsearch instances running on the same server.

Parameters:
Return type:

None

check_all_nodes_up() None[source]

Check that all the nodes on this hosts are up and have joined their respective clusters.

Raises:

spicerack.elasticsearch_cluster.ElasticsearchClusterCheckError -- if not all nodes have joined.

Return type:

None

restarted_since(since: datetime.datetime) bool[source]

Check if node has been restarted.

Parameters:

since (datetime.datetime) -- the time against after which we check if the node has been restarted.

Return type:

bool

Returns:

True if the node has been restarted after since, False otherwise.

property clusters_instances: Sequence[ElasticsearchCluster]

Get the cluster instances running on this node group.

property fqdn: str

Get the Fully Qualified Domain Name.

property master_capable: set[str]

Get the set of clusters this node is master capable on.

property row: str

Get the datacenter row.

spicerack.elasticsearch_cluster.create_elasticsearch_clusters(configuration: dict[str, dict[str, dict[str, str]]], clustergroup: str, write_queue_datacenters: collections.abc.Sequence[str], remote: spicerack.remote.Remote, prometheus: wmflib.prometheus.Prometheus, dry_run: bool = True) spicerack.elasticsearch_cluster.ElasticsearchClusters[source]

Get an ElasticsearchClusters instance.

Parameters:
Raises:

spicerack.elasticsearch_cluster.ElasticsearchClusterError -- Thrown when the requested cluster configuration is not found.

Return type:

spicerack.elasticsearch_cluster.ElasticsearchClusters