ClusterShell

Transport ClusterShell: worker and event handlers.

class cumin.transports.clustershell.AsyncEventHandler(target, commands, success_threshold=1.0, progress_bars=True, **kwargs)[source]

Bases: cumin.transports.clustershell.BaseEventHandler

Custom ClusterShell event handler class that execute commands asynchronously.

The implemented logic is:

  • execute on all nodes independently every command in a sequence, aborting the execution on that node if any command fails.

  • The success ratio is checked at each node completion (either because it completed all commands or aborted earlier), however nodes already scheduled for execution with ClusterShell will execute the commands anyway. The use of the batch_size allows to control this aspect.

  • if the success ratio is met, schedule the execution of all commands to the next node.

The typical use case is to execute read-only commands to gather the status of a fleet without any special need of orchestration between the nodes.

__init__(target, commands, success_threshold=1.0, progress_bars=True, **kwargs)[source]

Define a custom ClusterShell event handler to execute commands asynchronously between nodes.

Parameters

according to parent BaseEventHandler.__init__().

_commands_output_report(buffer_iterator, command=None)

static inherited Print the commands output in a colored and tqdm-friendly way.

Parameters
  • buffer_iterator (mixed) -- any ClusterShell object that implements iter_buffers() like ClusterShell.Task.Task and all the Worker objects.

  • command (str, optional) -- the command the output is referring to.

_ev_routing(worker, arg)

static inherited Routing event (private). Called to indicate that a (meta)worker has just updated one of its route path. You can safely ignore this event.

_failed_commands_report(filter_command_index=-1)

static inherited Print the nodes that failed to execute commands in a colored and tqdm-friendly way.

Parameters

filter_command_index (int, optional) -- print only the nodes that failed to execute the command specified by this command index.

_get_log_message(num, message, nodes=None)

static inherited Get a pre-formatted message suitable for logging or printing.

Parameters
  • num (int) -- the number of affecte nodes.

  • message (str) -- the message to print.

  • nodes (list, optional) -- the list of nodes affected.

Returns

a tuple of (logging message, NodeSet of the affected nodes).

Return type

tuple

_get_short_command(command)

static inherited Return a shortened representation of a command omitting the central part, if it's too long.

Parameters

command (str) -- the command to be shortened.

Returns

the short command.

Return type

str

_global_timeout_nodes_report()

static inherited Print the nodes that were caught by the global timeout in a colored and tqdm-friendly way.

_print_report_line(message, color_func=<function ColoredType.__getattr__.<locals>.<lambda>>, nodes_string='')

static inherited Print a tqdm-friendly colored status line with success/failure ratio and optional list of nodes.

Parameters
  • message (str) -- the message to print.

  • color_func (function, optional) -- the coloring function, one of :py:class`cumin.color.Colored` methods.

  • nodes_string (str, optional) -- the string representation of the affected nodes.

_success_nodes_report(command=None)

static inherited Print how many nodes succesfully executed all commands in a colored and tqdm-friendly way.

Parameters

command (str, optional) -- the command the report is referring to.

close(task)[source]

Concrete implementation of parent abstract method to print the nodes reports and close progress bars.

Parameters

according to parent cumin.transports.BaseEventHandler.close().

ev_close(worker, timedout)

static inherited Worker has finished or timed out.

This callback is triggered by ClusterShell when the execution has completed or timed out.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_close().

ev_hup(worker, node, rc)[source]

Command execution completed on a node.

This callback is triggered by ClusterShell for each node when it completes the execution of a command. Enqueue the next command if the success criteria are met, track the failure otherwise. Update the progress bars accordingly.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_hup().

ev_msg(port, msg)

static inherited Called to indicate that a message has been received on an EnginePort.

Used to deliver messages reliably between tasks.

Parameters
  • port -- EnginePort object on which a message has been received

  • msg -- the message object received

ev_pickup(worker, node)

static inherited Command execution started on a node, remove the command from the node's queue.

This callback is triggered by the ClusterShell library for each node when it starts executing a command.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_pickup().

ev_read(worker, node, _, msg)

static inherited Worker has data to read from a specific node. Print it if running on a single host.

This callback is triggered by ClusterShell for each node when output is available.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_read().

ev_start(worker)

static inherited Called to indicate that a worker has just started.

Parameters

worker -- Worker derived object

ev_timer(timer)[source]

Schedule the current command on the next node or the next command on the first batch of nodes.

This callback is triggered by ClusterShell when a scheduled Task.timer() goes off.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_timer().

ev_written(worker, node, sname, size)

static inherited Called to indicate that some writing has been done by the worker to a node on a given stream. This event is only generated when write() is previously called on the worker.

This handler may be called very often depending on the number of target nodes, the amount of data to write and the block size used by the worker.

New in version 1.7.

