Remove tobiko.run module

This module seems that is not used anywhere and by anything so there's
no point to keep it in the repo.

Change-Id: I3486e7ecb4250d07c9e3d0d51323709d41bf8982
This commit is contained in:
Slawek Kaplonski 2024-11-28 15:23:34 +00:00
parent 3cdc46a98c
commit b23979f413
9 changed files with 0 additions and 591 deletions

View File

@ -35,7 +35,6 @@ Tobiko package
modules/http
modules/openstack/index
modules/podman
modules/run
modules/shell/index
modules/shiftstack
modules/tripleo/index

View File

@ -1,9 +0,0 @@
tobiko.run
----------
.. automodule:: tobiko.run
:members:
:imported-members:
:undoc-members:
:inherited-members:
:show-inheritance:

View File

@ -1,30 +0,0 @@
# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from tobiko.run import _discover
from tobiko.run import _find
from tobiko.run import _run
discover_test_ids = _discover.discover_test_ids
find_test_ids = _discover.find_test_ids
forked_discover_test_ids = _discover.forked_discover_test_ids
find_test_files = _find.find_test_files
run_tests = _run.run_tests
run_test_ids = _run.run_test_ids

View File

@ -1,47 +0,0 @@
# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import typing
import os
import tobiko
class RunConfigFixture(tobiko.SharedFixture):
test_path: typing.List[str]
test_filename: str = 'test_*.py'
python_path: typing.Optional[typing.List[str]] = None
workers_count: typing.Optional[int] = None
def setup_fixture(self):
package_file = os.path.realpath(os.path.realpath(tobiko.__file__))
package_dir = os.path.dirname(package_file)
tobiko_dir = os.path.dirname(package_dir)
self.test_path = [os.path.join(tobiko_dir, 'tobiko', 'tests', 'unit')]
@property
def forked(self) -> bool:
return self.workers_count is not None and self.workers_count != 1
def run_confing(obj=None) -> RunConfigFixture:
if obj is None:
return tobiko.setup_fixture(RunConfigFixture)
fixture = tobiko.get_fixture(obj)
tobiko.check_valid_type(fixture, RunConfigFixture)
return tobiko.setup_fixture(fixture)

View File

@ -1,165 +0,0 @@
# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import inspect
import os
import sys
import typing
import unittest
from oslo_log import log
import tobiko
from tobiko.run import _config
from tobiko.run import _find
from tobiko.run import _worker
LOG = log.getLogger(__name__)
def find_test_ids(test_path: typing.Union[str, typing.Iterable[str]],
test_filename: str = None,
python_path: typing.Iterable[str] = None,
forked: bool = None,
config: _config.RunConfigFixture = None) \
-> typing.List[str]:
config = _config.run_confing(config)
test_files = _find.find_test_files(test_path=test_path,
test_filename=test_filename,
config=config)
if not python_path:
python_path = config.python_path
if forked is None:
forked = bool(config.forked)
if forked:
return forked_discover_test_ids(test_files=test_files,
python_path=python_path)
else:
return discover_test_ids(test_files=test_files,
python_path=python_path)
def discover_test_ids(test_files: typing.Iterable[str],
python_path: typing.Iterable[str] = None) \
-> typing.List[str]:
if not python_path:
python_path = sys.path
python_dirs = [os.path.realpath(p) + '/'
for p in python_path
if os.path.isdir(p)]
test_ids: typing.List[str] = []
for test_file in test_files:
test_ids.extend(discover_file_test_ids(test_file=test_file,
python_dirs=python_dirs))
return test_ids
def discover_file_test_ids(test_file: str,
python_dirs: typing.Iterable[str]) \
-> typing.List[str]:
test_file = os.path.realpath(test_file)
if not os.path.isfile(test_file):
raise ValueError(f"Test file doesn't exist: '{test_file}'")
if not test_file.endswith('.py'):
raise ValueError(f"Test file hasn't .py suffix: '{test_file}'")
for python_dir in python_dirs:
if test_file.startswith(python_dir):
module_name = test_file[len(python_dir):-3].replace('/', '.')
return discover_module_test_ids(module_name)
raise ValueError(f"Test file not in Python path: '{test_file}'")
def discover_module_test_ids(module_name: str) -> typing.List[str]:
LOG.debug(f"Load test module '{module_name}'...")
module = tobiko.load_module(module_name)
test_file = module.__file__
LOG.debug("Inspect test module:\n"
f" module: '{module_name}'\n"
f" filename: '{test_file}'\n")
test_ids: typing.List[str] = []
for obj_name in dir(module):
try:
obj = getattr(module, obj_name)
except AttributeError:
LOG.warning("Error getting object "
f"'{module_name}.{obj_name}'",
exc_info=1)
continue
if (inspect.isclass(obj) and
issubclass(obj, unittest.TestCase) and
not inspect.isabstract(obj)):
LOG.debug("Inspect test class members...\n"
f" file: '{test_file}'\n"
f" module: '{module_name}'\n"
f" object: '{obj_name}'\n")
for member_name in dir(obj):
if member_name.startswith('test_'):
member_id = f"{module_name}.{obj_name}.{member_name}"
try:
member = getattr(obj, member_name)
except Exception:
LOG.error(f'Error getting "{member_id}"', exc_info=1)
continue
if not callable(member):
LOG.error("Class member is not callable: "
f"'{member_id}'")
continue
test_ids.append(member_id)
return test_ids
def forked_discover_test_ids(test_files: typing.Iterable[str],
python_path: typing.Iterable[str] = None) \
-> typing.List[str]:
results = [_worker.call_async(discover_test_ids,
test_files=[test_file],
python_path=python_path)
for test_file in test_files]
test_ids: typing.List[str] = []
for result in results:
test_ids.extend(result.get())
return test_ids
def main(test_path: typing.Iterable[str] = None,
test_filename: str = None,
forked: bool = None,
python_path: typing.Iterable[str] = None):
if test_path is None:
test_path = sys.argv[1:]
try:
test_ids = find_test_ids(test_path=test_path,
test_filename=test_filename,
forked=forked,
python_path=python_path)
except Exception as ex:
sys.stderr.write(f'{ex}\n')
sys.exit(1)
else:
output = ''.join(f'{test_id}\n'
for test_id in test_ids)
sys.stdout.write(output)
sys.exit(0)
if __name__ == '__main__':
main()

