# -*- coding: utf-8 -*-
"""
scap.checks
~~~~~~~~~~~
Deployment checks.
Definitions are typically loaded from YAML of the following format:
checks:
some_unique_check_name:
type: command
command: /usr/local/bin/my_special_check
stage: promote
some_other_check_name:
type: nrpe
command: some_parsed_nrpe_command_name
stage: promote
Copyright © 2014-2017 Wikimedia Foundation and Contributors.
This file is part of Scap.
Scap is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, version 3.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import collections
import os
import select
import shlex
import subprocess
import time
import scap.utils as utils
_TYPES = {}
[docs]class CheckInvalid(AssertionError):
pass
[docs]def checktype(type_of_check):
"""
Class decorator for registering a new check type.
:param type_of_check: type name
"""
def decorator(klass):
register_type(type_of_check, klass)
return klass
return decorator
[docs]def execute(checks, logger, concurrency=1):
"""
Execute the given checks in parallel.
:param checks: iterable of `checks.Check` objects, or a single `checks.Check` object.
:param logger: `logging.Logger` to send messages to
:param concurrency: level of concurrency
:returns: tuple of the aggregate check success and list of executed checks
:rtype: (bool, list)
"""
if isinstance(checks, Check):
checks = [checks]
epoll = select.epoll()
todo = checks[::-1]
doing = {}
done = []
success = 0
def handle_failure(job, msg, kill=True):
logger.warning(msg)
handle_done(job)
if kill:
try:
job.kill()
except OSError:
pass
def handle_done(job):
if job.fd in doing:
del doing[job.fd]
try:
while todo or doing:
# Schedule new jobs up to the concurrency level
while todo and len(doing) < concurrency:
check = todo.pop()
logger.info("Executing check '{}'".format(check.name))
job = check.run()
doing[job.fd] = job
# Note: we do not call epoll.unregister() since the call to
# Proc.communicate() in CheckJob.wait() closes the file
# descriptor.
epoll.register(job.fd, select.EPOLLIN)
# Poll for stdout events
for fd, event in epoll.poll(0.01):
job = doing[fd]
# Handle job completion
if job.poll() is not None:
job.wait()
if job.isfailure():
msg = "Check '{}' failed: {}"
msg = msg.format(job.check.name, job.output)
handle_failure(job, msg, kill=False)
else:
msg = "Check '{}' completed, output: {}".format(
job.check.name, job.output
)
logger.debug(msg)
handle_done(job)
success += 1
done.append(job)
# Enforce timeout on running jobs
for job in list(doing.values()):
if job.timedout():
msg = "Check '{}' exceeded {}s timeout"
msg = msg.format(job.check.name, job.check.timeout)
handle_failure(job, msg)
finally:
for job in doing.values():
msg = "Error running check '{}'".format(job.check.name)
handle_failure(job, msg)
epoll.close()
return (len(done) == len(checks) == success, done)
[docs]def load(cfg, environment=None):
"""
Load checks from the given config dict.
:param cfg: config dict
:param environment: environment in which to execute checks
"""
checks = collections.OrderedDict()
if cfg and cfg.get("checks", None):
for name, options in cfg["checks"].items():
check_type = options.get("type", "command")
if not options:
# Discard blank checks.
continue
if check_type not in _TYPES:
msg = "unknown check type '{}'".format(check_type)
raise CheckInvalid(msg)
checks[name] = _TYPES[check_type](
name=name, environment=environment, **options
)
return checks
[docs]def register_type(check_type, factory):
"""
Register a new check type and factory.
:param check_type: type name
:param factory: callable type factory
"""
_TYPES[check_type] = factory
[docs]@checktype("command")
class Check(object):
"""
Represent a loaded 'command' check.
:param name: check name
:param stage: (deprecated: use "after", ignored when "after" is set)
stage after which to run the check
:param before: stage before which to run the check
:param after: stage after which to run the check
:param environment: environment in which to run checks
:param group: deploy group for which to run the check
:param timeout: maximum time allowed for check execution, in seconds
:param command: check command to run
:param shell: If True, a shell is used to execute "command".
"""
[docs] def __init__(
self,
name,
stage=None,
before=None,
after=None,
environment=None,
group=None,
timeout=30.0,
command="",
shell=False,
**opts
):
self.name = name
self.environment = environment
self.before = before
self.after = after or stage
self.group = group
self.timeout = timeout
self.command = command
self.shell = shell
self.options = opts
if self.environment is None:
self.environment = os.environ.copy()
# Avoid TypeError exceptions later on in Popen -> fsencode by removing
# None values from the environment
self.environment = {k: v for k, v in self.environment.items() if v is not None}
@property
def stage(self):
return self.after or self.before
[docs] def run(self):
"""Return a running :class:`CheckJob`."""
return CheckJob(self)
[docs] def validate(self):
"""Validate check properties."""
if not self.command:
raise CheckInvalid("missing 'command'")
[docs] def __repr__(self):
stages = ""
if self.stage:
stages += "stage: %s" % self.stage
if self.before:
stages += "before: %s" % self.before
if self.after:
stages += "after: %s" % self.after
return "<Check %s %s>" % (self.name, stages)
[docs]class CheckJob(object):
"""
Represent and control a running check.
A :class:`CheckJob` begins execution immediately and should be controlled
within some kind of poll loop, typically `checks.execute`.
"""
[docs] def __init__(self, check):
"""Inititalizes a new CheckJob and begins execution."""
self.check = check
cmd = check.command if check.shell else shlex.split(check.command)
self.proc = subprocess.Popen(
cmd,
env=check.environment,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=check.shell,
)
self.fd = self.proc.stdout.fileno()
self.stream = self.proc.stdout
self.output = ""
self.started = time.time()
self.ended = None
[docs] def duration(self):
"""Return the current or final job duration."""
ended = time.time() if self.ended is None else self.ended
return ended - self.started
[docs] def isfailure(self):
"""Return whether this check failed."""
return self.proc.returncode != 0
[docs] def kill(self):
"""Kill the executing process."""
self.proc.kill()
self.proc.communicate()
[docs] def poll(self):
"""
Read output and polls the process for exit status.
If the process has exited, an (approximate) end time is recorded. This
method is non-blocking typically called within an event loop like
`checks.execute`.
"""
self.output += utils.eintr_retry(os.read, self.fd, 1048576).decode("utf-8")
result = utils.eintr_retry(self.proc.poll)
if result is not None:
self.ended = time.time()
return result
[docs] def timedout(self):
"""Whether the job duration has exceeded the job timeout."""
return self.duration() > self.check.timeout
[docs] def wait(self):
"""
Block for the last stdout/stderr read of the check process.
To ensure the least amount of blocking, this method should only be
called within an event loop once `poll` has signaled that the process
has exited.
"""
# Note: communicate() closes the file descriptor after reading from it.
# Closed file descriptors are automatically removed from the epoll set
# by the kernel.
for output in self.proc.communicate():
if output is not None:
self.output += output.decode("utf-8")