# -*- coding: utf-8 -*-
"""
scap.main
~~~~~~~~~~
Command wrappers for scap tasks
Copyright © 2014-2017 Wikimedia Foundation and Contributors.
This file is part of Scap.
Scap is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, version 3.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import argparse
import errno
import getpass
import locale
import os
import select
import socket
import sys
import time
from concurrent.futures import ThreadPoolExecutor
import scap.arg as arg
import scap.checks as checks
import scap.cli as cli
import scap.interaction as interaction
import scap.lint as lint
import scap.lock as lock
import scap.log as log
import scap.php_fpm as php_fpm
import scap.ssh as ssh
import scap.targets as targets
import scap.tasks as tasks
import scap.utils as utils
import scap.version as scapversion
from scap import ansi, history
from scap.kubernetes import K8sOps, TEST_SERVERS, CANARIES, PRODUCTION, STAGES
from scap.runcmd import mwscript
[docs]class AbstractSync(cli.Application):
"""Base class for applications that want to sync one or more files from
the deployment server to the rest of the cluster."""
soft_errors = False
[docs] def __init__(self, exe_name):
super().__init__(exe_name)
self.includes = []
self.exclude_wikiversions_php = False
self.om = None
self.logo = True
# A list of hostnames that have been processed by self._perform_sync
self.already_synced = []
self.already_restarted = set()
self.k8s_ops = None
[docs] @cli.argument(
"--force",
action="store_true",
help="Skip canary checks, " "performs ungraceful php-fpm restarts",
)
@cli.argument(
"--stop-before-sync",
action="store_true",
help="Perform all operations up to but not including rsyncing to any host",
)
@cli.argument(
"--pause-after-testserver-sync",
action="store_true",
help="Pause after syncing testservers and prompt the user to confirm to continue syncing",
)
@cli.argument(
"--notify-user",
action="append",
default=[],
help="User to notify on IRC after sync to testservers."
" Can be used multiple times",
)
@cli.argument(
"--k8s-only",
action="store_true",
help="Deploy/sync to Kubernetes targets only",
)
@cli.argument(
"--k8s-confirm-diffs",
action="store_true",
help="Display and require confirmation of helmfile diffs before proceeding.",
)
@cli.argument("message", nargs="*", help="Log message for SAL")
def main(self, *extra_args):
"""Perform a sync operation to the cluster."""
if self.logo:
print(ansi.logo(color=sys.stdout.isatty() or "FORCE_COLOR" in os.environ))
self._assert_auth_sock()
with lock.Lock(
self.get_lock_file(), name="sync", reason=self.arguments.message
):
self.k8s_ops = K8sOps(self)
# Begin by confirming helmfile diffs, if enabled. This avoids having
# to clean up after operations with side-effects if cancelled.
self._confirm_k8s_diffs()
self._compile_wikiversions()
self._before_cluster_sync()
self._update_caches()
if not self.arguments.force:
self.get_logger().info("Checking for new runtime errors locally")
self._check_fatals()
else:
self.get_logger().warning("check_fatals skipped by --force")
self.k8s_ops.build_k8s_images()
if self.arguments.stop_before_sync:
self.get_logger().info("Stopping before sync operations")
return 0
# Preload MW multiversion image into K8s cluster nodes
self.k8s_ops.pull_image_on_nodes()
# Sync masters regardless of the --k8s-only flag since some of the
# sync'd information affects k8s deployments.
self._sync_masters()
if self._k8s_only_sync():
self._deploy_k8s_testservers()
if self.arguments.force:
self.get_logger().warning("Test server checks skipped by --force")
else:
self.check_testservers(baremetal_testservers=[])
self._pause_after_testserver_sync()
self._deploy_k8s_canaries()
self._deploy_k8s_production()
else:
full_target_list = self._get_target_list()
# Deploy K8s test releases
self._deploy_k8s_testservers()
baremetal_testservers = utils.list_intersection(
self._get_testserver_list(), full_target_list
)
if len(baremetal_testservers) > 0:
with log.Timer("sync-testservers", self.get_stats()):
self.sync_targets(baremetal_testservers, "testservers")
if self.arguments.force:
self.get_logger().warning("Test server checks skipped by --force")
else:
self.check_testservers(baremetal_testservers)
self._pause_after_testserver_sync()
# Deploy K8s canary releases
self._deploy_k8s_canaries()
canaries = utils.list_intersection(
self._get_canary_list(), full_target_list
)
with log.Timer("sync-check-canaries", self.get_stats()) as timer:
if canaries:
self.sync_targets(canaries, "canaries")
timer.mark("Canaries Synced")
if self.arguments.force:
self.get_logger().warning("Canary checks skipped by --force")
else:
self.canary_checks()
# Deploy K8s production releases
self._deploy_k8s_production()
# Update proxies
proxies = utils.list_intersection(
self._get_proxy_list(), full_target_list
)
if len(proxies) > 0:
with log.Timer("sync-proxies", self.get_stats()):
sync_cmd = self._apache_sync_command()
sync_cmd.append(socket.getfqdn())
self._perform_sync("proxies", sync_cmd, proxies)
# Update apaches
with log.Timer("sync-apaches", self.get_stats()):
self._perform_sync(
"apaches",
self._apache_sync_command(proxies),
full_target_list,
shuffle=True,
)
history.update_latest(self.config["history_log"], synced=True)
# php-fpm restarts happen in here
self._after_cluster_sync()
self._after_lock_release()
if self.soft_errors:
return 1
return 0
[docs] def increment_stat(self, stat, all_stat=True, value=1):
"""Increment a stat in deploy.*
:param stat: String name of stat to increment
:param all_stat: Whether to increment deploy.all as well
:param value: How many to increment by, default of 1 is normal
"""
self.get_stats().increment("deploy.%s" % stat, value)
if all_stat:
self.get_stats().increment("deploy.all", value)
[docs] def get_keyholder_key(self):
"""
Returns scap2-specific deploy key
This way we can set a key in the default scap config without
having all non-scap2 repos inherit that configuration.
"""
key = self.config.get("mediawiki_keyholder_key", None)
if key:
return key
return super().get_keyholder_key()
def _before_cluster_sync(self):
pass
def _pause_after_testserver_sync(self):
# Not all subclasses of AbstractSync define the --pause-after-testserver-sync argument,
# so we can't assume it is in self.arguments.
if not getattr(self.arguments, "pause_after_testserver_sync", False):
return
users = " and ".join(
set([getpass.getuser()] + getattr(self.arguments, "notify_user", []))
)
message = (
"%s: %s synced to the testservers (https://wikitech.wikimedia.org/wiki/Mwdebug)"
% (users, self.arguments.message)
)
self.announce(message)
self.prompt_for_approval_or_exit(
"Changes synced to the testservers. (see https://wikitech.wikimedia.org/wiki/Mwdebug)\n"
"Please do any necessary checks before continuing.\n"
"Continue with sync?",
"Sync cancelled.",
)
self.announce(f"{users}: Continuing with sync")
def _k8s_only_sync(self):
# Not all subclasses of AbstractSync define the --k8s-only option
return getattr(self.arguments, "k8s_only", False)
def _confirm_k8s_diffs(self):
if not getattr(self.arguments, "k8s_confirm_diffs", False):
return
logger = self.get_logger()
logger.info("Collecting helmfile diffs for review")
for stage in STAGES:
diffs = self.k8s_ops.helmfile_diffs_for_stage(stage)
if not diffs:
logger.warn("No diffs for stage %s", stage)
continue
formatted_diffs = [
"=== {datacenter}/{namespace}-{release} ===\n{diff_stdout}".format(
**diff
)
for diff in sorted(
diffs,
key=lambda diff: (
diff["datacenter"],
diff["namespace"],
diff["release"],
),
)
]
logger.info("Diffs for stage %s:\n%s" % (stage, "\n".join(formatted_diffs)))
self.prompt_for_approval_or_exit(
"Note: Diffs are relative to the current helm charts and helmfile values. These may "
"become outdated if new changes are merged during sync.\n"
"Continue with sync?",
"Sync cancelled.",
)
def _deploy_k8s_testservers(self):
with log.Timer("sync-testservers-k8s", self.get_stats()):
with utils.suppress_backtrace():
self.k8s_ops.deploy_k8s_images_for_stage(TEST_SERVERS)
def _deploy_k8s_canaries(self):
with log.Timer("sync-canaries-k8s", self.get_stats()):
with utils.suppress_backtrace():
self.k8s_ops.deploy_k8s_images_for_stage(CANARIES)
def _deploy_k8s_production(self):
with log.Timer("sync-prod-k8s", self.get_stats()):
with utils.suppress_backtrace():
self.k8s_ops.deploy_k8s_images_for_stage(PRODUCTION)
def _update_caches(self):
self._git_repo()
# Compute git version information
with log.Timer("cache_git_info", self.get_stats()):
for version in self.active_wikiversions("stage"):
tasks.cache_git_info(version, self.config)
def _check_fatals(self):
logger = self.get_logger()
for version, wikidb in self.active_wikiversions(
"stage", return_type=dict
).items():
logger.debug("Testing {} with eval.php using {}".format(version, wikidb))
with utils.suppress_backtrace():
stderr = mwscript("eval.php", "--wiki", wikidb)
if stderr:
raise SystemExit(
"'mwscript eval.php --wiki {}' generated unexpected output: {}".format(
wikidb, stderr
)
)
[docs] def _get_proxy_list(self):
"""Get list of sync proxy hostnames that should be updated before the
rest of the cluster."""
return targets.get("dsh_proxies", self.config).all
[docs] def _get_target_list(self):
"""Get list of hostnames that should be updated from the proxies."""
return list(
set(self._get_proxy_list())
| set(targets.get("dsh_targets", self.config).all)
)
[docs] def _get_testserver_list(self):
"""Get list of MediaWiki testservers."""
return targets.get("dsh_testservers", self.config).all
[docs] def _get_api_canary_list(self):
"""Get list of MediaWiki api canaries."""
return targets.get("dsh_api_canaries", self.config).all
[docs] def _get_app_canary_list(self):
"""Get list of MediaWiki api canaries."""
return targets.get("dsh_app_canaries", self.config).all
[docs] def _get_canary_list(self):
"""Get list of MediaWiki canary hostnames."""
return list(set(self._get_api_canary_list()) | set(self._get_app_canary_list()))
[docs] def _sync_masters(self):
"""Sync the staging directory across all other deploy master servers."""
us = socket.getfqdn()
other_masters = [master for master in self.get_master_list() if master != us]
if len(other_masters) > 0:
self.master_only_cmd("sync-masters", self._master_sync_command())
[docs] def master_only_cmd(self, timer, cmd):
"""
Run a command on all other master servers than the one we're on
:param timer: String name to use in timer/logging
:param cmd: List of command/parameters to be executed
"""
masters = self.get_master_list()
with log.Timer(timer, self.get_stats()):
update_masters = ssh.Job(
masters, user=self.config["ssh_user"], key=self.get_keyholder_key()
)
update_masters.exclude_hosts([socket.getfqdn()])
update_masters.command(cmd)
update_masters.progress(log.reporter(timer))
succeeded, failed = update_masters.run()
if failed:
self.get_logger().warning("%d masters had sync errors", failed)
self.soft_errors = True
[docs] def _master_sync_command(self):
"""Synchronization command to run on the master hosts."""
cmd = [self.get_script_path(remote=True), "pull-master"]
if self.verbose:
cmd.append("--verbose")
cmd.append(socket.getfqdn())
lang = os.getenv("SCAP_MW_LANG")
if lang:
cmd = ["env", "SCAP_MW_LANG={}".format(lang)] + cmd
return cmd
[docs] def _base_scap_pull_command(self) -> list:
"""
Returns (as a list) the basic scap pull command to run on a remote
target. Note that no source servers are specified in the command
so scap pull will default to pull from whatever `master_rsync` is
defined to be in the scap configuration on the target.
"""
cmd = [self.get_script_path(remote=True), "pull"]
cmd.extend(["--no-php-restart", "--no-update-l10n"])
if self.exclude_wikiversions_php:
cmd.append("--exclude-wikiversions.php")
if self.verbose:
cmd.append("--verbose")
for include in self.includes:
cmd.extend(["--include", include])
return cmd
[docs] def _apache_sync_command(self, proxies: list = (), **kwargs) -> list:
"""
Returns (as a list) the scap pull command to run on mediawiki
installation targets. This is comprised of the base scap pull command
(defined by _base_scap_pull_command) followed by the list of deployment
masters and the list of proxies.
:param proxies: A list of proxy hostnames that can be pulled from in addition
to the deployment masters. Default is empty (as a tuple to prevent mutable
list warning)
:param kwargs: Any remaining keyword arguments are passed on to _base_scap_pull_command.
"""
return self._base_scap_pull_command(**kwargs) + utils.list_union(
self.get_master_list(), proxies
)
[docs] def _compile_wikiversions(self):
"""Compile wikiversions.json to wikiversions.php in stage_dir"""
tasks.compile_wikiversions("stage", self.config)
[docs] def _git_repo(self):
"""Flatten deploy directory into shared git repo."""
if self.config["scap3_mediawiki"]:
self.get_logger().info("Setting up deploy git directory")
cmd = '{} deploy-mediawiki -v "{}"'.format(
self.get_script_path(), self.arguments.message
)
utils.sudo_check_call("mwdeploy", cmd)
def _after_cluster_sync(self):
pass
def _after_lock_release(self):
pass
def _after_sync_rebuild_cdbs(self, target_hosts):
# Ask target hosts to rebuild l10n CDB files
with log.Timer("scap-cdb-rebuild", self.get_stats()):
rebuild_cdbs = ssh.Job(
target_hosts, user=self.config["ssh_user"], key=self.get_keyholder_key()
)
rebuild_cdbs.shuffle()
rebuild_cdbs.command(
"sudo -u mwdeploy -n -- %s cdb-rebuild"
% self.get_script_path(remote=True)
)
rebuild_cdbs.progress(log.reporter("scap-cdb-rebuild"))
succeeded, failed = rebuild_cdbs.run()
if failed:
self.get_logger().warning(
"%d hosts had scap-cdb-rebuild errors", failed
)
self.soft_errors = True
def _after_sync_sync_wikiversions(self, target_hosts):
# Update and sync wikiversions.php
succeeded, failed = tasks.sync_wikiversions(
target_hosts, self.config, key=self.get_keyholder_key()
)
if failed:
self.get_logger().warning("%d hosts had sync_wikiversions errors", failed)
self.soft_errors = True
[docs] def sync_targets(self, targets=None, type=None):
"""
This function is used to sync to bare metal testservers and canaries.
Run scap pull on the targets, including l10n rebuild, wikiversions sync,
and php-fpm restart. The pull source will be this deploy server.
:param targets: Iterable of target servers to sync
:param type: A string like "testservers" or "canaries" naming the type of target.
"""
sync_cmd = self._apache_sync_command()
sync_cmd.append(socket.getfqdn())
self._perform_sync(type, sync_cmd, targets)
self._after_sync_rebuild_cdbs(targets)
self._after_sync_sync_wikiversions(targets)
self._restart_php_hostgroups([targets])
[docs] def retry_continue_exit(self, description, test_func):
"""
Runs test_func(). If it returns True, this function returns.
If test_func() does not return true:
* If an interactive terminal is not available, self.cancel() is called
and it is not expected to return.
* If an interactive terminal is available, ask the user if they want to
retry the test, continue with deployment anyway, or exit. If the user
chooses to retry, this function starts over. If the user chooses to
continue, a message is logged and this function returns. If the user
chooses to exit, self.cancel() is called and it is not expected to return.
'description' is used to describe the operation to retry in the
retry/continue/exit prompt.
"""
while True:
if test_func():
break
resp = interaction.prompt_choices(
"What do you want to do?",
{
f"Retry {description}": "r",
"Continue with deployment": "c",
"Exit scap": "e",
},
"e",
)
if resp == "r":
# loop around and try again
continue
elif resp == "c":
self.get_logger().info("Continuing with deployment")
break
elif resp == "e":
self.cancel()
break
else:
raise Exception("This should never happen")
def cancel(self):
self.announce("Scap cancelled\nWARNING: Nothing has been rolled back.")
sys.exit(1)
[docs] def canary_checks(self):
"""
Run logstash error rate checks (for bare metal and mw-on-k8s).
:raises SystemExit: on canary check failure
"""
# Deployment to canaries finishes just before canary_checks()
# is called, so this time is when the canary deployments
# finished.
start = time.time()
logger = self.get_logger()
###########################
# Wait for canary traffic #
###########################
canary_wait_time = self.config["canary_wait_time"]
logger.info(f"Waiting {canary_wait_time} seconds for canary traffic...")
time.sleep(canary_wait_time)
###################
# Logstash checks #
###################
def test_func() -> bool:
status = tasks.logstash_canary_checks(
self.config["canary_service"],
self.config["canary_threshold"],
self.config["logstash_host"],
time.time() - start,
)
if status == 0:
# Checks OK!
return True
elif status == 10:
logger.error(
"The average error rate across "
"canaries increased by {}x "
"(rerun with --force to override this check, "
"see {} for details).".format(
self.config["canary_threshold"],
self.config["canary_dashboard_url"],
)
)
# Checks not OK!
return False
else:
# Something weird happened with logstash_checker.py.
logger.warning("Failed to complete canary checks for some reason.")
return False
self.retry_continue_exit("canary checks", test_func)
[docs] def check_testservers(self, baremetal_testservers: list):
"""
Check bare metal and k8s testservers.
:raises SystemExit: on check failure
"""
baremetal_check_cmd = self.config["testservers_check_cmd_baremetal"]
k8s_check_cmd = self.config["testservers_check_cmd_k8s"]
checkslist = []
if baremetal_check_cmd and baremetal_testservers:
env = os.environ.copy()
env["BAREMETAL_TESTSERVERS"] = ",".join(baremetal_testservers)
checkslist.append(
checks.Check(
"check_testservers_baremetal",
command=baremetal_check_cmd,
timeout=120,
shell=True,
environment=env,
)
)
if k8s_check_cmd:
checkslist.append(
checks.Check(
"check_testservers_k8s",
command=k8s_check_cmd,
timeout=120,
shell=True,
)
)
if not checkslist:
return
logger = self.get_logger()
with log.Timer("check-testservers", self.get_stats()):
def test_func() -> bool:
success, jobs = checks.execute(checkslist, logger, concurrency=2)
return success
self.retry_continue_exit("testserver checks", test_func)
[docs] def _setup_php(self):
"""
Sets up the php_fpm instance if not already initialized.
Returns True if php_fpm restart has been configured, or False if not.
"""
if php_fpm.INSTANCE is None:
php_fpm.INSTANCE = php_fpm.PHPRestart(
self.config,
ssh.Job(key=self.get_keyholder_key(), user=self.config["ssh_user"]),
self.arguments.force,
self.get_logger(),
)
return php_fpm.INSTANCE.cmd is not None
[docs] def _restart_php(self):
"""
If php_fpm_restart_script is set in the configuration then
on all dsh groups referenced by the mw_web_clusters config parameter
do the following:
Check if php-fpm opcache is full, if so restart php-fpm. If
the php_fpm_always_restart config parameter is true, the
opcache is treated as always full, so php-fpm will always
restart.
If the operator invoked scap with the --force flag, restart
php-fpm unsafely (i.e., without depooling and repooling
around the service restart). T243009
Targets that have already been restarted will not be restarted again.
"""
if not self._setup_php():
return
# mw_web_clusters is expected to be a comma-separated string naming dsh
# groups.
# target_groups will be a list of objects representing representing
# each group.
target_groups = targets.DirectDshTargetList("mw_web_clusters", self.config)
# Convert the list of group objects into a
# list of lists of targets.
group_hosts = []
for group in target_groups.groups.values():
target_hosts = set(group.targets) - self.already_restarted
if target_hosts:
group_hosts.append(list(target_hosts))
self.already_restarted |= target_hosts
self._restart_php_hostgroups(group_hosts)
[docs] def _restart_php_hostgroups(self, target_hosts=None):
"""Perform php restart for sets of hosts (if configured).
Parameter target_hosts is a list of lists of hostnames.
"""
if not self._setup_php():
return
num_hosts = 0
for grp in target_hosts:
num_hosts += len(grp)
with log.Timer("php-fpm-restarts", self.get_stats()):
self.get_logger().info(
"Running '{}' on {} host(s)".format(php_fpm.INSTANCE.cmd, num_hosts)
)
with log.MultithreadedProgressReportCollection("php-fpm-restart") as q:
php_fpm.INSTANCE.set_progress_queue(q)
with ThreadPoolExecutor(max_workers=5) as pool:
results = pool.map(php_fpm.restart_helper, target_hosts)
for _, failed in results:
if failed:
self.get_logger().warning(
"%d hosts had failures restarting php-fpm", failed
)
[docs]@cli.command("security-check")
class SecurityPatchCheck(cli.Application):
"""
Check if security patches are applied.
class to check if patches in ``/srv/patches`` have been applied to the
active wikiversions
"""
[docs] def main(self, *extra_args):
for version in self.active_wikiversions():
tasks.check_patch_files(version, self.config)
return 0
[docs]@cli.command(
"wikiversions-compile", help=argparse.SUPPRESS, affected_by_blocked_deployments=True
)
class CompileWikiversions(cli.Application):
"""Compile wikiversions.json to wikiversions.php."""
# This flag is no longer needed, but left here for compatibility with any existing scripts which
# might still pass this flag. T357581, T329857
[docs] @cli.argument(
"--staging",
action="store_true",
help="Compile wikiversions in staging directory. This flag is deprecated since wikiversions-compile always uses the staging directory.",
)
def main(self, *extra_args):
if self.arguments.staging:
self.get_logger().warn(
"The --staging flag is deprecated and will be removed in a future release."
)
tasks.compile_wikiversions("stage", self.config)
return 0
[docs]@cli.command("wikiversions-inuse")
class MWVersionsInUse(cli.Application):
"""Get a list of the active MediaWiki versions."""
[docs] @cli.argument(
"--withdb",
action="store_true",
help="Add `=wikidb` with some wiki using the version.",
)
# This flag is no longer needed, but left here for compatibility with any existing scripts which
# might still pass this flag. T357581, T329857
@cli.argument(
"--staging",
action="store_true",
help="Read wikiversions from the staging directory. This flag is deprecated since wikiversions-inuse always uses the staging directory.",
)
def main(self, *extra_args):
if self.arguments.staging:
self.get_logger().warn(
"The --staging flag is deprecated and will be removed in a future release."
)
versions = self.active_wikiversions("stage", return_type=dict)
if self.arguments.withdb:
output = [
"%s=%s" % (version, wikidb) for version, wikidb in versions.items()
]
else:
output = [str(version) for version in versions.keys()]
print(" ".join(output))
return 0
[docs]@cli.command("cdb-rebuild", help=argparse.SUPPRESS)
class RebuildCdbs(cli.Application):
"""Rebuild localization cache CDB files from the JSON versions."""
[docs] @cli.argument(
"--version", type=arg.is_version, help="MediaWiki version (eg 1.27.0-wmf.7)"
)
@cli.argument(
"--no-progress",
action="store_true",
dest="mute",
help="Do not show progress indicator.",
)
def main(self, *extra_args):
user = "mwdeploy"
source_tree = "deploy"
root_dir = self.config["deploy_dir"]
self._run_as(user)
self._assert_current_user(user)
# Leave some of the cores free for apache processes
use_cores = utils.cpus_for_jobs()
versions = self.active_wikiversions(source_tree)
if self.arguments.version:
version = self.arguments.version
if version.startswith("php-"):
version = version[4:]
# Assert version is active
if version not in versions:
raise IOError(errno.ENOENT, "Version not active", version)
# Replace list of active versions with the single version selected
versions = [version]
# Rebuild the CDB files from the JSON versions
for version in versions:
cache_dir = os.path.join(root_dir, "php-%s" % version, "cache", "l10n")
tasks.merge_cdb_updates(cache_dir, use_cores, True, self.arguments.mute)
[docs]@cli.command(
"sync-world",
help="Deploy MediaWiki to the cluster",
affected_by_blocked_deployments=True,
)
class ScapWorld(AbstractSync):
"""
Deploy MediaWiki to the cluster.
#. Validate php syntax of wmf-config and multiversion
#. Compile wikiversions.json to php in staging directory
#. Update l10n files in staging area
#. Compute git version information
#. Ask scap masters to sync with current master
#. Ask scap proxies to sync with master server
#. Ask apaches to sync with fastest rsync server (excluding wikiversions.php)
#. Ask apaches to rebuild l10n CDB files
#. Ask apaches to sync wikiversions.php
#. Run purgeMessageBlobStore.php
#. Rolling invalidation of all opcache for php 7.x
"""
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# We want to exclude wikiversions.php during the normal rsync.
# wikiversions.php is handled separately in _after_cluster_sync.
self.exclude_wikiversions_php = True
[docs] @cli.argument(
"--force",
action="store_true",
help="Skip canary checks, " "performs ungraceful php-fpm restarts",
)
@cli.argument(
"-w",
"--canary-wait-time",
dest="canary_wait_time",
type=int,
help="Define how long new code will run on the "
"canary servers (default is 20s)",
metavar="<time in secs>",
)
@cli.argument(
"--skip-l10n-update",
action="store_true",
dest="skip_l10n_update",
help="Skip update of l10n files",
)
@cli.argument("-n", action="store_true", help="No-op for running tests")
@cli.argument(
"--stop-before-sync",
action="store_true",
help="Perform all operations up to but not including rsyncing to any host",
)
@cli.argument(
"--no-logo",
action="store_false",
help="Do not print the Scap logo",
dest="logo",
)
@cli.argument(
"--pause-after-testserver-sync",
action="store_true",
help="Pause after syncing testservers and prompt the user to confirm to continue syncing",
)
@cli.argument(
"--notify-user",
action="append",
default=[],
help="User to notify on IRC after sync to testservers."
" Can be used multiple times",
)
@cli.argument(
"--k8s-only",
action="store_true",
help="Deploy/sync to Kubernetes targets only",
)
@cli.argument(
"--k8s-confirm-diffs",
action="store_true",
help="Display and require confirmation of helmfile diffs before proceeding.",
)
@cli.argument("message", nargs="*", help="Log message for SAL")
def main(self, *extra_args):
try:
if any("canary_wait_time" in s for s in self.arguments.defines):
raise ValueError(
"Canary wait time must be defined with " "-w or --canary-wait-time"
)
except TypeError:
pass
wait = self.arguments.canary_wait_time
if wait is not None:
self.config["canary_wait_time"] = wait
self.logo = self.arguments.logo
if self.arguments.n:
return 0
return super().main(*extra_args)
def _before_cluster_sync(self):
self.announce("Started scap: %s", self.arguments.message)
# Validate php syntax of wmf-config and multiversion
lint.check_valid_syntax(
[
"%(stage_dir)s/wmf-config" % self.config,
"%(stage_dir)s/multiversion" % self.config,
],
utils.cpus_for_jobs(),
)
def _update_caches(self):
super()._update_caches()
# Update list of extension message files and regenerate the
# localisation cache.
if self.arguments.skip_l10n_update:
self.get_logger().warn("Skipping l10n-update")
else:
with log.Timer("l10n-update", self.get_stats()):
for version in self.active_wikiversions("stage"):
tasks.update_localization_cache(version, self)
def _after_cluster_sync(self):
target_hosts = self._get_target_list()
self._after_sync_rebuild_cdbs(target_hosts)
self._after_sync_sync_wikiversions(target_hosts)
self._restart_php()
tasks.clear_message_blobs(self.config)
def _after_lock_release(self):
self.announce(
"Finished scap: %s (duration: %s)",
self.arguments.message,
utils.human_duration(self.get_duration()),
)
self.increment_stat("scap")
[docs] def _handle_exception(self, ex):
# Logic copied from cli.py:_handle_exception. FIXME: There has to be a better
# way to do this.
backtrace = True
if isinstance(ex, lock.LockFailedError) or getattr(
ex, "_scap_no_backtrace", False
):
backtrace = False
if backtrace:
self.get_logger().warning("Unhandled error:", exc_info=True)
self.announce(
"scap failed: %s %s (duration: %s)",
type(ex).__name__,
ex,
utils.human_duration(self.get_duration()),
)
return 1
[docs] def _before_exit(self, exit_status):
if self.config:
self.get_stats().timing("scap.scap", self.get_duration() * 1000)
return exit_status
[docs]@cli.command("pull-master", help=argparse.SUPPRESS)
class SyncMaster(cli.Application):
"""Sync local MediaWiki staging directory with deploy server state."""
[docs] @cli.argument("master", help="Master rsync server to copy from")
def main(self, *extra_args):
tasks.sync_master(
self.config, master=self.arguments.master, verbose=self.verbose
)
return 0
[docs]@cli.command(
"pull",
help="Sync local MediaWiki deployment directory with "
"deploy server state (formerly sync-common)",
)
class SyncPull(cli.Application):
"""Sync local MediaWiki deployment directory with deploy server state."""
[docs] @cli.argument(
"--no-update-l10n",
action="store_false",
dest="update_l10n",
help="Do not update l10n cache files.",
)
@cli.argument(
"--exclude-wikiversions.php",
action="store_true",
dest="exclude_wikiversions_php",
help="Do not rsync wikiversions.php.",
)
@cli.argument(
"-i",
"--include",
default=None,
action="append",
help="Rsync include pattern to limit transfer to."
" End directories with a trailing `/***`."
" Can be used multiple times.",
)
@cli.argument(
"--delete-excluded",
action="store_true",
help="Also delete local files not found on the master.",
)
@cli.argument(
"--no-php-restart",
action="store_false",
dest="php_restart",
help="Don't restart php-fpm after the pull.",
)
@cli.argument(
"servers", nargs=argparse.REMAINDER, help="Rsync server(s) to copy from"
)
def main(self, *extra_args):
rsync_args = ["--delete-excluded"] if self.arguments.delete_excluded else []
tasks.sync_common(
self.config,
include=self.arguments.include,
sync_from=self.arguments.servers,
verbose=self.verbose,
rsync_args=rsync_args,
exclude_wikiversionsphp=self.arguments.exclude_wikiversions_php,
)
if self.arguments.update_l10n:
with log.Timer("scap-cdb-rebuild", self.get_stats()):
utils.sudo_check_call(
user="mwdeploy",
cmd=self.get_script_path() + " cdb-rebuild --no-progress",
app=self,
)
if self.arguments.php_restart:
fpm = php_fpm.PHPRestart(self.config, logger=self.get_logger())
self.get_logger().info("Checking if php-fpm restart needed")
failed = fpm.restart_self()
if failed:
self.get_logger().warning("php-fpm restart failed!")
return 0
[docs]@cli.command("sync-dir", help=argparse.SUPPRESS, affected_by_blocked_deployments=True)
@cli.command("sync-file", affected_by_blocked_deployments=True)
class SyncFile(AbstractSync):
"""Sync a specific file/directory to the cluster."""
[docs] @cli.argument("--force", action="store_true", help="Skip canary checks")
@cli.argument(
"--pause-after-testserver-sync",
action="store_true",
help="Pause after syncing testservers and prompt the user to confirm to continue syncing",
)
@cli.argument(
"--notify-user",
action="append",
default=[],
help="User to notify on IRC after sync to testservers."
" Can be used multiple times",
)
@cli.argument("file", help="File/directory to sync")
@cli.argument("message", nargs="*", help="Log message for SAL")
def main(self, *extra_args):
self.arguments.stop_before_sync = False
return super().main(*extra_args)
def _before_cluster_sync(self):
# assert file exists
abspath = os.path.join(self.config["stage_dir"], self.arguments.file)
if not os.path.exists(abspath):
raise IOError(errno.ENOENT, "File/directory not found", abspath)
relpath = os.path.relpath(abspath, self.config["stage_dir"])
if os.path.isdir(abspath):
relpath = "%s/***" % relpath
include = relpath
# Notify when syncing a symlink.
if os.path.islink(abspath):
symlink_dest = os.path.realpath(abspath)
self.get_logger().info(
"%s: syncing symlink, not its target [%s]", abspath, symlink_dest
)
else:
lint.check_valid_syntax(abspath, utils.cpus_for_jobs())
if "/" in include:
parts = include.split("/")
for i in range(1, len(parts)):
# Include parent directories in sync command or the default
# exclude will block them and by extension block the target
# file.
self.includes.append("/".join(parts[:i]))
self.includes.append(include)
def _after_cluster_sync(self):
self._restart_php()
def _after_lock_release(self):
self.announce(
"Synchronized %s: %s (duration: %s)",
self.arguments.file,
self.arguments.message,
utils.human_duration(self.get_duration()),
)
self.increment_stat("sync-file")
[docs]@cli.command("sync-wikiversions", affected_by_blocked_deployments=True)
class SyncWikiversions(AbstractSync):
"""Rebuild and sync wikiversions.php to the cluster."""
[docs] def _update_caches(self):
"""
Skip this step.
It currently consists only of cache_git_info and this class should
attempt to be fast where possible.
"""
pass
[docs] def _before_cluster_sync(self):
"""
check for the presence of ExtensionMessages and l10n cache
for every branch of mediawiki that is referenced in wikiversions.json
to avoid syncing a branch that is lacking these critical files.
"""
for version in self.active_wikiversions("stage"):
ext_msg = os.path.join(
self.config["stage_dir"],
"wmf-config",
"ExtensionMessages-%s.php" % version,
)
err_msg = "ExtensionMessages not found in %s" % ext_msg
utils.check_file_exists(ext_msg, err_msg)
cache_file = os.path.join(
self.config["stage_dir"],
"php-%s" % version,
"cache",
"l10n",
"l10n_cache-en.cdb",
)
err_msg = "l10n cache missing for %s" % version
utils.check_file_exists(cache_file, err_msg)
# Tell the remaining stages to only rsync wikiversions*.* files.
self.include = "wikiversions*.*"
def _after_lock_release(self):
self.announce(
"rebuilt and synchronized wikiversions files: %s", self.arguments.message
)
self.increment_stat("sync-wikiversions")
def _after_cluster_sync(self):
self._restart_php()
[docs]@cli.command(
"cdb-json-refresh", help=argparse.SUPPRESS, affected_by_blocked_deployments=True
)
class RefreshCdbJsonFiles(cli.Application):
"""
Create JSON/MD5 files for all CDB files in a directory.
This will put a JSON and MD5 file in /upstream for each CDB file.
This can be combined with rsync and the scap-rebuild-cdbs to
push out large CDB files with minimal traffic. CDB files change
drastically with small key/value changes, where as JSON files do not, and
thus they diff/rdiff much better.
When pushing updates with rsync, this should be run before running rsync.
The rsync command should exclude CDB files or at least use
-ignore-existing. After the rsync is done, scap-rebuild-cdbs can be
run on each server to apply the updates to the CDB files.
"""
[docs] @cli.argument(
"-d",
"--directory",
required=True,
type=arg.is_dir,
help="Directory containing cdb files",
)
@cli.argument(
"-t",
"--threads",
default=1,
type=int,
help="Number of threads to use to build json/md5 files",
)
def main(self, *extra_args):
cdb_dir = os.path.realpath(self.arguments.directory)
upstream_dir = os.path.join(cdb_dir, "upstream")
use_cores = self.arguments.threads
if not os.path.isdir(cdb_dir):
raise IOError(errno.ENOENT, "Directory does not exist", cdb_dir)
if use_cores < 1:
use_cores = utils.cpus_for_jobs()
if not os.path.isdir(upstream_dir):
os.mkdir(upstream_dir)
tasks.refresh_cdb_json_files(cdb_dir, use_cores, self.verbose)
[docs]@cli.command("version", help="Show the version number and exit")
class Version(cli.Application):
[docs] def main(self, *extra_args):
print(scapversion.__version__)
return 0
[docs]@cli.command(
"lock",
help="Temporarily lock deployment of this repository",
affected_by_blocked_deployments=True,
)
class LockManager(cli.Application):
"""
Holds a lock open for a given repository.
examples::
lock 'Testing something, do not deploy'
"""
[docs] @cli.argument(
"--all",
action="store_true",
help="Lock ALL repositories from deployment. "
+ "With great power comes great responsibility",
)
@cli.argument(
"--unlock-all",
action="store_true",
help="Remove global lock for all repositories",
)
@cli.argument("message", nargs="*", help="Log message for SAL/lock file")
def main(self, *extra_args):
logger = self.get_logger()
if self.arguments.unlock_all:
if self.arguments.message == "(no justification provided)":
logger.fatal("Cannot request to remove global lock without a reason")
return 1
self.announce(f"Forcefully removing global lock: {self.arguments.message}")
lock.Lock.signal_gl_release(self.arguments.message)
return
if self.arguments.message == "(no justification provided)":
logger.fatal("Cannot lock repositories without a reason")
return 1
if self.arguments.all:
lock_path = lock.GLOBAL_LOCK_FILE
repo = "ALL REPOSITORIES"
else:
lock_path = self.get_lock_file()
# This value is used only for log messages.
repo = self.config.get("git_repo", "mediawiki")
with lock.Lock(lock_path, name="lock-manager", reason=self.arguments.message):
forced_lock_release_r = None
forced_lock_release_w = None
if self.arguments.all:
def release_global_lock(*args):
# Signal forced abort
os.write(forced_lock_release_w, bytes(1))
forced_lock_release_r, forced_lock_release_w = os.pipe()
lock.Lock.watch_for_gl_release_signal(release_global_lock)
self.announce(
"Locking from deployment [%s]: %s", repo, self.arguments.message
)
logger.info("Press enter to unlock...")
try:
fds = [sys.stdin]
if forced_lock_release_r is not None:
fds.append(forced_lock_release_r)
rlist, _, _ = select.select(fds, [], [])
if sys.stdin in rlist:
sys.stdin.readline()
except KeyboardInterrupt:
pass # We don't care here
if forced_lock_release_r is not None:
os.close(forced_lock_release_r)
os.close(forced_lock_release_w)
self.announce(
"Unlocked for deployment [%s]: %s (duration: %s)",
repo,
self.arguments.message,
utils.human_duration(self.get_duration()),
)
return 0