View File

@ -1,98 +0,0 @@
# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import os
import subprocess
import sys
import typing
from oslo_log import log
from tobiko.run import _config
LOG = log.getLogger(__name__)
def find_test_files(test_path: typing.Union[str, typing.Iterable[str]] = None,
test_filename: str = None,
config: _config.RunConfigFixture = None) \
-> typing.List[str]:
config = _config.run_confing(config)
if test_path is None:
test_path = config.test_path
elif isinstance(test_path, str):
test_path = [test_path]
else:
test_path = list(test_path)
if not test_filename:
test_filename = config.test_filename
test_files: typing.List[str] = []
for path in test_path:
path = os.path.realpath(path)
if os.path.isfile(path):
test_files.append(path)
LOG.debug("Found test file:\n"
f" {path}\n",)
continue
if os.path.isdir(path):
find_dir = path
find_name = test_filename
else:
find_dir = os.path.dirname(path)
find_name = os.path.basename(path)
LOG.debug("Find test files...\n"
f" dir: '{find_dir}'\n"
f" name: '{find_name}'")
try:
output = subprocess.check_output(
['find', find_dir, '-name', find_name],
universal_newlines=True)
except subprocess.CalledProcessError as ex:
LOG.exception("Test files not found.")
raise FileNotFoundError('Test files not found: \n'
f" dir: '{find_dir}'\n"
f" name: '{find_name}'") from ex
for line in output.splitlines():
line = line.strip()
if line:
test_files.append(line)
LOG.debug("Found test file(s):\n"
" %s", ' \n'.join(test_files))
return test_files
def main(test_path: typing.List[str] = None):
if test_path is None:
test_path = sys.argv[1:]
try:
test_files = find_test_files(test_path=test_path)
except Exception as ex:
sys.stderr.write(f'{ex}\n')
sys.exit(1)
else:
output = ''.join(f'{test_file}\n'
for test_file in test_files)
sys.stdout.write(output)
sys.exit(0)
if __name__ == '__main__':
main()

View File

