# -*- coding: utf-8 -*-
"""
scap.cli
~~~~~~~~
Classes and helpers for creating command line interfaces
Copyright © 2014-2017 Wikimedia Foundation and Contributors.
This file is part of Scap.
Scap is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, version 3.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from contextlib import contextmanager
import errno
import locale
import logging
import operator
import os
import re
import shlex
import socket
import subprocess
import sys
import time
from functools import reduce
import scap.version as version
import scap.ansi as ansi
import scap.arg as arg
import scap.config as config
import scap.interaction as interaction
import scap.lock as lock
import scap.log as log
import scap.plugins
import scap.targets as targets
import scap.utils as utils
from scap.ssh import SSH_WITH_KEY
PRIMARY_DEPLOY_SERVER_ONLY_COMMAND = "_primary_deploy_server_only"
REQUIRE_TTY_MULTIPLEXER_COMMAND = "_require_tty_multiplexer"
[docs]class Application(object):
"""Base class for creating command line applications."""
program_name = None
_logger = None
_announce_logger = None
_have_announced = False
_stats = None
arguments = None
config = None
# Dictionary representing configuration options that were
# specified using -D on the command line. Populated by _load_config().
cli_defines = {}
[docs] def __init__(self, exe_name):
if self.program_name is None:
self.program_name = os.path.basename(exe_name)
self.exe_name = exe_name
self.start = time.time()
# Property and related code should be removed once a long-term solution for
# https://phabricator.wikimedia.org/T304557 is implemented
self.user_ssh_auth_sock = (
os.environ["SSH_AUTH_SOCK"] if "SSH_AUTH_SOCK" in os.environ else None
)
try:
locale.setlocale(locale.LC_ALL, "")
except locale.Error:
pass
[docs] def get_logger(self):
"""Lazy getter for a logger instance."""
if self._logger is None:
self._logger = logging.getLogger(self.program_name)
return self._logger
def get_stats(self):
if self._stats is None:
self._stats = log.Stats(
self.config["statsd_host"], int(self.config["statsd_port"])
)
return self._stats
[docs] def get_lock_file(self):
"""Get the path to scap.lock"""
if self.config["lock_file"] is not None:
return self.config["lock_file"]
if "git_repo" in self.config:
return "/var/lock/scap.%s.lock" % (
self.config["git_repo"].replace("/", "_")
)
# `scap sync*` can run from anywhere on the file system and
# doesn't actually use the value of `git_repo`. In contrast,
# `scap deploy` will fail almost instantly without a `git_repo`
# set. If we're attempting to create a lock file, and there is
# no git_repo, then it's likely for a sync* command and the
# correct git_repo is operations/mediawiki-config.
stage_dir = self.config["stage_dir"]
return os.path.join(
"/var/lock", "scap." + stage_dir.strip("/").replace("/", "_") + ".lock"
)
@property
def verbose(self):
return self.arguments.loglevel < logging.INFO
[docs] def get_duration(self):
"""Get the elapsed duration in seconds."""
return time.time() - self.start
[docs] def get_script_path(self, remote=False):
"""Returns the path to the scap script."""
scap = os.path.join(os.path.dirname(sys.argv[0]), "scap")
# For development
if remote:
scap = os.environ.get("REMOTE_SCAP", scap)
return scap
[docs] def get_keyholder_key(self, ssh_user=None, key_name=None):
"""Get the private key for IdentityFile use in ssh."""
if ssh_user is None:
ssh_user = self.config["ssh_user"]
if key_name is None:
key_name = self.config.get("keyholder_key")
key_dir = "/etc/keyholder.d"
key_safe_name = re.sub(r"\W", "_", ssh_user)
if key_name is None:
key_name = key_safe_name
key_path = os.path.join(key_dir, key_name)
if os.path.exists(key_path):
self.get_logger().debug("Using key: %s", key_path)
return key_path
self.get_logger().debug("Unable to find keyholder key for %s", key_safe_name)
return None
[docs] def get_master_list(self, limit_hosts=None, exclude_hosts=None):
"""Get list of deploy master hostnames that should be updated before
the rest of the cluster."""
return targets.get(
"dsh_masters",
self.config,
limit_hosts=limit_hosts,
exclude_hosts=exclude_hosts,
).all
[docs] def announce(self, *args):
"""
Announce a message to broadcast listeners.
Emits a logging event to the 'scap.announce' logger which can be
configured to broadcast messages via irc or another real-time
notification channel.
Announcements can be disabled by using '--no-log-message' in the
command invocation (e.g. `scap sync-file --no-log-message foo.php`). In
this case the log event will still be emitted to the normal logger as
though `self.get_logger().info()` was used instead of
`self.announce()`.
"""
if self.arguments.no_log_message:
# Do not log to the announce logger, but do log the event for the
# console and other non-broadcast log collectors.
self.get_logger().info(*args)
else:
if self._announce_logger is None:
self._announce_logger = logging.getLogger("scap.announce")
self._announce_logger.info(*args)
self._have_announced = True
[docs] def announce_final(self, *args):
"""
Use this method to avoid announcing (to IRC, etc) that an operation
has finished if there was no prior announcement that an operation
had started. This is useful is situations where self.announce()
is called in an `except` or `finally` clause surrounding a larger
operation.
"""
if self._have_announced:
self.announce(*args)
else:
self.get_logger().info(*args)
[docs] def active_wikiversions(self, source_tree="deploy", return_type=list):
"""
Get an ordered list or dictionary of active MediaWiki versions.
:param source_tree: Source tree to read file from: 'deploy' or 'stage'
:param return_type: One of list or dict.
:returns: If return_type is list (the default), returns a list of
version strings (like "1.38.0-wmf.4") in ascending order.
If return_type is dict, returns a collections.OrderedDict of
{version:wikidb} values sorted by version number in ascending
order. 'wikidb' will be the alphabetically-first wikidb for 'version'.
This can be used by operations that need a db but don't care
which wiki's db is used.
"""
return utils.get_active_wikiversions(
self.config[source_tree + "_dir"], self.config["wmf_realm"], return_type
)
@property
def message_argument(self) -> str:
"""
The given `message` :func:`argument` if one was defined, otherwise it
returns "(no justification provided)".
"""
if hasattr(self, "arguments") and hasattr(self.arguments, "message"):
return self.arguments.message
return "(no justification provided)"
[docs] def prompt_for_approval_or_exit(self, prompt_message, exit_message):
"""Exits successfully with a message if the user does not approve."""
approval = interaction.prompt_user_for_confirmation(prompt_message)
if not approval:
self.announce_final(exit_message)
sys.exit(os.EX_OK)
[docs] def _process_arguments(self, args, extra_args):
"""
Validate and process command line arguments.
Default behavior is to abort the application with an error if any
unparsed arguments were found.
:returns: Tuple of (args, extra_args) after processing
"""
if extra_args:
self._argparser.error("extra arguments found: %s" % " ".join(extra_args))
if hasattr(args, "message"):
args.message = " ".join(args.message) or "(no justification provided)"
return args, extra_args
def setup(self, use_global_config=True):
# Let each application handle `extra_args`
self.arguments, self.extra_arguments = self._process_arguments(
self.arguments, self.extra_arguments
)
self._load_config(use_global_config=use_global_config)
self._setup_loggers()
self._setup_environ()
[docs] def _load_config(self, use_global_config=True):
"""Load configuration."""
self.cli_defines = {}
if self.arguments.defines:
res = []
for string in self.arguments.defines:
pair = string.split(":", 1)
if len(pair) != 2:
raise SystemExit(
"Invalid configuration setting: {}\nSettings must be in 'key:value' format".format(
string
)
)
res.append(pair)
self.cli_defines = dict(res)
self.config = config.load(
cfg_file=self.arguments.conf_file,
environment=self.arguments.environment,
overrides=self.cli_defines,
use_global_config=use_global_config,
)
# self.cli_defines remains available for other code (such as
# deploy.py) to use.
[docs] def _setup_loggers(self):
"""Setup logging."""
log.setup_loggers(self.config, self.arguments.loglevel)
[docs] def _setup_environ(self):
"""Setup shell environment."""
auth_sock = self.config.get("ssh_auth_sock")
php_version = self.config.get("php_version")
if php_version is not None:
os.environ["PHP"] = php_version
if auth_sock is not None and self.arguments.shared_authsock:
os.environ["SSH_AUTH_SOCK"] = auth_sock
[docs] def main(self, *extra_args):
"""
Main business logic of the application.
Parsed command line arguments are available in self.arguments. Global
configuration is available in self.config. Unparsed command line
arguments are passed as positional arguments.
:returns: exit status
"""
raise NotImplementedError()
[docs] def handle_keyboard_interrupt(self):
"""
Handle ctrl-c from interactive user.
:returns: exit status
"""
# Only announce the abort if any previous call to
# self.announce() has generated a broadcast announcement
# (T329228)
if self._announce_logger:
self.announce(
"{} aborted: {} (duration: {})".format(
self.program_name,
self.message_argument,
utils.human_duration(self.get_duration()),
)
)
return 130
[docs] def scap_check_call(self, scap_cmd: list):
"""
Call scap subcommand
:raises CalledProcessError: if the subcommand fails
"""
args = [self.get_script_path()] + scap_cmd + self.format_passthrough_args()
with utils.suppress_backtrace():
subprocess.run(args, check=True, env=self.get_gerrit_ssh_env())
[docs] def scap_check_output(self, scap_cmd: list) -> str:
"""
Call scap subcommand and return output as text
:returns: the output of `scap_cmd` as text
:raises CalledProcessError: if the subcommand fails
"""
args = [self.get_script_path()] + scap_cmd + self.format_passthrough_args()
with utils.suppress_backtrace():
return subprocess.run(
args,
check=True,
stdout=subprocess.PIPE,
text=True,
env=self.get_gerrit_ssh_env(),
).stdout.strip()
[docs] def get_gerrit_ssh_env(self) -> dict:
"""
An environment to interact with Gerrit over ssh
When `gerrit_push_user` scap configuration is set, set the
`GIT_SSH_COMMAND` environment variable to have git ssh to use that user
instead of the username of the person that invoked scap.
Else if a user-supplied ssh agent socket was provided, override the one
set by scap which does not have the invoking user key.
Eventually we will always require `gerrit_push_user` to be set.
See https://phabricator.wikimedia.org/T304557
:returns: environment suitable for sshing to Gerrit
"""
# Copy from the user environment
gerrit_env = os.environ.copy()
if self.config["gerrit_push_user"]:
key_file = self.get_keyholder_key(
ssh_user=self.config["gerrit_push_user"],
)
ssh_command = SSH_WITH_KEY(
user=self.config["gerrit_push_user"], key=key_file
)
# Do not redirect ssh stdin from /dev/null since git sends data
# this way.
ssh_command.remove("-n")
# With python 3.8 we can use shlex.join(ssh_command)
gerrit_env["GIT_SSH_COMMAND"] = " ".join(
shlex.quote(arg) for arg in ssh_command
)
elif self.user_ssh_auth_sock:
gerrit_env["SSH_AUTH_SOCK"] = self.user_ssh_auth_sock
return gerrit_env
[docs] def _handle_exception(self, ex):
"""
Handle unhandled exceptions and errors.
:returns: exit status
"""
logger = self.get_logger()
exception_type = type(ex).__name__
backtrace = True
# Retrieve scap version
scap_version = version.__version__
message = "%s failed: <%s> %s (scap version: %s)"
if isinstance(ex, lock.LockFailedError) or getattr(
ex, "_scap_no_backtrace", False
):
backtrace = False
if backtrace:
logger.warning("Unhandled error:", exc_info=True)
logger.error(message, self.program_name, exception_type, ex, scap_version)
return 70
[docs] def _before_exit(self, exit_status):
"""
Do any final cleanup or processing before the application exits.
Called after :meth:`main` and before `sys.exit` even when an exception
occurs.
:returns: exit status
"""
if utils.should_colorize_output():
try:
sys.stdout.write(ansi.reset())
sys.stdout.flush()
except Exception:
pass
return exit_status
[docs] def _run_as(self, user):
"""Ensure that this program is run as the given user."""
if utils.get_username() != user:
self.get_logger().info("%s must be run as user %s", self.program_name, user)
# Replace the current process with a sudo call to the same script
os.execvp("sudo", ["sudo", "-u", user, "-n", "--"] + sys.argv)
[docs] def _assert_current_user(self, user):
"""Assert that this program is run as the given user."""
if utils.get_username() != user:
raise RuntimeError("%s must be run as user %s" % (self.program_name, user))
[docs] def _assert_auth_sock(self):
"""Assert that SSH_AUTH_SOCK is present in the environment."""
if "SSH_AUTH_SOCK" not in os.environ:
with utils.suppress_backtrace():
raise RuntimeError(
"%s requires SSH agent forwarding" % self.program_name
)
[docs] def _check_user_auth_sock(self):
"""
Similar to `_assert_auth_sock` method above.
Returns immediately when "gerrit_push_user" has been configured, it is
assumed the identity is provided in the ssh authentication socket /
keyholder.
Unlike `_assert_auth_sock`, this method:
* Checks the original SSH_AUTH_SOCK provided in the environment
by the user, before scap potentially overrides it
* Is meant to be called during operations where a user-provided
ssh-agent is required (e.g. deploy-promote). It is not a global
requirement of scap
"""
if self.config["gerrit_push_user"]:
return
if not self.user_ssh_auth_sock:
utils.abort(
"You need to provide access to your own ssh-agent\n"
"try: eval $(ssh-agent) && ssh-add"
)
def get_current_train_info(self):
return utils.get_current_train_info(
self.config["train_blockers_url"], self.config["web_proxy"]
)
[docs] @contextmanager
def lock(self, lock_file=None, **kwargs):
"""
Acquire a lock for the main work of this application.
"""
if lock_file is None:
lock_file = self.get_lock_file()
if "name" not in kwargs:
kwargs["name"] = self.program_name
if "reason" not in kwargs:
kwargs["reason"] = self.message_argument
with lock.Lock(lock_file, **kwargs):
yield
[docs] @contextmanager
def lock_and_announce(self, **kwargs):
"""
Acquire a lock for the main work of this application and announce its
start and finish.
"""
with self.lock(**kwargs):
start = time.time()
self.announce(
"Started scap %s: %s", self.program_name, self.message_argument
)
yield
self.announce(
"Finished scap %s: %s (duration: %s)",
self.program_name,
self.message_argument,
utils.human_duration(time.time() - start),
)
# NOTE: The 'scap' directory is excluded by scap pull (aka
# scap.tasks.sync_common()), and excluded when building mediawiki
# container images
# xref https://gitlab.wikimedia.org/repos/releng/release/-/blob/00aa9bd253c358fc7b4fb88498e4f9c806212f57/make-container-image/build-images.py#L95
def scap_history_dbfile(self):
return os.path.join(self.config["stage_dir"], "scap/log/history.db")
def spiderpig_dbfile(self):
return os.path.join(self.config["stage_dir"], "scap/log/spiderpig.db")
def spiderpig_logdir(self):
return os.path.join(self.config["stage_dir"], "scap/log/jobs")
[docs] def timed(self, fn, *args, **kwargs):
"""
Call a function wrapped in a log.Timer.
"""
with log.Timer(fn.__name__, self.get_stats()):
fn(*args, **kwargs)
@staticmethod
def factory(argv=None):
parser = arg.build_parser()
args, extra_args = parser.parse_known_args(argv)
if not hasattr(args, "which"):
sys.exit("MUST provide subcommand, run with --help for a list")
app = args.which(args.command)
app._argparser = parser
app.arguments = args
app.extra_arguments = extra_args
return app
[docs] @classmethod
def run(cls):
"""
Construct and run an application.
Calls ``sys.exit`` with exit status returned by application by
default. Setting ``exit`` to ``False`` will instead return the class
instance and it's exit status. This would generally only be done when
testing.
:param cls: Class to create and run
:param argv: Command line arguments
:returns: Tuple of class instance and exit status when not exiting
"""
# Bootstrap the logging system
logging.basicConfig(
level=logging.INFO,
format=log.CONSOLE_LOG_FORMAT,
datefmt=log.TIMESTAMP_FORMAT,
stream=sys.stdout,
)
# Silence noisy loggers early
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("scap.sh").setLevel(logging.WARNING)
# Setup instance for logger access
app = cls("scap")
exit_status = 0
try:
app = Application.factory()
if os.geteuid() == 0:
raise OSError(errno.EPERM, "Scap should not be run as root")
app.setup()
if getattr(app.main, PRIMARY_DEPLOY_SERVER_ONLY_COMMAND, False):
hostname = socket.gethostname()
if not any(
hostname.startswith(prefix) for prefix in ["deploy", "releases"]
):
utils.abort(
"This scap command can only be used on a deploy server or the releases Jenkins instance."
)
if hasattr(app, "config") and app.config["block_deployments"]:
utils.abort(
"This scap command is disabled on this host. If you really need to run it, you can override by"
""" passing "-Dblock_deployments:False" to the call"""
)
if (
app.config.get("require_terminal_multiplexer")
and getattr(app.main, REQUIRE_TTY_MULTIPLEXER_COMMAND, False)
and interaction.have_terminal()
):
if os.environ.get("TMUX") is None and os.environ.get("STY") is None:
utils.abort(
"No tmux or screen process found. Try either:\n"
'\tTMUX: tmux new-session -s "🚂🌈"\n'
"\tSCREEN: screen -D -RR train"
)
if "subcommand" in app.arguments and app.arguments.subcommand:
method = app.arguments.subcommand
exit_status = method(app, app.extra_arguments)
else:
exit_status = app.main(app.extra_arguments)
except KeyboardInterrupt:
# Handle ctrl-c from interactive user
exit_status = app.handle_keyboard_interrupt()
except Exception as ex:
# Handle all unhandled exceptions and errors
exit_status = app._handle_exception(ex)
finally:
exit_status = app._before_exit(exit_status)
# Flush logger before exiting
logging.shutdown()
# Make a beep
if exit_status != 0:
sys.stdout.write("\a")
sys.stdout.flush()
# Exit
sys.exit(exit_status)
[docs]def argument(*args, **kwargs):
"""
argument(option_flags..[,action='store'][,nargs=1]\
[,const=None][,default][,type=str][,choices][,required=False][,help]\
[,dest])
Decorator used to declare a command line argument on an
:class:`Application`
Maps a command line argument to the decorated class or method.
:param str option_flags: One or more option flags associated with this
argument, e.g. '-a', '--arg'
:param action: the action associated with this argument. e.g. 'store_true'
:param default: The default value for this argument if not specified by the
user.
:param str help: Short description of this argument, displayed in
``--help`` text.
:param list choices: List possible values for this argument. If specified,
argument values will be validated for you and
``--help`` will list the possible choices for the user.
:param bool required: True if your argument is required.
:param type type: The type of value accepted by your argument, e.g. int or
file.
:type type: type
:param nargs: The number of values accepted by this argument.
:type nargs: int, str
"""
def wrapper(func):
arguments = getattr(func, arg.ATTR_ARGUMENTS, [])
arguments.append(dict(_flags=args, **kwargs))
setattr(func, arg.ATTR_ARGUMENTS, arguments)
return func
return wrapper
COMMAND_REGISTRY = {}
ALL_COMMANDS = None
[docs]def all_commands():
"""
return a list of all commands that have been registered with the
command() decorator, including those registered via plugins.
"""
global COMMAND_REGISTRY, ALL_COMMANDS
if ALL_COMMANDS:
return ALL_COMMANDS
# prevent plugins from overwriting built-in commands by first copying the
# COMMAND_REGISTRY and then verifying that none of the registered plugins
# write to any of the keys used by built-in commands.
builtin_commands = COMMAND_REGISTRY.copy()
COMMAND_REGISTRY.clear()
ALL_COMMANDS = builtin_commands.copy()
plugin_dir = os.path.dirname(scap.plugins.__file__)
# The process of loading plugins will add entries into COMMAND_REGISTRY,
# but only the first time scap.plugins.load_plugins is called.
scap.plugins.load_plugins(plugin_dir=plugin_dir)
# At this point COMMAND_REGISTRY will only have information
# for commands added by plugins.
for key in COMMAND_REGISTRY.keys():
if key in builtin_commands:
logger = logging.getLogger()
msg = "Plugin (%s) attempted to overwrite builtin command: %s" % (
COMMAND_REGISTRY[key],
key,
)
logger.warning(msg)
else:
ALL_COMMANDS[key] = COMMAND_REGISTRY[key]
COMMAND_REGISTRY = builtin_commands
return ALL_COMMANDS
[docs]def command(*args, **kwargs):
"""
command(command_name, help="help text",[subcommands=False])
Map a `scap` sub-command to the decorated class.
:param str command_name: The name of the sub-command
:param str help: A summary of your command to be displayed in `--help` text
:type help: str or None
:raises ValueError: if there is already a command named `command_name`
**Usage Example**::
import scap.cli
@cli.command('hello', help='prints "hello world" and exits')
class HelloCommand(cli.Application):
@cli.argument('--goodbye', action='store_true',
help='Say goodbye instead.')
def main(extra_args):
if self.arguments.goodbye:
print('Goodbye, cruel world.')
else:
print('Hello, world.')
"""
def wrapper(cls):
global COMMAND_REGISTRY
name = args[0]
if name in COMMAND_REGISTRY:
err = 'Duplicate: A command named "%s" already exists.' % name
raise ValueError(err)
has_subcommands = kwargs.pop("subcommands", False)
if has_subcommands:
setattr(cls, arg.ATTR_SUBPARSER, True)
cmd = dict(name=name, cls=cls, args=args, kwargs=kwargs)
COMMAND_REGISTRY[name] = cmd
primary_deploy_server_only = kwargs.get("primary_deploy_server_only", False)
setattr(
cls.main, PRIMARY_DEPLOY_SERVER_ONLY_COMMAND, primary_deploy_server_only
)
kwargs.pop("primary_deploy_server_only", None)
require_tty_multiplexer = kwargs.get("require_tty_multiplexer", False)
setattr(cls.main, REQUIRE_TTY_MULTIPLEXER_COMMAND, require_tty_multiplexer)
kwargs.pop("require_tty_multiplexer", None)
return cls
return wrapper
[docs]def subcommand(name=None):
"""
subcommand(command_name)
Define an argparse subcommand by decorating a method on your
cli.Application subclass.
Use this decorator when you want to have a subcommand on a method other
than 'main'
In order for this to have any affect, your cli.Application must be
decorated with subcommands=True (see example below).
**Usage Example**::
import scap.cli
@cli.command('hello', subcommands=True,
help='prints "hello world" and exits',)
class HelloCommand(cli.Application):
@cli.subcommand('world')
def world_subcommand(extra_args):
print('hello world')
"""
def wrapper(func):
subcommand_name = func.__name__ if name is None else name
setattr(func, arg.ATTR_SUBCOMMAND, subcommand_name)
setattr(func, arg.ATTR_ARGUMENTS, [])
return func
return wrapper