Parameters
  • worker -- Worker derived object

  • node -- node (or) key

  • sname -- stream name

  • size -- amount of bytes that has just been written to node/stream associated with this event

on_timeout(task)

static inherited Update the state of the nodes and the timeout counter.

Callback called by the ClusterShellWorker when a ClusterShell.Task.TimeoutError is raised. It means that the whole execution timed out.

Parameters

task (ClusterShell.Task.Task) -- a ClusterShell Task instance.

class cumin.transports.clustershell.BaseEventHandler(target, commands, success_threshold=1.0, progress_bars=True, **kwargs)[source]

Bases: ClusterShell.Event.EventHandler

ClusterShell event handler base class.

Inherit from ClusterShell.Event.EventHandler class and define a base EventHandler class to be used in Cumin. It can be subclassed to generate custom EventHandler classes while taking advantage of some common functionalities.

__init__(target, commands, success_threshold=1.0, progress_bars=True, **kwargs)[source]

Event handler ClusterShell extension constructor.

Parameters
  • target (cumin.transports.Target) -- a Target instance.

  • commands (list) -- the list of Command objects that has to be executed on the nodes.

  • success_threshold (float, optional) -- the success threshold, a float between 0 and 1, to consider the execution successful.

  • progress_bars (bool) -- should progress bars be displayed

  • **kwargs (optional) -- additional keyword arguments that might be used by derived classes.

_commands_output_report(buffer_iterator, command=None)[source]

Print the commands output in a colored and tqdm-friendly way.

Parameters
  • buffer_iterator (mixed) -- any ClusterShell object that implements iter_buffers() like ClusterShell.Task.Task and all the Worker objects.

  • command (str, optional) -- the command the output is referring to.

_ev_routing(worker, arg)

static inherited Routing event (private). Called to indicate that a (meta)worker has just updated one of its route path. You can safely ignore this event.

_failed_commands_report(filter_command_index=-1)[source]

Print the nodes that failed to execute commands in a colored and tqdm-friendly way.

Parameters

filter_command_index (int, optional) -- print only the nodes that failed to execute the command specified by this command index.

_get_log_message(num, message, nodes=None)[source]

Get a pre-formatted message suitable for logging or printing.

Parameters
  • num (int) -- the number of affecte nodes.

  • message (str) -- the message to print.

  • nodes (list, optional) -- the list of nodes affected.

Returns

a tuple of (logging message, NodeSet of the affected nodes).

Return type

tuple

_get_short_command(command)[source]

Return a shortened representation of a command omitting the central part, if it's too long.

Parameters

command (str) -- the command to be shortened.

Returns

the short command.

Return type

str

_global_timeout_nodes_report()[source]

Print the nodes that were caught by the global timeout in a colored and tqdm-friendly way.

_print_report_line(message, color_func=<function ColoredType.__getattr__.<locals>.<lambda>>, nodes_string='')[source]

Print a tqdm-friendly colored status line with success/failure ratio and optional list of nodes.

Parameters
  • message (str) -- the message to print.

  • color_func (function, optional) -- the coloring function, one of :py:class`cumin.color.Colored` methods.

  • nodes_string (str, optional) -- the string representation of the affected nodes.

_success_nodes_report(command=None)[source]

Print how many nodes succesfully executed all commands in a colored and tqdm-friendly way.

Parameters

command (str, optional) -- the command the report is referring to.

close(task)[source]

Additional method called at the end of the whole execution, useful for reporting and final actions.

Parameters

task (ClusterShell.Task.Task) -- a ClusterShell Task instance.

ev_close(worker, timedout)[source]

Worker has finished or timed out.

This callback is triggered by ClusterShell when the execution has completed or timed out.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_close().

ev_hup(worker, node, rc)

static inherited Called for each node to indicate that a worker command for a specific node has just finished.

Warning

The signature of EventHandler.ev_hup() changed in ClusterShell 1.8, please update your EventHandler derived classes to add the node and rc arguments.