@ -1,75 +0,0 @@
# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import io
import sys
import typing
import unittest
import tobiko
def get_test_result() -> unittest.TestResult:
return tobiko.setup_fixture(TestResultFixture).result
class TestResultFixture(tobiko.SharedFixture):
verbosity = 2
stream = sys.stderr
description = True
result: unittest.TestResult
def setup_fixture(self):
self.result = TestResult(stream=self.stream,
verbosity=self.verbosity,
description=self.description)
class TestResult(unittest.TextTestResult):
def __init__(self,
stream: typing.TextIO,
description: bool,
verbosity: int):
super().__init__(stream=TextIOWrapper(stream),
descriptions=description,
verbosity=verbosity)
self.buffer = True
def startTest(self, test: unittest.TestCase):
tobiko.push_test_case(test)
super().startTest(test)
def stopTest(self, test: unittest.TestCase) -> None:
super().stopTestRun()
actual_test = tobiko.pop_test_case()
assert actual_test == test
tobiko.remove_test_from_all_shared_resources(test.id())
class TextIOWrapper(io.TextIOWrapper):
def __init__(self, stream: typing.TextIO):
super().__init__(buffer=stream.buffer,
encoding='UTF-8',
errors='strict',
line_buffering=True,
write_through=False)
def writeln(self, line: str):
self.write(line + '\n')

View File

@ -1,112 +0,0 @@
# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import collections
import sys
import typing
import unittest
from oslo_log import log
import tobiko
from tobiko.run import _config
from tobiko.run import _discover
LOG = log.getLogger(__name__)
def run_tests(test_path: typing.Union[str, typing.Iterable[str]],
test_filename: str = None,
python_path: typing.Iterable[str] = None,
config: _config.RunConfigFixture = None,
result: unittest.TestResult = None,
check=True) -> unittest.TestResult:
test_ids = _discover.find_test_ids(test_path=test_path,
test_filename=test_filename,
python_path=python_path,
config=config)
return run_test_ids(test_ids=test_ids, result=result, check=check)
def run_test_ids(test_ids: typing.List[str],
result: unittest.TestResult = None,
check=True) \
-> unittest.TestResult:
test_classes: typing.Dict[str, typing.List[str]] = \
collections.defaultdict(list)
# regroup test ids my test class keeping test names order
test_ids = list(test_ids)
for test_id in test_ids:
test_class_id, test_name = test_id.rsplit('.', 1)
test_classes[test_class_id].append(test_name)
# add test cases to the suite ordered by class name
suite = unittest.TestSuite()
for test_class_id, test_names in sorted(test_classes.items()):
test_class = tobiko.load_object(test_class_id)
for test_name in test_names:
test = test_class(test_name)
suite.addTest(test)
LOG.info(f'Run {len(test_ids)} test(s)')
result = tobiko.run_test(case=suite, result=result, check=check)
LOG.info(f'{result.testsRun} test(s) run')
return result
class RunTestCasesFailed(tobiko.TobikoException):
message = ('Test case execution failed:\n'
'{errors}\n'
'{failures}\n')
def main(test_path: typing.Iterable[str] = None,
test_filename: str = None,
python_path: typing.Iterable[str] = None):
if test_path is None:
test_path = sys.argv[1:]
result = run_tests(test_path=test_path,
test_filename=test_filename,
python_path=python_path)
for case, exc_info in result.errors:
LOG.exception(f"Test case error: {case.id()}",
exc_info=exc_info)
for case, exc_info in result.errors:
LOG.exception(f"Test case failure: {case.id()}",
exc_info=exc_info)
for case, reason in result.skipped:
LOG.info(f"Test case skipped: {case.id()} ({reason})")
LOG.info(f"{result.testsRun} test case(s) executed:\n"
f" errors: {len(result.errors)}"
f" failures: {len(result.failures)}"
f" skipped: {len(result.skipped)}")
if result.errors or result.failures:
sys.exit(1)
else:
sys.exit(0)
if __name__ == '__main__':
main()

View File

@ -1,54 +0,0 @@
# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import multiprocessing
from multiprocessing import pool
import typing
import tobiko
from tobiko.run import _config
class WorkersPoolFixture(tobiko.SharedFixture):
config = tobiko.required_fixture(_config.RunConfigFixture)
pool: pool.Pool
workers_count: int = 0
def __init__(self, workers_count: int = None):
super().__init__()
if workers_count is not None:
self.workers_count = workers_count
def setup_fixture(self):
workers_count = self.workers_count
if not workers_count:
workers_count = self.config.workers_count
self.workers_count = workers_count or 0
context = multiprocessing.get_context('spawn')
self.pool = context.Pool(processes=workers_count or None)
def workers_pool() -> pool.Pool:
return tobiko.setup_fixture(WorkersPoolFixture).pool
def call_async(func: typing.Callable,
*args,
**kwargs):
return workers_pool().apply_async(func, args=args, kwds=kwargs)