From d65811f2d5b129cab76e7bd26fe68b2c1cd21eb7 Mon Sep 17 00:00:00 2001 From: Lindley Werner Date: Tue, 6 Jun 2023 12:22:06 -0300 Subject: [PATCH] Adding unit tests in pybox python scripts. Enabling automatic tests with tox and zuul for each new patchset. To see the unit test logs, go to: 1- Zuul Summary 2- tox-unittests 3- Logs 4- job-output.txt Test Plan: PASS: Run "tox -e unittests" in the terminal, this will: - Set the PYTHONPATH environment variable - Run the tests - Show the coverage report Task: 47929 Story: 2005051 Change-Id: I7f527860f3498c53b28691c654035d017d70f68b Signed-off-by: Lindley Werner --- .zuul.yaml | 9 + pylint.rc | 2 +- tox.ini | 17 +- virtualbox/pybox/helper/tests/__init__.py | 0 .../pybox/helper/tests/test_host_helper.py | 294 ++++ .../pybox/helper/tests/test_install_lab.py | 123 ++ .../pybox/helper/tests/test_vboxmanage.py | 1405 +++++++++++++++++ virtualbox/pybox/helper/vboxmanage.py | 5 +- virtualbox/pybox/tests/__init__.py | 0 virtualbox/pybox/tests/test_install_vbox.py | 623 ++++++++ virtualbox/pybox/utils/install_log.py | 8 +- virtualbox/pybox/utils/kpi.py | 9 +- virtualbox/pybox/utils/serial.py | 7 +- virtualbox/pybox/utils/sftp.py | 21 +- virtualbox/pybox/utils/tests/__init__.py | 0 .../pybox/utils/tests/test_install_log.py | 94 ++ virtualbox/pybox/utils/tests/test_kpi.py | 247 +++ virtualbox/pybox/utils/tests/test_serial.py | 201 +++ virtualbox/pybox/utils/tests/test_sftp.py | 162 ++ 19 files changed, 3200 insertions(+), 27 deletions(-) create mode 100644 virtualbox/pybox/helper/tests/__init__.py create mode 100644 virtualbox/pybox/helper/tests/test_host_helper.py create mode 100644 virtualbox/pybox/helper/tests/test_install_lab.py create mode 100644 virtualbox/pybox/helper/tests/test_vboxmanage.py create mode 100644 virtualbox/pybox/tests/__init__.py create mode 100644 virtualbox/pybox/tests/test_install_vbox.py create mode 100644 virtualbox/pybox/utils/tests/__init__.py create mode 100644 virtualbox/pybox/utils/tests/test_install_log.py create mode 100644 virtualbox/pybox/utils/tests/test_kpi.py create mode 100644 virtualbox/pybox/utils/tests/test_serial.py create mode 100644 virtualbox/pybox/utils/tests/test_sftp.py diff --git a/.zuul.yaml b/.zuul.yaml index d34f9e1..3891eb2 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -6,7 +6,16 @@ jobs: - openstack-tox-linters - openstack-tox-pylint + - tox-unittests gate: jobs: - openstack-tox-linters - openstack-tox-pylint + - tox-unittests + +- job: + name: tox-unittests + parent: tox + description: Run unit tests + vars: + tox_envlist: unittests diff --git a/pylint.rc b/pylint.rc index 77ce541..d4c7e6b 100755 --- a/pylint.rc +++ b/pylint.rc @@ -7,7 +7,7 @@ rcfile=pylint.rc #init-hook= # Add files or directories to the blacklist. Should be base names, not paths. -ignore= +ignore=tests # Pickle collected data for later comparisons. persistent=yes diff --git a/tox.ini b/tox.ini index e74ce01..9a1cde0 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = linters,pylint +envlist = linters,pylint,unittests minversion = 2.3 skipsdist = True @@ -37,6 +37,17 @@ deps = -r{env:BASEPATH}/requirements.txt {[testenv]deps} allowlist_externals = pylint -commands = - pylint {posargs} --rcfile=./pylint.rc virtualbox/pybox +commands = pylint {posargs} --rcfile=./pylint.rc virtualbox/pybox +[testenv:unittests] +basepython = python3 +setenv = + BASEPATH = {toxinidir}/virtualbox/pybox + PYTHONPATH= {env:BASEPATH}:{env:BASEPATH}/helper:{env:BASEPATH}/consts:{env:BASEPATH}/utils +deps = + -r{toxinidir}/virtualbox/pybox/requirements.txt + coverage +change_dir = {env:BASEPATH} +commands = + coverage run -m unittest discover + coverage report -m diff --git a/virtualbox/pybox/helper/tests/__init__.py b/virtualbox/pybox/helper/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/virtualbox/pybox/helper/tests/test_host_helper.py b/virtualbox/pybox/helper/tests/test_host_helper.py new file mode 100644 index 0000000..53d8645 --- /dev/null +++ b/virtualbox/pybox/helper/tests/test_host_helper.py @@ -0,0 +1,294 @@ +""" +Unit tests related to host_helper +""" + +import unittest +from unittest.mock import MagicMock, patch, call +import streamexpect + +import host_helper + + +class UnlockHostTestCase(unittest.TestCase): + """ + Class to test unlock_host method + """ + + mock_stream = MagicMock() + mock_hostname = "hostname" + + @patch("host_helper.serial") + def test_unlock_host_when_locked(self, mock_serial): + """ + Test unlock_host when locked + """ + + # Run + result = host_helper.unlock_host(self.mock_stream, self.mock_hostname) + + # Assert + mock_serial.send_bytes.assert_any_call(self.mock_stream, + f"system host-list | grep {self.mock_hostname}", + expect_prompt=False) + mock_serial.expect_bytes.assert_called_once_with(self.mock_stream, "locked") + mock_serial.send_bytes.assert_any_call(self.mock_stream, + f"system host-unlock {self.mock_hostname}", + expect_prompt=False) + self.assertIsNone(result) + self.assertEqual(mock_serial.send_bytes.call_count, 2) + + @patch("host_helper.serial") + def test_unlock_host_when_not_locked(self, mock_serial): + """ + Test unlock_host when not locked + """ + + # Setup + mock_serial.expect_bytes.side_effect = streamexpect.ExpectTimeout + + # Run + result = host_helper.unlock_host(self.mock_stream, self.mock_hostname) + + # Assert + mock_serial.send_bytes.assert_called_once_with(self.mock_stream, + f"system host-list | grep {self.mock_hostname}", + expect_prompt=False) + mock_serial.expect_bytes.assert_called_once_with(self.mock_stream, "locked") + self.assertEqual(result, 1) + + +class LockHostTestCase(unittest.TestCase): + """ + Class to test lock_host method + """ + + mock_stream = MagicMock() + mock_hostname = "hostname" + + @patch("host_helper.serial") + def test_lock_host_when_unlocked(self, mock_serial): + """ + Test lock_host when host is unlocked + """ + + # Run + result = host_helper.lock_host(self.mock_stream, self.mock_hostname) + + # Assert + mock_serial.send_bytes.assert_any_call(self.mock_stream, + f"system host-list |grep {self.mock_hostname}", + expect_prompt=False) + mock_serial.expect_bytes.assert_called_once_with(self.mock_stream, "unlocked") + mock_serial.send_bytes.assert_any_call(self.mock_stream, + f"system host-lock {self.mock_hostname}", + expect_prompt="keystone") + self.assertEqual(mock_serial.send_bytes.call_count, 2) + self.assertIsNone(result) + + @patch("host_helper.serial") + def test_lock_host_when_not_unlocked(self, mock_serial): + """ + Test lock_host when host is not unlocked + """ + + # Setup + mock_serial.expect_bytes.side_effect = streamexpect.ExpectTimeout + + # Run + result = host_helper.lock_host(self.mock_stream, self.mock_hostname) + + # Assert + mock_serial.send_bytes.assert_called_once_with(self.mock_stream, + f"system host-list |grep {self.mock_hostname}", + expect_prompt=False) + mock_serial.expect_bytes.assert_called_once_with(self.mock_stream, "unlocked") + self.assertEqual(result, 1) + + +class RebootHostTestCase(unittest.TestCase): + """ + Class to test reboot_host method + """ + + @patch("host_helper.serial") + @patch("host_helper.HostTimeout") + def test_reboot_host(self, mock_host_timeout, mock_serial): + """ + Test reboot_host method + """ + + # Setup + mock_stream = MagicMock() + mock_hostname = "hostname" + + # Run + host_helper.reboot_host(mock_stream, mock_hostname) + + # Assert + mock_serial.send_bytes.assert_called_once_with(mock_stream, + f"system host-reboot {mock_hostname}", + expect_prompt=False) + mock_serial.expect_bytes.assert_called_once_with(mock_stream, "rebooting", mock_host_timeout.REBOOT) + + +class InstallHostTestCase(unittest.TestCase): + """ + Class to test install_host method + """ + + mock_stream = MagicMock() + mock_hostname = "hostname" + mock_host_id = 1 + + @patch("host_helper.serial") + def test_install_host_controller(self, mock_serial): + """ + Test install_host for controller type host + """ + + # Setup + mock_host_type = "controller" + + # Run + host_helper.install_host(self.mock_stream, self.mock_hostname, mock_host_type, self.mock_host_id) + + # Assert + mock_serial.send_bytes.assert_called_once_with(self.mock_stream, + f"system host-update {self.mock_host_id} personality=controller", + expect_prompt=False) + + @patch("host_helper.serial") + def test_install_host_storage(self, mock_serial): + """ + Test install_host for storage type host + """ + + # Setup + mock_host_type = "storage" + + # Run + host_helper.install_host(self.mock_stream, self.mock_hostname, mock_host_type, self.mock_host_id) + + # Assert + mock_serial.send_bytes.assert_called_once_with(self.mock_stream, + f"system host-update {self.mock_host_id} personality=storage", + expect_prompt=False) + + @patch("host_helper.serial") + def test_install_host_compute(self, mock_serial): + """ + Test install_host for compute type host + """ + + # Setup + mock_host_type = "compute" + + # Run + host_helper.install_host(self.mock_stream, self.mock_hostname, mock_host_type, self.mock_host_id) + + # Assert + mock_serial.send_bytes.assert_called_once_with( + self.mock_stream, + f"system host-update {self.mock_host_id} personality=compute hostname={self.mock_hostname}", + expect_prompt=False) + + +class DisableLogoutTestCase(unittest.TestCase): + """ + Class to test disable_logout method + """ + + @patch("host_helper.serial") + def test_disable_logout(self, mock_serial): + """ + Test disable_logout method + """ + + # Setup + mock_stream = MagicMock() + + # Run + host_helper.disable_logout(mock_stream) + + # Assert + mock_serial.send_bytes.assert_called_once_with(mock_stream, "export TMOUT=0") + + +class ChangePasswordTestCase(unittest.TestCase): + """ + Class to test change_password method + """ + + @patch("host_helper.serial") + def test_change_password(self, mock_serial): + """ + Test change_password method + """ + + # Setup + mock_stream = MagicMock() + mock_username = "sysadmin" + mock_password = "Li69nux*" + + # Run + host_helper.change_password(mock_stream, mock_username, mock_password) + + # Assert + calls = [ + call.send_bytes(mock_stream, mock_username, expect_prompt=False), + call.expect_bytes(mock_stream, "Password:"), + call.send_bytes(mock_stream, mock_username, expect_prompt=False), + call.expect_bytes(mock_stream, "Current password:"), + call.send_bytes(mock_stream, mock_username, expect_prompt=False), + call.expect_bytes(mock_stream, "New password:"), + call.send_bytes(mock_stream, mock_password, expect_prompt=False), + call.expect_bytes(mock_stream, "Retype new password"), + call.send_bytes(mock_stream, mock_password) + ] + + mock_serial.assert_has_calls(calls, any_order=False) + + +class CheckPasswordTestCase(unittest.TestCase): + """ + Class to test check_password method + """ + + mock_stream = MagicMock() + + @patch("host_helper.serial") + def test_check_password_prompt_found(self, mock_serial): + """ + Test check_password method when password prompt is found + """ + + # Setup + mock_password = "Li69nux*" + mock_serial.expect_bytes.return_value = 0 + + # Run + host_helper.check_password(self.mock_stream, mock_password) + + # Assert + mock_serial.expect_bytes.assert_called_once_with(self.mock_stream, 'assword', fail_ok=True, timeout=5) + mock_serial.send_bytes.assert_called_once_with(self.mock_stream, mock_password, expect_prompt=False) + + @patch("host_helper.serial") + def test_check_password_prompt_not_found(self, mock_serial): + """ + Test check_password method when password prompt is not found + """ + + # Setup + mock_serial.expect_bytes.return_value = 1 + + # Run + host_helper.check_password(self.mock_stream, "") + + # Assert + mock_serial.expect_bytes.assert_called_once_with(self.mock_stream, 'assword', fail_ok=True, timeout=5) + mock_serial.send_bytes.assert_not_called() + + +if __name__ == '__main__': + unittest.main() diff --git a/virtualbox/pybox/helper/tests/test_install_lab.py b/virtualbox/pybox/helper/tests/test_install_lab.py new file mode 100644 index 0000000..392b3c2 --- /dev/null +++ b/virtualbox/pybox/helper/tests/test_install_lab.py @@ -0,0 +1,123 @@ +""" +Unit tests related to install_lab +""" + +import unittest +from unittest.mock import MagicMock, patch +import install_lab + + +class UpdatePlatformCpusTestCase(unittest.TestCase): + """ + Class to test update_platform_cpus method + """ + + @patch("install_lab.serial") + def test_update_platform_cpus(self, mock_serial): + """ + Test update_platform_cpus method + """ + + # Setup + mock_stream = MagicMock() + mock_hostname = "hostname" + mock_cpu_num = 5 + + # Run + install_lab.update_platform_cpus(mock_stream, mock_hostname, cpu_num=mock_cpu_num) + + # Assert + command_string = ( + "\nsource /etc/platform/openrc; system host-cpu-modify " + f"{mock_hostname} -f platform -p0 {mock_cpu_num}" + ) + mock_serial.send_bytes.assert_called_once_with( + mock_stream, command_string, prompt="keystone", timeout=300 + ) + + +class SetDnsTestCase(unittest.TestCase): + """ + Class to test set_dns method + """ + + @patch("install_lab.serial") + def test_set_dns(self, mock_serial): + """ + Test set_dns method + """ + + # Setup + mock_stream = MagicMock() + mock_dns_ip = "8.8.8.8" + + # Run + install_lab.set_dns(mock_stream, mock_dns_ip) + + # Assert + command_string = ( + "source /etc/platform/openrc; system dns-modify " + f"nameservers={mock_dns_ip}" + ) + mock_serial.send_bytes.assert_called_once_with( + mock_stream, command_string, prompt="keystone" + ) + + +class ConfigControllerTestCase(unittest.TestCase): + """ + Class to test config_controller method + """ + + command_string = ( + "ansible-playbook /usr/share/ansible/stx-ansible/playbooks/bootstrap.yml" + ) + mock_stream = MagicMock() + mock_password = "Li69nux*" + + @patch("install_lab.serial") + @patch("install_lab.host_helper.check_password") + def test_config_controller_successful(self, mock_check_password, mock_serial): + """ + Test config_controller method with success + """ + + # Setup + mock_serial.expect_bytes.return_value = 0 + + # Run + install_lab.config_controller(self.mock_stream, password=self.mock_password) + + # Assert + mock_serial.send_bytes.assert_called_once_with( + self.mock_stream, self.command_string, expect_prompt=False + ) + mock_check_password.assert_called_once_with(self.mock_stream, password=self.mock_password) + mock_serial.expect_bytes.assert_called_once_with(self.mock_stream, "~$", + timeout=install_lab.HostTimeout.LAB_CONFIG) + + @patch("install_lab.serial") + @patch("install_lab.host_helper.check_password") + def test_config_controller_unsuccessful(self, mock_check_password, mock_serial): + """ + Test config_controller method without success raising an exception + """ + + # Setup + mock_serial.expect_bytes.return_value = 1 + + # Run + with self.assertRaises(Exception): + install_lab.config_controller(self.mock_stream, password=self.mock_password) + + # Assert + mock_serial.send_bytes.assert_called_once_with( + self.mock_stream, self.command_string, expect_prompt=False + ) + mock_check_password.assert_called_once_with(self.mock_stream, password=self.mock_password) + mock_serial.expect_bytes.assert_called_once_with(self.mock_stream, "~$", + timeout=install_lab.HostTimeout.LAB_CONFIG) + + +if __name__ == '__main__': + unittest.main() diff --git a/virtualbox/pybox/helper/tests/test_vboxmanage.py b/virtualbox/pybox/helper/tests/test_vboxmanage.py new file mode 100644 index 0000000..508001e --- /dev/null +++ b/virtualbox/pybox/helper/tests/test_vboxmanage.py @@ -0,0 +1,1405 @@ +""" +Unit tests related to vboxmanage +""" + +import unittest +from unittest.mock import patch, call +import vboxmanage + + +class VboxmanageVersionTestCase(unittest.TestCase): + """ + Class to test vboxmanage_version method + """ + + @patch("vboxmanage.subprocess") + def test_vboxmanage_version(self, mock_subprocess): + """ + Test vboxmanage_version method + """ + + # Setup + expected_version = "2.25.95" + mock_subprocess.check_output.return_value = expected_version + + # Run + version = vboxmanage.vboxmanage_version() + + # Assert + mock_subprocess.check_output.assert_called_once_with(["vboxmanage", "--version"], + stderr=mock_subprocess.STDOUT) + self.assertEqual(version, expected_version) + + +class VboxmanageExtpackTestCase(unittest.TestCase): + """ + Class to test vboxmanage_extpack method + """ + + @patch("vboxmanage.subprocess.check_output") + @patch("vboxmanage.vboxmanage_version") + def test_vboxmanage_extpack(self, mock_vboxmanage_version, mock_check_output): + """ + Test vboxmanage_extpack method + """ + + # Setup + mock_vboxmanage_version.return_value = b"2.25.95r123456\n" + version_path = "2.25.95" + filename = f"Oracle_VM_VirtualBox_Extension_Pack-{version_path}.vbox-extpack" + expected_wget_args = ["wget", f"http://download.virtualbox.org/virtualbox/{version_path}/{filename}", "-P", + "/tmp"] + expected_vboxmanage_args = ["vboxmanage", "extpack", "install", "/tmp/" + filename, "--replace"] + + # Run + vboxmanage.vboxmanage_extpack() + + # Assert + mock_vboxmanage_version.assert_called_once() + call_args_list = mock_check_output.call_args_list + self.assertEqual(call_args_list[0][0][0], expected_wget_args) + self.assertEqual(call_args_list[1][0][0], expected_vboxmanage_args) + + +class GetAllVmsTestCase(unittest.TestCase): + """ + Class to test get_all_vms method + """ + + @patch("vboxmanage.vboxmanage_list") + @patch("vboxmanage.vboxmanage_showinfo") + def test_get_all_vms(self, mock_showinfo, mock_list): + """ + Test get_all_vms method + """ + + # Setup + labname = "lab1" + mock_list.return_value = [ + b'"lab1-controller-0" {2f7f1b1c-40fe-4063-8182-ece45bbe229d}', + b'"lab1-compute-0" {1f7a1a1a-30ee-4062-8182-edc45bbe239d}', + b'"lab1-storage-0" {1f6a1a1b-30ee-4071-8182-edd45bbe239d}', + b'"not-matching-vm" {2f7f1b1c-40fe-4063-8182-ece45bbe229d}' + ] + mock_showinfo.return_value = b'groups="/lab1"\n' + + # Run + vms = vboxmanage.get_all_vms(labname) + + # Assert + mock_list.assert_called_once_with("vms") + expected_vms = [ + '"lab1-controller-0" {2f7f1b1c-40fe-4063-8182-ece45bbe229d}', + '"lab1-compute-0" {1f7a1a1a-30ee-4062-8182-edc45bbe239d}', + '"lab1-storage-0" {1f6a1a1b-30ee-4071-8182-edd45bbe239d}', + ] + self.assertCountEqual(vms, expected_vms) + + +class TakeSnapshotTestCase(unittest.TestCase): + """ + Class to test take_snapshot method + """ + + labname = "lab1" + snapshot_name = "snap1" + vms = ["vm1", "vm2"] + + @patch("vboxmanage._wait_for_vms_to_run", return_value=None) + @patch("vboxmanage._resume_running_vms", return_value=None) + @patch("vboxmanage.vboxmanage_takesnapshot", return_value=None) + @patch("vboxmanage._pause_running_vms", return_value=None) + @patch("vboxmanage.get_all_vms") + def test_take_snapshot_no_running_vms(self, mock_get_all_vms, mock_pause, mock_takesnapshot, + mock_resume, mock_wait): + """ + Test take_snapshot method with no running vms + """ + + # Setup + mock_get_all_vms.side_effect = [self.vms, []] + + # Run + vboxmanage.take_snapshot(self.labname, self.snapshot_name) + + # Assert + mock_get_all_vms.assert_any_call(self.labname, option="vms") + mock_get_all_vms.assert_any_call(self.labname, option="runningvms") + mock_pause.assert_called_once() + mock_takesnapshot.assert_called_once_with(self.vms, self.snapshot_name) + mock_resume.assert_called_once() + mock_wait.assert_not_called() + + @patch("vboxmanage._wait_for_vms_to_run", return_value=None) + @patch("vboxmanage._resume_running_vms", return_value=None) + @patch("vboxmanage.vboxmanage_takesnapshot", return_value=None) + @patch("vboxmanage._pause_running_vms", return_value=None) + @patch("vboxmanage.get_all_vms") + def test_take_snapshot_with_running_vms(self, mock_get_all_vms, mock_pause, mock_takesnapshot, + mock_resume, mock_wait): + """ + Test take_snapshot method with running vms + """ + + # Setup + running_vms = ["vm1"] + mock_get_all_vms.side_effect = [self.vms, running_vms] + + # Run + vboxmanage.take_snapshot(self.labname, self.snapshot_name) + + # Assert + mock_get_all_vms.assert_any_call(self.labname, option="vms") + mock_get_all_vms.assert_any_call(self.labname, option="runningvms") + mock_pause.assert_called_once() + mock_takesnapshot.assert_called_once_with(self.vms, self.snapshot_name) + mock_resume.assert_called_once_with(running_vms) + mock_wait.assert_called_once_with(self.labname, running_vms, self.vms) + + +class PauseRunningVmsTestCase(unittest.TestCase): + """ + Class to test _pause_running_vms method + """ + + @patch("vboxmanage.os.waitpid", return_value=None) + @patch("vboxmanage.os._exit", return_value=None) # pylint: disable=protected-access + @patch("vboxmanage.vboxmanage_controlvms", return_value=None) + @patch("vboxmanage.os.fork") + def test_pause_running_vms_no_vms(self, mock_fork, mock_controlvms, mock_exit, mock_waitpid): + """ + Test _pause_running_vms method with no running vms + """ + + # Setup + running_vms = [] + vms = [] + + # Run + vboxmanage._pause_running_vms(running_vms, vms) + + # Assert + mock_fork.assert_not_called() + mock_controlvms.assert_not_called() + mock_exit.assert_not_called() + mock_waitpid.assert_not_called() + + @patch("vboxmanage.os.waitpid", return_value=None) + @patch("vboxmanage.os._exit", return_value=None) # pylint: disable=protected-access + @patch("vboxmanage.vboxmanage_controlvms", return_value=None) + @patch("vboxmanage.os.fork") + def test_pause_running_vms_with_vms(self, mock_fork, mock_controlvms, mock_exit, mock_waitpid): + """ + Test _pause_running_vms method with running vms + """ + + # Setup + running_vms = ["vm1", "vm2"] + vms = ["vm1", "vm2", "vm3"] + mock_fork.return_value = 0 + + # Run + vboxmanage._pause_running_vms(running_vms, vms) + + # Assert + mock_fork.assert_has_calls([call() for _ in running_vms]) + mock_controlvms.assert_has_calls([call([vm], "pause") for vm in running_vms]) + mock_exit.assert_has_calls([call(0) for _ in running_vms]) + mock_waitpid.assert_has_calls([call(0, 0) for _ in vms]) + + +class ResumeRunningVMsTestCase(unittest.TestCase): + """ + Class to test _resume_running_vms method + """ + + @patch("vboxmanage.os.waitpid", return_value=None) + @patch("vboxmanage.os._exit", return_value=None) + @patch("vboxmanage.os.fork") + @patch("vboxmanage.vboxmanage_controlvms", return_value=None) + def test_resume_running_vms(self, mock_controlvms, mock_fork, mock_exit, mock_waitpid): + """ + Test _resume_running_vms method + """ + + # Setup + runningvms = ["vm1", "vm2"] + mock_fork.side_effect = [0, 0] + + # Run + vboxmanage._resume_running_vms(runningvms) + + # Assert + mock_controlvms.assert_has_calls([call([runningvms[0]], "resume"), call([runningvms[1]], "resume")]) + mock_exit.assert_has_calls([call(0), call(0)]) + mock_waitpid.assert_has_calls([call(0, 0), call(0, 0)]) + + @patch("vboxmanage.vboxmanage_controlvms", return_value=None) + def test_resume_running_vms_one_vm(self, mock_controlvms): + """ + Test _resume_running_vms method with one running vm + """ + + # Setup + runningvms = ["vm1"] + + # Run + vboxmanage._resume_running_vms(runningvms) + + # Assert + mock_controlvms.assert_not_called() + + +class WaitForVmsToRunTestCase(unittest.TestCase): + """ + Class to test _wait_for_vms_to_run method + """ + + @patch("vboxmanage.LOG.info", return_value=None) + @patch("vboxmanage.get_all_vms") + def test_wait_for_vms_to_run_successful(self, mock_get_all_vms, mock_log_info): + """ + Test _wait_for_vms_to_run method + """ + + # Setup + labname = "lab1" + runningvms = ["vm1"] + vms = ["vm1", "vm2", "vm3"] + mock_get_all_vms.side_effect = [vms, vms[0:2], [vms[0]]] + + # Run + vboxmanage._wait_for_vms_to_run(labname, runningvms, vms) + + # Assert + mock_get_all_vms.assert_has_calls([call(labname, option="runningvms"), call(labname, option="runningvms"), + call(labname, option="runningvms")]) + mock_log_info.assert_called_with("All VMs %s are up running after taking snapshot...", vms) + + @patch("vboxmanage.time.sleep", return_value=None) + @patch("vboxmanage.LOG.info", return_value=None) + @patch("vboxmanage.get_all_vms", return_value=["vm1"]) + def test_wait_for_vms_to_run_no_retry(self, mock_get_all_vms, mock_log_info, mock_sleep): + """ + Test _wait_for_vms_to_run method with no need for retry + """ + + # Setup + labname = "lab1" + runningvms = ["vm1"] + vms = ["vm1"] + + # Run + vboxmanage._wait_for_vms_to_run(labname, runningvms, vms) + + # Assert + mock_get_all_vms.assert_called_once_with(labname, option="runningvms") + mock_sleep.assert_not_called() + mock_log_info.assert_called_with("All VMs %s are up running after taking snapshot...", vms) + + +class RestoreSnapshotTestCase(unittest.TestCase): + """ + Class to test restore_snapshot method + """ + + @patch("vboxmanage.time.sleep", return_value=None) + @patch("vboxmanage.vboxmanage_controlvms", return_value=None) + @patch("vboxmanage.vboxmanage_restoresnapshot", return_value=None) + @patch("vboxmanage.vboxmanage_startvm", return_value=None) + def test_restore_snapshot(self, mock_startvm, mock_restoresnapshot, mock_controlvms, mock_sleep): + """ + Test restore_snapshot method + """ + + # Setup + node_list = ["controller-0", "vm1", "vm2"] + snapshot_name = "snapshot1" + + # Run + vboxmanage.restore_snapshot(node_list, snapshot_name) + + # Assert + mock_controlvms.assert_called_once_with(node_list, "poweroff") + mock_restoresnapshot.assert_has_calls( + [call(node_list[0], snapshot_name), call(node_list[1], snapshot_name), call(node_list[2], snapshot_name)]) + mock_startvm.assert_has_calls([call(node_list[1]), call(node_list[2]), call(node_list[0])]) + mock_sleep.assert_has_calls([call(5), call(5), call(5), call(10), call(10), call(10)]) + + @patch("vboxmanage.time.sleep", return_value=None) + @patch("vboxmanage.vboxmanage_controlvms", return_value=None) + @patch("vboxmanage.vboxmanage_restoresnapshot", return_value=None) + @patch("vboxmanage.vboxmanage_startvm", return_value=None) + def test_restore_snapshot_empty_node_list(self, mock_startvm, mock_restoresnapshot, mock_controlvms, mock_sleep): + """ + Test restore_snapshot method with empty node list + """ + + # Setup + node_list = [] + snapshot_name = "snapshot1" + + # Run + vboxmanage.restore_snapshot(node_list, snapshot_name) + + # Assert + mock_controlvms.assert_not_called() + mock_restoresnapshot.assert_not_called() + mock_startvm.assert_not_called() + mock_sleep.assert_not_called() + + +class VboxmanageListTestCase(unittest.TestCase): + """ + Class to test vboxmanage_list method + """ + + @patch("vboxmanage.subprocess.check_output") + def test_vboxmanage_list(self, mock_subprocess): + """ + Test vboxmanage_list method + """ + + # Setup + expected_vms = [b"vm1", b"vm2", b"vm3"] + mock_subprocess.return_value = b'"vm1"\n"vm2"\n"vm3"\n' + + # Run + vms = vboxmanage.vboxmanage_list() + + # Assert + mock_subprocess.assert_called_once_with(["vboxmanage", "list", "vms"], stderr=vboxmanage.subprocess.STDOUT) + self.assertCountEqual(vms, expected_vms) + + @patch("vboxmanage.subprocess.check_output") + def test_vboxmanage_list_custom_option(self, mock_subprocess): + """ + Test vboxmanage_list method with custom option + """ + + # Setup + expected_vms = [b"vm1", b"vm2"] + mock_subprocess.return_value = b'"vm1"\n"vm2"\n' + custom_option = "runningvms" + + # Run + vms = vboxmanage.vboxmanage_list(custom_option) + + # Assert + mock_subprocess.assert_called_once_with(["vboxmanage", "list", custom_option], + stderr=vboxmanage.subprocess.STDOUT) + self.assertCountEqual(vms, expected_vms) + + +class VboxmanageShowinfoTestCase(unittest.TestCase): + """ + Class to test vboxmanage_showinfo method + """ + + @patch("vboxmanage.subprocess.check_output") + def test_vboxmanage_showinfo(self, mock_subprocess): + """ + Test vboxmanage_showinfo method + """ + + # Setup + expected_info = "Some VM info" + mock_subprocess.return_value = expected_info + host = "vm1" + + # Run + info = vboxmanage.vboxmanage_showinfo(host) + + # Assert + mock_subprocess.assert_called_once_with(["vboxmanage", "showvminfo", host, "--machinereadable"], + stderr=vboxmanage.subprocess.STDOUT) + self.assertEqual(info, expected_info) + + @patch("vboxmanage.subprocess.check_output") + def test_vboxmanage_showinfo_bytes(self, mock_subprocess): + """ + Test vboxmanage_showinfo method with bytes input + """ + + # Setup + expected_info = "Some VM info" + mock_subprocess.return_value = expected_info + host = "vm1" + + # Run + info = vboxmanage.vboxmanage_showinfo(host) + + # Assert + mock_subprocess.assert_called_once_with(["vboxmanage", "showvminfo", "vm1", "--machinereadable"], + stderr=vboxmanage.subprocess.STDOUT) + self.assertEqual(info, expected_info) + + +class VboxmanageCreatevmTestCase(unittest.TestCase): + """ + Class to test vboxmanage_createvm method + """ + + @patch("vboxmanage.subprocess.check_output") + def test_vboxmanage_createvm(self, mock_subprocess): + """ + Test vboxmanage_createvm method + """ + + # Setup + hostname = "vm1" + labname = "lab1" + + # Run + vboxmanage.vboxmanage_createvm(hostname, labname) + + # Assert + mock_subprocess.assert_called_once_with( + ["vboxmanage", "createvm", "--name", hostname, "--register", "--ostype", "Linux_64", "--groups", + "/" + labname], + stderr=vboxmanage.subprocess.STDOUT + ) + + def test_vboxmanage_createvm_no_hostname(self): + """ + Test vboxmanage_createvm method with no hostname + """ + + # Setup + hostname = None + labname = "lab1" + + # Assert + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_createvm(hostname, labname) + + def test_vboxmanage_createvm_no_labname(self): + """ + Test vboxmanage_createvm method with no labname + """ + + # Setup + hostname = "vm1" + labname = None + + # Assert + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_createvm(hostname, labname) + + +class VboxmanageDeletevmsTestCase(unittest.TestCase): + """ + Class to test vboxmanage_deletevms method + """ + + @patch("vboxmanage.vboxmanage_deletemedium") + @patch("vboxmanage.vboxmanage_list") + @patch("vboxmanage.subprocess.check_output") + def test_vboxmanage_deletevms(self, mock_subprocess, mock_list, mock_deletemedium): + """ + Test vboxmanage_deletevms method + """ + + # Setup + hosts = ["vm1", "vm2"] + mock_list.return_value = [] + + # Run + vboxmanage.vboxmanage_deletevms(hosts) + + # Assert + mock_subprocess.assert_has_calls( + [call(["vboxmanage", "unregistervm", host, "--delete"], stderr=vboxmanage.subprocess.STDOUT) for host in + hosts], + any_order=True + ) + mock_deletemedium.assert_has_calls( + [call(host) for host in hosts], + any_order=True + ) + mock_list.assert_called_once_with("vms") + + def test_vboxmanage_deletevms_no_hosts(self): + """ + Test vboxmanage_deletevms method with no hosts + """ + + # Setup + hosts = None + + # Assert + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_deletevms(hosts) + + +class VboxmanageHostonlyifcreateTestCase(unittest.TestCase): + """ + Class to test vboxmanage_hostonlyifcreate method + """ + + @patch("vboxmanage.subprocess.check_output") + def test_vboxmanage_hostonlyifcreate(self, mock_subprocess): + """ + Test vboxmanage_hostonlyifcreate method + """ + + # Setup + name = "vboxnet0" + oam_ip = "192.168.0.1" + netmask = "255.255.255.0" + + # Run + vboxmanage.vboxmanage_hostonlyifcreate(name, oam_ip, netmask) + + # Assert + mock_subprocess.assert_has_calls([ + call(["vboxmanage", "hostonlyif", "create"], stderr=vboxmanage.subprocess.STDOUT), + call(["vboxmanage", "hostonlyif", "ipconfig", name, "--ip", oam_ip, "--netmask", netmask], + stderr=vboxmanage.subprocess.STDOUT) + ]) + + def test_vboxmanage_hostonlyifcreate_no_name(self): + """ + Test vboxmanage_hostonlyifcreate method with no network name + """ + + # Setup + name = None + oam_ip = "192.168.0.1" + netmask = "255.255.255.0" + + # Assert + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_hostonlyifcreate(name, oam_ip, netmask) + + def test_vboxmanage_hostonlyifcreate_no_oam_ip(self): + """ + Test vboxmanage_hostonlyifcreate method with no OAM IP + """ + + # Setup + name = "vboxnet0" + oam_ip = None + netmask = "255.255.255.0" + + # Assert + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_hostonlyifcreate(name, oam_ip, netmask) + + def test_vboxmanage_hostonlyifcreate_no_netmask(self): + """ + Test vboxmanage_hostonlyifcreate method with no OAM Netmask + """ + + # Setup + name = "vboxnet0" + oam_ip = "192.168.0.1" + netmask = None + + # Assert + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_hostonlyifcreate(name, oam_ip, netmask) + + +class VboxmanageHostonlyifdeleteTestCase(unittest.TestCase): + """ + Class to test vboxmanage_hostonlyifdelete method + """ + + @patch("vboxmanage.subprocess.check_output") + def test_vboxmanage_hostonlyifdelete(self, mock_subprocess): + """ + Test vboxmanage_hostonlyifdelete method + """ + + # Setup + name = "vboxnet0" + + # Run + vboxmanage.vboxmanage_hostonlyifdelete(name) + + # Assert + mock_subprocess.assert_called_once_with(["vboxmanage", "hostonlyif", "remove", name], + stderr=vboxmanage.subprocess.STDOUT) + + def test_vboxmanage_hostonlyifdelete_no_name(self): + """ + Test vboxmanage_hostonlyifdelete method with no network name + """ + + # Setup + name = None + + # Assert + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_hostonlyifdelete(name) + + +class VboxmanageModifyvmTestCase(unittest.TestCase): + """ + Class to test vboxmanage_modifyvm method + """ + + @patch("vboxmanage.subprocess.check_output") + @patch("vboxmanage._contains_value") + @patch("vboxmanage._is_network_configured") + @patch("vboxmanage._is_nat_network_configured") + @patch("vboxmanage._is_uart_configured") + @patch("vboxmanage._get_network_configuration") + @patch("vboxmanage._add_uart") + def test_vboxmanage_modifyvm(self, mock_add_uart, mock_get_network_configuration, mock_is_uart_configured, + mock_is_nat_network_configured, mock_is_network_configured, + mock_contains_value, mock_subprocess): + """ + Test vboxmanage_modifyvm method + """ + + # Setup + hostname = "test_host" + vm_config = { + "cpus": "2", + "memory": "1024", + "nicnum": "1", + "nicbootprio2": "1" + } + + mock_contains_value.return_value = True + mock_is_network_configured.return_value = False + mock_is_nat_network_configured.return_value = False + mock_is_uart_configured.return_value = False + mock_get_network_configuration.return_value = [] + mock_add_uart.return_value = [] + + expected_cmd = ["vboxmanage", "modifyvm", hostname, "--cpus", "2", "--memory", "1024", "--nicbootprio2", "1", + "--boot4", "net"] + + # Run + vboxmanage.vboxmanage_modifyvm(hostname, vm_config) + + # Assert + mock_subprocess.assert_called_once_with(expected_cmd, stderr=vboxmanage.subprocess.STDOUT) + + +class IsNetworkConfiguredTestCase(unittest.TestCase): + """ + Class to test _is_network_configured method + """ + + @patch("vboxmanage._contains_value") + def test_is_network_configured(self, mock_contains_value): + """ + Test _is_network_configured method + """ + + # Setup + vm_config = { + "nic": "test_nic", + "nictype": "test_nictype", + "nicpromisc": "test_nicpromisc", + "nicnum": "test_nicnum" + } + + mock_contains_value.return_value = True + + # Run + result = vboxmanage._is_network_configured(vm_config) + + # Assert + self.assertTrue(result) + mock_contains_value.assert_any_call("nic", vm_config) + mock_contains_value.assert_any_call("nictype", vm_config) + mock_contains_value.assert_any_call("nicpromisc", vm_config) + mock_contains_value.assert_any_call("nicnum", vm_config) + + +class GetNetworkConfigurationTestCase(unittest.TestCase): + """ + Class to test _get_network_configuration method + """ + + @patch("vboxmanage._contains_value") + def test_get_network_configuration(self, mock_contains_value): + """ + Test _get_network_configuration method + """ + + # Setup + vm_config = { + "nic": "test_nic", + "nictype": "test_nictype", + "nicpromisc": "test_nicpromisc", + "nicnum": "1", + "intnet": "test_intnet", + "hostonlyadapter": "test_hostonlyadapter", + "natnetwork": "test_natnetwork", + "prefix": "test_prefix" + } + + mock_contains_value.return_value = True + + # Expected output + expected_output = [ + '--nic1', 'test_nic', + '--nictype1', 'test_nictype', + '--nicpromisc1', 'test_nicpromisc', + '--intnet1', 'test_prefix-test_intnet', + '--hostonlyadapter1', 'test_hostonlyadapter', + '--nat-network1', 'test_natnetwork' + ] + + # Run + result = vboxmanage._get_network_configuration(vm_config) + + # Assert + self.assertEqual(result, expected_output) + + +class IsNatNetworkConfiguredTestCase(unittest.TestCase): + """ + Class to test _is_nat_network_configured method + """ + + @patch("vboxmanage._contains_value") + def test_is_nat_network_configured_true(self, mock_contains_value): + """ + Test _is_nat_network_configured method with nat + """ + + # Setup + vm_config = { + "nicnum": "1", + "nictype": "nat", + } + + mock_contains_value.return_value = True + + # Run + result = vboxmanage._is_nat_network_configured(vm_config) + + # Assert + self.assertEqual(result, True) + + @patch("vboxmanage._contains_value") + def test_is_nat_network_configured_false(self, mock_contains_value): + """ + Test _is_nat_network_configured method with non nat + """ + + # Setup + vm_config = { + "nicnum": "1", + "nictype": "non-nat", + } + mock_contains_value.return_value = True + + # Run + result = vboxmanage._is_nat_network_configured(vm_config) + + # Assert + self.assertEqual(result, False) + + +class IsUartConfiguredTestCase(unittest.TestCase): + """ + Class to test _is_uart_configured method + """ + + @patch("vboxmanage._contains_value") + def test_is_uart_configured_true(self, mock_contains_value): + """ + Test _is_uart_configured method with all key values + """ + + # Setup + vm_config = { + "uartbase": "0x3F8", + "uartport": "1", + "uartmode": "server", + "uartpath": "/tmp/uart", + } + mock_contains_value.return_value = True + + # Run + result = vboxmanage._is_uart_configured(vm_config) + + # Assert + self.assertEqual(result, True) + + @patch("vboxmanage._contains_value") + def test_is_uart_configured_false(self, mock_contains_value): + """ + Test _is_uart_configured method without all key values + """ + + # Setup + vm_config = { + "uartbase": "0x3F8", + "uartport": "1", + "uartmode": "server", + } + mock_contains_value.side_effect = [True, True, True, False] + + # Run + result = vboxmanage._is_uart_configured(vm_config) + + # Assert + self.assertEqual(result, False) + + +class AddUartTestCase(unittest.TestCase): + """ + Class to test _add_uart method + """ + + def setUp(self): + self.hostname = "test-host" + self.vm_config = { + "uartbase": "0x3F8", + "uartport": "4", + "uartmode": "file", + "uartpath": "/path/to/uart/", + "prefix": "test-prefix" + } + + @patch('vboxmanage.env') + @patch('vboxmanage.platform', new='win32') + def test_add_uart_windows(self, mock_env): + """ + Test _add_uart method for Windows platform + """ + mock_env.PORT = 1 + + result = vboxmanage._add_uart(self.hostname, self.vm_config) + + expected = [ + '--uart1', '0x3F8', '4', '--uartmode1', 'file', '1' + ] + self.assertCountEqual(result, expected) + + @patch('vboxmanage.platform', new='linux') + def test_add_uart_linux_controller(self): + """ + Test _add_uart method for Linux platform with 'controller-0' in hostname + """ + self.hostname = "test-host-controller-0" + + result = vboxmanage._add_uart(self.hostname, self.vm_config) + + expected = [ + '--uart1', '0x3F8', '4', '--uartmode1', 'file', + '/path/to/uart/test-prefix_test-host-controller-0_serial' + ] + self.assertCountEqual(result, expected) + + @patch('vboxmanage.platform', new='linux') + def test_add_uart_linux_no_controller(self): + """ + Test _add_uart method for Linux platform without 'controller-0' in hostname + """ + result = vboxmanage._add_uart(self.hostname, self.vm_config) + + expected = [ + '--uart1', '0x3F8', '4', '--uartmode1', 'file', + '/path/to/uart/test-prefix_test-host' + ] + self.assertCountEqual(result, expected) + + @patch('vboxmanage.platform', new='linux') + def test_add_uart_linux_no_prefix(self): + """ + Test _add_uart method for Linux platform without prefix in vm_config + """ + del self.vm_config["prefix"] + + result = vboxmanage._add_uart(self.hostname, self.vm_config) + + expected = [ + '--uart1', '0x3F8', '4', '--uartmode1', 'file', + '/path/to/uart/test-host' + ] + self.assertCountEqual(result, expected) + + +class ContainsValueTestCase(unittest.TestCase): + """ + Class to test _contains_value method + """ + + def setUp(self): + self.dictionary = { + "key1": "value1", + "key2": None, + } + + def test_contains_value_key_present_value_truthy(self): + """ + Test _contains_value method with key present and value truthy + """ + result = vboxmanage._contains_value("key1", self.dictionary) + self.assertTrue(result) + + def test_contains_value_key_present_value_falsy(self): + """ + Test _contains_value method with key present and value falsy + """ + result = vboxmanage._contains_value("key2", self.dictionary) + self.assertFalse(result) + + def test_contains_value_key_absent(self): + """ + Test _contains_value method with key absent + """ + result = vboxmanage._contains_value("key3", self.dictionary) + self.assertFalse(result) + + +class PortForwardTestCase(unittest.TestCase): + """ + Class to test vboxmanage_port_forward method + """ + + @patch('vboxmanage.subprocess') + def test_vboxmanage_port_forward(self, mock_subprocess): + """ + Test vboxmanage_port_forward method + """ + + hostname = "test-host" + network = "natnet1" + local_port = 1022 + guest_port = 22 + guest_ip = "192.168.15.5" + + # Run + vboxmanage.vboxmanage_port_forward(hostname, network, local_port, guest_port, guest_ip) + + # Assert + rule_name = f"{hostname}-{guest_port}" + + # Assert delete command was called + delete_cmd = [ + "vboxmanage", + "natnetwork", + "modify", + "--netname", + network, + "--port-forward-4", + "delete", + rule_name, + ] + mock_subprocess.check_output.assert_any_call(delete_cmd, stderr=mock_subprocess.STDOUT) + + # Assert create command was called + rule = f"{rule_name}:tcp:[]:{local_port}:[{guest_ip}]:{guest_port}" + create_cmd = [ + "vboxmanage", + "natnetwork", + "modify", + "--netname", + network, + "--port-forward-4", + rule, + ] + mock_subprocess.check_output.assert_any_call(create_cmd, stderr=mock_subprocess.STDOUT) + + +class StorageCtlTestCase(unittest.TestCase): + """ + Class to test vboxmanage_storagectl method + """ + + @patch('vboxmanage.subprocess') + def test_vboxmanage_storagectl(self, mock_subprocess): + """ + Test vboxmanage_storagectl method + """ + + hostname = "test-host" + storectl = "sata" + hostiocache = "off" + + # Run + vboxmanage.vboxmanage_storagectl(hostname, storectl, hostiocache) + + # Assert + cmd = [ + "vboxmanage", + "storagectl", + hostname, + "--name", + storectl, + "--add", + storectl, + "--hostiocache", + hostiocache, + ] + mock_subprocess.check_output.assert_called_once_with(cmd, stderr=mock_subprocess.STDOUT) + + def test_vboxmanage_storagectl_no_hostname(self): + """ + Test vboxmanage_storagectl method without hostname + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_storagectl(None, "sata", "off") + + def test_vboxmanage_storagectl_no_storectl(self): + """ + Test vboxmanage_storagectl method without storectl + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_storagectl("test-host", None, "off") + + +class StorageAttachTestCase(unittest.TestCase): + """ + Class to test vboxmanage_storageattach method + """ + + @patch('vboxmanage.subprocess') + def test_vboxmanage_storageattach(self, mock_subprocess): + """ + Test vboxmanage_storageattach method + """ + + hostname = "test-host" + storage_config = { + "storectl": "sata", + "storetype": "hdd", + "disk": "disk1", + "port_num": "0", + "device_num": "0", + } + + # Run + vboxmanage.vboxmanage_storageattach(hostname, storage_config) + + # Assert + cmd = [ + "vboxmanage", + "storageattach", + hostname, + "--storagectl", + storage_config["storectl"], + "--medium", + storage_config["disk"], + "--type", + storage_config["storetype"], + "--port", + storage_config["port_num"], + "--device", + storage_config["device_num"], + ] + mock_subprocess.check_output.assert_called_once_with(cmd, stderr=mock_subprocess.STDOUT) + + def test_vboxmanage_storageattach_no_hostname(self): + """ + Test vboxmanage_storageattach method without hostname + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_storageattach(None, {"disk": "disk1"}) + + def test_vboxmanage_storageattach_no_storage_config(self): + """ + Test vboxmanage_storageattach method without storage_config + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_storageattach("test-host", None) + + def test_vboxmanage_storageattach_no_disk_in_config(self): + """ + Test vboxmanage_storageattach method without 'disk' in storage_config + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_storageattach("test-host", {}) + + +class DeleteMediumTestCase(unittest.TestCase): + """ + Class to test vboxmanage_deletemedium method + """ + + @patch('vboxmanage.subprocess') + @patch('vboxmanage.getpass') + @patch('vboxmanage.os') + def test_vboxmanage_deletemedium(self, mock_os, mock_getpass, mock_subprocess): + """ + Test vboxmanage_deletemedium method + """ + + hostname = "test-host" + vbox_home_dir = "/home" + username = "user1" + mock_getpass.getuser.return_value = username + mock_os.path.isfile.return_value = True + + vbox_home_dir = f"{vbox_home_dir}/{username}/vbox_disks/" + disk_list = ["test-host-user1.vdi"] + mock_os.listdir.return_value = disk_list + + # Run + vboxmanage.vboxmanage_deletemedium(hostname) + + # Assert + cmd = [ + "vboxmanage", + "closemedium", + "disk", + f"{vbox_home_dir}{disk_list[0]}", + "--delete", + ] + mock_subprocess.check_output.assert_called_once_with(cmd, stderr=mock_subprocess.STDOUT) + mock_os.remove.assert_called_once_with(f"{vbox_home_dir}{disk_list[0]}") + + def test_vboxmanage_deletemedium_no_hostname(self): + """ + Test vboxmanage_deletemedium method without hostname + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_deletemedium(None, "/home") + + +class CreateMediumTestCase(unittest.TestCase): + """ + Class to test vboxmanage_createmedium method + """ + + @patch('vboxmanage.subprocess') + @patch('vboxmanage.getpass') + @patch('vboxmanage.LOG') + @patch('vboxmanage.vboxmanage_storageattach') + def test_vboxmanage_createmedium(self, mock_storageattach, mock_log, mock_getpass, mock_subprocess): + """ + Test vboxmanage_createmedium method + """ + + hostname = "test-host" + disk_list = ["50000", "60000"] + vbox_home_dir = "/home" + username = "user1" + mock_getpass.getuser.return_value = username + + # Run + vboxmanage.vboxmanage_createmedium(hostname, disk_list, vbox_home_dir) + + # Assert + disk_count = 1 + port_num = 0 + device_num = 0 + for disk in disk_list: + file_name = f"{vbox_home_dir}/{username}/vbox_disks/{hostname}_disk_{disk_count}" + cmd = [ + "vboxmanage", + "createmedium", + "disk", + "--size", + disk, + "--filename", + file_name, + "--format", + "vdi", + "--variant", + "standard", + ] + # Assert the logs and command for each disk + mock_log.info.assert_any_call( + "Creating disk %s of size %s on VM %s on device %s port %s", + file_name, + disk, + hostname, + device_num, + port_num, + ) + mock_subprocess.check_output.assert_any_call(cmd, stderr=mock_subprocess.STDOUT) + # Assert the storageattach call for each disk + mock_storageattach.assert_any_call( + hostname, + { + "storectl": "sata", + "storetype": "hdd", + "disk": file_name + ".vdi", + "port_num": str(port_num), + "device_num": str(device_num), + }, + ) + # Update values for the next disk + disk_count += 1 + port_num += 1 + + def test_vboxmanage_createmedium_no_hostname(self): + """ + Test vboxmanage_createmedium method without hostname + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_createmedium(None, ["50000", "60000"], "/home") + + def test_vboxmanage_createmedium_no_disk_list(self): + """ + Test vboxmanage_createmedium method without disk_list + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_createmedium("test-host", None, "/home") + + +class StartVMTestCase(unittest.TestCase): + """ + Class to test vboxmanage_startvm method + """ + + @patch('vboxmanage.vboxmanage_list') + @patch('vboxmanage.LOG') + def test_vboxmanage_startvm(self, mock_log, mock_list): + """ + Test vboxmanage_startvm method + """ + + hostname = "test-host" + running_vms = [b"another-host", hostname.encode("utf-8")] + mock_list.return_value = running_vms + + # Run + vboxmanage.vboxmanage_startvm(hostname) + + # Assert + # Should check if VM is running and find it is running + mock_log.info.assert_any_call("Check if VM is running") + mock_list.assert_called_with(option="runningvms") + mock_log.info.assert_any_call("Host %s is already started", hostname) + + def test_vboxmanage_startvm_no_hostname(self): + """ + Test vboxmanage_startvm method without hostname + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_startvm(None) + + +class ControlVMsTestCase(unittest.TestCase): + """ + Class to test vboxmanage_controlvms method + """ + + @patch('vboxmanage.subprocess') + @patch('vboxmanage.LOG') + def test_vboxmanage_controlvms(self, mock_log, mock_subprocess): + """ + Test vboxmanage_controlvms method + """ + + hosts = ["test-host1", "test-host2"] + action = "pause" + + # Run + vboxmanage.vboxmanage_controlvms(hosts, action) + + # Assert + # Should execute action on each VM + for host in hosts: + mock_log.info.assert_any_call("Executing %s action on VM %s", action, host) + mock_subprocess.call.assert_any_call(["vboxmanage", "controlvm", host, action], + stderr=mock_subprocess.STDOUT) + + def test_vboxmanage_controlvms_no_hosts(self): + """ + Test vboxmanage_controlvms method without hosts + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_controlvms(None, "pause") + + def test_vboxmanage_controlvms_no_action(self): + """ + Test vboxmanage_controlvms method without action + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_controlvms(["test-host"], None) + + +class TakeSnapshotTestCase2(unittest.TestCase): + """ + Class to test vboxmanage_takesnapshot method + """ + + @patch('vboxmanage.subprocess') + def test_vboxmanage_takesnapshot(self, mock_subprocess): + """ + Test vboxmanage_takesnapshot method + """ + + hosts = ["test-host1", "test-host2"] + name = "test-snapshot" + + # Run + vboxmanage.vboxmanage_takesnapshot(hosts, name) + # Assert + # Should execute action on each VM + for host in hosts: + mock_subprocess.call.assert_any_call(["vboxmanage", "snapshot", host, "take", name], + stderr=mock_subprocess.STDOUT) + + def test_vboxmanage_takesnapshot_no_hosts(self): + """ + Test vboxmanage_takesnapshot method without hosts + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_takesnapshot(None, "test-snapshot") + + def test_vboxmanage_takesnapshot_no_name(self): + """ + Test vboxmanage_takesnapshot method without snapshot name + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_takesnapshot(["test-host"], None) + + +class RestoreSnapshotTestCase2(unittest.TestCase): + """ + Class to test vboxmanage_restoresnapshot method + """ + + @patch('vboxmanage.subprocess') + def test_vboxmanage_restoresnapshot(self, mock_subprocess): + """ + Test vboxmanage_restoresnapshot method + """ + + host = "test-host1" + name = "test-snapshot" + + # Run + vboxmanage.vboxmanage_restoresnapshot(host, name) + + # Assert + mock_subprocess.call.assert_called_once_with(["vboxmanage", "snapshot", host, "restore", name], + stderr=mock_subprocess.STDOUT) + + def test_vboxmanage_restoresnapshot_no_hosts(self): + """ + Test vboxmanage_restoresnapshot method without hosts + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_restoresnapshot(None, "test-snapshot") + + def test_vboxmanage_restoresnapshot_no_name(self): + """ + Test vboxmanage_restoresnapshot method without snapshot name + """ + + with self.assertRaises(AssertionError): + vboxmanage.vboxmanage_restoresnapshot(["test-host"], None) + + +if __name__ == '__main__': + unittest.main() diff --git a/virtualbox/pybox/helper/vboxmanage.py b/virtualbox/pybox/helper/vboxmanage.py index 75524be..9cb48b6 100644 --- a/virtualbox/pybox/helper/vboxmanage.py +++ b/virtualbox/pybox/helper/vboxmanage.py @@ -511,6 +511,7 @@ def _add_uart(hostname, vm_config): uart_config.extend([f'{vm_config["uartport"]}']) uart_config.extend(["--uartmode1"]) uart_config.extend([f'{vm_config["uartmode"]}']) + prefix = "" if platform in ("win32", "win64"): uart_config.extend([f"{env.PORT}"]) @@ -696,7 +697,7 @@ def vboxmanage_deletemedium(hostname, vbox_home_dir="/home"): "vboxmanage", "closemedium", "disk", - "{vbox_home_dir}{disk}", + f"{vbox_home_dir}{disk}", "--delete", ], stderr=subprocess.STDOUT, @@ -712,7 +713,7 @@ def vboxmanage_deletemedium(hostname, vbox_home_dir="/home"): ) LOG.info("Removing backing file %s", disk) try: - os.remove("{vbox_home_dir}{disk}") + os.remove(f"{vbox_home_dir}{disk}") except: # pylint: disable=bare-except pass diff --git a/virtualbox/pybox/tests/__init__.py b/virtualbox/pybox/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/virtualbox/pybox/tests/test_install_vbox.py b/virtualbox/pybox/tests/test_install_vbox.py new file mode 100644 index 0000000..44f3e25 --- /dev/null +++ b/virtualbox/pybox/tests/test_install_vbox.py @@ -0,0 +1,623 @@ +import unittest +from unittest.mock import MagicMock, patch, call, ANY +import install_vbox +from dataclasses import dataclass + + +@dataclass +class VirtualBoxOptions: + vboxnet_type: str + labname: str + + controllers: int + workers: int + storages: int + + username: str + password: str + + force_delete_lab: str + + setup_type: str + securityprofile: str + lowlatency: str + install_mode: str + + controller0_ip: str + controller1_ip: str + vboxnet_ip: str + + config_controller_ini: str + ini_oam_cidr: str + ini_oam_ip_start_address: str + ini_oam_ip_end_address: str + + ansible_controller_config: str + + nat_controller_floating_local_ssh_port: str + nat_controller0_local_ssh_port: str + nat_controller_1_local_ssh_port: str + controller_floating_ip: str + + config_files_dir: str + config_files_dir_dont_follow_links: str + lab_setup_conf: str + + script1: str + script2: str + script3: str + script4: str + script5: str + + hostiocache: str + + list_stages: str + logpath: str + custom_stages: str + from_stage: str + to_stage: str + + snapshot: str + + +class MenuSelectorTestCase(unittest.TestCase): + """ + Class to test menu_selector method + """ + + @patch("install_vbox.serial") + def test_menu_selector(self, mock_serial): + """ + Test menu_selector method + """ + + # Setup + mock_stream = MagicMock() + setup_type_aio_sx = "AIO-SX" + setup_type_aio_dx = "AIO-DX" + setup_type_other = "OTHER" + security_profile_extended = "extended" + security_profile_other = "other" + low_latency_true = True + low_latency_false = False + install_mode_serial = "serial" + install_mode_graphical = "graphical" + + permutations = [ + ((setup_type_aio_sx, security_profile_extended, low_latency_true, install_mode_serial), 6), + ((setup_type_aio_sx, security_profile_other, low_latency_false, install_mode_serial), 4), + ((setup_type_aio_dx, security_profile_extended, low_latency_true, install_mode_graphical), 7), + ((setup_type_aio_dx, security_profile_other, low_latency_false, install_mode_graphical), 5), + ((setup_type_other, security_profile_extended, low_latency_true, install_mode_serial), 4), + ((setup_type_other, security_profile_other, low_latency_false, install_mode_graphical), 4) + ] + + # Run + accumulated_calls = 0 + for permutation, calls_number in permutations: + with self.subTest(permutation=permutation): + install_vbox.menu_selector(mock_stream, *permutation) + + # Assert + accumulated_calls += calls_number + self.assertEqual(mock_serial.send_bytes.call_count, accumulated_calls) + + +class SetupNetworkingTestCase(unittest.TestCase): + """ + Class to test setup_networking method + """ + + mock_stream = MagicMock() + mock_ip = "192.168.1.1" + mock_gateway_ip = "192.168.1.254" + mock_password = "password" + + @patch("install_vbox.serial") + @patch("install_vbox.LOG") + @patch("install_vbox.subprocess.call") + @patch("install_vbox.host_helper") + def test_setup_networking( + self, + mock_host_helper, + mock_subprocess_call, + mock_log, + mock_serial, + ): + """ + Test setup_networking + """ + + # Setup + mock_subprocess_call.return_value = 0 + v_box = VirtualBoxOptions + v_box.vboxnet_type = "hostonly" + install_vbox.V_BOX_OPTIONS = v_box + + # Run + install_vbox.setup_networking(self.mock_stream, self.mock_ip, self.mock_gateway_ip, password=self.mock_password) + + # Assert + mock_serial.send_bytes.assert_any_call(self.mock_stream, + "/sbin/ip address list", + prompt=self.mock_ip, + fail_ok=True, + timeout=10) + mock_host_helper.check_password.assert_has_calls([ + call(self.mock_stream, password=self.mock_password), + call(self.mock_stream, password=self.mock_password), + call(self.mock_stream, password=self.mock_password) + ]) + mock_serial.send_bytes.assert_any_call(self.mock_stream, + f"sudo /sbin/ip addr add {self.mock_ip}/24 dev enp0s3", + expect_prompt=False) + mock_serial.send_bytes.assert_any_call(self.mock_stream, + "sudo /sbin/ip link set enp0s3 up", + expect_prompt=False) + mock_serial.send_bytes.assert_any_call(self.mock_stream, + f"sudo route add default gw {self.mock_gateway_ip}", + expect_prompt=False) + self.assertEqual(mock_subprocess_call.call_args_list, [call(['ping', '-c', '1', self.mock_ip])]) + mock_log.info.assert_any_call("Ping succeeded!") + + +class FixNetworkingTestCase(unittest.TestCase): + """ + Class to test fix_networking method + """ + + mock_stream = MagicMock() + mock_release_r2 = "R2" + mock_release_r3 = "R3" + mock_password = "Li69nux*" + + @patch("install_vbox.serial") + @patch("install_vbox.host_helper") + def test_fix_networking_r2(self, mock_host_helper, mock_serial): + """ + Test fix_networking for release R2 + """ + + # Run + install_vbox.fix_networking(self.mock_stream, self.mock_release_r2, self.mock_password) + + # Assert + mock_serial.send_bytes.assert_any_call(self.mock_stream, + "sudo /sbin/ip link set eth0 down", + expect_prompt=False) + mock_host_helper.check_password.assert_called_with(self.mock_stream, password=self.mock_password) + mock_serial.send_bytes.assert_any_call( + self.mock_stream, + "sudo /sbin/ip link set eth0 up", + expect_prompt=False) + mock_host_helper.check_password.assert_called_with(self.mock_stream, password=self.mock_password) + + @patch("install_vbox.serial") + @patch("install_vbox.host_helper") + def test_fix_networking_not_r2(self, mock_host_helper, mock_serial): + """ + Test fix_networking for releases other than R2 + """ + + # Run + install_vbox.fix_networking(self.mock_stream, self.mock_release_r3, self.mock_password) + + # Assert + mock_serial.send_bytes.assert_any_call(self.mock_stream, + "sudo /sbin/ip link set enp0s3 down", + expect_prompt=False) + mock_host_helper.check_password.assert_called_with(self.mock_stream, password=self.mock_password) + mock_serial.send_bytes.assert_any_call( + self.mock_stream, + "sudo /sbin/ip link set enp0s3 up", + expect_prompt=False) + mock_host_helper.check_password.assert_called_with(self.mock_stream, password=self.mock_password) + + +class InstallController0TestCase(unittest.TestCase): + """ + Class to test install_controller_0 method + """ + + mock_stream = MagicMock() + mock_menu_select_dict = { + "setup_type": "Duplex", + "securityprofile": "Standard", + "lowlatency": False, + "install_mode": "standard" + } + mock_network_dict = { + "ctrlr0_ip": "192.168.1.2", + "gateway_ip": "192.168.1.1", + "username": "wrsroot", + "password": "Li69nux*" + } + + @patch("install_vbox.serial") + @patch("install_vbox.host_helper") + @patch("install_vbox.menu_selector") + @patch("install_vbox.setup_networking") + def test_install_controller_0( + self, mock_setup_networking, mock_menu_selector, mock_host_helper, mock_serial + ): + """ + Test install_controller_0 + """ + + # Run + install_vbox.install_controller_0(self.mock_stream, self.mock_menu_select_dict, self.mock_network_dict) + + # Assert + mock_menu_selector.assert_called_once_with( + self.mock_stream, + self.mock_menu_select_dict["setup_type"], + self.mock_menu_select_dict["securityprofile"], + self.mock_menu_select_dict["lowlatency"], + self.mock_menu_select_dict["install_mode"] + ) + mock_serial.expect_bytes.assert_called_with( + self.mock_stream, + "login:", + timeout=ANY) + mock_host_helper.change_password.assert_called_once_with( + self.mock_stream, + username=self.mock_network_dict["username"], + password=self.mock_network_dict["password"] + ) + mock_host_helper.disable_logout.assert_called_once_with(self.mock_stream) + mock_setup_networking.assert_called_once_with( + self.mock_stream, + self.mock_network_dict["ctrlr0_ip"], + self.mock_network_dict["gateway_ip"], + password=self.mock_network_dict["password"] + ) + + @patch("serial.LOG.info") + @patch("install_vbox.serial") + @patch("install_vbox.host_helper") + @patch("install_vbox.time") + @patch("install_vbox.menu_selector") + @patch("install_vbox.setup_networking") + def test_install_controller_0_exception( + self, mock_setup_networking, mock_menu_selector, mock_time, mock_host_helper, mock_serial, mock_log_info + ): + """ + Test install_controller_0 when an exception occurs during login + """ + + # Setup + mock_serial.expect_bytes.side_effect = [Exception(), None] + mock_time.time.return_value = 0 + + # Run + install_vbox.install_controller_0(self.mock_stream, self.mock_menu_select_dict, self.mock_network_dict) + + # Assert + self.assertEqual(mock_serial.expect_bytes.call_count, 2) + self.assertEqual(mock_serial.expect_bytes.call_args_list[1][1]["timeout"], ANY) + mock_menu_selector.assert_called_once() + mock_setup_networking.assert_called_once() + mock_host_helper.change_password.assert_called_once() + mock_host_helper.disable_logout.assert_called_once() + self.assertEqual(mock_log_info.call_count, 4) + + +class DeleteLabTestCase(unittest.TestCase): + """ + Class to test delete_lab method + """ + + mock_labname = "test_lab" + mock_node_list = ["vm1", "vm2", "vm3"] + + @patch("install_vbox.vboxmanage") + @patch("install_vbox.LOG") + @patch("install_vbox.time") + @patch("install_vbox.input", return_value="y") + def test_delete_lab_not_force( + self, mock_input, mock_time, mock_log, mock_vboxmanage + ): + """ + Test delete_lab with force=False and user input 'y' + """ + + # Setup + mock_vboxmanage.get_all_vms.return_value = self.mock_node_list + + # Run + install_vbox.delete_lab(self.mock_labname, force=False) + + # Assert + mock_vboxmanage.get_all_vms.assert_called_once_with(self.mock_labname, option="vms") + mock_input.assert_called_once_with() + mock_log.info.assert_has_calls([ + call("This will delete lab %s with vms: %s", self.mock_labname, self.mock_node_list), + call("Continue? (y/N)"), + call("#### Deleting lab %s.", self.mock_labname), + call("VMs in lab: %s.", self.mock_node_list), + ]) + mock_vboxmanage.vboxmanage_controlvms.assert_called_once_with(self.mock_node_list, "poweroff") + mock_time.sleep.assert_called_once_with(2) + mock_vboxmanage.vboxmanage_deletevms.assert_called_once_with(self.mock_node_list) + + @patch("install_vbox.LOG") + @patch("install_vbox.vboxmanage") + @patch("install_vbox.input", return_value="n") + def test_delete_lab_not_force_abort( + self, mock_input, mock_vboxmanage, mock_log + ): + """ + Test delete_lab with force=False and user input 'n' + """ + + # Setup + mock_vboxmanage.get_all_vms.return_value = self.mock_node_list + + # Run + with self.assertRaises(SystemExit): + install_vbox.delete_lab(self.mock_labname, force=False) + + # Assert + mock_input.assert_called_once_with() + mock_log.info.assert_called_with("Aborting!") + + @patch("install_vbox.vboxmanage") + @patch("install_vbox.LOG") + @patch("install_vbox.time") + def test_delete_lab_force( + self, mock_time, mock_log, mock_vboxmanage + ): + """ + Test delete_lab with force=True + """ + + # Setup + mock_vboxmanage.get_all_vms.return_value = self.mock_node_list + + # Run + install_vbox.delete_lab(self.mock_labname, force=True) + + # Assert + mock_vboxmanage.get_all_vms.assert_called_once_with(self.mock_labname, option="vms") + mock_log.info.assert_has_calls([ + call("#### Deleting lab %s.", self.mock_labname), + call("VMs in lab: %s.", self.mock_node_list), + ]) + mock_vboxmanage.vboxmanage_controlvms.assert_called_once_with(self.mock_node_list, "poweroff") + mock_time.sleep.assert_called_once_with(2) + mock_vboxmanage.vboxmanage_deletevms.assert_called_once_with(self.mock_node_list) + + +class GetDiskSizesTestCase(unittest.TestCase): + """ + Class to test get_disk_sizes method + """ + + def test_get_disk_sizes_valid_input(self): + """ + Test get_disk_sizes with valid input + """ + + # Setup + valid_input = "100,200,300" + + # Run + result = install_vbox.get_disk_sizes(valid_input) + + # Assert + self.assertEqual(result, ['100', '200', '300']) + + @patch("install_vbox.LOG") + def test_get_disk_sizes_invalid_input(self, mock_log): + """ + Test get_disk_sizes with invalid input + """ + + # Setup + invalid_input = "-100,200,300" + + # Assert + with self.assertRaises(Exception) as context: + install_vbox.get_disk_sizes(invalid_input) + + self.assertTrue("Disk sizes must be a comma separated list of positive integers." in str(context.exception)) + mock_log.info.assert_called_once_with("Disk sizes must be a comma separated list of positive integers.") + + +class TestCreateLab(unittest.TestCase): + """ + Class to test create_lab method + """ + + # This function needs to be refactored in order to be tested + pass + + +class TestGetHostnames(unittest.TestCase): + """ + Class to test get_hostnames method + """ + + @patch.object(install_vbox, 'V_BOX_OPTIONS', create=True) + def test_get_hostnames(self, mock_options): + """ + Test get_hostnames + """ + + # Setup + mock_options.controllers = 2 + mock_options.workers = 2 + mock_options.storages = 2 + mock_options.labname = "test" + + expected = { + 'test-controller-0': f'controller-{id}', + 'test-controller-1': f'controller-{id}', + 'test-worker-0': f'worker-{id}', + 'test-worker-1': f'worker-{id}', + 'test-storage-0': 'storage-0', + 'test-storage-1': 'storage-1', + } + + # Run and Assert + self.assertEqual(install_vbox.get_hostnames(), expected) + + @patch.object(install_vbox, 'V_BOX_OPTIONS', create=True) + def test_get_hostnames_with_ignore(self, mock_options): + """ + Test get_hostnames with ignore + """ + + # Setup + mock_options.controllers = 2 + mock_options.workers = 2 + mock_options.storages = 2 + mock_options.labname = "test" + + ignore = ['test-controller-0', 'test-worker-1'] + expected = { + 'test-controller-1': f'controller-{id}', + 'test-worker-0': f'worker-{id}', + 'test-storage-0': 'storage-0', + 'test-storage-1': 'storage-1', + } + + # Run and Assert + self.assertEqual(install_vbox.get_hostnames(ignore=ignore), expected) + + @patch.object(install_vbox, 'V_BOX_OPTIONS', create=True) + def test_get_hostnames_with_selected_personalities(self, mock_options): + """ + Test get_hostnames with selected personalities + """ + + # Setup + mock_options.controllers = 2 + mock_options.workers = 2 + mock_options.storages = 2 + mock_options.labname = "test" + + personalities = ['controller', 'worker'] + expected = { + 'test-controller-0': f'controller-{id}', + 'test-controller-1': f'controller-{id}', + 'test-worker-0': f'worker-{id}', + 'test-worker-1': f'worker-{id}', + } + + # Run and Assert + self.assertEqual(install_vbox.get_hostnames(personalities=personalities), expected) + + +class TestGetPersonalities(unittest.TestCase): + """ + Class to test get_personalities method + """ + + @patch.object(install_vbox, 'V_BOX_OPTIONS', create=True) + def test_get_personalities(self, mock_options): + """ + Test get_personalities + """ + + # Setup + mock_options.controllers = 2 + mock_options.workers = 2 + mock_options.storages = 2 + mock_options.labname = "test" + + expected = { + 'test-controller-0': 'controller', + 'test-controller-1': 'controller', + 'test-worker-0': 'worker', + 'test-worker-1': 'worker', + 'test-storage-0': 'storage', + 'test-storage-1': 'storage', + } + + # Run and Assert + self.assertEqual(install_vbox.get_personalities(), expected) + + @patch.object(install_vbox, 'V_BOX_OPTIONS', create=True) + def test_get_personalities_with_ignore(self, mock_options): + """ + Test get_personalities with ignore + """ + + # Setup + mock_options.controllers = 2 + mock_options.workers = 2 + mock_options.storages = 2 + mock_options.labname = "test" + + ignore = ['test-controller-0', 'test-worker-1'] + expected = { + 'test-controller-1': 'controller', + 'test-worker-0': 'worker', + 'test-storage-0': 'storage', + 'test-storage-1': 'storage', + } + + # Run and Assert + self.assertEqual(install_vbox.get_personalities(ignore=ignore), expected) + + +class TestCreateHostBulkAdd(unittest.TestCase): + """ + Class to test create_host_bulk_add method + """ + + @patch.object(install_vbox, 'V_BOX_OPTIONS', create=True) + @patch.object(install_vbox, 'vboxmanage', create=True) + @patch.object(install_vbox, 'get_personalities') + @patch.object(install_vbox, 'get_hostnames') + def test_create_host_bulk_add(self, mock_get_hostnames, mock_get_personalities, mock_vboxmanage, mock_options): + """ + Test create_host_bulk_add + """ + + # Setup + mock_options.labname = "test" + mock_vboxmanage.get_all_vms.return_value = ['test-controller-0', 'test-controller-1', 'test-worker-0', + 'test-storage-0'] + mock_vboxmanage.vboxmanage_showinfo.return_value = b'macaddress2="080027C95571"\n' + mock_get_personalities.return_value = { + 'test-controller-1': 'controller', + 'test-worker-0': 'worker', + 'test-storage-0': 'storage', + } + mock_get_hostnames.return_value = { + 'test-controller-1': 'controller-1', + 'test-worker-0': 'worker-0', + 'test-storage-0': 'storage-0', + } + expected_xml = ( + '\n' + '\n' + ' \n' + ' controller-1\n' + ' controller\n' + ' 08:00:27:C9:55:71\n' + ' \n' + ' \n' + ' worker-0\n' + ' worker\n' + ' 08:00:27:C9:55:71\n' + ' \n' + ' \n' + ' storage-0\n' + ' storage\n' + ' 08:00:27:C9:55:71\n' + ' \n' + '\n' + ) + + # Run + actual_xml = install_vbox.create_host_bulk_add() + + # Assert + self.assertEqual(actual_xml, expected_xml) + + +if __name__ == '__main__': + unittest.main() diff --git a/virtualbox/pybox/utils/install_log.py b/virtualbox/pybox/utils/install_log.py index 37ee8ec..6ec919b 100644 --- a/virtualbox/pybox/utils/install_log.py +++ b/virtualbox/pybox/utils/install_log.py @@ -27,15 +27,15 @@ def init_logging(lab_name, log_path=None): LOGPATH constant in the env module. """ - global LOG, LOG_DIR # pylint: disable=global-statement, global-variable-not-assigned + global LOG, LOG_DIR # pylint: disable=global-statement, global-variable-not-assigned if not log_path: log_path = LOGPATH lab_log_path = log_path + "/" + lab_name # Setup log sub-directory for current run current_time = datetime.datetime.now() - LOG_DIR = f"{lab_log_path}/{current_time.year}_{current_time.month}_\ - {current_time.day}_{current_time.hour}_{current_time.minute}_{current_time.second}" + LOG_DIR = f"{lab_log_path}/{current_time.year}_{current_time.month}_" \ + f"{current_time.day}_{current_time.hour}_{current_time.minute}_{current_time.second}" if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) @@ -53,7 +53,7 @@ def init_logging(lab_name, log_path=None): # Create symbolic link to latest logs of this lab try: os.unlink(lab_log_path + "/latest") - except: # pylint: disable=bare-except + except: # pylint: disable=bare-except pass os.symlink(LOG_DIR, lab_log_path + "/latest") diff --git a/virtualbox/pybox/utils/kpi.py b/virtualbox/pybox/utils/kpi.py index efa291e..9528d95 100644 --- a/virtualbox/pybox/utils/kpi.py +++ b/virtualbox/pybox/utils/kpi.py @@ -14,13 +14,14 @@ STAGES = [] METRICS = {} START = 0 + def init_kpi_metrics(): """ Initializes the global variable START with the current time to start tracking the duration of a program. """ - global START # pylint: disable=global-statement + global START # pylint: disable=global-statement START = time.time() @@ -45,7 +46,7 @@ def get_formated_time(sec): def set_kpi_metric(metric, duration): """Sets the duration of a metric and adds the metric to the global list of STAGES.""" - global METRICS, STAGES # pylint: disable=global-statement, global-variable-not-assigned + global METRICS, STAGES # pylint: disable=global-statement, global-variable-not-assigned METRICS[metric] = duration STAGES.append(metric) @@ -67,10 +68,10 @@ def get_kpi_str(metric): msg = "" if metric in STAGES: sec = METRICS[metric] - msg += (f" Time in stage '{metric}': {get_formated_time(sec)} \n") + msg += f" Time in stage '{metric}': {get_formated_time(sec)} \n" elif metric == 'total' and START: duration = time.time() - START - msg += (f" Total time: {get_formated_time(duration)}\n") + msg += f" Total time: {get_formated_time(duration)}\n" return msg diff --git a/virtualbox/pybox/utils/serial.py b/virtualbox/pybox/utils/serial.py index 271990e..5537c6d 100644 --- a/virtualbox/pybox/utils/serial.py +++ b/virtualbox/pybox/utils/serial.py @@ -47,7 +47,8 @@ def connect(hostname, port=10000, prefix=""): # disconnect(sock) sock = None # TODO (WEI): double check this # pylint: disable=fixme - sock.setblocking(0) + if sock: + sock.setblocking(False) return sock @@ -67,7 +68,7 @@ def disconnect(sock): # pylint: disable=too-many-arguments, too-many-locals, too-many-branches -def get_output(stream, prompts=None, timeout=5, log=True, as_lines=True, flush=True): +def get_output(stream, cmd, prompts=None, timeout=5, log=True, as_lines=True, flush=True): # pylint: disable=fixme # TODO: Not tested, will not work if kernel or other processes throw data on stdout or stderr """ @@ -96,7 +97,7 @@ def get_output(stream, prompts=None, timeout=5, log=True, as_lines=True, flush=T pass # Send command - stream.sendall("{cmd}\n".encode('utf-8')) + stream.sendall(f"{cmd}\n".encode('utf-8')) # Get response patterns = [] diff --git a/virtualbox/pybox/utils/sftp.py b/virtualbox/pybox/utils/sftp.py index 7be9c6f..efa4c6f 100644 --- a/virtualbox/pybox/utils/sftp.py +++ b/virtualbox/pybox/utils/sftp.py @@ -23,13 +23,14 @@ def sftp_send(source, destination, client_dict): remote_host = client_dict["remote_host"] username = client_dict["username"] + sftp_client = None LOG.info("Connecting to server %s with username %s", remote_host, username) ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ## TODO(WEI): need to make this timeout handling better + # TODO(WEI): need to make this timeout handling better retry = 0 while retry < 8: try: @@ -49,9 +50,10 @@ def sftp_send(source, destination, client_dict): time.sleep(10) LOG.info("Sending file from %s to %s", source, destination) - sftp_client.put(source, destination) - LOG.info("Done") - sftp_client.close() + if sftp_client: + sftp_client.put(source, destination) + LOG.info("Done") + sftp_client.close() ssh_client.close() @@ -69,9 +71,9 @@ def send_dir(params_dict): - destination (str): The remote directory to copy `source` into. - username (str): The username for the SSH connection. - password (str): The password for the SSH connection. - - follow_links (bool, optional): Whether or not to follow symbolic links when + - follow_links (bool, optional): Whether to follow symbolic links when copying files. Default is True. - - clear_known_hosts (bool, optional): Whether or not to clear the known_hosts file + - clear_known_hosts (bool, optional): Whether to clear the known_hosts file before making the SSH connection. Default is True. Raises: @@ -144,20 +146,19 @@ def send_dir_fallback(source, remote_host, destination, username, password): allow_agent=False ) sftp_client = ssh_client.open_sftp() - path = '' send_img = False for items in os.listdir(source): - path = source+items + path = source + items if os.path.isfile(path): if items.endswith('.img'): - remote_path = destination+'images/'+items + remote_path = destination + 'images/' + items LOG.info("Sending file from %s to %s", path, remote_path) sftp_client.put(path, remote_path) send_img = True elif items.endswith('.iso'): pass else: - remote_path = destination+items + remote_path = destination + items LOG.info("Sending file from %s to %s", path, remote_path) sftp_client.put(path, remote_path) LOG.info("Done") diff --git a/virtualbox/pybox/utils/tests/__init__.py b/virtualbox/pybox/utils/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/virtualbox/pybox/utils/tests/test_install_log.py b/virtualbox/pybox/utils/tests/test_install_log.py new file mode 100644 index 0000000..01baab1 --- /dev/null +++ b/virtualbox/pybox/utils/tests/test_install_log.py @@ -0,0 +1,94 @@ +""" +Unit tests related to install_log +""" + +import unittest +from unittest.mock import patch +import install_log +import datetime + + +class InitLoggingTestCase(unittest.TestCase): + """ + Class to test init_logging method + """ + + @patch("os.makedirs") + @patch("os.path.exists", return_value=False) + @patch("os.symlink") + @patch("os.unlink") + @patch("logging.FileHandler") + @patch("logging.StreamHandler") + def test_init_logging(self, + mock_stream_handler, + mock_file_handler, + mock_unlink, + mock_symlink, + mock_exists, + mock_makedirs): + """ + Test init_logging method + """ + + # Setup + lab_name = "lab1" + log_path = "/some/log/path" + current_time = datetime.datetime(2023, 6, 6, 10, 20, 30) + expected_log_dir = f"{log_path}/{lab_name}/{current_time.year}_{current_time.month}_{current_time.day}_" \ + f"{current_time.hour}_{current_time.minute}_{current_time.second}" + + with patch('install_log.datetime') as mock_date: + mock_date.datetime.now.return_value = current_time + + # Run + install_log.init_logging(lab_name, log_path) + + # Assert + mock_exists.assert_called_once_with(expected_log_dir) + mock_makedirs.assert_called_once_with(expected_log_dir) + mock_file_handler.assert_called_once_with(f"{expected_log_dir}/install.log") + mock_stream_handler.assert_called_once_with() + mock_unlink.assert_called_once_with(f"{log_path}/{lab_name}/latest") + mock_symlink.assert_called_once_with(expected_log_dir, f"{log_path}/{lab_name}/latest") + + @patch("os.makedirs") + @patch("os.path.exists", return_value=False) + @patch("os.symlink") + @patch("os.unlink", side_effect=FileNotFoundError) + @patch("logging.FileHandler") + @patch("logging.StreamHandler") + def test_init_logging_no_latest_link(self, + mock_stream_handler, + mock_file_handler, + mock_unlink, + mock_symlink, + mock_exists, + mock_makedirs): + """ + Test init_logging method when there's no latest link + """ + + # Setup + lab_name = "lab1" + log_path = "/some/log/path" + current_time = datetime.datetime(2023, 6, 6, 10, 20, 30) + expected_log_dir = f"{log_path}/{lab_name}/{current_time.year}_{current_time.month}_{current_time.day}_" \ + f"{current_time.hour}_{current_time.minute}_{current_time.second}" + + with patch('install_log.datetime') as mock_date: + mock_date.datetime.now.return_value = current_time + + # Run + install_log.init_logging(lab_name, log_path) + + # Assert + mock_exists.assert_called_once_with(expected_log_dir) + mock_makedirs.assert_called_once_with(expected_log_dir) + mock_file_handler.assert_called_once_with(f"{expected_log_dir}/install.log") + mock_stream_handler.assert_called_once_with() + mock_unlink.assert_called_once_with(f"{log_path}/{lab_name}/latest") + mock_symlink.assert_called_once_with(expected_log_dir, f"{log_path}/{lab_name}/latest") + + +if __name__ == '__main__': + unittest.main() diff --git a/virtualbox/pybox/utils/tests/test_kpi.py b/virtualbox/pybox/utils/tests/test_kpi.py new file mode 100644 index 0000000..37615c2 --- /dev/null +++ b/virtualbox/pybox/utils/tests/test_kpi.py @@ -0,0 +1,247 @@ +import unittest +from unittest.mock import patch, call, ANY +import kpi + + +class InitKpiMetricsTestCase(unittest.TestCase): + """ + Class to test init_kpi_metrics method + """ + + @patch("time.time") + def test_init_kpi_metrics(self, mock_time): + """ + Test init_kpi_metrics method + """ + + # Setup + mock_time.return_value = 12345.67 + + # Run + kpi.init_kpi_metrics() + + # Assert + self.assertEqual(kpi.START, 12345.67) + + +class GetFormatedTimeTestCase(unittest.TestCase): + """ + Class to test get_formated_time method + """ + + def test_get_formated_time_hours(self): + """ + Test get_formated_time method with hours + """ + + # Setup + sec = 3665.67 + + # Run + result = kpi.get_formated_time(sec) + + # Assert + self.assertEqual(result, "1h 1m 5.67s") + + def test_get_formated_time_minutes(self): + """ + Test get_formated_time method with minutes + """ + + # Setup + sec = 65.67 + + # Run + result = kpi.get_formated_time(sec) + + # Assert + self.assertEqual(result, "1m 5.67s") + + def test_get_formated_time_seconds(self): + """ + Test get_formated_time method with seconds + """ + + # Setup + sec = 5.67 + + # Run + result = kpi.get_formated_time(sec) + + # Assert + self.assertEqual(result, "5.67s") + + +class SetKpiMetricTestCase(unittest.TestCase): + """ + Class to test set_kpi_metric method + """ + + def setUp(self): + kpi.METRICS = {} + kpi.STAGES = [] + + def test_set_kpi_metric(self): + """ + Test set_kpi_metric method + """ + + # Setup + metric = "some_metric" + duration = 123.45 + + # Run + kpi.set_kpi_metric(metric, duration) + + # Assert + self.assertEqual(kpi.METRICS[metric], duration) + self.assertIn(metric, kpi.STAGES) + + +class PrintKpiTestCase(unittest.TestCase): + """ + Class to test print_kpi method + """ + + @patch("kpi.LOG") + @patch("kpi.get_formated_time", return_value="1m 23.45s") + def test_print_kpi_metric(self, mock_get_formated_time, mock_log): + """ + Test print_kpi method with a metric + """ + + # Setup + kpi.STAGES = ["some_metric"] + kpi.METRICS = {"some_metric": 123.45} + metric = "some_metric" + + # Run + kpi.print_kpi(metric) + + # Assert + mock_get_formated_time.assert_called_once_with(123.45) + mock_log.info.assert_called_once_with(" Time in stage '%s': %s ", metric, "1m 23.45s") + + @patch("kpi.LOG") + @patch("kpi.get_formated_time", return_value="2m 46.90s") + @patch("time.time") + def test_print_kpi_total(self, mock_time, mock_get_formated_time, mock_log): + """ + Test print_kpi method with total + """ + + # Setup + kpi.START = 20 + metric = "total" + mock_time.return_value = 166.90 + + # Run + kpi.print_kpi(metric) + + # Assert + mock_get_formated_time.assert_called_once_with(146.9) + mock_log.info.assert_called_once_with(" Total time: %s", "2m 46.90s") + + +class GetKpiStrTestCase(unittest.TestCase): + """ + Class to test get_kpi_str method + """ + + @patch("kpi.get_formated_time", return_value="1m 23.45s") + def test_get_kpi_str_metric(self, mock_get_formated_time): + """ + Test get_kpi_str method with a metric + """ + + # Setup + kpi.STAGES = ["some_metric"] + kpi.METRICS = {"some_metric": 123.45} + metric = "some_metric" + + # Run + result = kpi.get_kpi_str(metric) + + # Assert + mock_get_formated_time.assert_called_once_with(123.45) + self.assertEqual(result, " Time in stage 'some_metric': 1m 23.45s \n") + + @patch("kpi.get_formated_time", return_value="2m 46.90s") + @patch("time.time") + def test_get_kpi_str_total(self, mock_time, mock_get_formated_time): + """ + Test get_kpi_str method with total + """ + + # Setup + kpi.START = 20 + metric = "total" + mock_time.return_value = 166.90 + + # Run + result = kpi.get_kpi_str(metric) + + # Assert + mock_get_formated_time.assert_called_once_with(146.9) + self.assertEqual(result, " Total time: 2m 46.90s\n") + + +class GetKpiMetricsStrTestCase(unittest.TestCase): + """ + Class to test get_kpi_metrics_str method + """ + + @patch("kpi.get_kpi_str") + def test_get_kpi_metrics_str(self, mock_get_kpi_str): + """ + Test get_kpi_metrics_str method + """ + + # Setup + kpi.STAGES = ["metric1", "metric2"] + kpi.METRICS = {"metric1": 123.45, "metric2": 166.9} + kpi.START = 20 + mock_get_kpi_str.side_effect = [" Time in stage 'metric1': 1m 23.45s \n", + " Time in stage 'metric2': 2m 46.90s \n", + " Total time: 4m 10.35s\n"] + + # Run + result = kpi.get_kpi_metrics_str() + + # Assert + expected_result = ("===================== Metrics ====================\n" + " Time in stage 'metric1': 1m 23.45s \n" + " Time in stage 'metric2': 2m 46.90s \n" + " Total time: 4m 10.35s\n" + "===============================================\n") + self.assertEqual(result, expected_result) + + +class PrintKpiMetricsTestCase(unittest.TestCase): + """ + Class to test print_kpi_metrics method + """ + + @patch("serial.LOG.info") + @patch("kpi.print_kpi") + def test_print_kpi_metrics(self, mock_print_kpi, mock_log_info): + """ + Test print_kpi_metrics method + """ + + # Setup + kpi.STAGES = ["metric1", "metric2"] + kpi.METRICS = {"metric1": 123.45, "metric2": 166.9} + kpi.START = 20 + + # Run + kpi.print_kpi_metrics() + + # Assert + calls = [call("metric1"), call("metric2"), call('total')] + mock_print_kpi.assert_has_calls(calls) + mock_log_info.assert_any_call(ANY) + + +if __name__ == '__main__': + unittest.main() diff --git a/virtualbox/pybox/utils/tests/test_serial.py b/virtualbox/pybox/utils/tests/test_serial.py new file mode 100644 index 0000000..ff27d8c --- /dev/null +++ b/virtualbox/pybox/utils/tests/test_serial.py @@ -0,0 +1,201 @@ +import unittest +from unittest.mock import MagicMock, patch, ANY +import serial +import socket + + +class ConnectTestCase(unittest.TestCase): + """ + Class to test connect method + """ + + @patch("serial.LOG.info") + @patch("socket.socket") + def test_connect_unix(self, mock_socket, mock_log_info): + """ + Test connect method for Unix platform + """ + + # Setup + serial.platform = 'linux' + mock_socket.return_value = mock_socket + hostname = 'hostname' + + # Run + result = serial.connect(hostname) + + # Assert + mock_socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_STREAM) + mock_socket.connect.assert_called_once_with(f"/tmp/{hostname}") + self.assertEqual(result, mock_socket) + + @patch("serial.LOG.info") + @patch("socket.socket") + def test_connect_windows(self, mock_socket, mock_log_info): + """ + Test connect method for Windows platform + """ + + # Setup + serial.platform = 'win32' + mock_socket.return_value = mock_socket + hostname = 'hostname' + port = 10000 + + # Run + result = serial.connect(hostname, port) + + # Assert + mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) + mock_socket.connect.assert_called_once_with(('localhost', port)) + self.assertEqual(result, mock_socket) + + @patch("serial.LOG.info") + @patch("socket.socket") + def test_connect_fail(self, mock_socket, mock_log_info): + """ + Test connect method when connection fails + """ + + # Setup + serial.platform = 'linux' + mock_socket.return_value = mock_socket + hostname = 'hostname' + mock_socket.connect.side_effect = Exception + + # Run + result = serial.connect(hostname) + + # Assert + mock_socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_STREAM) + mock_socket.connect.assert_called_once_with(f"/tmp/{hostname}") + mock_log_info.assert_called_with("Connection failed") + self.assertIsNone(result) + + +class DisconnectTestCase(unittest.TestCase): + """ + Class to test disconnect method + """ + + @patch("serial.LOG.info") + def test_disconnect(self, mock_log_info): + """ + Test disconnect method + """ + + # Setup + sock = MagicMock() + + # Run + serial.disconnect(sock) + + # Assert + sock.shutdown.assert_called_once_with(socket.SHUT_RDWR) + sock.close.assert_called_once() + mock_log_info.assert_any_call(ANY) + + +# TODO This test is just for coverage purposes, this function needs a heavy refactoring +class GetOutputTestCase(unittest.TestCase): + """ + Class to test get_output method + """ + + @patch("serial.LOG.info") + @patch("serial.time") + def test_get_output(self, mock_time, mock_log_info): + """ + Test get_output method + """ + + # Setup + stream = MagicMock() + stream.poll.return_value = None + stream.gettimeout.return_value = 1 + stream.recv.side_effect = ['cmd\n', 'test\n', ':~$ '] + mock_time.time.side_effect = [0, 1, 2, 3] + cmd = "cmd" + prompts = [':~$ ', ':~# ', ':/home/wrsroot# ', '(keystone_.*)]$ ', '(keystone_.*)]# '] + timeout = 2 + log = True + as_lines = True + flush = True + + # Run + with self.assertRaises(Exception): + serial.get_output(stream, cmd, prompts, timeout, log, as_lines, flush) + + # Assert + stream.sendall.assert_called_once_with(f"{cmd}\n".encode('utf-8')) + mock_log_info.assert_any_call('cmd') + mock_log_info.assert_any_call('test') + + +class ExpectBytesTestCase(unittest.TestCase): + """ + Class to test expect_bytes method + """ + + @patch("serial.LOG.info") + @patch("serial.stdout.write") + def test_expect_bytes(self, mock_stdout_write, mock_log_info): + """ + Test expect_bytes method + """ + + # Setup + stream = MagicMock() + stream.expect_bytes.return_value = None + stream.poll.return_value = None + text = "Hello, world!" + timeout = 180 + fail_ok = False + flush = True + + # Run + result = serial.expect_bytes(stream, text, timeout, fail_ok, flush) + + # Assert + self.assertEqual(result, 0) + stream.expect_bytes.assert_called_once_with(f"{text}".encode('utf-8'), timeout=timeout) + mock_stdout_write.assert_any_call('\n') + mock_log_info.assert_any_call("Expecting text within %s minutes: %s\n", timeout / 60, text) + mock_log_info.assert_any_call("Found expected text: %s", text) + + +class SendBytesTestCase(unittest.TestCase): + """ + Class to test send_bytes method + """ + + @patch("serial.LOG.info") + @patch("serial.expect_bytes") + def test_send_bytes(self, mock_expect_bytes, mock_log_info): + """ + Test send_bytes method + """ + + # Setup + stream = MagicMock() + stream.poll.return_value = None + text = "Hello, world!" + fail_ok = False + expect_prompt = True + prompt = None + timeout = 180 + send = True + flush = True + mock_expect_bytes.return_value = 0 + + # Run + result = serial.send_bytes(stream, text, fail_ok, expect_prompt, prompt, timeout, send, flush) + + # Assert + self.assertEqual(result, 0) + mock_expect_bytes.assert_called() + stream.sendall.assert_called_once_with(f"{text}\n".encode('utf-8')) + + +if __name__ == '__main__': + unittest.main() diff --git a/virtualbox/pybox/utils/tests/test_sftp.py b/virtualbox/pybox/utils/tests/test_sftp.py new file mode 100644 index 0000000..9733ffe --- /dev/null +++ b/virtualbox/pybox/utils/tests/test_sftp.py @@ -0,0 +1,162 @@ +import unittest +from unittest.mock import MagicMock, patch, call, ANY +import sftp + + +class SftpSendTestCase(unittest.TestCase): + """ + Class to test sftp_send method + """ + + @patch("serial.LOG.info") + @patch("sftp.paramiko.SSHClient") + def test_sftp_send(self, mock_ssh_client, mock_log_info): + """ + Test sftp_send method + """ + + # Setup + source = "/local/path/to/file" + destination = "/remote/path/to/file" + client_dict = { + "remote_host": "127.0.0.1", + "username": "username", + "remote_port": 22, + "password": "password" + } + + mock_ssh_instance = MagicMock() + mock_sftp_client_instance = MagicMock() + mock_ssh_client.return_value = mock_ssh_instance + mock_ssh_instance.open_sftp.return_value = mock_sftp_client_instance + + # Run + sftp.sftp_send(source, destination, client_dict) + + # Assert + mock_ssh_instance.connect.assert_called_once_with( + client_dict["remote_host"], + port=client_dict["remote_port"], + username=client_dict["username"], + password=client_dict["password"], + look_for_keys=False, + allow_agent=False + ) + mock_sftp_client_instance.put.assert_called_once_with(source, destination) + mock_sftp_client_instance.close.assert_called_once() + mock_ssh_instance.close.assert_called_once() + mock_log_info.assert_any_call(ANY) + + +class TestSendDir(unittest.TestCase): + """ + Class to test send_dir method + """ + + @patch("serial.LOG.info") + @patch('sftp.subprocess.Popen') + @patch('sftp.getpass.getuser') + def test_send_dir(self, mock_getuser, mock_popen, mock_log_info): + """ + Test send_dir method + """ + + # Setup + mock_getuser.return_value = 'username' + process_mock = MagicMock() + attrs = {'wait.return_value': None, + 'returncode': 0, + 'stdout.readline.side_effect': [b'line1\n', b'line2\n', b'']} + process_mock.configure_mock(**attrs) + mock_popen.return_value.__enter__.return_value = process_mock + + # Run and assert + # test without follow_links and clear_known_hosts + sftp.send_dir({ + 'source': 'source', + 'remote_host': 'remote_host', + 'remote_port': 'remote_port', + 'destination': 'destination', + 'username': 'username', + 'password': 'password', + 'follow_links': False, + 'clear_known_hosts': False, + }) + self.assertEqual(mock_popen.call_count, 1) + + # test with follow_links and clear_known_hosts and localhost + sftp.send_dir({ + 'source': 'source', + 'remote_host': '127.0.0.1', + 'remote_port': 'remote_port', + 'destination': 'destination', + 'username': 'username', + 'password': 'password', + 'follow_links': True, + 'clear_known_hosts': True, + }) + self.assertEqual(mock_popen.call_count, 3) + + # test with follow_links and clear_known_hosts and non-localhost + sftp.send_dir({ + 'source': 'source', + 'remote_host': 'remote_host', + 'remote_port': 'remote_port', + 'destination': 'destination', + 'username': 'username', + 'password': 'password', + 'follow_links': True, + 'clear_known_hosts': True, + }) + self.assertEqual(mock_popen.call_count, 5) + + # test with non-zero return code + process_mock.returncode = 1 + + +class TestSendDirFallback(unittest.TestCase): + """ + Class to test send_dir_fallback method + """ + + @patch("serial.LOG.info") + @patch('sftp.os.listdir') + @patch('sftp.os.path.isfile') + @patch('sftp.paramiko.SSHClient') + def test_send_dir_fallback(self, mock_ssh_client, mock_isfile, mock_listdir, mock_log_info): + """ + Test send_dir_fallback method + """ + + # Setup + mock_listdir.return_value = ['test1.img', 'test2.txt', 'test3.iso'] + mock_isfile.return_value = True + mock_sftp_client = MagicMock() + mock_ssh_client_instance = MagicMock() + mock_ssh_client_instance.open_sftp.return_value = mock_sftp_client + mock_ssh_client.return_value = mock_ssh_client_instance + + source = '/path/to/source' + remote_host = 'remote_host' + destination = '/path/to/destination' + username = 'username' + password = 'password' + + # Run + sftp.send_dir_fallback(source, remote_host, destination, username, password) + + # Assert + mock_ssh_client.assert_called_once() + mock_ssh_client_instance.connect.assert_called_once_with(remote_host, username=username, + password=password, look_for_keys=False, + allow_agent=False) + mock_sftp_client.put.assert_any_call(source + 'test1.img', destination + 'images/test1.img') + mock_sftp_client.put.assert_any_call(source + 'test2.txt', destination + 'test2.txt') + self.assertNotIn(call(source + 'test3.iso', ANY), mock_sftp_client.put.call_args_list) + mock_sftp_client.close.assert_called_once() + mock_ssh_client_instance.close.assert_called_once() + mock_log_info.assert_any_call(ANY) + + +if __name__ == '__main__': + unittest.main()