Parameters
  • worker -- Worker derived object

  • node -- node (or key)

  • rc -- command return code (or None if the worker doesn't support command return codes)

ev_msg(port, msg)

static inherited Called to indicate that a message has been received on an EnginePort.

Used to deliver messages reliably between tasks.

Parameters
  • port -- EnginePort object on which a message has been received

  • msg -- the message object received

ev_pickup(worker, node)[source]

Command execution started on a node, remove the command from the node's queue.

This callback is triggered by the ClusterShell library for each node when it starts executing a command.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_pickup().

ev_read(worker, node, _, msg)[source]

Worker has data to read from a specific node. Print it if running on a single host.

This callback is triggered by ClusterShell for each node when output is available.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_read().

ev_start(worker)

static inherited Called to indicate that a worker has just started.

Parameters

worker -- Worker derived object

ev_timer(timer)

static inherited Called to indicate that a timer is firing.

Parameters

timer -- EngineTimer object that is firing

ev_written(worker, node, sname, size)

static inherited Called to indicate that some writing has been done by the worker to a node on a given stream. This event is only generated when write() is previously called on the worker.

This handler may be called very often depending on the number of target nodes, the amount of data to write and the block size used by the worker.

New in version 1.7.

Parameters
  • worker -- Worker derived object

  • node -- node (or) key

  • sname -- stream name

  • size -- amount of bytes that has just been written to node/stream associated with this event

on_timeout(task)[source]

Update the state of the nodes and the timeout counter.

Callback called by the ClusterShellWorker when a ClusterShell.Task.TimeoutError is raised. It means that the whole execution timed out.

Parameters

task (ClusterShell.Task.Task) -- a ClusterShell Task instance.

short_command_length = 35

the length to which a command should be shortened in various outputs.

Type

int

class cumin.transports.clustershell.ClusterShellWorker(config, target)[source]

Bases: cumin.transports.BaseWorker

It provides a Cumin worker for SSH using the ClusterShell library.

This transport uses the ClusterShell Python library to connect to the selected hosts and execute a list of commands. This transport accept the following customizations:

  • sync execution mode: given a list of commands, the first one will be executed on all the hosts, then, if the success ratio is reached, the second one will be executed on all hosts where the first one was successful, and so on.

  • async execution mode: given a list of commands, on each hosts the commands will be executed sequentially, interrupting the execution on any single host at the first command that fails. The execution on the hosts is independent between each other.

  • custom execution mode: can be achieved creating a custom event handler class that extends the BaseEventHandler class defined in cumin/transports/clustershell.py, implementing its abstract methods and setting to this class object the handler to the transport.

__init__(config, target)[source]

Worker ClusterShell constructor.

Parameters

according to parent cumin.transports.BaseWorker.__init__().

property commands

Commands for the current execution.

Getter

Returns the current command list or an empty list if not set.

Setter

list[Command], list[str]: a list of Command objects or str to be executed in the hosts. The elements are converted to Command automatically.

Raises

cumin.transports.WorkerError -- if trying to set it with invalid data.

execute()[source]

Execute the commands on all the targets using the handler.

Concrete implementation of parent abstract method.

Parameters

according to parent cumin.transports.BaseWorker.execute().

get_results()[source]

Get the results of the last task execution.

Concrete implementation of parent abstract method.

Parameters

according to parent cumin.transports.BaseWorker.get_results().

property handler

Concrete implementation of parent abstract getter and setter.

Accepted values for the setter: * an instance of a custom handler class derived from BaseEventHandler. * a str with one of the available default handler listed in DEFAULT_HANDLERS.

The event handler is mandatory for this transport.

Parameters

according to parent cumin.transports.BaseWorker.handler.

property success_threshold

Success threshold for the current execution.

Getter

float: returns the current success_threshold or 1.0 (100%) if not set.

Setter

float, None: The success ratio threshold that must be reached to consider the run successful. A float between 0 and 1 or None to reset it. The specific meaning might change based on the chosen transport.

Raises

cumin.transports.WorkerError -- if trying to set it to an invalid value.

property timeout

Global timeout for the current execution.

Getter

int: returns the current timeout or 0 (no timeout) if not set.

Setter

int, None: timeout for the current execution in seconds. Must be a positive integer or None to reset it.

Raises

cumin.transports.WorkerError -- if trying to set it to an invalid value.

cumin.transports.clustershell.DEFAULT_HANDLERS = {'async': <class 'cumin.transports.clustershell.AsyncEventHandler'>, 'sync': <class 'cumin.transports.clustershell.SyncEventHandler'>}

mapping of available default event handlers for ClusterShellWorker.

Type

dict

class cumin.transports.clustershell.Node(name, commands)[source]

Bases: object

Node class to represent each target node.

__init__(name, commands)[source]

Node class constructor with default values.

Parameters
class cumin.transports.clustershell.SyncEventHandler(target, commands, success_threshold=1.0, progress_bars=True, **kwargs)[source]

Bases: cumin.transports.clustershell.BaseEventHandler

Custom ClusterShell event handler class that execute commands synchronously.

The implemented logic is:

  • execute command #N on all nodes where command #`N-1` was successful according to batch_size.

  • the success ratio is checked at each command completion on every node, and will abort if not met, however nodes already scheduled for execution with ClusterShell will execute the command anyway. The use of the batch_size allow to control this aspect.

  • if the execution of command #N is completed and the success ratio is greater than the success threshold, re-start from the top with N=N+1.

The typical use case is to orchestrate some operation across a fleet, ensuring that each command is completed by enough nodes before proceeding with the next one.

__init__(target, commands, success_threshold=1.0, progress_bars=True, **kwargs)[source]

Define a custom ClusterShell event handler to execute commands synchronously.

Parameters

according to parent BaseEventHandler.__init__().

_commands_output_report(buffer_iterator, command=None)

static inherited Print the commands output in a colored and tqdm-friendly way.

Parameters
  • buffer_iterator (mixed) -- any ClusterShell object that implements iter_buffers() like ClusterShell.Task.Task and all the Worker objects.

  • command (str, optional) -- the command the output is referring to.

_ev_routing(worker, arg)

static inherited Routing event (private). Called to indicate that a (meta)worker has just updated one of its route path. You can safely ignore this event.

_failed_commands_report(filter_command_index=-1)

static inherited Print the nodes that failed to execute commands in a colored and tqdm-friendly way.

Parameters

filter_command_index (int, optional) -- print only the nodes that failed to execute the command specified by this command index.

_get_log_message(num, message, nodes=None)

static inherited Get a pre-formatted message suitable for logging or printing.

Parameters
  • num (int) -- the number of affecte nodes.

  • message (str) -- the message to print.

  • nodes (list, optional) -- the list of nodes affected.

Returns

a tuple of (logging message, NodeSet of the affected nodes).

Return type

tuple

_get_short_command(command)

static inherited Return a shortened representation of a command omitting the central part, if it's too long.

Parameters

command (str) -- the command to be shortened.

Returns

the short command.

Return type

str

_global_timeout_nodes_report()

static inherited Print the nodes that were caught by the global timeout in a colored and tqdm-friendly way.

_print_report_line(message, color_func=<function ColoredType.__getattr__.<locals>.<lambda>>, nodes_string='')

static inherited Print a tqdm-friendly colored status line with success/failure ratio and optional list of nodes.

Parameters
  • message (str) -- the message to print.

  • color_func (function, optional) -- the coloring function, one of :py:class`cumin.color.Colored` methods.

  • nodes_string (str, optional) -- the string representation of the affected nodes.

_success_nodes_report(command=None)

static inherited Print how many nodes succesfully executed all commands in a colored and tqdm-friendly way.

Parameters

command (str, optional) -- the command the report is referring to.

close(task)[source]

Concrete implementation of parent abstract method to print the success nodes report.

Parameters

according to parent cumin.transports.BaseEventHandler.close().

end_command()[source]

Command terminated, print the result and schedule the next command if criteria are met.

Executed at the end of each command inside a lock.

Returns

True if the next command should be scheduled, False otherwise.

Return type

bool

ev_close(worker, timedout)

static inherited Worker has finished or timed out.

This callback is triggered by ClusterShell when the execution has completed or timed out.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_close().

ev_hup(worker, node, rc)[source]

Command execution completed on a node.

This callback is triggered by ClusterShell for each node when it completes the execution of a command. Update the progress bars and keep track of nodes based on the success/failure of the command's execution. Schedule a timer for further decisions.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_hup().

ev_msg(port, msg)

static inherited Called to indicate that a message has been received on an EnginePort.

Used to deliver messages reliably between tasks.

Parameters
  • port -- EnginePort object on which a message has been received

  • msg -- the message object received

ev_pickup(worker, node)

static inherited Command execution started on a node, remove the command from the node's queue.

This callback is triggered by the ClusterShell library for each node when it starts executing a command.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_pickup().

ev_read(worker, node, _, msg)

static inherited Worker has data to read from a specific node. Print it if running on a single host.

This callback is triggered by ClusterShell for each node when output is available.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_read().

ev_start(worker)

static inherited Called to indicate that a worker has just started.

Parameters

worker -- Worker derived object

ev_timer(timer)[source]

Schedule the current command on the next node or the next command on the first batch of nodes.

This callback is triggered by ClusterShell when a scheduled Task.timer() goes off.

Parameters

according to parent ClusterShell.Event.EventHandler.ev_timer().

ev_written(worker, node, sname, size)

static inherited Called to indicate that some writing has been done by the worker to a node on a given stream. This event is only generated when write() is previously called on the worker.

This handler may be called very often depending on the number of target nodes, the amount of data to write and the block size used by the worker.

New in version 1.7.

Parameters
  • worker -- Worker derived object

  • node -- node (or) key

  • sname -- stream name

  • size -- amount of bytes that has just been written to node/stream associated with this event

on_timeout(task)[source]

Override parent class on_timeout method to run end_command.

Parameters

according to parent BaseEventHandler.on_timeout().

start_command(schedule=False)[source]

Initialize progress bars and variables for this command execution.

Executed at the start of each command.

Parameters

schedule (bool, optional) -- whether the next command should be sent to ClusterShell for execution or not.

cumin.transports.clustershell.worker_class

Required by the transport auto-loader in cumin.transport.Transport.new().

alias of cumin.transports.clustershell.ClusterShellWorker