Source code for pykiso.test_coordinator.test_execution

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Execution of tests
******************

:module: test_execution

:synopsis: Execute a test environment based on the supplied configuration.

.. currentmodule:: test_execute

.. note::
    1. Glob a list of test-suite folders
    2. Generate a list of test-suites with a list of test-cases
    3. Loop per suite
    4. Gather result
"""
from __future__ import annotations

import os
from typing import TYPE_CHECKING
from unittest.loader import VALID_MODULE_NAME

from pykiso.test_result.multi_result import MultiTestResult

if TYPE_CHECKING:
    from .test_case import BasicTest
    from ..types import ConfigDict, SuiteConfig

import enum
import logging
import re
import time
import unittest
from collections import OrderedDict, namedtuple
from fnmatch import fnmatch
from pathlib import Path
from typing import Dict, List, Optional

import xmlrunner

import pykiso

from ..exceptions import AuxiliaryCreationError, TestCollectionError
from ..logging_initializer import get_logging_options
from ..test_result.assert_step_report import (
    StepReportData,
    assert_decorator,
    generate_step_report,
)
from ..test_result.text_result import BannerTestResult, ResultStream
from ..test_result.xml_result import XmlTestResult
from . import test_suite

log = logging.getLogger(__name__)

TestFilterPattern = namedtuple("TestFilterPattern", "test_file, test_class, test_case")


[docs]@enum.unique class ExitCode(enum.IntEnum): """List of possible exit codes""" ALL_TESTS_SUCCEEDED = 0 ONE_OR_MORE_TESTS_FAILED = 1 ONE_OR_MORE_TESTS_RAISED_UNEXPECTED_EXCEPTION = 2 ONE_OR_MORE_TESTS_FAILED_AND_RAISED_UNEXPECTED_EXCEPTION = 3 AUXILIARY_CREATION_FAILED = 4 BAD_CLI_USAGE = 5
[docs]def create_test_suite(test_suite_dict: SuiteConfig) -> test_suite.BasicTestSuite: """create a test suite based on the config dict :param test_suite_dict: dict created from config with keys 'suite_dir', 'test_filter_pattern', 'test_suite_id' """ return test_suite.BasicTestSuite( test_suite_dict["suite_dir"], test_suite_dict["test_filter_pattern"], test_suite_dict["test_suite_id"], )
[docs]def apply_tag_filter( all_tests_to_run: unittest.TestSuite, usr_tags: Dict[str, List[str]] ) -> None: """Filter the test cases based on user tags provided via CLI. :param all_tests_to_run: a dict containing all testsuites and testcases :param usr_tags: encapsulate user's variant choices """ def format_tag_names(tags: Dict[str, List[str]]) -> Dict[str, List[str]]: """Remove any comma or underscore from the provided dict's keys.""" for char in ["_", "-"]: tags = {name.replace(char, ""): value for name, value in tags.items()} return tags def should_skip(test_case: BasicTest) -> bool: """Check if test shall be skipped by evaluating the test case tag attribute. :param test_case: test_case to check :return: True if test shall be skipped else False """ if test_case.tag is None: return False # Skip only the test cases that have a matching tag name but no matching tag value for cli_tag_id, cli_tag_value in usr_tags.items(): # skip any test case that doesn't define a CLI-provided tag name if cli_tag_id not in test_case.tag.keys(): test_case._skip_msg = ( f"provided tag {cli_tag_id!r} not present in test tags" ) return True # skip any test case that which tag value don't match the provided tag's value cli_tag_values = ( cli_tag_value if isinstance(cli_tag_value, list) else [cli_tag_value] ) if not any( cli_val in test_case.tag[cli_tag_id] for cli_val in cli_tag_values ): test_case._skip_msg = f"non-matching value for tag {cli_tag_id!r}" return True return False def set_skipped(test_case: BasicTest) -> BasicTest: """Set testcase to skipped. :param test_case: testcase to be skipped :return: the skipped testcase as a new instance of the provided TestCase subclass. """ if should_skip(test_case): skipped_test_cls = unittest.skip(test_case._skip_msg)(test_case.__class__) return skipped_test_cls() return test_case # collect and reformat all CLI and test case tag names usr_tags = format_tag_names(usr_tags) all_test_tags = [] for tc in test_suite.flatten(all_tests_to_run): if getattr(tc, "tag", None) is None: continue tc.tag = format_tag_names(tc.tag) all_test_tags.extend(tc.tag.keys()) # skip the tests according to the provided CLI tags and the defined test tags list(map(set_skipped, test_suite.flatten(all_tests_to_run))) # verify that each provided tag name is defined in at least one test case all_test_tags = set(all_test_tags) for tag_name in usr_tags: if tag_name not in all_test_tags: raise NameError( f"Provided tag {tag_name!r} is not defined in any testcase.", tag_name )
[docs]def apply_test_case_filter( all_tests_to_run: unittest.TestSuite, test_class_pattern: str, test_case_pattern: str, ) -> unittest.TestSuite: """Apply a filter to run only test cases which matches given expression :param all_tests_to_run: a dict containing all testsuites and testcases :param test_class_pattern: pattern to select test class as unix filename pattern :param test_case_pattern: pattern to select test case as unix filename pattern :return: new test suite with filtered test cases """ def is_active_test(test_case: BasicTest) -> bool: """Check if testcase shall be active by given selection patterns :param test_case: unittest test :return: True if test matches patterns else False """ test_class_name = re.sub(r"-\d+-\d+", "", test_case.__class__.__name__) is_classname_match = bool(fnmatch(test_class_name, test_class_pattern)) if is_classname_match and test_case_pattern is None: return is_classname_match elif is_classname_match and test_case_pattern: return bool(fnmatch(test_case._testMethodName, test_case_pattern)) else: return False base_suite = test_suite.flatten(all_tests_to_run) filtered_suite = filter(is_active_test, base_suite) return unittest.TestSuite(filtered_suite)
[docs]def failure_and_error_handling(result: unittest.TestResult) -> int: """provide necessary information to Jenkins if an error occur during tests execution :param result: encapsulate all test results from the current run :return: an ExitCode object """ failed, errored = result.failures, result.errors if failed and errored: exit_code = ExitCode.ONE_OR_MORE_TESTS_FAILED_AND_RAISED_UNEXPECTED_EXCEPTION elif failed: exit_code = ExitCode.ONE_OR_MORE_TESTS_FAILED elif errored: exit_code = ExitCode.ONE_OR_MORE_TESTS_RAISED_UNEXPECTED_EXCEPTION else: exit_code = ExitCode.ALL_TESTS_SUCCEEDED return exit_code
[docs]def enable_step_report( all_tests_to_run: unittest.suite.TestSuite, step_report: Path ) -> None: """Decorate all assert method from Test-Case. This will allow to save the assert inputs in order to generate the HTML step report. :param all_tests_to_run: a dict containing all testsuites and testcases """ # Step report header fed during test base_suite = test_suite.flatten(all_tests_to_run) for tc in base_suite: tc.step_report = StepReportData() if step_report is not None: # for any test, show ITF version tc.step_report.header = OrderedDict({"ITF version": pykiso.__version__}) # Decorate All assert method assert_method_list = [ method for method in dir(tc) if method.startswith("assert") ] for method_name in assert_method_list: # Get method from name method = getattr(tc, method_name) # Add decorator to the existing method setattr(tc, method_name, assert_decorator(method))
[docs]def parse_test_selection_pattern(pattern: str) -> TestFilterPattern: """Parse test selection pattern from cli. For example: ``test_file.py::TestCaseClass::test_method`` :param pattern: test selection pattern :return: pattern for file, class name and test case name """ if not pattern: return TestFilterPattern(None, None, None) parsed_patterns = [] patterns = pattern.split("::") for pattern in patterns: if pattern == "": parsed_patterns.append(None) else: parsed_patterns.append(pattern) for _ in range(3 - (len(parsed_patterns))): parsed_patterns.append(None) return TestFilterPattern(*parsed_patterns)
[docs]def find_folders_between_paths(start_path: Path, end_path: Path) -> List[Path]: """Find and return a list of folders between two specified paths. :param start_path: the starting path :param end_path: the ending path :return: a list of folders between `start_path` (exclusive) and `end_path` (inclusive). If `end_path` is not a subpath of `start_path`, an empty list is returned. """ start_path = start_path end_path = end_path folders_between = [] while start_path != end_path: folders_between.append(end_path.parent) end_path = end_path.parent if len(end_path.parents) == 1: return [] return folders_between
def _is_valid_module(start_dir: str, pattern: str) -> bool: """Check if the test files found in the specified directory and its subdirectories conform to the requirements of a valid module. :param start_dir: the directory to search :param pattern: pattern that matches the file names """ start_dir = Path(start_dir) test_files_found = list(start_dir.glob(f"**/{pattern}")) # remove folders test_files_found = [ test_file for test_file in test_files_found if test_file.is_file() ] # If found test files are in given test suite directory don't check for __init__.py if any(test_file.parent == start_dir for test_file in test_files_found): return True # No file matches the test filter pattern if not test_files_found: log.critical( f"Test filter {pattern=} doesn't match any files in folder {start_dir}" ) return False # Check that all sub folders of a nested testsuite have an __init__.py file testsuite_folders = {filepath.parent for filepath in test_files_found} all_folders_to_check = [] for folder in testsuite_folders: all_folders_to_check.extend(find_folders_between_paths(start_dir, folder)) all_folders_to_check.extend(testsuite_folders) all_folders_to_check = list(set(all_folders_to_check)) if start_dir in all_folders_to_check: all_folders_to_check.remove(start_dir) for folder in all_folders_to_check: if not any(file_path.name == "__init__.py" for file_path in folder.iterdir()): log.critical(f'Could not find file "__init__.py" in folder {folder}') return False # check that the test file has a valid module name return all(VALID_MODULE_NAME.match(file.name) for file in test_files_found)
[docs]def filter_test_modules_by_suite(test_modules: List[SuiteConfig]) -> List[SuiteConfig]: """Filter a list of test modules by their suite directory to avoid running duplicates. :param test_modules: List of test modules to filter. :return: Filtered list of test modules with unique suite directories """ seen_suite_dirs = {} filtered_data = [] for module in test_modules: suite_dir = module["suite_dir"] # Check if the suite directory is seen for the first time if suite_dir not in seen_suite_dirs: seen_suite_dirs[suite_dir] = module filtered_data.append(module) return filtered_data
[docs]def collect_test_suites( config_test_suite_list: List[SuiteConfig], test_filter_pattern: Optional[str] = None, ) -> List[test_suite.BasicTestSuite]: """Collect and load all test suites defined in the test configuration. :param config_test_suite_list: list of dictionaries from the configuration file corresponding each to one test suite. :param test_filter_pattern: optional filter pattern to overwrite the one defined in the test suite configuration. :raises pykiso.TestCollectionError: if any test case inside one of the configured test suites failed to be loaded. :return: a list of all loaded test suites. """ list_of_test_suites = [] valid_test_modules = [] for test_suite_configuration in config_test_suite_list: if test_filter_pattern is not None: test_suite_configuration["test_filter_pattern"] = test_filter_pattern if _is_valid_module( start_dir=test_suite_configuration["suite_dir"], pattern=test_suite_configuration["test_filter_pattern"], ): valid_test_modules.append(test_suite_configuration) if test_filter_pattern: valid_test_modules = filter_test_modules_by_suite(valid_test_modules) if not valid_test_modules: raise TestCollectionError( [ test_suite_config["suite_dir"] for test_suite_config in config_test_suite_list ] ) for test_suite_configuration in valid_test_modules: try: current_test_suite = create_test_suite(test_suite_configuration) list_of_test_suites.append(current_test_suite) except BaseException as e: raise TestCollectionError(test_suite_configuration["suite_dir"]) from e return list_of_test_suites
[docs]def abort(reason: str = None) -> None: """Quit ITF test and log an error if a reason is indicated and if any errors occurred it logs them. :param reason: reason to abort, defaults to None """ if reason: log.critical(reason) log.critical( "Non recoverable error occurred during test execution, aborting test session." ) os.kill(os.getpid(), ExitCode.ONE_OR_MORE_TESTS_RAISED_UNEXPECTED_EXCEPTION)
[docs]def execute( config: ConfigDict, report_type: str = "text", report_name: str = "", user_tags: Optional[Dict[str, List[str]]] = None, step_report: Optional[Path] = None, pattern_inject: Optional[str] = None, failfast: bool = False, junit_path: str = "reports", ) -> int: """Create test environment based on test configuration. :param config: dict from converted YAML config file :param report_type: str to set the type of report wanted, i.e. test or junit :param report_name: name of the junit report :param user_tags: test case tags to execute :param step_report: file path for the step report or None :param pattern_inject: optional pattern that will override test_filter_pattern for all suites. Used in test development to run specific tests. :param failfast: stop the test run on the first error or failure. :param junit_path: path (file or dir) to junit report :return: exit code corresponding to the result of the test execution (tests failed, unexpected exception, ...) """ try: test_file_pattern = parse_test_selection_pattern(pattern_inject) test_suites = collect_test_suites( config["test_suite_list"], test_file_pattern.test_file ) # Group all the collected test suites in one global test suite all_tests_to_run = unittest.TestSuite(test_suites) # filter test cases based on variant and branch-level options if user_tags: apply_tag_filter(all_tests_to_run, user_tags) # Enable step report enable_step_report(all_tests_to_run, step_report) if test_file_pattern.test_class: all_tests_to_run = apply_test_case_filter( all_tests_to_run, test_file_pattern.test_class, test_file_pattern.test_case, ) log_file_path = get_logging_options().log_path # TestRunner selection: generate or not a junit report. Start the tests and publish the results if report_type == "junit": report_path = junit_path full_report_path = Path.cwd() / report_path if full_report_path.suffix == ".xml": junit_report_path = str(full_report_path) full_report_path.parent.mkdir(exist_ok=True) else: junit_report_path = str( full_report_path / Path(time.strftime(f"%Y-%m-%d_%H-%M-%S-{report_name}.xml")) ) full_report_path.mkdir(exist_ok=True) with open(junit_report_path, "wb") as junit_output, ResultStream( log_file_path ) as stream: test_runner = xmlrunner.XMLTestRunner( output=junit_output, resultclass=MultiTestResult(XmlTestResult, BannerTestResult), failfast=failfast, verbosity=0, stream=stream, ) result = test_runner.run(all_tests_to_run) else: with ResultStream(log_file_path) as stream: test_runner = unittest.TextTestRunner( stream=stream, resultclass=MultiTestResult(BannerTestResult), failfast=failfast, verbosity=0, ) result = test_runner.run(all_tests_to_run) # Generate the html step report if step_report is not None: generate_step_report(result, step_report) exit_code = failure_and_error_handling(result) except NameError: log.exception("Error occurred during tag evaluation.") exit_code = ExitCode.BAD_CLI_USAGE except TestCollectionError: log.exception("Error occurred during test collection.") exit_code = ExitCode.ONE_OR_MORE_TESTS_RAISED_UNEXPECTED_EXCEPTION except AuxiliaryCreationError: log.exception("Error occurred during auxiliary creation.") exit_code = ExitCode.AUXILIARY_CREATION_FAILED except KeyboardInterrupt: log.exception("Keyboard Interrupt detected") exit_code = ExitCode.ONE_OR_MORE_TESTS_RAISED_UNEXPECTED_EXCEPTION except Exception: log.exception(f'Issue detected in the test-suite: {config["test_suite_list"]}!') exit_code = ExitCode.ONE_OR_MORE_TESTS_RAISED_UNEXPECTED_EXCEPTION return int(exit_code)