Module kalash.run
Expand source code
import unittest
import argparse
import xmlrunner
import os.path
import platform
import inspect
import webbrowser
from unittest import TextTestRunner, TestLoader
from unittest import TestResult
from unittest.result import failfast
from unittest.main import TestProgram
from types import ModuleType
from parameterized import parameterized
from typing import Callable, Dict, Iterator, Optional, Tuple
from .utils import get_ts
from .filter import apply_filters
from .smuggle import smuggle
from .config import (Collector, CollectorArtifact, Config,
PathOrIdForWhatIf, CliConfig, Trigger)
from .test_case import TestCase
from .log import close_all
from .collectors import (_collect_test_case_v1_x, _collect_test_case_v2_0,
_collect_test_case_from_module)
kalash = ModuleType('kalash')
__all__ = (
'unittest', 'TextTestRunner', 'TestResult',
'TestProgram', 'failfast', 'MetaLoader', 'main', 'TestCase',
'get_ts', 'parameterized'
)
# ====================
# Public Utilities
# ====================
def find_my_yaml(filevar: str, path: str) -> str:
"""
Figures out the path to the YAML file relative to a given
test script, should be used like:
```python
YAML = find_my_yaml(__file__, "../yamls/yaml.yaml")
```
Args:
filevar (str): should always be set to `__file__`
path (str): relative path component that points to the YAML
Returns:
Normalized absolute path to the correct YAML file
"""
return os.path.normpath(
os.path.abspath(
os.path.join(os.path.dirname(filevar), path)
)
)
# =============================================================================
# ====================
# COLLECTOR LOOKUP
# ====================
# all 1.x versions will map to v1_x function
COLLECTOR_FUNC_LOOKUP: Dict[str, Collector] = \
{f'1.{k}': _collect_test_case_v1_x for k in range(0, 10)}
# all further declarations will be mapped manually
COLLECTOR_FUNC_LOOKUP['2.0'] = _collect_test_case_v2_0
# =============================================================================
def prepare_suite(
kalash_trigger: Trigger
) -> Iterator[CollectorArtifact]:
"""
Higher-order suite definition function.
As opposed to `Collector`, this function iterates over the
YAML config and calls `Collector` on a per-test basis.
This function calls `apply_filters`. If no filters are
provided the `kalash_test_loader` will be called directly,
otherwise the collected tests will be skimmed to match the
provided filters.
Args:
kalash_trigger (Trigger): `Trigger` object collecting
all configuration elements.
Yields:
One or more `CollectArtifact` elements.
"""
for test_idx, test_conf in enumerate(kalash_trigger.tests):
# set up path (if exists) and non-filter keys
path = test_conf.path
if not path:
path = '.' # default to CWD
# recursive directory search can be set in a config or as a global flag when calling
no_recurse_from_file: Optional[bool] = test_conf.no_recurse
cli_config = kalash_trigger.cli_config
if no_recurse_from_file is not None:
cli_config.no_recurse = cli_config.no_recurse or no_recurse_from_file
yield apply_filters(
test_conf,
path,
COLLECTOR_FUNC_LOOKUP,
kalash_trigger
)
class MetaLoader(TestLoader):
def __init__(
self,
yaml_path: str = None,
trigger: Optional[Trigger] = None,
local=True
):
"""
Custom `TestLoader` for Kalash. This provides consistency
between running local and remote tests.
Args:
yaml_path (str): for backwards compatibility with Kalash YAML files,
set instantly to the `config_file_path` value
trigger (Optional[Trigger]): `Trigger` instance providing the
entire configuration model or `None` if the test is run
in a local context
local (bool): if True, run only this test even when `Trigger`
or `yaml_path` is provided when in local context
"""
if yaml_path and not trigger:
self._kalash_trigger = Trigger()
self._kalash_trigger.cli_config.file = yaml_path
elif yaml_path and trigger:
self._kalash_trigger.cli_config.file = yaml_path
elif not yaml_path and trigger:
self._kalash_trigger = trigger
else:
self._kalash_trigger = Trigger()
self._local = local
self.suite = unittest.TestSuite()
@property
def trigger(self) -> Trigger:
"""Typesafe handler of `KalashYamlObj`.
Throws an Exception if the YAML object
hasn't been parsed correctly.
"""
if not self._kalash_trigger:
raise Exception(
"No `Trigger` on this `MetaLoader` instance"
)
else:
return self._kalash_trigger
def loadTestsFromKalashYaml(self) -> CollectorArtifact:
"""Loads tests from associated YAML or `Trigger`"""
whatif_names: PathOrIdForWhatIf = []
for a in prepare_suite(
self.trigger
):
one_suite, one_whatif_names = a
self.suite.addTests(one_suite)
whatif_names.extend(one_whatif_names)
return self.suite, list(set(whatif_names))
def loadTestsFromModule(self, module, *args, pattern=None, **kws):
def tests_generator(suite: unittest.TestSuite):
"""
Recursive test generator for unittest.TestSuite
(because a suite can contain other suites recursively).
Args:
suite (unittest.TestSuite): tests suite to pull tests from
Yields:
unittest test functions
"""
for test in suite:
if not type(test) is unittest.TestSuite:
yield test
else:
for t in tests_generator(test):
yield t
if self.trigger.cli_config.file:
# parse YAML if provided
self._kalash_trigger = Trigger.infer_trigger(
self.trigger.cli_config
)
if self._local and self.trigger.cli_config.file:
# if YAML exists and isolated mode is on, make sure values from YAML can be injected
for test_idx, test_conf in enumerate(self.trigger.tests):
# find whether any block declares path that pertains to this module config
# or if the module is placed in any directory that should inherit config
from pathlib import Path
if test_conf.path:
if type(test_conf.path) is str:
path = Path(test_conf.path)
elif type(test_conf.path) is list:
path = Path(test_conf.path[0])
else:
path = Path(str(test_conf.path))
else:
path = Path('.')
files = [os.path.abspath(str(f)) for f in path.glob("**/*")]
for file in files:
if os.path.normcase(os.path.abspath(module.__file__)) == os.path.normcase(file):
self.suite.addTests([
suite for suite, _ in prepare_suite(
self.trigger
)
])
elif self.trigger.cli_config.file and not self._local:
# if not running in isolated mode and the YAML is provided, run all tests
# that are declared in the YAML
self.loadTestsFromKalashYaml()
else:
tests, _ = _collect_test_case_from_module(module, None)
# if no YAML provided just add tests from the current module to the suite
for test in tests_generator(tests):
# suite is defined globally
self.suite.addTest(test)
return self.suite
def _smuggle_fixture_module(self, is_setup: bool):
cfg_section: Config = self.trigger.config
if cfg_section:
relpath_to_script = cfg_section.setup if is_setup else cfg_section.teardown
if relpath_to_script:
p = os.path.abspath(relpath_to_script)
smuggle(p)
def one_time_setup(self):
"""Runs One-time-setup script"""
self._smuggle_fixture_module(True)
def one_time_teardown(self):
"""Runs One-time-teardown script"""
self._smuggle_fixture_module(False)
# -----------------------------------------------------------------
main = TestProgram
# Exporting `unittest` components required in tests from `kalash`
# results in testers not needing to touch unittest itself.
# -----------------------------------------------------------------
def run_test_suite(
loader: MetaLoader,
kalash_trigger: Trigger,
whatif_callback: Callable[[str], None] = print
) -> Tuple[Optional[xmlrunner.runner._XMLTestResult], int]:
"""Accepts a loader and a `Trigger` object
and triggers the test run.
Args:
loader (MetaLoader): `MetaLoader` instance extending
`unittest.TestLoader` with extra goodies
kalash_trigger (Trigger): `Trigger` instance which
directly translates to a YAML configuration file
or an equivalent Python file
whatif_callback (Callable[[str], None]): the function
to call when running in a what-if mode
Returns:
A tuple of (unittest Result object, return code) or
a tuple of (`None`, return code) when running
in the what-if mode
"""
suite, whatif_names = loader.loadTestsFromKalashYaml()
return_code = 0
if not kalash_trigger.cli_config.what_if:
loader.one_time_setup()
report = "."
if kalash_trigger.config:
report = kalash_trigger.config.report
result: xmlrunner.runner._XMLTestResult = xmlrunner.XMLTestRunner(
output=report,
failfast=kalash_trigger.cli_config.fail_fast
).run(suite)
loader.one_time_teardown()
# PRODTEST-4708 -> Jenkins needs a non-zero return code
# on test failure
# return a valid return code depending on the result:
if len(result.failures) > 0:
return_code = 1
elif len(result.errors) > 0:
return_code = 2
return result, return_code
else:
for n in whatif_names:
whatif_callback(n)
return None, return_code
def run(
kalash_trigger: Trigger,
whatif_callback: Callable[[str], None] = print
) -> int:
"""User-friendly alias of the `run_test_suite` command
for importable use in Python-based files.
Args:
kalash_trigger (Trigger): `Trigger` instance which
directly translates to a YAML configuration file
or an equivalent Python file
whatif_callback (Callable[[str], None]): the function
to call when running in a what-if mode
Returns:
Return code, 0 if all collected tests are passing.
A non-zero return code indicates failure.
"""
frame = inspect.stack()[1]
module = inspect.getmodule(frame[0])
module_path = module.__file__ if module else None
loader = MetaLoader(
module_path,
local=False,
trigger=kalash_trigger
)
_, return_code = run_test_suite(loader, kalash_trigger, whatif_callback)
close_all()
return return_code
def make_loader_and_trigger_object(
config: CliConfig
) -> Tuple[MetaLoader, Trigger]:
"""Prepares a ``MetaLoader`` based
on the YAML file parameters.
Args:
cli_config (CliConfig): a `CliConfig` object representing
command-line parameters used to trigger the test run
modifying behavior of certain aspects of the application
like logging or triggering speculative runs instead of
real runs
Returns:
A tuple of (`MetaLoader` instance, `Trigger` instance)
"""
kalash_trigger = Trigger.infer_trigger(config)
loader = MetaLoader(
local=False,
trigger=kalash_trigger
)
return loader, kalash_trigger
def docs():
"""Open bundled documentation in the web browser."""
base_dir = os.path.dirname(__file__)
rel_docpath = ['built_docs', 'html', 'kalash', 'index.html']
docpath = os.path.join(base_dir, *rel_docpath)
_platform = platform.system()
if _platform == "Darwin" or "Linux":
url = f"file://{os.path.join(docpath)}"
elif _platform == "Windows":
url = f"file:\\\\\\{os.path.join(docpath)}"
else:
raise SystemError("Web browser handler for this platform is not supported")
webbrowser.open(url, new=2)
def main_cli():
"""
Main function. Expected to be run from CLI and used
only in automated context.
"""
config = CliConfig()
parser = argparse.ArgumentParser(description='Test automation runner')
subparsers = parser.add_subparsers()
parser.add_argument(
'-sc', '--spec-config',
type=str, help='Path to YAML specification YAML, default is `spec.yaml` '
'from the package directory.'
)
parser.add_argument(
'-dd', '--docs',
action='store_true', help='Display bundled documentation'
)
# `run` subcommand:
parser_run = subparsers.add_parser('run', help='run an analysis')
parser_run.add_argument(
'-f', '--file',
type=str, help='Path to .kalash.yaml')
parser_run.add_argument(
'-n', '--no-recurse',
action='store_true', help='Do not walk directories')
parser_run.add_argument(
'-d', '--debug',
action='store_true', help='Run in debug mode')
parser_run.add_argument(
'-ff', '--fail-fast',
action='store_true', help='Fail suite if at least one test fails')
parser_run.add_argument(
'-nl', '--no-log',
action='store_true', help='Disable logging')
parser_run.add_argument(
'-ne', '--no-log-echo',
action='store_true', help='If set, log calls will not be echoed to STDOUT')
parser_run.add_argument(
'-ld', '--log-dir',
type=str, help='Log base directory')
parser_run.add_argument(
'-ll', '--log-level',
type=int, help='Python `logging` log level ('
'CRITICAL = 50, '
'ERROR = 40, '
'WARNING = 30, '
'INFO = 20, '
'DEBUG = 10, '
'NOTSET = 0, default level is INFO)')
parser_run.add_argument(
'-lf', '--log-format',
type=str, help=f'Log format string, default is %{config.spec.cli_config.log_formatter}')
parser_run.add_argument(
'-g', '--group-by',
type=str, help='Log directories grouping: '
f'<{config.spec.cli_config.group_device}|'
f'{config.spec.cli_config.group_group}>')
parser_run.add_argument(
'-wi', '--what-if',
type=str, help='Collects the tests but does not run them '
'and produces a list of paths or IDs that have been '
'collected for the run. Use '
f'<{config.spec.cli_config.whatif_paths}|'
f'{config.spec.cli_config.whatif_ids}> '
'to modify the output behavior of the what-if flag.'
)
args = parser.parse_args()
if args.docs:
docs()
return 0
if args.file:
config.file = args.file
if args.spec_config:
config.spec_path = args.spec_config
config.__post_init__()
if args.log_dir:
config.log_dir = args.log_dir
if args.group_by:
config.group_by = args.group_by
if args.log_level:
config.log_level = args.log_level
if args.log_format:
config.log_format = args.log_format
if args.what_if:
config.what_if = args.what_if
loader, kalash_trigger = make_loader_and_trigger_object(
config
)
_, return_code = run_test_suite(loader, kalash_trigger)
close_all()
return return_code
Functions
def failfast(method)
-
Expand source code
def failfast(method): @wraps(method) def inner(self, *args, **kw): if getattr(self, 'failfast', False): self.stop() return method(self, *args, **kw) return inner
def get_ts(name='', format='%Y%m%d%H%M%S', sep='-')
-
Attaches a formatted timestamp to a name passed as the first argument. Utility function used in test templates.
Args
name
:str
- name to add
"-timestamp"
to format
:str
strftime
format string
Returns
When called with e.g.
"Something"
at 9 AM 2020.02.14, it will return a string like"Something-20200214090000"
Expand source code
def get_ts(name='', format="%Y%m%d%H%M%S", sep='-'): """ Attaches a formatted timestamp to a name passed as the first argument. Utility function used in test templates. Args: name (str): name to add `"-timestamp"` to format (str): `strftime` format string Returns: When called with e.g. `"Something"` at 9 AM 2020.02.14, it will return a string like `"Something-20200214090000"` """ return name + sep + datetime.datetime.now().strftime(format)
Classes
class MetaLoader (yaml_path: str = None, trigger: Optional[Trigger] = None, local=True)
-
This class is responsible for loading tests according to various criteria and returning them wrapped in a TestSuite
Custom
TestLoader
for Kalash. This provides consistency between running local and remote tests.Args
yaml_path
:str
- for backwards compatibility with Kalash YAML files,
set instantly to the
config_file_path
value trigger
:Optional[Trigger]
Trigger
instance providing the entire configuration model orNone
if the test is run in a local contextlocal
:bool
- if True, run only this test even when
Trigger
oryaml_path
is provided when in local context
Expand source code
class MetaLoader(TestLoader): def __init__( self, yaml_path: str = None, trigger: Optional[Trigger] = None, local=True ): """ Custom `TestLoader` for Kalash. This provides consistency between running local and remote tests. Args: yaml_path (str): for backwards compatibility with Kalash YAML files, set instantly to the `config_file_path` value trigger (Optional[Trigger]): `Trigger` instance providing the entire configuration model or `None` if the test is run in a local context local (bool): if True, run only this test even when `Trigger` or `yaml_path` is provided when in local context """ if yaml_path and not trigger: self._kalash_trigger = Trigger() self._kalash_trigger.cli_config.file = yaml_path elif yaml_path and trigger: self._kalash_trigger.cli_config.file = yaml_path elif not yaml_path and trigger: self._kalash_trigger = trigger else: self._kalash_trigger = Trigger() self._local = local self.suite = unittest.TestSuite() @property def trigger(self) -> Trigger: """Typesafe handler of `KalashYamlObj`. Throws an Exception if the YAML object hasn't been parsed correctly. """ if not self._kalash_trigger: raise Exception( "No `Trigger` on this `MetaLoader` instance" ) else: return self._kalash_trigger def loadTestsFromKalashYaml(self) -> CollectorArtifact: """Loads tests from associated YAML or `Trigger`""" whatif_names: PathOrIdForWhatIf = [] for a in prepare_suite( self.trigger ): one_suite, one_whatif_names = a self.suite.addTests(one_suite) whatif_names.extend(one_whatif_names) return self.suite, list(set(whatif_names)) def loadTestsFromModule(self, module, *args, pattern=None, **kws): def tests_generator(suite: unittest.TestSuite): """ Recursive test generator for unittest.TestSuite (because a suite can contain other suites recursively). Args: suite (unittest.TestSuite): tests suite to pull tests from Yields: unittest test functions """ for test in suite: if not type(test) is unittest.TestSuite: yield test else: for t in tests_generator(test): yield t if self.trigger.cli_config.file: # parse YAML if provided self._kalash_trigger = Trigger.infer_trigger( self.trigger.cli_config ) if self._local and self.trigger.cli_config.file: # if YAML exists and isolated mode is on, make sure values from YAML can be injected for test_idx, test_conf in enumerate(self.trigger.tests): # find whether any block declares path that pertains to this module config # or if the module is placed in any directory that should inherit config from pathlib import Path if test_conf.path: if type(test_conf.path) is str: path = Path(test_conf.path) elif type(test_conf.path) is list: path = Path(test_conf.path[0]) else: path = Path(str(test_conf.path)) else: path = Path('.') files = [os.path.abspath(str(f)) for f in path.glob("**/*")] for file in files: if os.path.normcase(os.path.abspath(module.__file__)) == os.path.normcase(file): self.suite.addTests([ suite for suite, _ in prepare_suite( self.trigger ) ]) elif self.trigger.cli_config.file and not self._local: # if not running in isolated mode and the YAML is provided, run all tests # that are declared in the YAML self.loadTestsFromKalashYaml() else: tests, _ = _collect_test_case_from_module(module, None) # if no YAML provided just add tests from the current module to the suite for test in tests_generator(tests): # suite is defined globally self.suite.addTest(test) return self.suite def _smuggle_fixture_module(self, is_setup: bool): cfg_section: Config = self.trigger.config if cfg_section: relpath_to_script = cfg_section.setup if is_setup else cfg_section.teardown if relpath_to_script: p = os.path.abspath(relpath_to_script) smuggle(p) def one_time_setup(self): """Runs One-time-setup script""" self._smuggle_fixture_module(True) def one_time_teardown(self): """Runs One-time-teardown script""" self._smuggle_fixture_module(False)
Ancestors
- unittest.loader.TestLoader
Instance variables
var trigger : Trigger
-
Typesafe handler of
KalashYamlObj
. Throws an Exception if the YAML object hasn't been parsed correctly.Expand source code
@property def trigger(self) -> Trigger: """Typesafe handler of `KalashYamlObj`. Throws an Exception if the YAML object hasn't been parsed correctly. """ if not self._kalash_trigger: raise Exception( "No `Trigger` on this `MetaLoader` instance" ) else: return self._kalash_trigger
Methods
def loadTestsFromKalashYaml(self) ‑> Tuple[unittest.suite.TestSuite, List[str]]
-
Loads tests from associated YAML or
Trigger
Expand source code
def loadTestsFromKalashYaml(self) -> CollectorArtifact: """Loads tests from associated YAML or `Trigger`""" whatif_names: PathOrIdForWhatIf = [] for a in prepare_suite( self.trigger ): one_suite, one_whatif_names = a self.suite.addTests(one_suite) whatif_names.extend(one_whatif_names) return self.suite, list(set(whatif_names))
def loadTestsFromModule(self, module, *args, pattern=None, **kws)
-
Return a suite of all test cases contained in the given module
Expand source code
def loadTestsFromModule(self, module, *args, pattern=None, **kws): def tests_generator(suite: unittest.TestSuite): """ Recursive test generator for unittest.TestSuite (because a suite can contain other suites recursively). Args: suite (unittest.TestSuite): tests suite to pull tests from Yields: unittest test functions """ for test in suite: if not type(test) is unittest.TestSuite: yield test else: for t in tests_generator(test): yield t if self.trigger.cli_config.file: # parse YAML if provided self._kalash_trigger = Trigger.infer_trigger( self.trigger.cli_config ) if self._local and self.trigger.cli_config.file: # if YAML exists and isolated mode is on, make sure values from YAML can be injected for test_idx, test_conf in enumerate(self.trigger.tests): # find whether any block declares path that pertains to this module config # or if the module is placed in any directory that should inherit config from pathlib import Path if test_conf.path: if type(test_conf.path) is str: path = Path(test_conf.path) elif type(test_conf.path) is list: path = Path(test_conf.path[0]) else: path = Path(str(test_conf.path)) else: path = Path('.') files = [os.path.abspath(str(f)) for f in path.glob("**/*")] for file in files: if os.path.normcase(os.path.abspath(module.__file__)) == os.path.normcase(file): self.suite.addTests([ suite for suite, _ in prepare_suite( self.trigger ) ]) elif self.trigger.cli_config.file and not self._local: # if not running in isolated mode and the YAML is provided, run all tests # that are declared in the YAML self.loadTestsFromKalashYaml() else: tests, _ = _collect_test_case_from_module(module, None) # if no YAML provided just add tests from the current module to the suite for test in tests_generator(tests): # suite is defined globally self.suite.addTest(test) return self.suite
def one_time_setup(self)
-
Runs One-time-setup script
Expand source code
def one_time_setup(self): """Runs One-time-setup script""" self._smuggle_fixture_module(True)
def one_time_teardown(self)
-
Runs One-time-teardown script
Expand source code
def one_time_teardown(self): """Runs One-time-teardown script""" self._smuggle_fixture_module(False)
class TestCase (methodName: str, id: str, meta: Meta, trigger: Optional[Trigger])
-
Lightweight
unittest.TestCase
wrapper.When declaring your own tests you're supposed to inherit from this class and treat it pretty much the same as a good old-fashioned
unittest.TestCase
.For example:
""" META_START id: test_something_12345 META_END """ from kalash.run import main, TestCase, MetaLoader class TestSomething(TestCase): test_something(self): self.assertTrue(True) if __name__ == '__main__' main(testLoader=MetaLoader())
Args
methodName
:str
- test method
id
:str
- test ID from the metadata tag
trigger
:Trigger
Trigger
instance
Create an instance of the class that will use the named test method when executed. Raises a ValueError if the instance does not have a method with the specified name.
Expand source code
class TestCase(unittest.TestCase): """ Lightweight `unittest.TestCase` wrapper. When declaring your own tests you're supposed to inherit from this class and treat it pretty much the same as a good old-fashioned `unittest.TestCase`. For example: ```python \"\"\" META_START id: test_something_12345 META_END \"\"\" from kalash.run import main, TestCase, MetaLoader class TestSomething(TestCase): test_something(self): self.assertTrue(True) if __name__ == '__main__' main(testLoader=MetaLoader()) ``` Args: methodName (str): test method id (str): test ID from the metadata tag trigger (Trigger): `Trigger` instance """ def __init__( self, methodName: str, id: str, meta: Meta, trigger: Optional[Trigger] ) -> None: super().__init__(methodName=methodName) cli_config = trigger.cli_config if trigger else CliConfig() self._id = id self.log_base_path = cli_config.log_dir if cli_config else None self.groupby = cli_config.group_by if cli_config else None self.no_log_echo = cli_config.no_log_echo if cli_config else None self.meta = meta self.trigger = trigger # inject logger: if cli_config: if not cli_config.no_log: self.logger = get( id, self.__class__.__name__, self.meta, cli_config ) else: # create dummy non-functional logger on the spot when # running with `log=False` self.logger = logging.getLogger(self.__class__.__name__) # close and clear all handlers that sb could have opened # by accident for h in self.logger.handlers: h.close() self.logger.handlers = [] def allow_when(self, allowed_parameters_config_property: str, parameter_on_test_case: str): """When running with a custom configuration class, you can use this method to tell your test case to not be skipped on some runtime filter. This is useful mostly when using Kalash with `parameterized`. Consider the following example: ```python class TestAdvancedFiltering1(TestCase): @parameterized.expand(['lincombo', 'cancombo']) def test_1(self, name): self.allow_when('run_only_with', name) print(f"Running for {name}") ``` If at runtime the config object contains a `run_only_with=['cancombo']` value, the test will only be triggered for `cancombo`. Args: allowed_parameters_config_property (str): property name on the `config` section of the `Trigger` instance containing the skip/allow (must be a `List`). parameter_on_test_case (str): parameter value to find in the allowed list, coming from the test case """ if self.trigger: run_with: Optional[List[str]] = self.trigger.config.get( allowed_parameters_config_property) if run_with: if parameter_on_test_case in run_with: return else: import inspect caller = inspect.stack()[1].function self.skipTest(f"{parameter_on_test_case} made test function {caller} skip") def __del__(self): if hasattr(self, 'logger'): close(self.logger)
Ancestors
- unittest.case.TestCase
Methods
def allow_when(self, allowed_parameters_config_property: str, parameter_on_test_case: str)
-
When running with a custom configuration class, you can use this method to tell your test case to not be skipped on some runtime filter. This is useful mostly when using Kalash with
parameterized
.Consider the following example:
class TestAdvancedFiltering1(TestCase): @parameterized.expand(['lincombo', 'cancombo']) def test_1(self, name): self.allow_when('run_only_with', name) print(f"Running for {name}")
If at runtime the config object contains a
run_only_with=['cancombo']
value, the test will only be triggered forcancombo
.Args
allowed_parameters_config_property
:str
- property name on the
config
section of theTrigger
instance containing the skip/allow (must be aList
). parameter_on_test_case
:str
- parameter value to find in the allowed list, coming from the test case
Expand source code
def allow_when(self, allowed_parameters_config_property: str, parameter_on_test_case: str): """When running with a custom configuration class, you can use this method to tell your test case to not be skipped on some runtime filter. This is useful mostly when using Kalash with `parameterized`. Consider the following example: ```python class TestAdvancedFiltering1(TestCase): @parameterized.expand(['lincombo', 'cancombo']) def test_1(self, name): self.allow_when('run_only_with', name) print(f"Running for {name}") ``` If at runtime the config object contains a `run_only_with=['cancombo']` value, the test will only be triggered for `cancombo`. Args: allowed_parameters_config_property (str): property name on the `config` section of the `Trigger` instance containing the skip/allow (must be a `List`). parameter_on_test_case (str): parameter value to find in the allowed list, coming from the test case """ if self.trigger: run_with: Optional[List[str]] = self.trigger.config.get( allowed_parameters_config_property) if run_with: if parameter_on_test_case in run_with: return else: import inspect caller = inspect.stack()[1].function self.skipTest(f"{parameter_on_test_case} made test function {caller} skip")
class TestProgram (module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=<unittest.loader.TestLoader object>, exit=True, verbosity=1, failfast=None, catchbreak=None, buffer=None, warnings=None, *, tb_locals=False)
-
A command-line program that runs a set of tests; this is primarily for making test modules conveniently executable.
Expand source code
class TestProgram(object): """A command-line program that runs a set of tests; this is primarily for making test modules conveniently executable. """ # defaults for testing module=None verbosity = 1 failfast = catchbreak = buffer = progName = warnings = testNamePatterns = None _discovery_parser = None def __init__(self, module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=loader.defaultTestLoader, exit=True, verbosity=1, failfast=None, catchbreak=None, buffer=None, warnings=None, *, tb_locals=False): if isinstance(module, str): self.module = __import__(module) for part in module.split('.')[1:]: self.module = getattr(self.module, part) else: self.module = module if argv is None: argv = sys.argv self.exit = exit self.failfast = failfast self.catchbreak = catchbreak self.verbosity = verbosity self.buffer = buffer self.tb_locals = tb_locals if warnings is None and not sys.warnoptions: # even if DeprecationWarnings are ignored by default # print them anyway unless other warnings settings are # specified by the warnings arg or the -W python flag self.warnings = 'default' else: # here self.warnings is set either to the value passed # to the warnings args or to None. # If the user didn't pass a value self.warnings will # be None. This means that the behavior is unchanged # and depends on the values passed to -W. self.warnings = warnings self.defaultTest = defaultTest self.testRunner = testRunner self.testLoader = testLoader self.progName = os.path.basename(argv[0]) self.parseArgs(argv) self.runTests() def usageExit(self, msg=None): if msg: print(msg) if self._discovery_parser is None: self._initArgParsers() self._print_help() sys.exit(2) def _print_help(self, *args, **kwargs): if self.module is None: print(self._main_parser.format_help()) print(MAIN_EXAMPLES % {'prog': self.progName}) self._discovery_parser.print_help() else: print(self._main_parser.format_help()) print(MODULE_EXAMPLES % {'prog': self.progName}) def parseArgs(self, argv): self._initArgParsers() if self.module is None: if len(argv) > 1 and argv[1].lower() == 'discover': self._do_discovery(argv[2:]) return self._main_parser.parse_args(argv[1:], self) if not self.tests: # this allows "python -m unittest -v" to still work for # test discovery. self._do_discovery([]) return else: self._main_parser.parse_args(argv[1:], self) if self.tests: self.testNames = _convert_names(self.tests) if __name__ == '__main__': # to support python -m unittest ... self.module = None elif self.defaultTest is None: # createTests will load tests from self.module self.testNames = None elif isinstance(self.defaultTest, str): self.testNames = (self.defaultTest,) else: self.testNames = list(self.defaultTest) self.createTests() def createTests(self, from_discovery=False, Loader=None): if self.testNamePatterns: self.testLoader.testNamePatterns = self.testNamePatterns if from_discovery: loader = self.testLoader if Loader is None else Loader() self.test = loader.discover(self.start, self.pattern, self.top) elif self.testNames is None: self.test = self.testLoader.loadTestsFromModule(self.module) else: self.test = self.testLoader.loadTestsFromNames(self.testNames, self.module) def _initArgParsers(self): parent_parser = self._getParentArgParser() self._main_parser = self._getMainArgParser(parent_parser) self._discovery_parser = self._getDiscoveryArgParser(parent_parser) def _getParentArgParser(self): parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const', const=2, help='Verbose output') parser.add_argument('-q', '--quiet', dest='verbosity', action='store_const', const=0, help='Quiet output') parser.add_argument('--locals', dest='tb_locals', action='store_true', help='Show local variables in tracebacks') if self.failfast is None: parser.add_argument('-f', '--failfast', dest='failfast', action='store_true', help='Stop on first fail or error') self.failfast = False if self.catchbreak is None: parser.add_argument('-c', '--catch', dest='catchbreak', action='store_true', help='Catch Ctrl-C and display results so far') self.catchbreak = False if self.buffer is None: parser.add_argument('-b', '--buffer', dest='buffer', action='store_true', help='Buffer stdout and stderr during tests') self.buffer = False if self.testNamePatterns is None: parser.add_argument('-k', dest='testNamePatterns', action='append', type=_convert_select_pattern, help='Only run tests which match the given substring') self.testNamePatterns = [] return parser def _getMainArgParser(self, parent): parser = argparse.ArgumentParser(parents=[parent]) parser.prog = self.progName parser.print_help = self._print_help parser.add_argument('tests', nargs='*', help='a list of any number of test modules, ' 'classes and test methods.') return parser def _getDiscoveryArgParser(self, parent): parser = argparse.ArgumentParser(parents=[parent]) parser.prog = '%s discover' % self.progName parser.epilog = ('For test discovery all test modules must be ' 'importable from the top level directory of the ' 'project.') parser.add_argument('-s', '--start-directory', dest='start', help="Directory to start discovery ('.' default)") parser.add_argument('-p', '--pattern', dest='pattern', help="Pattern to match tests ('test*.py' default)") parser.add_argument('-t', '--top-level-directory', dest='top', help='Top level directory of project (defaults to ' 'start directory)') for arg in ('start', 'pattern', 'top'): parser.add_argument(arg, nargs='?', default=argparse.SUPPRESS, help=argparse.SUPPRESS) return parser def _do_discovery(self, argv, Loader=None): self.start = '.' self.pattern = 'test*.py' self.top = None if argv is not None: # handle command line args for test discovery if self._discovery_parser is None: # for testing self._initArgParsers() self._discovery_parser.parse_args(argv, self) self.createTests(from_discovery=True, Loader=Loader) def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, type): try: try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings, tb_locals=self.tb_locals) except TypeError: # didn't accept the tb_locals argument testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
Subclasses
- xmlrunner.runner.XMLTestProgram
Class variables
var buffer
var catchbreak
var failfast
var module
var progName
var testNamePatterns
var verbosity
var warnings
Methods
def createTests(self, from_discovery=False, Loader=None)
-
Expand source code
def createTests(self, from_discovery=False, Loader=None): if self.testNamePatterns: self.testLoader.testNamePatterns = self.testNamePatterns if from_discovery: loader = self.testLoader if Loader is None else Loader() self.test = loader.discover(self.start, self.pattern, self.top) elif self.testNames is None: self.test = self.testLoader.loadTestsFromModule(self.module) else: self.test = self.testLoader.loadTestsFromNames(self.testNames, self.module)
def parseArgs(self, argv)
-
Expand source code
def parseArgs(self, argv): self._initArgParsers() if self.module is None: if len(argv) > 1 and argv[1].lower() == 'discover': self._do_discovery(argv[2:]) return self._main_parser.parse_args(argv[1:], self) if not self.tests: # this allows "python -m unittest -v" to still work for # test discovery. self._do_discovery([]) return else: self._main_parser.parse_args(argv[1:], self) if self.tests: self.testNames = _convert_names(self.tests) if __name__ == '__main__': # to support python -m unittest ... self.module = None elif self.defaultTest is None: # createTests will load tests from self.module self.testNames = None elif isinstance(self.defaultTest, str): self.testNames = (self.defaultTest,) else: self.testNames = list(self.defaultTest) self.createTests()
def runTests(self)
-
Expand source code
def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, type): try: try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings, tb_locals=self.tb_locals) except TypeError: # didn't accept the tb_locals argument testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
def usageExit(self, msg=None)
-
Expand source code
def usageExit(self, msg=None): if msg: print(msg) if self._discovery_parser is None: self._initArgParsers() self._print_help() sys.exit(2)
class main (module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=<unittest.loader.TestLoader object>, exit=True, verbosity=1, failfast=None, catchbreak=None, buffer=None, warnings=None, *, tb_locals=False)
-
A command-line program that runs a set of tests; this is primarily for making test modules conveniently executable.
Expand source code
class TestProgram(object): """A command-line program that runs a set of tests; this is primarily for making test modules conveniently executable. """ # defaults for testing module=None verbosity = 1 failfast = catchbreak = buffer = progName = warnings = testNamePatterns = None _discovery_parser = None def __init__(self, module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=loader.defaultTestLoader, exit=True, verbosity=1, failfast=None, catchbreak=None, buffer=None, warnings=None, *, tb_locals=False): if isinstance(module, str): self.module = __import__(module) for part in module.split('.')[1:]: self.module = getattr(self.module, part) else: self.module = module if argv is None: argv = sys.argv self.exit = exit self.failfast = failfast self.catchbreak = catchbreak self.verbosity = verbosity self.buffer = buffer self.tb_locals = tb_locals if warnings is None and not sys.warnoptions: # even if DeprecationWarnings are ignored by default # print them anyway unless other warnings settings are # specified by the warnings arg or the -W python flag self.warnings = 'default' else: # here self.warnings is set either to the value passed # to the warnings args or to None. # If the user didn't pass a value self.warnings will # be None. This means that the behavior is unchanged # and depends on the values passed to -W. self.warnings = warnings self.defaultTest = defaultTest self.testRunner = testRunner self.testLoader = testLoader self.progName = os.path.basename(argv[0]) self.parseArgs(argv) self.runTests() def usageExit(self, msg=None): if msg: print(msg) if self._discovery_parser is None: self._initArgParsers() self._print_help() sys.exit(2) def _print_help(self, *args, **kwargs): if self.module is None: print(self._main_parser.format_help()) print(MAIN_EXAMPLES % {'prog': self.progName}) self._discovery_parser.print_help() else: print(self._main_parser.format_help()) print(MODULE_EXAMPLES % {'prog': self.progName}) def parseArgs(self, argv): self._initArgParsers() if self.module is None: if len(argv) > 1 and argv[1].lower() == 'discover': self._do_discovery(argv[2:]) return self._main_parser.parse_args(argv[1:], self) if not self.tests: # this allows "python -m unittest -v" to still work for # test discovery. self._do_discovery([]) return else: self._main_parser.parse_args(argv[1:], self) if self.tests: self.testNames = _convert_names(self.tests) if __name__ == '__main__': # to support python -m unittest ... self.module = None elif self.defaultTest is None: # createTests will load tests from self.module self.testNames = None elif isinstance(self.defaultTest, str): self.testNames = (self.defaultTest,) else: self.testNames = list(self.defaultTest) self.createTests() def createTests(self, from_discovery=False, Loader=None): if self.testNamePatterns: self.testLoader.testNamePatterns = self.testNamePatterns if from_discovery: loader = self.testLoader if Loader is None else Loader() self.test = loader.discover(self.start, self.pattern, self.top) elif self.testNames is None: self.test = self.testLoader.loadTestsFromModule(self.module) else: self.test = self.testLoader.loadTestsFromNames(self.testNames, self.module) def _initArgParsers(self): parent_parser = self._getParentArgParser() self._main_parser = self._getMainArgParser(parent_parser) self._discovery_parser = self._getDiscoveryArgParser(parent_parser) def _getParentArgParser(self): parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const', const=2, help='Verbose output') parser.add_argument('-q', '--quiet', dest='verbosity', action='store_const', const=0, help='Quiet output') parser.add_argument('--locals', dest='tb_locals', action='store_true', help='Show local variables in tracebacks') if self.failfast is None: parser.add_argument('-f', '--failfast', dest='failfast', action='store_true', help='Stop on first fail or error') self.failfast = False if self.catchbreak is None: parser.add_argument('-c', '--catch', dest='catchbreak', action='store_true', help='Catch Ctrl-C and display results so far') self.catchbreak = False if self.buffer is None: parser.add_argument('-b', '--buffer', dest='buffer', action='store_true', help='Buffer stdout and stderr during tests') self.buffer = False if self.testNamePatterns is None: parser.add_argument('-k', dest='testNamePatterns', action='append', type=_convert_select_pattern, help='Only run tests which match the given substring') self.testNamePatterns = [] return parser def _getMainArgParser(self, parent): parser = argparse.ArgumentParser(parents=[parent]) parser.prog = self.progName parser.print_help = self._print_help parser.add_argument('tests', nargs='*', help='a list of any number of test modules, ' 'classes and test methods.') return parser def _getDiscoveryArgParser(self, parent): parser = argparse.ArgumentParser(parents=[parent]) parser.prog = '%s discover' % self.progName parser.epilog = ('For test discovery all test modules must be ' 'importable from the top level directory of the ' 'project.') parser.add_argument('-s', '--start-directory', dest='start', help="Directory to start discovery ('.' default)") parser.add_argument('-p', '--pattern', dest='pattern', help="Pattern to match tests ('test*.py' default)") parser.add_argument('-t', '--top-level-directory', dest='top', help='Top level directory of project (defaults to ' 'start directory)') for arg in ('start', 'pattern', 'top'): parser.add_argument(arg, nargs='?', default=argparse.SUPPRESS, help=argparse.SUPPRESS) return parser def _do_discovery(self, argv, Loader=None): self.start = '.' self.pattern = 'test*.py' self.top = None if argv is not None: # handle command line args for test discovery if self._discovery_parser is None: # for testing self._initArgParsers() self._discovery_parser.parse_args(argv, self) self.createTests(from_discovery=True, Loader=Loader) def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, type): try: try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings, tb_locals=self.tb_locals) except TypeError: # didn't accept the tb_locals argument testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
Subclasses
- xmlrunner.runner.XMLTestProgram
Class variables
var buffer
var catchbreak
var failfast
var module
var progName
var testNamePatterns
var verbosity
var warnings
Methods
def createTests(self, from_discovery=False, Loader=None)
-
Expand source code
def createTests(self, from_discovery=False, Loader=None): if self.testNamePatterns: self.testLoader.testNamePatterns = self.testNamePatterns if from_discovery: loader = self.testLoader if Loader is None else Loader() self.test = loader.discover(self.start, self.pattern, self.top) elif self.testNames is None: self.test = self.testLoader.loadTestsFromModule(self.module) else: self.test = self.testLoader.loadTestsFromNames(self.testNames, self.module)
def parseArgs(self, argv)
-
Expand source code
def parseArgs(self, argv): self._initArgParsers() if self.module is None: if len(argv) > 1 and argv[1].lower() == 'discover': self._do_discovery(argv[2:]) return self._main_parser.parse_args(argv[1:], self) if not self.tests: # this allows "python -m unittest -v" to still work for # test discovery. self._do_discovery([]) return else: self._main_parser.parse_args(argv[1:], self) if self.tests: self.testNames = _convert_names(self.tests) if __name__ == '__main__': # to support python -m unittest ... self.module = None elif self.defaultTest is None: # createTests will load tests from self.module self.testNames = None elif isinstance(self.defaultTest, str): self.testNames = (self.defaultTest,) else: self.testNames = list(self.defaultTest) self.createTests()
def runTests(self)
-
Expand source code
def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, type): try: try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings, tb_locals=self.tb_locals) except TypeError: # didn't accept the tb_locals argument testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
def usageExit(self, msg=None)
-
Expand source code
def usageExit(self, msg=None): if msg: print(msg) if self._discovery_parser is None: self._initArgParsers() self._print_help() sys.exit(2)
class TestResult (stream=None, descriptions=None, verbosity=None)
-
Holder for test result information.
Test results are automatically managed by the TestCase and TestSuite classes, and do not need to be explicitly manipulated by writers of tests.
Each instance holds the total number of tests run, and collections of failures and errors that occurred among those test runs. The collections contain tuples of (testcase, exceptioninfo), where exceptioninfo is the formatted traceback of the error that occurred.
Expand source code
class TestResult(object): """Holder for test result information. Test results are automatically managed by the TestCase and TestSuite classes, and do not need to be explicitly manipulated by writers of tests. Each instance holds the total number of tests run, and collections of failures and errors that occurred among those test runs. The collections contain tuples of (testcase, exceptioninfo), where exceptioninfo is the formatted traceback of the error that occurred. """ _previousTestClass = None _testRunEntered = False _moduleSetUpFailed = False def __init__(self, stream=None, descriptions=None, verbosity=None): self.failfast = False self.failures = [] self.errors = [] self.testsRun = 0 self.skipped = [] self.expectedFailures = [] self.unexpectedSuccesses = [] self.shouldStop = False self.buffer = False self.tb_locals = False self._stdout_buffer = None self._stderr_buffer = None self._original_stdout = sys.stdout self._original_stderr = sys.stderr self._mirrorOutput = False def printErrors(self): "Called by TestRunner after test run" def startTest(self, test): "Called when the given test is about to be run" self.testsRun += 1 self._mirrorOutput = False self._setupStdout() def _setupStdout(self): if self.buffer: if self._stderr_buffer is None: self._stderr_buffer = io.StringIO() self._stdout_buffer = io.StringIO() sys.stdout = self._stdout_buffer sys.stderr = self._stderr_buffer def startTestRun(self): """Called once before any tests are executed. See startTest for a method called before each test. """ def stopTest(self, test): """Called when the given test has been run""" self._restoreStdout() self._mirrorOutput = False def _restoreStdout(self): if self.buffer: if self._mirrorOutput: output = sys.stdout.getvalue() error = sys.stderr.getvalue() if output: if not output.endswith('\n'): output += '\n' self._original_stdout.write(STDOUT_LINE % output) if error: if not error.endswith('\n'): error += '\n' self._original_stderr.write(STDERR_LINE % error) sys.stdout = self._original_stdout sys.stderr = self._original_stderr self._stdout_buffer.seek(0) self._stdout_buffer.truncate() self._stderr_buffer.seek(0) self._stderr_buffer.truncate() def stopTestRun(self): """Called once after all tests are executed. See stopTest for a method called after each test. """ @failfast def addError(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info(). """ self.errors.append((test, self._exc_info_to_string(err, test))) self._mirrorOutput = True @failfast def addFailure(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().""" self.failures.append((test, self._exc_info_to_string(err, test))) self._mirrorOutput = True def addSubTest(self, test, subtest, err): """Called at the end of a subtest. 'err' is None if the subtest ended successfully, otherwise it's a tuple of values as returned by sys.exc_info(). """ # By default, we don't do anything with successful subtests, but # more sophisticated test results might want to record them. if err is not None: if getattr(self, 'failfast', False): self.stop() if issubclass(err[0], test.failureException): errors = self.failures else: errors = self.errors errors.append((subtest, self._exc_info_to_string(err, test))) self._mirrorOutput = True def addSuccess(self, test): "Called when a test has completed successfully" pass def addSkip(self, test, reason): """Called when a test is skipped.""" self.skipped.append((test, reason)) def addExpectedFailure(self, test, err): """Called when an expected failure/error occurred.""" self.expectedFailures.append( (test, self._exc_info_to_string(err, test))) @failfast def addUnexpectedSuccess(self, test): """Called when a test was expected to fail, but succeed.""" self.unexpectedSuccesses.append(test) def wasSuccessful(self): """Tells whether or not this result was a success.""" # The hasattr check is for test_result's OldResult test. That # way this method works on objects that lack the attribute. # (where would such result instances come from? old stored pickles?) return ((len(self.failures) == len(self.errors) == 0) and (not hasattr(self, 'unexpectedSuccesses') or len(self.unexpectedSuccesses) == 0)) def stop(self): """Indicates that the tests should be aborted.""" self.shouldStop = True def _exc_info_to_string(self, err, test): """Converts a sys.exc_info()-style tuple of values into a string.""" exctype, value, tb = err # Skip test runner traceback levels while tb and self._is_relevant_tb_level(tb): tb = tb.tb_next if exctype is test.failureException: # Skip assert*() traceback levels length = self._count_relevant_tb_levels(tb) else: length = None tb_e = traceback.TracebackException( exctype, value, tb, limit=length, capture_locals=self.tb_locals) msgLines = list(tb_e.format()) if self.buffer: output = sys.stdout.getvalue() error = sys.stderr.getvalue() if output: if not output.endswith('\n'): output += '\n' msgLines.append(STDOUT_LINE % output) if error: if not error.endswith('\n'): error += '\n' msgLines.append(STDERR_LINE % error) return ''.join(msgLines) def _is_relevant_tb_level(self, tb): return '__unittest' in tb.tb_frame.f_globals def _count_relevant_tb_levels(self, tb): length = 0 while tb and not self._is_relevant_tb_level(tb): length += 1 tb = tb.tb_next return length def __repr__(self): return ("<%s run=%i errors=%i failures=%i>" % (util.strclass(self.__class__), self.testsRun, len(self.errors), len(self.failures)))
Subclasses
- unittest.runner.TextTestResult
Methods
def addError(self, test, err)
-
Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().
Expand source code
@failfast def addError(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info(). """ self.errors.append((test, self._exc_info_to_string(err, test))) self._mirrorOutput = True
def addExpectedFailure(self, test, err)
-
Called when an expected failure/error occurred.
Expand source code
def addExpectedFailure(self, test, err): """Called when an expected failure/error occurred.""" self.expectedFailures.append( (test, self._exc_info_to_string(err, test)))
def addFailure(self, test, err)
-
Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().
Expand source code
@failfast def addFailure(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().""" self.failures.append((test, self._exc_info_to_string(err, test))) self._mirrorOutput = True
def addSkip(self, test, reason)
-
Called when a test is skipped.
Expand source code
def addSkip(self, test, reason): """Called when a test is skipped.""" self.skipped.append((test, reason))
def addSubTest(self, test, subtest, err)
-
Called at the end of a subtest. 'err' is None if the subtest ended successfully, otherwise it's a tuple of values as returned by sys.exc_info().
Expand source code
def addSubTest(self, test, subtest, err): """Called at the end of a subtest. 'err' is None if the subtest ended successfully, otherwise it's a tuple of values as returned by sys.exc_info(). """ # By default, we don't do anything with successful subtests, but # more sophisticated test results might want to record them. if err is not None: if getattr(self, 'failfast', False): self.stop() if issubclass(err[0], test.failureException): errors = self.failures else: errors = self.errors errors.append((subtest, self._exc_info_to_string(err, test))) self._mirrorOutput = True
def addSuccess(self, test)
-
Called when a test has completed successfully
Expand source code
def addSuccess(self, test): "Called when a test has completed successfully" pass
def addUnexpectedSuccess(self, test)
-
Called when a test was expected to fail, but succeed.
Expand source code
@failfast def addUnexpectedSuccess(self, test): """Called when a test was expected to fail, but succeed.""" self.unexpectedSuccesses.append(test)
def printErrors(self)
-
Called by TestRunner after test run
Expand source code
def printErrors(self): "Called by TestRunner after test run"
def startTest(self, test)
-
Called when the given test is about to be run
Expand source code
def startTest(self, test): "Called when the given test is about to be run" self.testsRun += 1 self._mirrorOutput = False self._setupStdout()
def startTestRun(self)
-
Called once before any tests are executed.
See startTest for a method called before each test.
Expand source code
def startTestRun(self): """Called once before any tests are executed. See startTest for a method called before each test. """
def stop(self)
-
Indicates that the tests should be aborted.
Expand source code
def stop(self): """Indicates that the tests should be aborted.""" self.shouldStop = True
def stopTest(self, test)
-
Called when the given test has been run
Expand source code
def stopTest(self, test): """Called when the given test has been run""" self._restoreStdout() self._mirrorOutput = False
def stopTestRun(self)
-
Called once after all tests are executed.
See stopTest for a method called after each test.
Expand source code
def stopTestRun(self): """Called once after all tests are executed. See stopTest for a method called after each test. """
def wasSuccessful(self)
-
Tells whether or not this result was a success.
Expand source code
def wasSuccessful(self): """Tells whether or not this result was a success.""" # The hasattr check is for test_result's OldResult test. That # way this method works on objects that lack the attribute. # (where would such result instances come from? old stored pickles?) return ((len(self.failures) == len(self.errors) == 0) and (not hasattr(self, 'unexpectedSuccesses') or len(self.unexpectedSuccesses) == 0))
class TextTestRunner (stream=None, descriptions=True, verbosity=1, failfast=False, buffer=False, resultclass=None, warnings=None, *, tb_locals=False)
-
A test runner class that displays results in textual form.
It prints out the names of tests as they are run, errors as they occur, and a summary of the results at the end of the test run.
Construct a TextTestRunner.
Subclasses should accept **kwargs to ensure compatibility as the interface changes.
Expand source code
class TextTestRunner(object): """A test runner class that displays results in textual form. It prints out the names of tests as they are run, errors as they occur, and a summary of the results at the end of the test run. """ resultclass = TextTestResult def __init__(self, stream=None, descriptions=True, verbosity=1, failfast=False, buffer=False, resultclass=None, warnings=None, *, tb_locals=False): """Construct a TextTestRunner. Subclasses should accept **kwargs to ensure compatibility as the interface changes. """ if stream is None: stream = sys.stderr self.stream = _WritelnDecorator(stream) self.descriptions = descriptions self.verbosity = verbosity self.failfast = failfast self.buffer = buffer self.tb_locals = tb_locals self.warnings = warnings if resultclass is not None: self.resultclass = resultclass def _makeResult(self): return self.resultclass(self.stream, self.descriptions, self.verbosity) def run(self, test): "Run the given test case or test suite." result = self._makeResult() registerResult(result) result.failfast = self.failfast result.buffer = self.buffer result.tb_locals = self.tb_locals with warnings.catch_warnings(): if self.warnings: # if self.warnings is set, use it to filter all the warnings warnings.simplefilter(self.warnings) # if the filter is 'default' or 'always', special-case the # warnings from the deprecated unittest methods to show them # no more than once per module, because they can be fairly # noisy. The -Wd and -Wa flags can be used to bypass this # only when self.warnings is None. if self.warnings in ['default', 'always']: warnings.filterwarnings('module', category=DeprecationWarning, message=r'Please use assert\w+ instead.') startTime = time.perf_counter() startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: startTestRun() try: test(result) finally: stopTestRun = getattr(result, 'stopTestRun', None) if stopTestRun is not None: stopTestRun() stopTime = time.perf_counter() timeTaken = stopTime - startTime result.printErrors() if hasattr(result, 'separator2'): self.stream.writeln(result.separator2) run = result.testsRun self.stream.writeln("Ran %d test%s in %.3fs" % (run, run != 1 and "s" or "", timeTaken)) self.stream.writeln() expectedFails = unexpectedSuccesses = skipped = 0 try: results = map(len, (result.expectedFailures, result.unexpectedSuccesses, result.skipped)) except AttributeError: pass else: expectedFails, unexpectedSuccesses, skipped = results infos = [] if not result.wasSuccessful(): self.stream.write("FAILED") failed, errored = len(result.failures), len(result.errors) if failed: infos.append("failures=%d" % failed) if errored: infos.append("errors=%d" % errored) else: self.stream.write("OK") if skipped: infos.append("skipped=%d" % skipped) if expectedFails: infos.append("expected failures=%d" % expectedFails) if unexpectedSuccesses: infos.append("unexpected successes=%d" % unexpectedSuccesses) if infos: self.stream.writeln(" (%s)" % (", ".join(infos),)) else: self.stream.write("\n") return result
Subclasses
- xmlrunner.runner.XMLTestRunner
Class variables
var resultclass
-
A test result class that can print formatted text results to a stream.
Used by TextTestRunner.
Methods
def run(self, test)
-
Run the given test case or test suite.
Expand source code
def run(self, test): "Run the given test case or test suite." result = self._makeResult() registerResult(result) result.failfast = self.failfast result.buffer = self.buffer result.tb_locals = self.tb_locals with warnings.catch_warnings(): if self.warnings: # if self.warnings is set, use it to filter all the warnings warnings.simplefilter(self.warnings) # if the filter is 'default' or 'always', special-case the # warnings from the deprecated unittest methods to show them # no more than once per module, because they can be fairly # noisy. The -Wd and -Wa flags can be used to bypass this # only when self.warnings is None. if self.warnings in ['default', 'always']: warnings.filterwarnings('module', category=DeprecationWarning, message=r'Please use assert\w+ instead.') startTime = time.perf_counter() startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: startTestRun() try: test(result) finally: stopTestRun = getattr(result, 'stopTestRun', None) if stopTestRun is not None: stopTestRun() stopTime = time.perf_counter() timeTaken = stopTime - startTime result.printErrors() if hasattr(result, 'separator2'): self.stream.writeln(result.separator2) run = result.testsRun self.stream.writeln("Ran %d test%s in %.3fs" % (run, run != 1 and "s" or "", timeTaken)) self.stream.writeln() expectedFails = unexpectedSuccesses = skipped = 0 try: results = map(len, (result.expectedFailures, result.unexpectedSuccesses, result.skipped)) except AttributeError: pass else: expectedFails, unexpectedSuccesses, skipped = results infos = [] if not result.wasSuccessful(): self.stream.write("FAILED") failed, errored = len(result.failures), len(result.errors) if failed: infos.append("failures=%d" % failed) if errored: infos.append("errors=%d" % errored) else: self.stream.write("OK") if skipped: infos.append("skipped=%d" % skipped) if expectedFails: infos.append("expected failures=%d" % expectedFails) if unexpectedSuccesses: infos.append("unexpected successes=%d" % unexpectedSuccesses) if infos: self.stream.writeln(" (%s)" % (", ".join(infos),)) else: self.stream.write("\n") return result
class parameterized (input, doc_func=None, skip_on_empty=False)
-
Parameterize a test case::
class TestInt(object): @parameterized([ ("A", 10), ("F", 15), param("10", 42, base=42) ]) def test_int(self, input, expected, base=16): actual = int(input, base=base) assert_equal(actual, expected)
@parameterized([ (2, 3, 5) (3, 5, 8), ]) def test_add(a, b, expected): assert_equal(a + b, expected)
Expand source code
class parameterized(object): """ Parameterize a test case:: class TestInt(object): @parameterized([ ("A", 10), ("F", 15), param("10", 42, base=42) ]) def test_int(self, input, expected, base=16): actual = int(input, base=base) assert_equal(actual, expected) @parameterized([ (2, 3, 5) (3, 5, 8), ]) def test_add(a, b, expected): assert_equal(a + b, expected) """ def __init__(self, input, doc_func=None, skip_on_empty=False): self.get_input = self.input_as_callable(input) self.doc_func = doc_func or default_doc_func self.skip_on_empty = skip_on_empty def __call__(self, test_func): self.assert_not_in_testcase_subclass() @wraps(test_func) def wrapper(test_self=None): test_cls = test_self and type(test_self) if test_self is not None: if issubclass(test_cls, InstanceType): raise TypeError(( "@parameterized can't be used with old-style classes, but " "%r has an old-style class. Consider using a new-style " "class, or '@parameterized.expand' " "(see http://stackoverflow.com/q/54867/71522 for more " "information on old-style classes)." ) %(test_self, )) original_doc = wrapper.__doc__ for num, args in enumerate(wrapper.parameterized_input): p = param.from_decorator(args) unbound_func, nose_tuple = self.param_as_nose_tuple(test_self, test_func, num, p) try: wrapper.__doc__ = nose_tuple[0].__doc__ # Nose uses `getattr(instance, test_func.__name__)` to get # a method bound to the test instance (as opposed to a # method bound to the instance of the class created when # tests were being enumerated). Set a value here to make # sure nose can get the correct test method. if test_self is not None: setattr(test_cls, test_func.__name__, unbound_func) yield nose_tuple finally: if test_self is not None: delattr(test_cls, test_func.__name__) wrapper.__doc__ = original_doc input = self.get_input() if not input: if not self.skip_on_empty: raise ValueError( "Parameters iterable is empty (hint: use " "`parameterized([], skip_on_empty=True)` to skip " "this test when the input is empty)" ) wrapper = wraps(test_func)(skip_on_empty_helper) wrapper.parameterized_input = input wrapper.parameterized_func = test_func test_func.__name__ = "_parameterized_original_%s" %(test_func.__name__, ) return wrapper def param_as_nose_tuple(self, test_self, func, num, p): nose_func = wraps(func)(lambda *args: func(*args[:-1], **args[-1])) nose_func.__doc__ = self.doc_func(func, num, p) # Track the unbound function because we need to setattr the unbound # function onto the class for nose to work (see comments above), and # Python 3 doesn't let us pull the function out of a bound method. unbound_func = nose_func if test_self is not None: # Under nose on Py2 we need to return an unbound method to make # sure that the `self` in the method is properly shared with the # `self` used in `setUp` and `tearDown`. But only there. Everyone # else needs a bound method. func_self = ( None if PY2 and detect_runner() == "nose" else test_self ) nose_func = make_method(nose_func, func_self, type(test_self)) return unbound_func, (nose_func, ) + p.args + (p.kwargs or {}, ) def assert_not_in_testcase_subclass(self): parent_classes = self._terrible_magic_get_defining_classes() if any(issubclass(cls, TestCase) for cls in parent_classes): raise Exception("Warning: '@parameterized' tests won't work " "inside subclasses of 'TestCase' - use " "'@parameterized.expand' instead.") def _terrible_magic_get_defining_classes(self): """ Returns the set of parent classes of the class currently being defined. Will likely only work if called from the ``parameterized`` decorator. This function is entirely @brandon_rhodes's fault, as he suggested the implementation: http://stackoverflow.com/a/8793684/71522 """ stack = inspect.stack() if len(stack) <= 4: return [] frame = stack[4] code_context = frame[4] and frame[4][0].strip() if not (code_context and code_context.startswith("class ")): return [] _, _, parents = code_context.partition("(") parents, _, _ = parents.partition(")") return eval("[" + parents + "]", frame[0].f_globals, frame[0].f_locals) @classmethod def input_as_callable(cls, input): if callable(input): return lambda: cls.check_input_values(input()) input_values = cls.check_input_values(input) return lambda: input_values @classmethod def check_input_values(cls, input_values): # Explicitly convery non-list inputs to a list so that: # 1. A helpful exception will be raised if they aren't iterable, and # 2. Generators are unwrapped exactly once (otherwise `nosetests # --processes=n` has issues; see: # https://github.com/wolever/nose-parameterized/pull/31) if not isinstance(input_values, list): input_values = list(input_values) return [ param.from_decorator(p) for p in input_values ] @classmethod def expand(cls, input, name_func=None, doc_func=None, skip_on_empty=False, **legacy): """ A "brute force" method of parameterizing test cases. Creates new test cases and injects them into the namespace that the wrapped function is being defined in. Useful for parameterizing tests in subclasses of 'UnitTest', where Nose test generators don't work. >>> @parameterized.expand([("foo", 1, 2)]) ... def test_add1(name, input, expected): ... actual = add1(input) ... assert_equal(actual, expected) ... >>> locals() ... 'test_add1_foo_0': <function ...> ... >>> """ if "testcase_func_name" in legacy: warnings.warn("testcase_func_name= is deprecated; use name_func=", DeprecationWarning, stacklevel=2) if not name_func: name_func = legacy["testcase_func_name"] if "testcase_func_doc" in legacy: warnings.warn("testcase_func_doc= is deprecated; use doc_func=", DeprecationWarning, stacklevel=2) if not doc_func: doc_func = legacy["testcase_func_doc"] doc_func = doc_func or default_doc_func name_func = name_func or default_name_func def parameterized_expand_wrapper(f, instance=None): frame_locals = inspect.currentframe().f_back.f_locals parameters = cls.input_as_callable(input)() if not parameters: if not skip_on_empty: raise ValueError( "Parameters iterable is empty (hint: use " "`parameterized.expand([], skip_on_empty=True)` to skip " "this test when the input is empty)" ) return wraps(f)(skip_on_empty_helper) digits = len(str(len(parameters) - 1)) for num, p in enumerate(parameters): name = name_func(f, "{num:0>{digits}}".format(digits=digits, num=num), p) # If the original function has patches applied by 'mock.patch', # re-construct all patches on the just former decoration layer # of param_as_standalone_func so as not to share # patch objects between new functions nf = reapply_patches_if_need(f) frame_locals[name] = cls.param_as_standalone_func(p, nf, name) frame_locals[name].__doc__ = doc_func(f, num, p) # Delete original patches to prevent new function from evaluating # original patching object as well as re-constructed patches. delete_patches_if_need(f) f.__test__ = False return parameterized_expand_wrapper @classmethod def param_as_standalone_func(cls, p, func, name): @wraps(func) def standalone_func(*a): return func(*(a + p.args), **p.kwargs) standalone_func.__name__ = name # place_as is used by py.test to determine what source file should be # used for this test. standalone_func.place_as = func # Remove __wrapped__ because py.test will try to look at __wrapped__ # to determine which parameters should be used with this test case, # and obviously we don't need it to do any parameterization. try: del standalone_func.__wrapped__ except AttributeError: pass return standalone_func @classmethod def to_safe_name(cls, s): return str(re.sub("[^a-zA-Z0-9_]+", "_", s))
Static methods
def check_input_values(input_values)
-
Expand source code
@classmethod def check_input_values(cls, input_values): # Explicitly convery non-list inputs to a list so that: # 1. A helpful exception will be raised if they aren't iterable, and # 2. Generators are unwrapped exactly once (otherwise `nosetests # --processes=n` has issues; see: # https://github.com/wolever/nose-parameterized/pull/31) if not isinstance(input_values, list): input_values = list(input_values) return [ param.from_decorator(p) for p in input_values ]
def expand(input, name_func=None, doc_func=None, skip_on_empty=False, **legacy)
-
A "brute force" method of parameterizing test cases. Creates new test cases and injects them into the namespace that the wrapped function is being defined in. Useful for parameterizing tests in subclasses of 'UnitTest', where Nose test generators don't work.
>>> @parameterized.expand([("foo", 1, 2)]) ... def test_add1(name, input, expected): ... actual = add1(input) ... assert_equal(actual, expected) ... >>> locals() ... 'test_add1_foo_0': <function ...> ... >>>
Expand source code
@classmethod def expand(cls, input, name_func=None, doc_func=None, skip_on_empty=False, **legacy): """ A "brute force" method of parameterizing test cases. Creates new test cases and injects them into the namespace that the wrapped function is being defined in. Useful for parameterizing tests in subclasses of 'UnitTest', where Nose test generators don't work. >>> @parameterized.expand([("foo", 1, 2)]) ... def test_add1(name, input, expected): ... actual = add1(input) ... assert_equal(actual, expected) ... >>> locals() ... 'test_add1_foo_0': <function ...> ... >>> """ if "testcase_func_name" in legacy: warnings.warn("testcase_func_name= is deprecated; use name_func=", DeprecationWarning, stacklevel=2) if not name_func: name_func = legacy["testcase_func_name"] if "testcase_func_doc" in legacy: warnings.warn("testcase_func_doc= is deprecated; use doc_func=", DeprecationWarning, stacklevel=2) if not doc_func: doc_func = legacy["testcase_func_doc"] doc_func = doc_func or default_doc_func name_func = name_func or default_name_func def parameterized_expand_wrapper(f, instance=None): frame_locals = inspect.currentframe().f_back.f_locals parameters = cls.input_as_callable(input)() if not parameters: if not skip_on_empty: raise ValueError( "Parameters iterable is empty (hint: use " "`parameterized.expand([], skip_on_empty=True)` to skip " "this test when the input is empty)" ) return wraps(f)(skip_on_empty_helper) digits = len(str(len(parameters) - 1)) for num, p in enumerate(parameters): name = name_func(f, "{num:0>{digits}}".format(digits=digits, num=num), p) # If the original function has patches applied by 'mock.patch', # re-construct all patches on the just former decoration layer # of param_as_standalone_func so as not to share # patch objects between new functions nf = reapply_patches_if_need(f) frame_locals[name] = cls.param_as_standalone_func(p, nf, name) frame_locals[name].__doc__ = doc_func(f, num, p) # Delete original patches to prevent new function from evaluating # original patching object as well as re-constructed patches. delete_patches_if_need(f) f.__test__ = False return parameterized_expand_wrapper
def input_as_callable(input)
-
Expand source code
@classmethod def input_as_callable(cls, input): if callable(input): return lambda: cls.check_input_values(input()) input_values = cls.check_input_values(input) return lambda: input_values
def param_as_standalone_func(p, func, name)
-
Expand source code
@classmethod def param_as_standalone_func(cls, p, func, name): @wraps(func) def standalone_func(*a): return func(*(a + p.args), **p.kwargs) standalone_func.__name__ = name # place_as is used by py.test to determine what source file should be # used for this test. standalone_func.place_as = func # Remove __wrapped__ because py.test will try to look at __wrapped__ # to determine which parameters should be used with this test case, # and obviously we don't need it to do any parameterization. try: del standalone_func.__wrapped__ except AttributeError: pass return standalone_func
def to_safe_name(s)
-
Expand source code
@classmethod def to_safe_name(cls, s): return str(re.sub("[^a-zA-Z0-9_]+", "_", s))
Methods
def assert_not_in_testcase_subclass(self)
-
Expand source code
def assert_not_in_testcase_subclass(self): parent_classes = self._terrible_magic_get_defining_classes() if any(issubclass(cls, TestCase) for cls in parent_classes): raise Exception("Warning: '@parameterized' tests won't work " "inside subclasses of 'TestCase' - use " "'@parameterized.expand' instead.")
def param_as_nose_tuple(self, test_self, func, num, p)
-
Expand source code
def param_as_nose_tuple(self, test_self, func, num, p): nose_func = wraps(func)(lambda *args: func(*args[:-1], **args[-1])) nose_func.__doc__ = self.doc_func(func, num, p) # Track the unbound function because we need to setattr the unbound # function onto the class for nose to work (see comments above), and # Python 3 doesn't let us pull the function out of a bound method. unbound_func = nose_func if test_self is not None: # Under nose on Py2 we need to return an unbound method to make # sure that the `self` in the method is properly shared with the # `self` used in `setUp` and `tearDown`. But only there. Everyone # else needs a bound method. func_self = ( None if PY2 and detect_runner() == "nose" else test_self ) nose_func = make_method(nose_func, func_self, type(test_self)) return unbound_func, (nose_func, ) + p.args + (p.kwargs or {}, )