Source code for transferpy.test.unit.test_transfer

"""Tests for transfer.py class."""
import os
import sys
from os import path
import logging
import unittest
from unittest.mock import patch, MagicMock

from transferpy.Transferer import Transferer
from transferpy.Firewall import Firewall
from transferpy.test.utils import hide_stderr
from transferpy.transfer import option_parse, \
    assign_default_options, configparser, parse_configurations, \
    split_target


[docs]class TestTransferer(unittest.TestCase):
[docs] @patch('transferpy.Transferer.RemoteExecution') def setUp(self, executor_mock): self.executor = MagicMock() executor_mock.return_value = self.executor self.options = {'verbose': False} self.transferer = Transferer('source', 'path', ['target'], ['path'], self.options)
[docs] def test_run_command(self): self.transferer.run_command('host', 'command') self.executor.run.assert_called_with('host', 'command')
[docs] def test_remote_execution_injection(self): bad_transferer = Transferer('$PS1', '$(ls /)', ['target `ls`'], ['~/path'], {}) self.assertEqual(bad_transferer.source_host, "'$PS1'") self.assertEqual(bad_transferer.source_path, "'$(ls /)'") self.assertEqual(bad_transferer.target_hosts, ["'target `ls`'"]) self.assertEqual(bad_transferer.target_paths, ["'~/path'"])
[docs] def test_is_dir(self): path = 'path' self.transferer.is_dir('host', path) args = self.executor.run.call_args[0] self.assertIn(r'"[ -d "{}" ]"'.format(path), args[1])
[docs] def test_file_exists(self): file = 'path' self.transferer.file_exists('host', file) args = self.executor.run.call_args[0] self.assertIn(r'"[ -a "{}" ]"'.format(file), args[1])
[docs] def test_calculate_checksum_for_dir(self): self.transferer.source_is_dir = True self.executor.run.return_value = MagicMock() self.executor.run.return_value.returncode = 0 self.transferer.calculate_checksum('host', 'path') args = self.executor.run.call_args[0] self.assertIn('find', args[1][-1]) self.assertIn('md5sum', args[1][-1])
[docs] def test_calculate_checksum_for_file(self): self.transferer.source_is_dir = False self.executor.run.return_value = MagicMock() self.executor.run.return_value.returncode = 0 self.transferer.calculate_checksum('host', 'path') args = self.executor.run.call_args[0] self.assertNotIn('find', args[1][-1]) self.assertIn('md5sum', args[1][-1])
[docs] def test_has_available_disk_space(self): self.executor.run.return_value = MagicMock() self.executor.run.return_value.returncode = 0 size = 100 self.executor.run.return_value.stdout = str(size + 1) result = self.transferer.has_available_disk_space('host', 'path', size) self.assertTrue(result)
[docs] def test_disk_usage(self): self.executor.run.return_value = MagicMock() self.executor.run.return_value.returncode = 0 size = 1024 self.executor.run.return_value.stdout = "{} path".format(size) result = self.transferer.disk_usage('host', 'path') self.assertEqual(size, result)
[docs] def test_compress_command_compressing(self): self.options['compress'] = True command = self.transferer.compress_command self.assertIn('pigz -c', command)
[docs] def test_compress_command_not_compressing(self): self.options['compress'] = False self.transferer.source_is_dir = True command = self.transferer.compress_command self.assertEqual('', command) self.transferer.source_is_dir = False command = self.transferer.compress_command self.assertIn('cat', command)
[docs] def test_decompress_command_compressing(self): self.options['compress'] = True command = self.transferer.decompress_command self.assertIn('pigz -c -d', command)
[docs] def test_decompress_command_not_compressing(self): self.options['compress'] = False command = self.transferer.decompress_command self.assertEqual('', command)
[docs] def test_encrypt_command_encrypting(self): self.options['encrypt'] = True command = self.transferer.encrypt_command self.assertIn('openssl enc', command)
[docs] def test_encrypt_command_not_encrypting(self): self.options['encrypt'] = False command = self.transferer.encrypt_command self.assertEqual('', command)
[docs] def test_decrypt_command_encrypting(self): self.options['encrypt'] = True command = self.transferer.decrypt_command self.assertIn('openssl enc -d', command)
[docs] def test_decrypt_command_not_encrypting(self): self.options['encrypt'] = False command = self.transferer.decrypt_command self.assertEqual('', command)
[docs] def test_parallel_checksum_source_and_target_command(self): """Test to check the parallel_checksum command""" self.options['parallel_checksum'] = False src_command = self.transferer.parallel_checksum_source_command trgt_command = self.transferer.parallel_checksum_target_command self.assertEqual('', src_command) self.assertEqual('', trgt_command) self.options['parallel_checksum'] = True src_command = self.transferer.parallel_checksum_source_command trgt_command = self.transferer.parallel_checksum_target_command self.assertEqual('| tee >(md5sum > {})'.format( self.transferer.parallel_checksum_source_path), src_command) self.assertEqual('| tee >(md5sum > {})'.format( self.transferer.parallel_checksum_target_path), trgt_command)
[docs] def test_run_sanity_checks_failing(self): """Test case for Transferer.run function which simulates sanity check failure.""" with patch.object(Transferer, 'sanity_checks') as mocked_sanity_check: mocked_sanity_check.side_effect = ValueError('Test sanity_checks') command = self.transferer.run() self.assertTrue(type(command) == list)
[docs] def test_run_stoping_slave(self): """Test case for Transferer.run function which provides stop_slave option""" with patch.object(Transferer, 'sanity_checks') as mocked_sanity_check,\ patch('transferpy.Transferer.MariaDB.stop_replication') as mocked_stop_replication: self.options['stop_slave'] = True self.options['checksum'] = False # Return value should be anything other than 0 for the if block to execute mocked_stop_replication.return_value = 1 mocked_sanity_check.called_once() command = self.transferer.run() self.assertTrue(type(command) == list)
[docs] def test_run_successfully(self): """Test case for Transferer.run function starting transfer successfully""" with patch.object(Transferer, 'sanity_checks') as mocked_sanity_check,\ patch('transferpy.Transferer.Firewall.open') as mocked_open_firewall,\ patch.object(Transferer, 'copy_to') as mocked_copy_to,\ patch('transferpy.Transferer.Firewall.close') as mocked_close_firewall,\ patch.object(Transferer, 'after_transfer_checks') as mocked_after_transfer_checks,\ patch('transferpy.Transferer.MariaDB.start_replication') as mocked_start_replication: self.options['port'] = 4444 self.options['checksum'] = False mocked_copy_to.return_value = 0 mocked_close_firewall.return_value = 0 mocked_after_transfer_checks.return_value = 0 mocked_start_replication.return_value = 0 mocked_sanity_check.called_once() mocked_open_firewall.called_once() command = self.transferer.run() self.assertTrue(type(command) == list)
[docs] def test_run_start_slave(self): """Test case for Transferer.run function for when it runs the start_slave function with the stop_slave option """ with patch('transferpy.Transferer.MariaDB.stop_replication') as mocked_stop_replication,\ patch.object(Transferer, 'sanity_checks') as mocked_sanity_check,\ patch('transferpy.Transferer.Firewall.open') as mocked_open_firewall,\ patch.object(Transferer, 'copy_to') as mocked_copy_to,\ patch('transferpy.Transferer.Firewall.close') as mocked_close_firewall,\ patch.object(Transferer, 'after_transfer_checks') as mocked_after_transfer_checks,\ patch('transferpy.Transferer.MariaDB.start_replication') as mocked_start_replication: self.options['port'] = 4444 self.options['checksum'] = False self.options['stop_slave'] = True # We need to skip the first if statement # which checks the stop slave option mocked_stop_replication.return_value = 0 mocked_copy_to.return_value = 0 mocked_close_firewall.return_value = 0 mocked_after_transfer_checks.return_value = 0 # Return value should be anything other than 0 # for this if block to execute mocked_start_replication.return_value = 1 mocked_sanity_check.called_once() mocked_open_firewall.called_once() command = self.transferer.run() self.assertTrue(type(command) == list)
[docs] def test_copy_to_success(self): """Test case for the successful run of Transferer.copy_to function""" self.options['compress'] = False self.options['parallel_checksum'] = False self.options['encrypt'] = False port = 4400 with patch('transferpy.Transferer.time'): target_host = 'target' target_path = 'target_path' self.executor.run.return_value.returncode = 0 returncode = self.transferer.copy_to(target_host, target_path, port) self.executor.start_job.assert_called_once() # Successful run should call the wait_job function self.executor.wait_job.assert_called_once() self.executor.kill_job.assert_not_called() self.assertEqual(returncode, 0)
[docs] def test_copy_to_failure(self): """Test case for the failed run of Transferer.copy_to function""" self.options['compress'] = False self.options['parallel_checksum'] = False self.options['encrypt'] = False port = 4400 with patch('transferpy.Transferer.time'): target_host = 'target' target_path = 'target_path' self.executor.run.return_value.returncode = 1 returncode = self.transferer.copy_to(target_host, target_path, port) self.executor.start_job.assert_called_once() # Failure should call the kill_job function self.executor.kill_job.assert_called_once() self.executor.wait_job.assert_not_called() self.assertEqual(returncode, 1)
[docs] def test_is_socket(self): """Test is_socket""" path = 'path' command = ['/bin/bash', '-c', r'"[ -S "{}" ]"'.format(path)] self.transferer.is_socket('source', path) self.executor.run.assert_called_once_with('source', command)
[docs] def test_host_exists(self): """Test host_exists""" command = ['/bin/true'] self.transferer.host_exists('source') self.executor.run.assert_called_once_with('source', command)
[docs] def test_dir_is_empty(self): """Test dir_is_empty""" directory = 'dir' command = ['/bin/bash', '-c', r'"[ -z \"$(/bin/ls -A {})\" ]"'.format(directory)] self.transferer.dir_is_empty(directory, 'source') self.executor.run.assert_called_once_with('source', command)
[docs] def test_parallel_checksum_source_command(self): """Test parallel_checksum_source_command""" self.options['parallel_checksum'] = True checksum_command = '| tee >(md5sum > {})'.format(self.transferer.parallel_checksum_source_path) self.assertEqual(checksum_command, self.transferer.parallel_checksum_source_command) # Make parallel_checksum False and try again self.options['parallel_checksum'] = False self.assertEqual('', self.transferer.parallel_checksum_source_command)
[docs] def test_parallel_checksum_target_command(self): """Test parallel_checksum_target_command""" self.options['parallel_checksum'] = True checksum_command = '| tee >(md5sum > {})'.format(self.transferer.parallel_checksum_target_path) self.assertEqual(checksum_command, self.transferer.parallel_checksum_target_command) # Make parallel_checksum False and try again self.options['parallel_checksum'] = False self.assertEqual('', self.transferer.parallel_checksum_target_command)
[docs] def test_read_checksum(self): """Test read_checksum""" path = 'path' command = ['/bin/bash', '-c', '/bin/cat < {} && /bin/rm {}'.format(path, path)] self.executor.run.return_value.returncode = 0 self.executor.run.return_value.stdout = "checksum - path" checksum = self.transferer.read_checksum('source', path) self.executor.run.assert_called_once_with('source', command) self.assertEqual(checksum, "checksum - path")
[docs] def test_netcat_send_command(self): """Test netcat_send_command""" target_host = 'source' port = 4400 expect_command = '| /bin/nc -q 0 -w 300 {} {}'.format(target_host, port) actual_command = self.transferer.netcat_send_command(target_host, port) self.assertEqual(expect_command, actual_command)
[docs] def test_netcat_listen_command(self): """Test netcat_listen_command""" port = 4400 expect_command = '/bin/nc -l -w 300 -p {}'.format(port) actual_command = self.transferer.netcat_listen_command(port) self.assertEqual(expect_command, actual_command)
[docs] def test_tar_command(self): """Test tar_command""" expected_command = '/bin/tar cf -' actual_command = self.transferer.tar_command self.assertEqual(expected_command, actual_command)
[docs] def test_untar_command(self): """Test untar_command""" expected_command_decompress = '| /bin/tar --strip-components=1 -xf -' expected_command_file = '| /bin/tar xf -' self.options['type'] = 'decompress' actual_command = self.transferer.untar_command self.assertEqual(actual_command, expected_command_decompress) self.options['type'] = 'file' actual_command = self.transferer.untar_command self.assertEqual(actual_command, expected_command_file)
[docs] def test_get_datadir_from_socket(self): """Test get_datadir_from_socket""" socket = 'mysqld.sock' datadir = '/srv/sqldata' actual_dir = self.transferer.get_datadir_from_socket(socket) self.assertEqual(datadir, actual_dir) socket = '/var/run/mysqld/test.mysqld.s1.sock' datadir = '/srv/sqldata.s1' actual_dir = self.transferer.get_datadir_from_socket(socket) self.assertEqual(datadir, actual_dir) # Test analytics use case socket = '/run/mysqld/mysqld.analytics_meta.sock' datadir = '/srv/sqldata.analytics_meta' actual_dir = self.transferer.get_datadir_from_socket(socket) self.assertEqual(datadir, actual_dir) # Give wrong socket input socket = 'test.mysqld.skt' with self.assertRaises(Exception): self.transferer.get_datadir_from_socket(socket)
[docs] def test_xtrabackup_command(self): """Test xtrabackup_command""" self.transferer.source_path = 'mysqld.sock' socket = self.transferer.source_path datadir = self.transferer.get_datadir_from_socket(socket) expected_command = 'xtrabackup --backup --target-dir /tmp ' \ '--user {} --socket={} --close-files --datadir={} --parallel={} ' \ '--stream=xbstream --slave-info --skip-ssl'.\ format('root', socket, datadir, 16) actual_command = self.transferer.xtrabackup_command self.assertEqual(expected_command, actual_command)
[docs] def test_mbstream_command(self): """Test mbstream_command""" expected_command = '| mbstream -x' actual_command = self.transferer.mbstream_command self.assertEqual(expected_command, actual_command)
[docs] def test_password(self): """Test password function""" self.transferer._password = None password = self.transferer.password self.assertNotEqual(password, None) self.transferer._password = 'password' password = self.transferer.password self.assertEqual('password', password)
[docs] def test_sanity_checks_file(self): """Test sanity_checks for file/dir""" with patch.object(Transferer, 'host_exists') as mocked_host_exists, \ patch.object(Transferer, 'disk_usage') as mocked_disk_usage, \ patch.object(Transferer, 'file_exists') as mocked_file_exists, \ patch.object(Transferer, 'has_available_disk_space') as mocked_disk_space, \ patch.object(Transferer, 'is_socket') as mocked_is_socket: self.transferer.target_hosts = ['target'] self.transferer.target_paths = ['path'] self.options['checksum'] = True self.options['type'] = 'file' mocked_disk_space.return_value = True mocked_is_socket.return_value = False mocked_file_exists.side_effect = [True, True, False] mocked_host_exists.return_value.returncode = 0 self.transferer.sanity_checks() self.assertEqual(mocked_host_exists.call_count, 2) mocked_disk_space.assert_called_once() mocked_disk_usage.assert_called_once() self.assertEqual(mocked_file_exists.call_count, 3)
[docs] def test_sanity_checks_xtrabackup(self): """Test sanity_checks for xtrabackup/decompress""" with patch.object(Transferer, 'host_exists') as mocked_host_exists, \ patch.object(Transferer, 'disk_usage') as mocked_disk_usage, \ patch.object(Transferer, 'file_exists') as mocked_file_exists, \ patch.object(Transferer, 'dir_is_empty') as mocked_dir_is_empty, \ patch.object(Transferer, 'has_available_disk_space') as mocked_disk_space, \ patch.object(Transferer, 'is_socket') as mocked_is_socket: self.transferer.target_hosts = ['target'] self.transferer.target_paths = ['path'] self.options['checksum'] = True self.options['type'] = 'xtrabackup' mocked_dir_is_empty.return_value = True mocked_disk_space.return_value = True mocked_is_socket.return_value = True mocked_file_exists.return_value = True mocked_host_exists.return_value.returncode = 0 self.transferer.sanity_checks() mocked_dir_is_empty.assert_called_once() self.assertEqual(mocked_host_exists.call_count, 2) mocked_disk_space.assert_called_once() mocked_disk_usage.assert_called_once() mocked_is_socket.assert_called_once() self.assertEqual(mocked_file_exists.call_count, 2)
[docs] def test_after_transfer_checks(self): """Test after_transfer_checks""" with patch.object(Transferer, 'disk_usage') as mocked_disk_usage, \ patch.object(Transferer, 'file_exists') as mocked_file_exists, \ patch.object(Transferer, 'calculate_checksum') as mocked_calculate_checksum, \ patch.object(Transferer, 'read_checksum') as mocked_read_checksum, \ patch.object(Transferer, 'remove_temp_paths') as mocked_remove_temp_paths: target_host = 'target' target_path = 'path' self.options['checksum'] = True self.options['parallel_checksum'] = True self.options['type'] = 'file' self.transferer.checksum = 'checksum' mocked_calculate_checksum.return_value = 'checksum' mocked_file_exists.return_value = True result = self.transferer.after_transfer_checks(0, target_host, target_path) mocked_disk_usage.assert_called_once() mocked_calculate_checksum.assert_called_once() mocked_file_exists.assert_called_once() mocked_remove_temp_paths.assert_called_once() self.assertEqual(mocked_read_checksum.call_count, 2) self.assertEqual(result, 0)
[docs]class TestFirewall(unittest.TestCase): """Test cases for Firewall module"""
[docs] @patch('transferpy.Transferer.RemoteExecution') def setUp(self, executor_mock): self.executor = MagicMock() executor_mock.return_value = self.executor self.firewall_handler = Firewall('target', self.executor)
[docs] def test_reserve_port(self): """Test for Firewall reserve_port function""" target_port = 4444 target_host = 'target' reserve_port_dir_name = path.normpath( path.join(self.firewall_handler.parent_tmp_dir, 'trnsfr_{}_{}.lock'. format(target_host, target_port))) command = ["/bin/mkdir {}".format(reserve_port_dir_name)] self.firewall_handler.reserve_port(target_port) self.executor.run.assert_called_with('target', command)
[docs] def test_unreserve_port(self): """Test for Firewall unreserve_port function""" target_port = 4444 target_host = 'target' # This assigning happens at Firewall.reserve_port function reserve_port_dir_name = path.normpath( path.join(self.firewall_handler.parent_tmp_dir, 'trnsfr_{}_{}.lock'. format(target_host, target_port))) self.firewall_handler.reserve_port_dir_name = reserve_port_dir_name command = ["/bin/rmdir {}".format(self.firewall_handler.reserve_port_dir_name)] self.firewall_handler.unreserve_port(target_port) self.executor.run.assert_called_with('target', command)
[docs] def test_find_available_port(self): """Test for find_available_port function""" self.executor.run.return_value.returncode = 0 self.executor.run.return_value.stdout = "4400\n4401" with patch('transferpy.Firewall.Firewall.reserve_port') as mocked_reserve_port: mocked_reserve_port.return_value = 1 target_port = self.firewall_handler.find_available_port() self.executor.run.assert_called_with('target', self.firewall_handler.find_used_ports_command) self.assertEqual(target_port, 4402)
[docs] def test_no_available_port(self): """Test for find_available_port function when no ports are available""" self.executor.run.return_value.returncode = 0 with patch('transferpy.Firewall.Firewall.reserve_port') as mocked_reserve_port: # reserve_port function failure lead to the idea of unavailability of the port mocked_reserve_port.return_value = 0 with self.assertRaises(ValueError): self.firewall_handler.find_available_port() self.executor.run.assert_called_with('target', self.firewall_handler.find_used_ports_command)
[docs] def test_open_with_auto_port_finding(self): """Test for open function with automatic port finding""" source_host = 'src_host' # When target_port is 0, Firewall automatically finds a free port target_port = 0 expected_port = 4400 self.executor.run.return_value.returncode = 0 with patch('transferpy.Firewall.Firewall.find_available_port') as mocked_find_available_port: mocked_find_available_port.return_value = expected_port port = self.firewall_handler.open(source_host, target_port) command = ['/sbin/iptables', '-A', 'INPUT', '-p', 'tcp', '-s', '{}'.format(source_host), '--dport', '{}'.format(expected_port), '-j', 'ACCEPT'] self.executor.run.assert_called_with('target', command) self.assertEqual(port, expected_port)
[docs] def test_open_with_given_port(self): """Test for open function with a given port""" source_host = 'src_host' target_port = 4400 expected_port = 4400 self.executor.run.return_value.returncode = 0 with patch('transferpy.Firewall.Firewall.reserve_port') as mocked_reserve_port: mocked_reserve_port.return_value = 1 port = self.firewall_handler.open(source_host, target_port) command = ['/sbin/iptables', '-A', 'INPUT', '-p', 'tcp', '-s', '{}'.format(source_host), '--dport', '{}'.format(expected_port), '-j', 'ACCEPT'] self.executor.run.assert_called_with('target', command) self.assertEqual(port, expected_port)
[docs] def test_open_with_given_non_available_port(self): """Test for open function with a given port which is not available at the target host""" source_host = 'src_host' target_port = 4400 self.executor.run.return_value.returncode = 0 with patch('transferpy.Firewall.Firewall.reserve_port') as mocked_reserve_port: mocked_reserve_port.return_value = 0 with self.assertRaises(ValueError): self.firewall_handler.open(source_host, target_port)
[docs] def test_open_failure(self): """Test for open function failure""" source_host = 'src_host' target_port = 4400 self.executor.run.return_value.returncode = 1 with patch('transferpy.Firewall.Firewall.reserve_port') as mocked_reserve_port: mocked_reserve_port.return_value = 1 with self.assertRaises(Exception): self.firewall_handler.open(source_host, target_port)
[docs] def test_close(self): """Test for close function failure""" source_host = 'src_host' target_port = 4400 self.executor.run.return_value.returncode = 1 with patch('transferpy.Firewall.Firewall.unreserve_port') as mocked_unreserve_port: command = ['/sbin/iptables', '-D', 'INPUT', '-p', 'tcp', '-s', '{}'.format(source_host), '--dport', '{}'.format(target_port), '-j', 'ACCEPT'] mocked_unreserve_port.return_value = 1 self.firewall_handler.target_port = 4400 self.firewall_handler.close(source_host) self.executor.run.assert_called_once_with('target', command) mocked_unreserve_port.assert_called_once_with(target_port)
[docs] def test_close_gets_port_from_open(self): """Test case for Firewall.close function, close takes port number from Firewall.target_port which is set by Firewall.open""" source_host = 'src_host' # When target_port is 0, Firewall automatically finds a free port target_port = 0 expected_port = 4400 self.executor.run.return_value.returncode = 0 with patch('transferpy.Firewall.Firewall.find_available_port') as mocked_find_available_port: mocked_find_available_port.return_value = expected_port port = self.firewall_handler.open(source_host, target_port) self.assertEqual(port, expected_port) self.assertEqual(self.firewall_handler.target_port, expected_port)
[docs] def test_find_pid(self): """Test for find_pid""" target_port = 4400 command = "/bin/fuser {}/tcp".format(target_port) self.executor.run.return_value.returncode = 0 self.executor.run.return_value.stdout = 'port:123' pid = self.firewall_handler.find_pid(target_port) self.executor.run.assert_called_once_with('target', command) self.assertEqual(pid, 123)
[docs] def test_kill_process(self): """Test for kill_process""" target_port = 4400 command = "/bin/fuser -k {}/tcp || echo 0".format(target_port) self.executor.run.return_value.returncode = 0 self.firewall_handler.kill_process(target_port) self.executor.run.assert_called_once_with('target', command)
[docs]class TestArgumentParsing(unittest.TestCase): """Test cases for the command line arguments parsing."""
[docs] def option_parse(self, args): """Call parse_args patching the arguments.""" with patch.object(sys, 'argv', args): return option_parse()
[docs] def check_bad_args(self, args, expected_error=SystemExit): """Check arg parsing fails for the given args.""" with self.assertRaises(expected_error) as exc: with hide_stderr(): self.option_parse(args) if expected_error == SystemExit: self.assertEquals(exc.exception.code, 2)
[docs] def test_missing_required_args(self): """Test errors with missing required args.""" missing_required_args_list = [ ['transfer'], ['transfer', 'src:path'], ['transfer', 'trg?:path'], ] for test_args in missing_required_args_list: self.check_bad_args(test_args)
[docs] def test_bad_source(self): """Test errors with the source.""" test_args = ['transfer', 'source', 'target:path'] self.check_bad_args(test_args)
[docs] def test_bad_target(self): """Test errors with the target.""" test_args = ['transfer', 'source:path', 'target'] self.check_bad_args(test_args)
[docs] def test_just_source_and_targets(self): """Test call with just source and targets.""" src = 'source' src_path = 'source_path' trg1 = 'target1' trg1_path = 'dst_path1' trg2 = 'target2' trg2_path = 'dst_path2' test_args = ['transfer', '{}:{}'.format(src, src_path), '{}:{}'.format(trg1, trg1_path), '{}:{}'.format(trg2, trg2_path)] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(test_args) self.assertEqual(src, source_host) self.assertEqual(src_path, source_path) self.assertEqual([trg1, trg2], target_hosts) self.assertEqual([trg1_path, trg2_path], target_paths) self.assertEqual(other_options['port'], 0) self.assertTrue(other_options['compress']) self.assertTrue(other_options['encrypt'])
[docs] def test_port(self): """Test port param.""" port = 12345 test_args = ['transfer', 'source:path', 'target:path', '--port', str(port)] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(test_args) self.assertEqual(other_options['port'], port) self.assertTrue(other_options['compress']) self.assertTrue(other_options['encrypt'])
[docs] def test_compress(self): """Test compress params.""" base_args = ['transfer', 'source:path', 'target:path'] compress_test_args = base_args + ['--compress'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(compress_test_args) self.assertTrue(other_options['compress']) self.assertTrue(other_options['encrypt']) no_compress_test_args = base_args + ['--no-compress'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(no_compress_test_args) self.assertFalse(other_options['compress']) self.assertTrue(other_options['encrypt'])
[docs] def test_encrypt(self): """Test encrypt params.""" base_args = ['transfer', 'source:path', 'target:path'] encrypt_test_args = base_args + ['--encrypt'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(encrypt_test_args) self.assertTrue(other_options['compress']) self.assertTrue(other_options['encrypt']) no_encrypt_test_args = base_args + ['--no-encrypt'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(no_encrypt_test_args) self.assertTrue(other_options['compress']) self.assertFalse(other_options['encrypt'])
[docs] def test_split_target(self): """Test split_target function""" # Correct input with spaces target = 'host : path ' host, path = split_target(target) self.assertEqual(host, 'host') self.assertEqual(path, 'path') # Input with no path but a space target = 'host: ' with self.assertRaises(SystemExit) as se: split_target(target) self.assertEqual(se.exception.code, 2) # Input with no colon target = 'host path' with self.assertRaises(SystemExit) as se: split_target(target) self.assertEqual(se.exception.code, 2) # Input with just colon target = ':' with self.assertRaises(SystemExit) as se: split_target(target) self.assertEqual(se.exception.code, 2) # Input with more than one colon target = 'host:path:' with self.assertRaises(SystemExit) as se: split_target(target) self.assertEqual(se.exception.code, 2)
[docs] def test_parallel_checksum(self): """Test parallel-checksum param.""" base_args = ['transfer', 'source:path', 'target:path'] # When normal checksum is enabled by the user, # parallel-checksum argument should be disabled. parallel_checksum_test_args = base_args + ['--parallel-checksum'] + ['--checksum'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(parallel_checksum_test_args) self.assertTrue(other_options['checksum']) self.assertFalse(other_options['parallel_checksum']) no_parallel_checksum_test_args = base_args + ['--no-parallel-checksum'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(no_parallel_checksum_test_args) self.assertTrue(other_options['checksum']) self.assertFalse(other_options['parallel_checksum']) # Now disable the normal checksum so that parallel-checksum can take effect base_args = ['transfer', 'source:path', 'target:path', '--no-checksum'] # By default, normal checksum is enabled. So, irrespective of the # --parallel-checksum argument, this option is disabled. parallel_checksum_test_args = base_args + ['--parallel-checksum'] (source_host, source_path, target_hosts, target_paths, other_options) \ = self.option_parse(parallel_checksum_test_args) self.assertFalse(other_options['checksum']) self.assertTrue(other_options['parallel_checksum']) no_parallel_checksum_test_args = base_args + ['--no-parallel-checksum'] (source_host, source_path, target_hosts, target_paths, other_options) \ = self.option_parse(no_parallel_checksum_test_args) self.assertFalse(other_options['checksum']) self.assertFalse(other_options['parallel_checksum'])
[docs] def test_verbose(self): """Test verbose param.""" base_args = ['transfer', 'source:path', 'target:path'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(base_args) self.assertFalse(other_options['verbose']) verbose_test_args = base_args + ['--verbose'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(verbose_test_args) self.assertTrue(other_options['verbose'])
[docs] def test_remote_execution_verbose(self): """Test the effect of verbose option on RemoteExecution""" base_args = ['transfer', 'source:path', 'target:path'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(base_args) t = Transferer(source_host, source_path, target_hosts, target_paths, other_options) self.assertEqual(t.remote_executor.options['verbose'], False) # Now enable verbose and check the effect on RemoteExecution verbose_test_args = base_args + ['--verbose'] (source_host, source_path, target_hosts, target_paths, other_options)\ = self.option_parse(verbose_test_args) t = Transferer(source_host, source_path, target_hosts, target_paths, other_options) self.assertEqual(t.remote_executor.options['verbose'], True)
[docs] def test_setup_logger(self): """Test the logger's global availability and the impact of the verbose option on the logger level.""" base_args = ['transfer', 'source:path', 'target:path'] self.option_parse(base_args) # By default, verbose is false, so logging level # should be INFO logger = logging.getLogger('transferpy') self.assertEqual(logger.level, logging.INFO) # If verbose mentioned, level should be DEBUG verbose_test_args = base_args + ['--verbose'] self.option_parse(verbose_test_args) logger = logging.getLogger('transferpy') self.assertEqual(logger.level, logging.DEBUG)
[docs] def test_assign_default_options(self): """Test assign_default_options function""" default_options = assign_default_options({}) self.assertEqual(default_options['port'], 0) self.assertEqual(default_options['transfer_type'], 'file') self.assertTrue(default_options['compress']) self.assertTrue(default_options['encrypt']) self.assertTrue(default_options['checksum']) self.assertFalse(default_options['verbose']) self.assertFalse(default_options['stop_slave']) # Check whether the values are getting accepted default_options = assign_default_options( {'port': 4444, 'transfer_type': 'xtrabackup', 'compress': False, 'encrypt': False, 'checksum': False, 'stop_slave': True, 'verbose': True }) self.assertEqual(default_options['port'], 4444) self.assertEqual(default_options['transfer_type'], 'xtrabackup') self.assertFalse(default_options['compress']) self.assertFalse(default_options['encrypt']) self.assertFalse(default_options['checksum']) self.assertTrue(default_options['verbose']) self.assertTrue(default_options['stop_slave'])
[docs] def test_parse_configurations(self): """test parse configuration file""" config_file = '/tmp/tmp_transferpy.conf' config = configparser.ConfigParser() config['DEFAULT'] = {'port': 4444, 'transfer_type': 'xtrabackup', 'compress': False, 'encrypt': False, 'checksum': False, 'stop_slave': True, 'verbose': True} # Write the config options to a file and use it as transferpy config file with open(config_file, 'w') as configfile: config.write(configfile) conf_args = dict(parse_configurations(config_file)) os.remove(config_file) self.assertEqual(conf_args['port'], '4444') self.assertEqual(conf_args['transfer_type'], 'xtrabackup') self.assertEqual(conf_args['compress'], 'False') self.assertEqual(conf_args['encrypt'], 'False') self.assertEqual(conf_args['checksum'], 'False') self.assertEqual(conf_args['stop_slave'], 'True') self.assertEqual(conf_args['verbose'], 'True')
[docs] def test_winner_among_cli_and_config(self): """Test the context of disagreement between command line arguments and configuration file arguments: command line arguments should get first preference.""" config_file = '/tmp/tmp_transferpy.conf' config = configparser.ConfigParser() config['DEFAULT'] = {'port': 4444, 'transfer_type': 'file', 'compress': True, 'encrypt': True, 'checksum': True, 'stop_slave': False, 'verbose': False} # Write the config options to a file and use it as transferpy config file with open(config_file, 'w') as configfile: config.write(configfile) # Give command line arguments opposite to the config file args = ['transfer', 'source:path', 'target:path', '--port', 0, '--type', 'xtrabackup', '--no-compress', '--no-encrypt', '--no-checksum', '--stop-slave', '--verbose', '--config', config_file] (source_host, source_path, target_hosts, target_paths, other_options) = \ self.option_parse(args) os.remove(config_file) # Command line arguments should get reflected self.assertEqual(other_options['port'], 0) self.assertEqual(other_options['type'], 'xtrabackup') self.assertFalse(other_options['compress']) self.assertFalse(other_options['encrypt']) self.assertFalse(other_options['checksum']) self.assertTrue(other_options['stop_slave']) self.assertTrue(other_options['verbose'])
[docs] def test_missing_argument(self): """Test missing argument""" config_file = '/tmp/tmp_transferpy.conf' config = configparser.ConfigParser() config['DEFAULT'] = {'port': 4444, 'transfer_type': 'file', 'compress': True, 'encrypt': True, 'checksum': True, 'stop_slave': False} # Write the config options except verbose with open(config_file, 'w') as configfile: config.write(configfile) # Give command line arguments without --verbose option args = ['transfer', 'source:path', 'target:path', '--port', 0, '--type', 'xtrabackup', '--no-compress', '--no-encrypt', '--no-checksum', '--stop-slave', '--config', config_file] (source_host, source_path, target_hosts, target_paths, other_options) = \ self.option_parse(args) os.remove(config_file) # The verbose is not mentioned anywhere, so default False should be taken self.assertFalse(other_options['verbose'])
[docs] def test_parallel_checksum_and_checksum_true_in_config(self): """Test for config given True for both parallel-checksum and checksum, checksum should win""" config_file = '/tmp/tmp_transferpy.conf' config = configparser.ConfigParser() # The config file has checksum True config['DEFAULT'] = {'port': 4444, 'transfer_type': 'file', 'compress': True, 'encrypt': True, 'checksum': True, 'parallel_checksum': True, 'stop_slave': False} # Write the config options except verbose with open(config_file, 'w') as configfile: config.write(configfile) # But command line has no specification args = ['transfer', 'source:path', 'target:path', '--stop-slave', '--config', config_file] (source_host, source_path, target_hosts, target_paths, other_options) = \ self.option_parse(args) os.remove(config_file) # The parallel-checksum should be True self.assertFalse(other_options['parallel_checksum']) self.assertTrue(other_options['checksum'])
[docs] def test_parallel_checksum_arg_when_checksum_true_by_default(self): """Test for user given --parallel-checksum argument when the --no-checksum is not mentioned or in other words just the --parallel-checksum should enable parallel-checksum""" config_file = '/tmp/tmp_transferpy.conf' config = configparser.ConfigParser() # The config file has checksum True config['DEFAULT'] = {'port': 4444, 'transfer_type': 'file', 'compress': True, 'encrypt': True, 'checksum': True, 'parallel_checksum': False, 'stop_slave': False} # Write the config options except verbose with open(config_file, 'w') as configfile: config.write(configfile) # But command line has given with --parallel-checksum args = ['transfer', 'source:path', 'target:path', '--parallel-checksum', '--stop-slave', '--config', config_file] (source_host, source_path, target_hosts, target_paths, other_options) = \ self.option_parse(args) os.remove(config_file) # The parallel-checksum should be True self.assertTrue(other_options['parallel_checksum']) self.assertFalse(other_options['checksum'])