Source code for pykiso.test_result.text_result

##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
Text Test Result with banners
*****************************

:module: text_result

:synopsis: implements a test result that displays the test execution
    wrapped in banners.

.. currentmodule:: text_result
"""

from __future__ import annotations

import logging
import os
import sys
import textwrap
import time
import typing
from contextlib import nullcontext
from shutil import get_terminal_size
from typing import List, Optional, TextIO, Union
from unittest import TextTestResult
from unittest.case import _SubTest

from ..test_coordinator.test_case import BasicTest
from ..test_coordinator.test_suite import BaseTestSuite

if typing.TYPE_CHECKING:
    from pykiso.types import ExcInfoType, PathType

log = logging.getLogger(__name__)


[docs]class ResultStream: """Class that duplicates sys.stderr to a log file if a file path is provided. When passed to a TestRunner or a TestResult, this allows to display the information from the test run in the log file. """ def __new__(cls, file: Optional[PathType]): """Customize class creation to return an instance of this class if a file path is provided, or simply ``sys.stderr`` if no file path is provided. :param file: the file to write stderr to. :return: an instance of this class or ``sys.stderr``. """ # don't bother instanciating and simply return stderr as context manager if file is None: return nullcontext(sys.stderr) # otherwise return a real instance return super().__new__(cls) def __init__(self, file: Optional[PathType]): """Initialize the streams. :param file: file where stderr should be written. """ self.stderr = sys.stderr self.file: TextIO = open(file, mode="a") sys.stderr = self def __del__(self): self.close() def __enter__(self): return self def __exit__(self, *args): self.close() def write(self, message: str): self.stderr.write(message) self.file.write(message) def flush(self): self.stderr.flush() self.file.flush() os.fsync(self.file.fileno())
[docs] def close(self): """Close or restore each stream.""" if self.stderr is not None: sys.stderr = self.stderr self.stderr = None if self.file is not None: self.file.close() self.file = None
[docs]class BannerTestResult(TextTestResult): """TextTestResult subclass showing results wrapped in banners (frames).""" BANNER_CHAR_WIDTH = 4 def __init__(self, stream: TextIO, descriptions: bool, verbosity: int): """Constructor. Initialize TextTestResult and the banner's width. The banner's width is set to the terminal size. In the case where this fails the fallback width corresponds to the default width of a Jenkins "console". :param stream: stream to print the result information (default: stderr) :param descriptions: unused (required for TextTestResult) :param verbosity: unused (required for TextTestResult) """ super().__init__(stream, descriptions, verbosity) # to determine whether the test succeeded or failed self._error_occurred = False # fallback is the default width in Jenkins size = get_terminal_size(fallback=(150, 24)) # avoid border effects due to newlines self.width = size.columns - 1 self.successes: List[Union[BasicTest, BaseTestSuite]] = [] @property def error_occurred(self): return self._error_occurred def _banner( self, text: Union[List, str], width: Optional[int] = None, sym: str = "#" ) -> str: """Format the provided text within a frame composed of the provided symbol. Works with multiline strings (either as a string containing newlines or split into a list with one entry per line). :param text: text to format :paran width: width of the banner :param sym: symbol used to compose the banner :return: the text enclosed in a banner """ width = width or self.width line = sym * width if isinstance(text, str): text = text.split("\n") if isinstance(text, list): text = "\n".join( f"{sym} {line: <{width-self.BANNER_CHAR_WIDTH}} {sym}" for line in text ) banner = f"{line}\n{text}\n{line}\n" return banner
[docs] def getDescription(self, test: Union[BasicTest, BaseTestSuite]) -> str: """Return the entire test method docstring. :param test: running testcase :return: the wrapped docstring """ doc = "" if getattr(test, "_testMethodDoc", None) is not None: for line in test._testMethodDoc.splitlines(): doc += "\n" + textwrap.fill( line.strip(), width=self.width - self.BANNER_CHAR_WIDTH ) return doc
[docs] def startTest(self, test: Union[BasicTest, BaseTestSuite]) -> None: """Print a banner containing the test information and the test method docstring when starting a test case. :param test: testcase that is about to be run """ super().startTest(test) self._error_occurred = False # gather test module, test class, test method and test description module_name = test.__module__ test_name = str(test) addendum = "" doc = self.getDescription(test).rstrip() # verify if the test case is skipped is_skipped = False if getattr(test, "__unittest_skip__", False): # in case of an entirely skipped test class only the top # banner is printed top_str = "SKIPPED TEST: " addendum += f"\nReason: {test.__unittest_skip_why__}" is_skipped = True else: top_str = "RUNNING TEST: " if len(module_name + test_name) < self.width - len(top_str): test_name = f"{module_name}.{test_name}" else: addendum += f"\nmodule: {module_name}" # create and print test start banner top_str += f"{test_name}{addendum}{doc}" top_banner = self._banner(top_str) if is_skipped: top_banner += "\n" self.stream.write(top_banner) self.stream.flush() # start monitoring test duration test.start_time = time.time()
[docs] def stopTest(self, test: Union[BasicTest, BaseTestSuite]) -> None: """Print a banner containing the test information and its result. :param test: terminated testcase """ if getattr(test, "__unittest_skip__", False): return super().stopTest(test) test.stop_time = time.time() test.elapsed_time = test.stop_time - test.start_time result = "FAILED" if self.error_occurred else "PASSED" bot_str = f"END OF TEST: {test}" result_str = f" -> {result} in {test.elapsed_time:.3f}s" if len(bot_str + result_str) < self.width - self.BANNER_CHAR_WIDTH: bot_str += result_str else: bot_str += "\n" + result_str bot_banner = self._banner(bot_str) + "\n" self.stream.write(bot_banner) self.stream.flush() super().stopTest(test)
[docs] def addFailure( self, test: Union[BasicTest, BaseTestSuite], err: ExcInfoType ) -> None: """Set the error flag when a failure occurs in order to get the individual test case result. :param test: testcase which failure will be reported :param err: tuple returned by sys.exc_info """ super().addFailure(test, err) self._error_occurred = True
[docs] def addSuccess(self, test: Union[BasicTest, BaseTestSuite]) -> None: """Add a testcase to the list of succeeded test cases. :param test: running testcase that succeeded """ self.successes.append(test)
[docs] def addError(self, test: Union[BasicTest, BaseTestSuite], err: ExcInfoType) -> None: """Set the error flag when an error occurs in order to get the individual test case result. :param test: running testcase that errored out :param err: tuple returned by sys.exc_info """ super().addError(test, err) self._error_occurred = True
[docs] def addSubTest( self, test: Union[BasicTest, BaseTestSuite], subtest: _SubTest, err: ExcInfoType, ) -> None: """Set the error flag when an error occurs in a subtest. :param test: running testcase :param subtest: subtest runned :param err: tuple returned by sys.exc_info """ super().addSubTest(test, subtest, err) if err is not None: self._error_occurred = True
[docs] def printErrorList(self, flavour: str, errors: List[tuple]): """Print all errors at the end of the whole tests execution. Overwrites the unittest method to have a nicer output. :param flavour: failure reason :param errors: list of failed tests with their error message """ for test, err in errors: self.stream.writeln(self.separator1) self.stream.writeln("%s" % test) self.stream.writeln("%s" % self.getDescription(test)) self.stream.writeln("%s: %s" % (flavour, err))