tobiko/tobiko/run/_discover.py
Federico Ressi 7453dd6f4f Fix test runner
- disable forked execution (by default)
- accept string as test path
- raise FileNotFoundError when test case files are not found
- create a new test result instance when any is given
- raise RunTestCasesFailed after test case errors or failures
- use 'spawn' context to create workers pool

Change-Id: Ifde3d43d023e7508ca099bee0a9e9bab0c639789
2022-04-01 08:42:37 +00:00

166 lines
5.8 KiB
Python

# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import inspect
import os
import sys
import typing
import unittest
from oslo_log import log
import tobiko
from tobiko.run import _config
from tobiko.run import _find
from tobiko.run import _worker
LOG = log.getLogger(__name__)
def find_test_ids(test_path: typing.Union[str, typing.Iterable[str]],
test_filename: str = None,
python_path: typing.Iterable[str] = None,
forked: bool = None,
config: _config.RunConfigFixture = None) \
-> typing.List[str]:
config = _config.run_confing(config)
test_files = _find.find_test_files(test_path=test_path,
test_filename=test_filename,
config=config)
if not python_path:
python_path = config.python_path
if forked is None:
forked = bool(config.forked)
if forked:
return forked_discover_test_ids(test_files=test_files,
python_path=python_path)
else:
return discover_test_ids(test_files=test_files,
python_path=python_path)
def discover_test_ids(test_files: typing.Iterable[str],
python_path: typing.Iterable[str] = None) \
-> typing.List[str]:
if not python_path:
python_path = sys.path
python_dirs = [os.path.realpath(p) + '/'
for p in python_path
if os.path.isdir(p)]
test_ids: typing.List[str] = []
for test_file in test_files:
test_ids.extend(discover_file_test_ids(test_file=test_file,
python_dirs=python_dirs))
return test_ids
def discover_file_test_ids(test_file: str,
python_dirs: typing.Iterable[str]) \
-> typing.List[str]:
test_file = os.path.realpath(test_file)
if not os.path.isfile(test_file):
raise ValueError(f"Test file doesn't exist: '{test_file}'")
if not test_file.endswith('.py'):
raise ValueError(f"Test file hasn't .py suffix: '{test_file}'")
for python_dir in python_dirs:
if test_file.startswith(python_dir):
module_name = test_file[len(python_dir):-3].replace('/', '.')
return discover_module_test_ids(module_name)
raise ValueError(f"Test file not in Python path: '{test_file}'")
def discover_module_test_ids(module_name: str) -> typing.List[str]:
LOG.debug(f"Load test module '{module_name}'...")
module = tobiko.load_module(module_name)
test_file = module.__file__
LOG.debug("Inspect test module:\n"
f" module: '{module_name}'\n"
f" filename: '{test_file}'\n")
test_ids: typing.List[str] = []
for obj_name in dir(module):
try:
obj = getattr(module, obj_name)
except AttributeError:
LOG.warning("Error getting object "
f"'{module_name}.{obj_name}'",
exc_info=1)
continue
if (inspect.isclass(obj) and
issubclass(obj, unittest.TestCase) and
not inspect.isabstract(obj)):
LOG.debug("Inspect test class members...\n"
f" file: '{test_file}'\n"
f" module: '{module_name}'\n"
f" object: '{obj_name}'\n")
for member_name in dir(obj):
if member_name.startswith('test_'):
member_id = f"{module_name}.{obj_name}.{member_name}"
try:
member = getattr(obj, member_name)
except Exception:
LOG.error(f'Error getting "{member_id}"', exc_info=1)
continue
if not callable(member):
LOG.error("Class member is not callable: "
f"'{member_id}'")
continue
test_ids.append(member_id)
return test_ids
def forked_discover_test_ids(test_files: typing.Iterable[str],
python_path: typing.Iterable[str] = None) \
-> typing.List[str]:
results = [_worker.call_async(discover_test_ids,
test_files=[test_file],
python_path=python_path)
for test_file in test_files]
test_ids: typing.List[str] = []
for result in results:
test_ids.extend(result.get())
return test_ids
def main(test_path: typing.Iterable[str] = None,
test_filename: str = None,
forked: bool = None,
python_path: typing.Iterable[str] = None):
if test_path is None:
test_path = sys.argv[1:]
try:
test_ids = find_test_ids(test_path=test_path,
test_filename=test_filename,
forked=forked,
python_path=python_path)
except Exception as ex:
sys.stderr.write(f'{ex}\n')
sys.exit(1)
else:
output = ''.join(f'{test_id}\n'
for test_id in test_ids)
sys.stdout.write(output)
sys.exit(0)
if __name__ == '__main__':
main()