kalash.run
View Source
import unittest import argparse import xmlrunner import os.path import platform import inspect import webbrowser from unittest import TextTestRunner, TestLoader from unittest import TestResult from unittest.result import failfast from unittest.main import TestProgram from types import ModuleType from parameterized import parameterized from typing import Callable, Dict, Iterator, Optional, Tuple from .utils import get_ts from .filter import apply_filters from .smuggle import smuggle from .config import (Collector, CollectorArtifact, Config, PathOrIdForWhatIf, CliConfig, Trigger) from .test_case import TestCase from .log import close_all from .collectors import (_collect_test_case_v1_x, _collect_test_case_v2_0, _collect_test_case_from_module) kalash = ModuleType('kalash') __all__ = ( 'unittest', 'TextTestRunner', 'TestResult', 'TestProgram', 'failfast', 'MetaLoader', 'main', 'TestCase', 'get_ts', 'parameterized' ) # ==================== # Public Utilities # ==================== def find_my_yaml(filevar: str, path: str) -> str: """ Figures out the path to the YAML file relative to a given test script, should be used like: ```python YAML = find_my_yaml(__file__, "../yamls/yaml.yaml") ``` Args: filevar (str): should always be set to `__file__` path (str): relative path component that points to the YAML Returns: Normalized absolute path to the correct YAML file """ return os.path.normpath( os.path.abspath( os.path.join(os.path.dirname(filevar), path) ) ) # ============================================================================= # ==================== # COLLECTOR LOOKUP # ==================== # all 1.x versions will map to v1_x function COLLECTOR_FUNC_LOOKUP: Dict[str, Collector] = \ {f'1.{k}': _collect_test_case_v1_x for k in range(0, 10)} # all further declarations will be mapped manually COLLECTOR_FUNC_LOOKUP['2.0'] = _collect_test_case_v2_0 # ============================================================================= def prepare_suite( kalash_trigger: Trigger ) -> Iterator[CollectorArtifact]: """ Higher-order suite definition function. As opposed to `Collector`, this function iterates over the YAML config and calls `Collector` on a per-test basis. This function calls `apply_filters`. If no filters are provided the `kalash_test_loader` will be called directly, otherwise the collected tests will be skimmed to match the provided filters. Args: kalash_trigger (Trigger): `Trigger` object collecting all configuration elements. Yields: One or more `CollectArtifact` elements. """ for test_idx, test_conf in enumerate(kalash_trigger.tests): # set up path (if exists) and non-filter keys path = test_conf.path if not path: path = '.' # default to CWD # recursive directory search can be set in a config or as a global flag when calling no_recurse_from_file: Optional[bool] = test_conf.no_recurse cli_config = kalash_trigger.cli_config if no_recurse_from_file is not None: cli_config.no_recurse = cli_config.no_recurse or no_recurse_from_file yield apply_filters( test_conf, path, COLLECTOR_FUNC_LOOKUP, kalash_trigger ) class MetaLoader(TestLoader): def __init__( self, yaml_path: str = None, trigger: Optional[Trigger] = None, local=True ): """ Custom `TestLoader` for Kalash. This provides consistency between running local and remote tests. Args: yaml_path (str): for backwards compatibility with Kalash YAML files, set instantly to the `config_file_path` value trigger (Optional[Trigger]): `Trigger` instance providing the entire configuration model or `None` if the test is run in a local context local (bool): if True, run only this test even when `Trigger` or `yaml_path` is provided when in local context """ if yaml_path and not trigger: self._kalash_trigger = Trigger() self._kalash_trigger.cli_config.file = yaml_path elif yaml_path and trigger: self._kalash_trigger.cli_config.file = yaml_path elif not yaml_path and trigger: self._kalash_trigger = trigger else: self._kalash_trigger = Trigger() self._local = local self.suite = unittest.TestSuite() @property def trigger(self) -> Trigger: """Typesafe handler of `KalashYamlObj`. Throws an Exception if the YAML object hasn't been parsed correctly. """ if not self._kalash_trigger: raise Exception( "No `Trigger` on this `MetaLoader` instance" ) else: return self._kalash_trigger def loadTestsFromKalashYaml(self) -> CollectorArtifact: """Loads tests from associated YAML or `Trigger`""" whatif_names: PathOrIdForWhatIf = [] for a in prepare_suite( self.trigger ): one_suite, one_whatif_names = a self.suite.addTests(one_suite) whatif_names.extend(one_whatif_names) return self.suite, list(set(whatif_names)) def loadTestsFromModule(self, module, *args, pattern=None, **kws): def tests_generator(suite: unittest.TestSuite): """ Recursive test generator for unittest.TestSuite (because a suite can contain other suites recursively). Args: suite (unittest.TestSuite): tests suite to pull tests from Yields: unittest test functions """ for test in suite: if not type(test) is unittest.TestSuite: yield test else: for t in tests_generator(test): yield t if self.trigger.cli_config.file: # parse YAML if provided self._kalash_trigger = Trigger.infer_trigger( self.trigger.cli_config ) if self._local and self.trigger.cli_config.file: # if YAML exists and isolated mode is on, make sure values from YAML can be injected for test_idx, test_conf in enumerate(self.trigger.tests): # find whether any block declares path that pertains to this module config # or if the module is placed in any directory that should inherit config from pathlib import Path if test_conf.path: if type(test_conf.path) is str: path = Path(test_conf.path) elif type(test_conf.path) is list: path = Path(test_conf.path[0]) else: path = Path(str(test_conf.path)) else: path = Path('.') files = [os.path.abspath(str(f)) for f in path.glob("**/*")] for file in files: if os.path.normcase(os.path.abspath(module.__file__)) == os.path.normcase(file): self.suite.addTests([ suite for suite, _ in prepare_suite( self.trigger ) ]) elif self.trigger.cli_config.file and not self._local: # if not running in isolated mode and the YAML is provided, run all tests # that are declared in the YAML self.loadTestsFromKalashYaml() else: tests, _ = _collect_test_case_from_module(module, None) # if no YAML provided just add tests from the current module to the suite for test in tests_generator(tests): # suite is defined globally self.suite.addTest(test) return self.suite def _smuggle_fixture_module(self, is_setup: bool): cfg_section: Config = self.trigger.config if cfg_section: relpath_to_script = cfg_section.setup if is_setup else cfg_section.teardown if relpath_to_script: p = os.path.abspath(relpath_to_script) smuggle(p) def one_time_setup(self): """Runs One-time-setup script""" self._smuggle_fixture_module(True) def one_time_teardown(self): """Runs One-time-teardown script""" self._smuggle_fixture_module(False) # ----------------------------------------------------------------- main = TestProgram # Exporting `unittest` components required in tests from `kalash` # results in testers not needing to touch unittest itself. # ----------------------------------------------------------------- def run_test_suite( loader: MetaLoader, kalash_trigger: Trigger, whatif_callback: Callable[[str], None] = print ) -> Tuple[Optional[xmlrunner.runner._XMLTestResult], int]: """Accepts a loader and a `Trigger` object and triggers the test run. Args: loader (MetaLoader): `MetaLoader` instance extending `unittest.TestLoader` with extra goodies kalash_trigger (Trigger): `Trigger` instance which directly translates to a YAML configuration file or an equivalent Python file whatif_callback (Callable[[str], None]): the function to call when running in a what-if mode Returns: A tuple of (unittest Result object, return code) or a tuple of (`None`, return code) when running in the what-if mode """ suite, whatif_names = loader.loadTestsFromKalashYaml() return_code = 0 if not kalash_trigger.cli_config.what_if: loader.one_time_setup() report = "." if kalash_trigger.config: report = kalash_trigger.config.report result: xmlrunner.runner._XMLTestResult = xmlrunner.XMLTestRunner( output=report, failfast=kalash_trigger.cli_config.fail_fast ).run(suite) loader.one_time_teardown() # PRODTEST-4708 -> Jenkins needs a non-zero return code # on test failure # return a valid return code depending on the result: if len(result.failures) > 0: return_code = 1 elif len(result.errors) > 0: return_code = 2 return result, return_code else: for n in whatif_names: whatif_callback(n) return None, return_code def run( kalash_trigger: Trigger, whatif_callback: Callable[[str], None] = print ) -> int: """User-friendly alias of the `run_test_suite` command for importable use in Python-based files. Args: kalash_trigger (Trigger): `Trigger` instance which directly translates to a YAML configuration file or an equivalent Python file whatif_callback (Callable[[str], None]): the function to call when running in a what-if mode Returns: Return code, 0 if all collected tests are passing. A non-zero return code indicates failure. """ frame = inspect.stack()[1] module = inspect.getmodule(frame[0]) module_path = module.__file__ if module else None loader = MetaLoader( module_path, local=False, trigger=kalash_trigger ) _, return_code = run_test_suite(loader, kalash_trigger, whatif_callback) close_all() return return_code def make_loader_and_trigger_object( config: CliConfig ) -> Tuple[MetaLoader, Trigger]: """Prepares a ``MetaLoader`` based on the YAML file parameters. Args: cli_config (CliConfig): a `CliConfig` object representing command-line parameters used to trigger the test run modifying behavior of certain aspects of the application like logging or triggering speculative runs instead of real runs Returns: A tuple of (`MetaLoader` instance, `Trigger` instance) """ kalash_trigger = Trigger.infer_trigger(config) loader = MetaLoader( local=False, trigger=kalash_trigger ) return loader, kalash_trigger def docs(): """Open bundled documentation in the web browser.""" base_dir = os.path.dirname(__file__) rel_docpath = ['built_docs', 'index.html'] docpath = os.path.join(base_dir, *rel_docpath) _platform = platform.system() if _platform == "Darwin" or "Linux": url = f"file://{os.path.join(docpath)}" elif _platform == "Windows": url = f"file:\\\\\\{os.path.join(docpath)}" else: raise SystemError("Web browser handler for this platform is not supported") webbrowser.open(url, new=2) def main_cli(): """ Main function. Expected to be run from CLI and used only in automated context. """ config = CliConfig() parser = argparse.ArgumentParser(description='Test automation runner') subparsers = parser.add_subparsers() parser.add_argument( '-sc', '--spec-config', type=str, help='Path to YAML specification YAML, default is `spec.yaml` ' 'from the package directory.' ) parser.add_argument( '-dd', '--docs', action='store_true', help='Display bundled documentation' ) # `run` subcommand: parser_run = subparsers.add_parser('run', help='run an analysis') parser_run.add_argument( '-f', '--file', type=str, help='Path to .kalash.yaml') parser_run.add_argument( '-n', '--no-recurse', action='store_true', help='Do not walk directories') parser_run.add_argument( '-d', '--debug', action='store_true', help='Run in debug mode') parser_run.add_argument( '-ff', '--fail-fast', action='store_true', help='Fail suite if at least one test fails') parser_run.add_argument( '-nl', '--no-log', action='store_true', help='Disable logging') parser_run.add_argument( '-ne', '--no-log-echo', action='store_true', help='If set, log calls will not be echoed to STDOUT') parser_run.add_argument( '-ld', '--log-dir', type=str, help='Log base directory') parser_run.add_argument( '-ll', '--log-level', type=int, help='Python `logging` log level (' 'CRITICAL = 50, ' 'ERROR = 40, ' 'WARNING = 30, ' 'INFO = 20, ' 'DEBUG = 10, ' 'NOTSET = 0, default level is INFO)') parser_run.add_argument( '-lf', '--log-format', type=str, help=f'Log format string, default is %{config.spec.cli_config.log_formatter}') parser_run.add_argument( '-g', '--group-by', type=str, help='Log directories grouping: ' f'<{config.spec.cli_config.group_device}|' f'{config.spec.cli_config.group_group}>') parser_run.add_argument( '-wi', '--what-if', type=str, help='Collects the tests but does not run them ' 'and produces a list of paths or IDs that have been ' 'collected for the run. Use ' f'<{config.spec.cli_config.whatif_paths}|' f'{config.spec.cli_config.whatif_ids}> ' 'to modify the output behavior of the what-if flag.' ) args = parser.parse_args() if args.docs: docs() return 0 if args.file: config.file = args.file if args.spec_config: config.spec_path = args.spec_config config.__post_init__() if args.log_dir: config.log_dir = args.log_dir if args.group_by: config.group_by = args.group_by if args.log_level: config.log_level = args.log_level if args.log_format: config.log_format = args.log_format if args.what_if: config.what_if = args.what_if loader, kalash_trigger = make_loader_and_trigger_object( config ) _, return_code = run_test_suite(loader, kalash_trigger) close_all() return return_code
View Source
""" Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's Smalltalk testing framework (used with permission). This module contains the core framework classes that form the basis of specific test cases and suites (TestCase, TestSuite etc.), and also a text-based utility class for running the tests and reporting the results (TextTestRunner). Simple usage: import unittest class IntegerArithmeticTestCase(unittest.TestCase): def testAdd(self): # test method names begin with 'test' self.assertEqual((1 + 2), 3) self.assertEqual(0 + 1, 1) def testMultiply(self): self.assertEqual((0 * 10), 0) self.assertEqual((5 * 8), 40) if __name__ == '__main__': unittest.main() Further information is available in the bundled documentation, and from http://docs.python.org/library/unittest.html Copyright (c) 1999-2003 Steve Purcell Copyright (c) 2003-2010 Python Software Foundation This module is free software, and you may redistribute it and/or modify it under the same terms as Python itself, so long as this copyright message and disclaimer are retained in their original form. IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. """ __all__ = ['TestResult', 'TestCase', 'IsolatedAsyncioTestCase', 'TestSuite', 'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main', 'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless', 'expectedFailure', 'TextTestResult', 'installHandler', 'registerResult', 'removeResult', 'removeHandler', 'addModuleCleanup'] # Expose obsolete functions for backwards compatibility __all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases']) __unittest = True from .result import TestResult from .case import (addModuleCleanup, TestCase, FunctionTestCase, SkipTest, skip, skipIf, skipUnless, expectedFailure) from .suite import BaseTestSuite, TestSuite from .loader import (TestLoader, defaultTestLoader, makeSuite, getTestCaseNames, findTestCases) from .main import TestProgram, main from .runner import TextTestRunner, TextTestResult from .signals import installHandler, registerResult, removeResult, removeHandler # IsolatedAsyncioTestCase will be imported lazily. # deprecated _TextTestResult = TextTestResult # There are no tests here, so don't try to run anything discovered from # introspecting the symbols (e.g. FunctionTestCase). Instead, all our # tests come from within unittest.test. def load_tests(loader, tests, pattern): import os.path # top level directory cached on loader instance this_dir = os.path.dirname(__file__) return loader.discover(start_dir=this_dir, pattern=pattern) # Lazy import of IsolatedAsyncioTestCase from .async_case # It imports asyncio, which is relatively heavy, but most tests # do not need it. def __dir__(): return globals().keys() | {'IsolatedAsyncioTestCase'} def __getattr__(name): if name == 'IsolatedAsyncioTestCase': global IsolatedAsyncioTestCase from .async_case import IsolatedAsyncioTestCase return IsolatedAsyncioTestCase raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's Smalltalk testing framework (used with permission).
This module contains the core framework classes that form the basis of specific test cases and suites (TestCase, TestSuite etc.), and also a text-based utility class for running the tests and reporting the results (TextTestRunner).
Simple usage:
import unittest
class IntegerArithmeticTestCase(unittest.TestCase):
def testAdd(self): # test method names begin with 'test'
self.assertEqual((1 + 2), 3)
self.assertEqual(0 + 1, 1)
def testMultiply(self):
self.assertEqual((0 * 10), 0)
self.assertEqual((5 * 8), 40)
if __name__ == '__main__':
unittest.main()
Further information is available in the bundled documentation, and from
http://docs.python.org/library/unittest.html
Copyright (c) 1999-2003 Steve Purcell Copyright (c) 2003-2010 Python Software Foundation This module is free software, and you may redistribute it and/or modify it under the same terms as Python itself, so long as this copyright message and disclaimer are retained in their original form.
IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
View Source
class TextTestRunner(object): """A test runner class that displays results in textual form. It prints out the names of tests as they are run, errors as they occur, and a summary of the results at the end of the test run. """ resultclass = TextTestResult def __init__(self, stream=None, descriptions=True, verbosity=1, failfast=False, buffer=False, resultclass=None, warnings=None, *, tb_locals=False): """Construct a TextTestRunner. Subclasses should accept **kwargs to ensure compatibility as the interface changes. """ if stream is None: stream = sys.stderr self.stream = _WritelnDecorator(stream) self.descriptions = descriptions self.verbosity = verbosity self.failfast = failfast self.buffer = buffer self.tb_locals = tb_locals self.warnings = warnings if resultclass is not None: self.resultclass = resultclass def _makeResult(self): return self.resultclass(self.stream, self.descriptions, self.verbosity) def run(self, test): "Run the given test case or test suite." result = self._makeResult() registerResult(result) result.failfast = self.failfast result.buffer = self.buffer result.tb_locals = self.tb_locals with warnings.catch_warnings(): if self.warnings: # if self.warnings is set, use it to filter all the warnings warnings.simplefilter(self.warnings) # if the filter is 'default' or 'always', special-case the # warnings from the deprecated unittest methods to show them # no more than once per module, because they can be fairly # noisy. The -Wd and -Wa flags can be used to bypass this # only when self.warnings is None. if self.warnings in ['default', 'always']: warnings.filterwarnings('module', category=DeprecationWarning, message=r'Please use assert\w+ instead.') startTime = time.perf_counter() startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: startTestRun() try: test(result) finally: stopTestRun = getattr(result, 'stopTestRun', None) if stopTestRun is not None: stopTestRun() stopTime = time.perf_counter() timeTaken = stopTime - startTime result.printErrors() if hasattr(result, 'separator2'): self.stream.writeln(result.separator2) run = result.testsRun self.stream.writeln("Ran %d test%s in %.3fs" % (run, run != 1 and "s" or "", timeTaken)) self.stream.writeln() expectedFails = unexpectedSuccesses = skipped = 0 try: results = map(len, (result.expectedFailures, result.unexpectedSuccesses, result.skipped)) except AttributeError: pass else: expectedFails, unexpectedSuccesses, skipped = results infos = [] if not result.wasSuccessful(): self.stream.write("FAILED") failed, errored = len(result.failures), len(result.errors) if failed: infos.append("failures=%d" % failed) if errored: infos.append("errors=%d" % errored) else: self.stream.write("OK") if skipped: infos.append("skipped=%d" % skipped) if expectedFails: infos.append("expected failures=%d" % expectedFails) if unexpectedSuccesses: infos.append("unexpected successes=%d" % unexpectedSuccesses) if infos: self.stream.writeln(" (%s)" % (", ".join(infos),)) else: self.stream.write("\n") return result
A test runner class that displays results in textual form.
It prints out the names of tests as they are run, errors as they occur, and a summary of the results at the end of the test run.
View Source
def __init__(self, stream=None, descriptions=True, verbosity=1, failfast=False, buffer=False, resultclass=None, warnings=None, *, tb_locals=False): """Construct a TextTestRunner. Subclasses should accept **kwargs to ensure compatibility as the interface changes. """ if stream is None: stream = sys.stderr self.stream = _WritelnDecorator(stream) self.descriptions = descriptions self.verbosity = verbosity self.failfast = failfast self.buffer = buffer self.tb_locals = tb_locals self.warnings = warnings if resultclass is not None: self.resultclass = resultclass
Construct a TextTestRunner.
Subclasses should accept **kwargs to ensure compatibility as the interface changes.
View Source
def run(self, test): "Run the given test case or test suite." result = self._makeResult() registerResult(result) result.failfast = self.failfast result.buffer = self.buffer result.tb_locals = self.tb_locals with warnings.catch_warnings(): if self.warnings: # if self.warnings is set, use it to filter all the warnings warnings.simplefilter(self.warnings) # if the filter is 'default' or 'always', special-case the # warnings from the deprecated unittest methods to show them # no more than once per module, because they can be fairly # noisy. The -Wd and -Wa flags can be used to bypass this # only when self.warnings is None. if self.warnings in ['default', 'always']: warnings.filterwarnings('module', category=DeprecationWarning, message=r'Please use assert\w+ instead.') startTime = time.perf_counter() startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: startTestRun() try: test(result) finally: stopTestRun = getattr(result, 'stopTestRun', None) if stopTestRun is not None: stopTestRun() stopTime = time.perf_counter() timeTaken = stopTime - startTime result.printErrors() if hasattr(result, 'separator2'): self.stream.writeln(result.separator2) run = result.testsRun self.stream.writeln("Ran %d test%s in %.3fs" % (run, run != 1 and "s" or "", timeTaken)) self.stream.writeln() expectedFails = unexpectedSuccesses = skipped = 0 try: results = map(len, (result.expectedFailures, result.unexpectedSuccesses, result.skipped)) except AttributeError: pass else: expectedFails, unexpectedSuccesses, skipped = results infos = [] if not result.wasSuccessful(): self.stream.write("FAILED") failed, errored = len(result.failures), len(result.errors) if failed: infos.append("failures=%d" % failed) if errored: infos.append("errors=%d" % errored) else: self.stream.write("OK") if skipped: infos.append("skipped=%d" % skipped) if expectedFails: infos.append("expected failures=%d" % expectedFails) if unexpectedSuccesses: infos.append("unexpected successes=%d" % unexpectedSuccesses) if infos: self.stream.writeln(" (%s)" % (", ".join(infos),)) else: self.stream.write("\n") return result
Run the given test case or test suite.
View Source
class TextTestResult(result.TestResult): """A test result class that can print formatted text results to a stream. Used by TextTestRunner. """ separator1 = '=' * 70 separator2 = '-' * 70 def __init__(self, stream, descriptions, verbosity): super(TextTestResult, self).__init__(stream, descriptions, verbosity) self.stream = stream self.showAll = verbosity > 1 self.dots = verbosity == 1 self.descriptions = descriptions def getDescription(self, test): doc_first_line = test.shortDescription() if self.descriptions and doc_first_line: return '\n'.join((str(test), doc_first_line)) else: return str(test) def startTest(self, test): super(TextTestResult, self).startTest(test) if self.showAll: self.stream.write(self.getDescription(test)) self.stream.write(" ... ") self.stream.flush() def addSuccess(self, test): super(TextTestResult, self).addSuccess(test) if self.showAll: self.stream.writeln("ok") elif self.dots: self.stream.write('.') self.stream.flush() def addError(self, test, err): super(TextTestResult, self).addError(test, err) if self.showAll: self.stream.writeln("ERROR") elif self.dots: self.stream.write('E') self.stream.flush() def addFailure(self, test, err): super(TextTestResult, self).addFailure(test, err) if self.showAll: self.stream.writeln("FAIL") elif self.dots: self.stream.write('F') self.stream.flush() def addSkip(self, test, reason): super(TextTestResult, self).addSkip(test, reason) if self.showAll: self.stream.writeln("skipped {0!r}".format(reason)) elif self.dots: self.stream.write("s") self.stream.flush() def addExpectedFailure(self, test, err): super(TextTestResult, self).addExpectedFailure(test, err) if self.showAll: self.stream.writeln("expected failure") elif self.dots: self.stream.write("x") self.stream.flush() def addUnexpectedSuccess(self, test): super(TextTestResult, self).addUnexpectedSuccess(test) if self.showAll: self.stream.writeln("unexpected success") elif self.dots: self.stream.write("u") self.stream.flush() def printErrors(self): if self.dots or self.showAll: self.stream.writeln() self.printErrorList('ERROR', self.errors) self.printErrorList('FAIL', self.failures) def printErrorList(self, flavour, errors): for test, err in errors: self.stream.writeln(self.separator1) self.stream.writeln("%s: %s" % (flavour,self.getDescription(test))) self.stream.writeln(self.separator2) self.stream.writeln("%s" % err)
A test result class that can print formatted text results to a stream.
Used by TextTestRunner.
Inherited Members
- unittest.runner.TextTestResult
- TextTestResult
- separator1
- separator2
- getDescription
- startTest
- addSuccess
- addError
- addFailure
- addSkip
- addExpectedFailure
- addUnexpectedSuccess
- printErrors
- printErrorList
View Source
class TestResult(object): """Holder for test result information. Test results are automatically managed by the TestCase and TestSuite classes, and do not need to be explicitly manipulated by writers of tests. Each instance holds the total number of tests run, and collections of failures and errors that occurred among those test runs. The collections contain tuples of (testcase, exceptioninfo), where exceptioninfo is the formatted traceback of the error that occurred. """ _previousTestClass = None _testRunEntered = False _moduleSetUpFailed = False def __init__(self, stream=None, descriptions=None, verbosity=None): self.failfast = False self.failures = [] self.errors = [] self.testsRun = 0 self.skipped = [] self.expectedFailures = [] self.unexpectedSuccesses = [] self.shouldStop = False self.buffer = False self.tb_locals = False self._stdout_buffer = None self._stderr_buffer = None self._original_stdout = sys.stdout self._original_stderr = sys.stderr self._mirrorOutput = False def printErrors(self): "Called by TestRunner after test run" def startTest(self, test): "Called when the given test is about to be run" self.testsRun += 1 self._mirrorOutput = False self._setupStdout() def _setupStdout(self): if self.buffer: if self._stderr_buffer is None: self._stderr_buffer = io.StringIO() self._stdout_buffer = io.StringIO() sys.stdout = self._stdout_buffer sys.stderr = self._stderr_buffer def startTestRun(self): """Called once before any tests are executed. See startTest for a method called before each test. """ def stopTest(self, test): """Called when the given test has been run""" self._restoreStdout() self._mirrorOutput = False def _restoreStdout(self): if self.buffer: if self._mirrorOutput: output = sys.stdout.getvalue() error = sys.stderr.getvalue() if output: if not output.endswith('\n'): output += '\n' self._original_stdout.write(STDOUT_LINE % output) if error: if not error.endswith('\n'): error += '\n' self._original_stderr.write(STDERR_LINE % error) sys.stdout = self._original_stdout sys.stderr = self._original_stderr self._stdout_buffer.seek(0) self._stdout_buffer.truncate() self._stderr_buffer.seek(0) self._stderr_buffer.truncate() def stopTestRun(self): """Called once after all tests are executed. See stopTest for a method called after each test. """ @failfast def addError(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info(). """ self.errors.append((test, self._exc_info_to_string(err, test))) self._mirrorOutput = True @failfast def addFailure(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().""" self.failures.append((test, self._exc_info_to_string(err, test))) self._mirrorOutput = True def addSubTest(self, test, subtest, err): """Called at the end of a subtest. 'err' is None if the subtest ended successfully, otherwise it's a tuple of values as returned by sys.exc_info(). """ # By default, we don't do anything with successful subtests, but # more sophisticated test results might want to record them. if err is not None: if getattr(self, 'failfast', False): self.stop() if issubclass(err[0], test.failureException): errors = self.failures else: errors = self.errors errors.append((subtest, self._exc_info_to_string(err, test))) self._mirrorOutput = True def addSuccess(self, test): "Called when a test has completed successfully" pass def addSkip(self, test, reason): """Called when a test is skipped.""" self.skipped.append((test, reason)) def addExpectedFailure(self, test, err): """Called when an expected failure/error occurred.""" self.expectedFailures.append( (test, self._exc_info_to_string(err, test))) @failfast def addUnexpectedSuccess(self, test): """Called when a test was expected to fail, but succeed.""" self.unexpectedSuccesses.append(test) def wasSuccessful(self): """Tells whether or not this result was a success.""" # The hasattr check is for test_result's OldResult test. That # way this method works on objects that lack the attribute. # (where would such result instances come from? old stored pickles?) return ((len(self.failures) == len(self.errors) == 0) and (not hasattr(self, 'unexpectedSuccesses') or len(self.unexpectedSuccesses) == 0)) def stop(self): """Indicates that the tests should be aborted.""" self.shouldStop = True def _exc_info_to_string(self, err, test): """Converts a sys.exc_info()-style tuple of values into a string.""" exctype, value, tb = err # Skip test runner traceback levels while tb and self._is_relevant_tb_level(tb): tb = tb.tb_next if exctype is test.failureException: # Skip assert*() traceback levels length = self._count_relevant_tb_levels(tb) else: length = None tb_e = traceback.TracebackException( exctype, value, tb, limit=length, capture_locals=self.tb_locals) msgLines = list(tb_e.format()) if self.buffer: output = sys.stdout.getvalue() error = sys.stderr.getvalue() if output: if not output.endswith('\n'): output += '\n' msgLines.append(STDOUT_LINE % output) if error: if not error.endswith('\n'): error += '\n' msgLines.append(STDERR_LINE % error) return ''.join(msgLines) def _is_relevant_tb_level(self, tb): return '__unittest' in tb.tb_frame.f_globals def _count_relevant_tb_levels(self, tb): length = 0 while tb and not self._is_relevant_tb_level(tb): length += 1 tb = tb.tb_next return length def __repr__(self): return ("<%s run=%i errors=%i failures=%i>" % (util.strclass(self.__class__), self.testsRun, len(self.errors), len(self.failures)))
Holder for test result information.
Test results are automatically managed by the TestCase and TestSuite classes, and do not need to be explicitly manipulated by writers of tests.
Each instance holds the total number of tests run, and collections of failures and errors that occurred among those test runs. The collections contain tuples of (testcase, exceptioninfo), where exceptioninfo is the formatted traceback of the error that occurred.
View Source
def __init__(self, stream=None, descriptions=None, verbosity=None): self.failfast = False self.failures = [] self.errors = [] self.testsRun = 0 self.skipped = [] self.expectedFailures = [] self.unexpectedSuccesses = [] self.shouldStop = False self.buffer = False self.tb_locals = False self._stdout_buffer = None self._stderr_buffer = None self._original_stdout = sys.stdout self._original_stderr = sys.stderr self._mirrorOutput = False
View Source
def printErrors(self): "Called by TestRunner after test run"
Called by TestRunner after test run
View Source
def startTest(self, test): "Called when the given test is about to be run" self.testsRun += 1 self._mirrorOutput = False self._setupStdout()
Called when the given test is about to be run
View Source
def startTestRun(self): """Called once before any tests are executed. See startTest for a method called before each test. """
Called once before any tests are executed.
See startTest for a method called before each test.
View Source
def stopTest(self, test): """Called when the given test has been run""" self._restoreStdout() self._mirrorOutput = False
Called when the given test has been run
View Source
def stopTestRun(self): """Called once after all tests are executed. See stopTest for a method called after each test. """
Called once after all tests are executed.
See stopTest for a method called after each test.
View Source
@failfast def addError(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info(). """ self.errors.append((test, self._exc_info_to_string(err, test))) self._mirrorOutput = True
Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().
View Source
@failfast def addFailure(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().""" self.failures.append((test, self._exc_info_to_string(err, test))) self._mirrorOutput = True
Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().
View Source
def addSubTest(self, test, subtest, err): """Called at the end of a subtest. 'err' is None if the subtest ended successfully, otherwise it's a tuple of values as returned by sys.exc_info(). """ # By default, we don't do anything with successful subtests, but # more sophisticated test results might want to record them. if err is not None: if getattr(self, 'failfast', False): self.stop() if issubclass(err[0], test.failureException): errors = self.failures else: errors = self.errors errors.append((subtest, self._exc_info_to_string(err, test))) self._mirrorOutput = True
Called at the end of a subtest. 'err' is None if the subtest ended successfully, otherwise it's a tuple of values as returned by sys.exc_info().
View Source
def addSuccess(self, test): "Called when a test has completed successfully" pass
Called when a test has completed successfully
View Source
def addSkip(self, test, reason): """Called when a test is skipped.""" self.skipped.append((test, reason))
Called when a test is skipped.
View Source
def addExpectedFailure(self, test, err): """Called when an expected failure/error occurred.""" self.expectedFailures.append( (test, self._exc_info_to_string(err, test)))
Called when an expected failure/error occurred.
View Source
@failfast def addUnexpectedSuccess(self, test): """Called when a test was expected to fail, but succeed.""" self.unexpectedSuccesses.append(test)
Called when a test was expected to fail, but succeed.
View Source
def wasSuccessful(self): """Tells whether or not this result was a success.""" # The hasattr check is for test_result's OldResult test. That # way this method works on objects that lack the attribute. # (where would such result instances come from? old stored pickles?) return ((len(self.failures) == len(self.errors) == 0) and (not hasattr(self, 'unexpectedSuccesses') or len(self.unexpectedSuccesses) == 0))
Tells whether or not this result was a success.
View Source
def stop(self): """Indicates that the tests should be aborted.""" self.shouldStop = True
Indicates that the tests should be aborted.
View Source
class TestProgram(object): """A command-line program that runs a set of tests; this is primarily for making test modules conveniently executable. """ # defaults for testing module=None verbosity = 1 failfast = catchbreak = buffer = progName = warnings = testNamePatterns = None _discovery_parser = None def __init__(self, module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=loader.defaultTestLoader, exit=True, verbosity=1, failfast=None, catchbreak=None, buffer=None, warnings=None, *, tb_locals=False): if isinstance(module, str): self.module = __import__(module) for part in module.split('.')[1:]: self.module = getattr(self.module, part) else: self.module = module if argv is None: argv = sys.argv self.exit = exit self.failfast = failfast self.catchbreak = catchbreak self.verbosity = verbosity self.buffer = buffer self.tb_locals = tb_locals if warnings is None and not sys.warnoptions: # even if DeprecationWarnings are ignored by default # print them anyway unless other warnings settings are # specified by the warnings arg or the -W python flag self.warnings = 'default' else: # here self.warnings is set either to the value passed # to the warnings args or to None. # If the user didn't pass a value self.warnings will # be None. This means that the behavior is unchanged # and depends on the values passed to -W. self.warnings = warnings self.defaultTest = defaultTest self.testRunner = testRunner self.testLoader = testLoader self.progName = os.path.basename(argv[0]) self.parseArgs(argv) self.runTests() def usageExit(self, msg=None): if msg: print(msg) if self._discovery_parser is None: self._initArgParsers() self._print_help() sys.exit(2) def _print_help(self, *args, **kwargs): if self.module is None: print(self._main_parser.format_help()) print(MAIN_EXAMPLES % {'prog': self.progName}) self._discovery_parser.print_help() else: print(self._main_parser.format_help()) print(MODULE_EXAMPLES % {'prog': self.progName}) def parseArgs(self, argv): self._initArgParsers() if self.module is None: if len(argv) > 1 and argv[1].lower() == 'discover': self._do_discovery(argv[2:]) return self._main_parser.parse_args(argv[1:], self) if not self.tests: # this allows "python -m unittest -v" to still work for # test discovery. self._do_discovery([]) return else: self._main_parser.parse_args(argv[1:], self) if self.tests: self.testNames = _convert_names(self.tests) if __name__ == '__main__': # to support python -m unittest ... self.module = None elif self.defaultTest is None: # createTests will load tests from self.module self.testNames = None elif isinstance(self.defaultTest, str): self.testNames = (self.defaultTest,) else: self.testNames = list(self.defaultTest) self.createTests() def createTests(self, from_discovery=False, Loader=None): if self.testNamePatterns: self.testLoader.testNamePatterns = self.testNamePatterns if from_discovery: loader = self.testLoader if Loader is None else Loader() self.test = loader.discover(self.start, self.pattern, self.top) elif self.testNames is None: self.test = self.testLoader.loadTestsFromModule(self.module) else: self.test = self.testLoader.loadTestsFromNames(self.testNames, self.module) def _initArgParsers(self): parent_parser = self._getParentArgParser() self._main_parser = self._getMainArgParser(parent_parser) self._discovery_parser = self._getDiscoveryArgParser(parent_parser) def _getParentArgParser(self): parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const', const=2, help='Verbose output') parser.add_argument('-q', '--quiet', dest='verbosity', action='store_const', const=0, help='Quiet output') parser.add_argument('--locals', dest='tb_locals', action='store_true', help='Show local variables in tracebacks') if self.failfast is None: parser.add_argument('-f', '--failfast', dest='failfast', action='store_true', help='Stop on first fail or error') self.failfast = False if self.catchbreak is None: parser.add_argument('-c', '--catch', dest='catchbreak', action='store_true', help='Catch Ctrl-C and display results so far') self.catchbreak = False if self.buffer is None: parser.add_argument('-b', '--buffer', dest='buffer', action='store_true', help='Buffer stdout and stderr during tests') self.buffer = False if self.testNamePatterns is None: parser.add_argument('-k', dest='testNamePatterns', action='append', type=_convert_select_pattern, help='Only run tests which match the given substring') self.testNamePatterns = [] return parser def _getMainArgParser(self, parent): parser = argparse.ArgumentParser(parents=[parent]) parser.prog = self.progName parser.print_help = self._print_help parser.add_argument('tests', nargs='*', help='a list of any number of test modules, ' 'classes and test methods.') return parser def _getDiscoveryArgParser(self, parent): parser = argparse.ArgumentParser(parents=[parent]) parser.prog = '%s discover' % self.progName parser.epilog = ('For test discovery all test modules must be ' 'importable from the top level directory of the ' 'project.') parser.add_argument('-s', '--start-directory', dest='start', help="Directory to start discovery ('.' default)") parser.add_argument('-p', '--pattern', dest='pattern', help="Pattern to match tests ('test*.py' default)") parser.add_argument('-t', '--top-level-directory', dest='top', help='Top level directory of project (defaults to ' 'start directory)') for arg in ('start', 'pattern', 'top'): parser.add_argument(arg, nargs='?', default=argparse.SUPPRESS, help=argparse.SUPPRESS) return parser def _do_discovery(self, argv, Loader=None): self.start = '.' self.pattern = 'test*.py' self.top = None if argv is not None: # handle command line args for test discovery if self._discovery_parser is None: # for testing self._initArgParsers() self._discovery_parser.parse_args(argv, self) self.createTests(from_discovery=True, Loader=Loader) def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, type): try: try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings, tb_locals=self.tb_locals) except TypeError: # didn't accept the tb_locals argument testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
A command-line program that runs a set of tests; this is primarily for making test modules conveniently executable.
View Source
def __init__(self, module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=loader.defaultTestLoader, exit=True, verbosity=1, failfast=None, catchbreak=None, buffer=None, warnings=None, *, tb_locals=False): if isinstance(module, str): self.module = __import__(module) for part in module.split('.')[1:]: self.module = getattr(self.module, part) else: self.module = module if argv is None: argv = sys.argv self.exit = exit self.failfast = failfast self.catchbreak = catchbreak self.verbosity = verbosity self.buffer = buffer self.tb_locals = tb_locals if warnings is None and not sys.warnoptions: # even if DeprecationWarnings are ignored by default # print them anyway unless other warnings settings are # specified by the warnings arg or the -W python flag self.warnings = 'default' else: # here self.warnings is set either to the value passed # to the warnings args or to None. # If the user didn't pass a value self.warnings will # be None. This means that the behavior is unchanged # and depends on the values passed to -W. self.warnings = warnings self.defaultTest = defaultTest self.testRunner = testRunner self.testLoader = testLoader self.progName = os.path.basename(argv[0]) self.parseArgs(argv) self.runTests()
View Source
def usageExit(self, msg=None): if msg: print(msg) if self._discovery_parser is None: self._initArgParsers() self._print_help() sys.exit(2)
View Source
def parseArgs(self, argv): self._initArgParsers() if self.module is None: if len(argv) > 1 and argv[1].lower() == 'discover': self._do_discovery(argv[2:]) return self._main_parser.parse_args(argv[1:], self) if not self.tests: # this allows "python -m unittest -v" to still work for # test discovery. self._do_discovery([]) return else: self._main_parser.parse_args(argv[1:], self) if self.tests: self.testNames = _convert_names(self.tests) if __name__ == '__main__': # to support python -m unittest ... self.module = None elif self.defaultTest is None: # createTests will load tests from self.module self.testNames = None elif isinstance(self.defaultTest, str): self.testNames = (self.defaultTest,) else: self.testNames = list(self.defaultTest) self.createTests()
View Source
def createTests(self, from_discovery=False, Loader=None): if self.testNamePatterns: self.testLoader.testNamePatterns = self.testNamePatterns if from_discovery: loader = self.testLoader if Loader is None else Loader() self.test = loader.discover(self.start, self.pattern, self.top) elif self.testNames is None: self.test = self.testLoader.loadTestsFromModule(self.module) else: self.test = self.testLoader.loadTestsFromNames(self.testNames, self.module)
View Source
def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, type): try: try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings, tb_locals=self.tb_locals) except TypeError: # didn't accept the tb_locals argument testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
View Source
def failfast(method): @wraps(method) def inner(self, *args, **kw): if getattr(self, 'failfast', False): self.stop() return method(self, *args, **kw) return inner
View Source
class MetaLoader(TestLoader): def __init__( self, yaml_path: str = None, trigger: Optional[Trigger] = None, local=True ): """ Custom `TestLoader` for Kalash. This provides consistency between running local and remote tests. Args: yaml_path (str): for backwards compatibility with Kalash YAML files, set instantly to the `config_file_path` value trigger (Optional[Trigger]): `Trigger` instance providing the entire configuration model or `None` if the test is run in a local context local (bool): if True, run only this test even when `Trigger` or `yaml_path` is provided when in local context """ if yaml_path and not trigger: self._kalash_trigger = Trigger() self._kalash_trigger.cli_config.file = yaml_path elif yaml_path and trigger: self._kalash_trigger.cli_config.file = yaml_path elif not yaml_path and trigger: self._kalash_trigger = trigger else: self._kalash_trigger = Trigger() self._local = local self.suite = unittest.TestSuite() @property def trigger(self) -> Trigger: """Typesafe handler of `KalashYamlObj`. Throws an Exception if the YAML object hasn't been parsed correctly. """ if not self._kalash_trigger: raise Exception( "No `Trigger` on this `MetaLoader` instance" ) else: return self._kalash_trigger def loadTestsFromKalashYaml(self) -> CollectorArtifact: """Loads tests from associated YAML or `Trigger`""" whatif_names: PathOrIdForWhatIf = [] for a in prepare_suite( self.trigger ): one_suite, one_whatif_names = a self.suite.addTests(one_suite) whatif_names.extend(one_whatif_names) return self.suite, list(set(whatif_names)) def loadTestsFromModule(self, module, *args, pattern=None, **kws): def tests_generator(suite: unittest.TestSuite): """ Recursive test generator for unittest.TestSuite (because a suite can contain other suites recursively). Args: suite (unittest.TestSuite): tests suite to pull tests from Yields: unittest test functions """ for test in suite: if not type(test) is unittest.TestSuite: yield test else: for t in tests_generator(test): yield t if self.trigger.cli_config.file: # parse YAML if provided self._kalash_trigger = Trigger.infer_trigger( self.trigger.cli_config ) if self._local and self.trigger.cli_config.file: # if YAML exists and isolated mode is on, make sure values from YAML can be injected for test_idx, test_conf in enumerate(self.trigger.tests): # find whether any block declares path that pertains to this module config # or if the module is placed in any directory that should inherit config from pathlib import Path if test_conf.path: if type(test_conf.path) is str: path = Path(test_conf.path) elif type(test_conf.path) is list: path = Path(test_conf.path[0]) else: path = Path(str(test_conf.path)) else: path = Path('.') files = [os.path.abspath(str(f)) for f in path.glob("**/*")] for file in files: if os.path.normcase(os.path.abspath(module.__file__)) == os.path.normcase(file): self.suite.addTests([ suite for suite, _ in prepare_suite( self.trigger ) ]) elif self.trigger.cli_config.file and not self._local: # if not running in isolated mode and the YAML is provided, run all tests # that are declared in the YAML self.loadTestsFromKalashYaml() else: tests, _ = _collect_test_case_from_module(module, None) # if no YAML provided just add tests from the current module to the suite for test in tests_generator(tests): # suite is defined globally self.suite.addTest(test) return self.suite def _smuggle_fixture_module(self, is_setup: bool): cfg_section: Config = self.trigger.config if cfg_section: relpath_to_script = cfg_section.setup if is_setup else cfg_section.teardown if relpath_to_script: p = os.path.abspath(relpath_to_script) smuggle(p) def one_time_setup(self): """Runs One-time-setup script""" self._smuggle_fixture_module(True) def one_time_teardown(self): """Runs One-time-teardown script""" self._smuggle_fixture_module(False)
This class is responsible for loading tests according to various criteria and returning them wrapped in a TestSuite
View Source
def __init__( self, yaml_path: str = None, trigger: Optional[Trigger] = None, local=True ): """ Custom `TestLoader` for Kalash. This provides consistency between running local and remote tests. Args: yaml_path (str): for backwards compatibility with Kalash YAML files, set instantly to the `config_file_path` value trigger (Optional[Trigger]): `Trigger` instance providing the entire configuration model or `None` if the test is run in a local context local (bool): if True, run only this test even when `Trigger` or `yaml_path` is provided when in local context """ if yaml_path and not trigger: self._kalash_trigger = Trigger() self._kalash_trigger.cli_config.file = yaml_path elif yaml_path and trigger: self._kalash_trigger.cli_config.file = yaml_path elif not yaml_path and trigger: self._kalash_trigger = trigger else: self._kalash_trigger = Trigger() self._local = local self.suite = unittest.TestSuite()
Custom TestLoader
for Kalash. This provides consistency
between running local and remote tests.
Args:
yaml_path (str): for backwards compatibility with Kalash YAML files,
set instantly to the config_file_path
value
trigger (Optional[Trigger]): Trigger
instance providing the
entire configuration model or None
if the test is run
in a local context
local (bool): if True, run only this test even when Trigger
or yaml_path
is provided when in local context
Typesafe handler of KalashYamlObj
.
Throws an Exception if the YAML object
hasn't been parsed correctly.
View Source
def loadTestsFromKalashYaml(self) -> CollectorArtifact: """Loads tests from associated YAML or `Trigger`""" whatif_names: PathOrIdForWhatIf = [] for a in prepare_suite( self.trigger ): one_suite, one_whatif_names = a self.suite.addTests(one_suite) whatif_names.extend(one_whatif_names) return self.suite, list(set(whatif_names))
Loads tests from associated YAML or Trigger
View Source
def loadTestsFromModule(self, module, *args, pattern=None, **kws): def tests_generator(suite: unittest.TestSuite): """ Recursive test generator for unittest.TestSuite (because a suite can contain other suites recursively). Args: suite (unittest.TestSuite): tests suite to pull tests from Yields: unittest test functions """ for test in suite: if not type(test) is unittest.TestSuite: yield test else: for t in tests_generator(test): yield t if self.trigger.cli_config.file: # parse YAML if provided self._kalash_trigger = Trigger.infer_trigger( self.trigger.cli_config ) if self._local and self.trigger.cli_config.file: # if YAML exists and isolated mode is on, make sure values from YAML can be injected for test_idx, test_conf in enumerate(self.trigger.tests): # find whether any block declares path that pertains to this module config # or if the module is placed in any directory that should inherit config from pathlib import Path if test_conf.path: if type(test_conf.path) is str: path = Path(test_conf.path) elif type(test_conf.path) is list: path = Path(test_conf.path[0]) else: path = Path(str(test_conf.path)) else: path = Path('.') files = [os.path.abspath(str(f)) for f in path.glob("**/*")] for file in files: if os.path.normcase(os.path.abspath(module.__file__)) == os.path.normcase(file): self.suite.addTests([ suite for suite, _ in prepare_suite( self.trigger ) ]) elif self.trigger.cli_config.file and not self._local: # if not running in isolated mode and the YAML is provided, run all tests # that are declared in the YAML self.loadTestsFromKalashYaml() else: tests, _ = _collect_test_case_from_module(module, None) # if no YAML provided just add tests from the current module to the suite for test in tests_generator(tests): # suite is defined globally self.suite.addTest(test) return self.suite
Return a suite of all test cases contained in the given module
View Source
def one_time_setup(self): """Runs One-time-setup script""" self._smuggle_fixture_module(True)
Runs One-time-setup script
View Source
def one_time_teardown(self): """Runs One-time-teardown script""" self._smuggle_fixture_module(False)
Runs One-time-teardown script
Inherited Members
- unittest.loader.TestLoader
- testMethodPrefix
- sortTestMethodsUsing
- testNamePatterns
- suiteClass
- loadTestsFromTestCase
- loadTestsFromName
- loadTestsFromNames
- getTestCaseNames
- discover
View Source
class TestProgram(object): """A command-line program that runs a set of tests; this is primarily for making test modules conveniently executable. """ # defaults for testing module=None verbosity = 1 failfast = catchbreak = buffer = progName = warnings = testNamePatterns = None _discovery_parser = None def __init__(self, module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=loader.defaultTestLoader, exit=True, verbosity=1, failfast=None, catchbreak=None, buffer=None, warnings=None, *, tb_locals=False): if isinstance(module, str): self.module = __import__(module) for part in module.split('.')[1:]: self.module = getattr(self.module, part) else: self.module = module if argv is None: argv = sys.argv self.exit = exit self.failfast = failfast self.catchbreak = catchbreak self.verbosity = verbosity self.buffer = buffer self.tb_locals = tb_locals if warnings is None and not sys.warnoptions: # even if DeprecationWarnings are ignored by default # print them anyway unless other warnings settings are # specified by the warnings arg or the -W python flag self.warnings = 'default' else: # here self.warnings is set either to the value passed # to the warnings args or to None. # If the user didn't pass a value self.warnings will # be None. This means that the behavior is unchanged # and depends on the values passed to -W. self.warnings = warnings self.defaultTest = defaultTest self.testRunner = testRunner self.testLoader = testLoader self.progName = os.path.basename(argv[0]) self.parseArgs(argv) self.runTests() def usageExit(self, msg=None): if msg: print(msg) if self._discovery_parser is None: self._initArgParsers() self._print_help() sys.exit(2) def _print_help(self, *args, **kwargs): if self.module is None: print(self._main_parser.format_help()) print(MAIN_EXAMPLES % {'prog': self.progName}) self._discovery_parser.print_help() else: print(self._main_parser.format_help()) print(MODULE_EXAMPLES % {'prog': self.progName}) def parseArgs(self, argv): self._initArgParsers() if self.module is None: if len(argv) > 1 and argv[1].lower() == 'discover': self._do_discovery(argv[2:]) return self._main_parser.parse_args(argv[1:], self) if not self.tests: # this allows "python -m unittest -v" to still work for # test discovery. self._do_discovery([]) return else: self._main_parser.parse_args(argv[1:], self) if self.tests: self.testNames = _convert_names(self.tests) if __name__ == '__main__': # to support python -m unittest ... self.module = None elif self.defaultTest is None: # createTests will load tests from self.module self.testNames = None elif isinstance(self.defaultTest, str): self.testNames = (self.defaultTest,) else: self.testNames = list(self.defaultTest) self.createTests() def createTests(self, from_discovery=False, Loader=None): if self.testNamePatterns: self.testLoader.testNamePatterns = self.testNamePatterns if from_discovery: loader = self.testLoader if Loader is None else Loader() self.test = loader.discover(self.start, self.pattern, self.top) elif self.testNames is None: self.test = self.testLoader.loadTestsFromModule(self.module) else: self.test = self.testLoader.loadTestsFromNames(self.testNames, self.module) def _initArgParsers(self): parent_parser = self._getParentArgParser() self._main_parser = self._getMainArgParser(parent_parser) self._discovery_parser = self._getDiscoveryArgParser(parent_parser) def _getParentArgParser(self): parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const', const=2, help='Verbose output') parser.add_argument('-q', '--quiet', dest='verbosity', action='store_const', const=0, help='Quiet output') parser.add_argument('--locals', dest='tb_locals', action='store_true', help='Show local variables in tracebacks') if self.failfast is None: parser.add_argument('-f', '--failfast', dest='failfast', action='store_true', help='Stop on first fail or error') self.failfast = False if self.catchbreak is None: parser.add_argument('-c', '--catch', dest='catchbreak', action='store_true', help='Catch Ctrl-C and display results so far') self.catchbreak = False if self.buffer is None: parser.add_argument('-b', '--buffer', dest='buffer', action='store_true', help='Buffer stdout and stderr during tests') self.buffer = False if self.testNamePatterns is None: parser.add_argument('-k', dest='testNamePatterns', action='append', type=_convert_select_pattern, help='Only run tests which match the given substring') self.testNamePatterns = [] return parser def _getMainArgParser(self, parent): parser = argparse.ArgumentParser(parents=[parent]) parser.prog = self.progName parser.print_help = self._print_help parser.add_argument('tests', nargs='*', help='a list of any number of test modules, ' 'classes and test methods.') return parser def _getDiscoveryArgParser(self, parent): parser = argparse.ArgumentParser(parents=[parent]) parser.prog = '%s discover' % self.progName parser.epilog = ('For test discovery all test modules must be ' 'importable from the top level directory of the ' 'project.') parser.add_argument('-s', '--start-directory', dest='start', help="Directory to start discovery ('.' default)") parser.add_argument('-p', '--pattern', dest='pattern', help="Pattern to match tests ('test*.py' default)") parser.add_argument('-t', '--top-level-directory', dest='top', help='Top level directory of project (defaults to ' 'start directory)') for arg in ('start', 'pattern', 'top'): parser.add_argument(arg, nargs='?', default=argparse.SUPPRESS, help=argparse.SUPPRESS) return parser def _do_discovery(self, argv, Loader=None): self.start = '.' self.pattern = 'test*.py' self.top = None if argv is not None: # handle command line args for test discovery if self._discovery_parser is None: # for testing self._initArgParsers() self._discovery_parser.parse_args(argv, self) self.createTests(from_discovery=True, Loader=Loader) def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, type): try: try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings, tb_locals=self.tb_locals) except TypeError: # didn't accept the tb_locals argument testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
A command-line program that runs a set of tests; this is primarily for making test modules conveniently executable.
View Source
def __init__(self, module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=loader.defaultTestLoader, exit=True, verbosity=1, failfast=None, catchbreak=None, buffer=None, warnings=None, *, tb_locals=False): if isinstance(module, str): self.module = __import__(module) for part in module.split('.')[1:]: self.module = getattr(self.module, part) else: self.module = module if argv is None: argv = sys.argv self.exit = exit self.failfast = failfast self.catchbreak = catchbreak self.verbosity = verbosity self.buffer = buffer self.tb_locals = tb_locals if warnings is None and not sys.warnoptions: # even if DeprecationWarnings are ignored by default # print them anyway unless other warnings settings are # specified by the warnings arg or the -W python flag self.warnings = 'default' else: # here self.warnings is set either to the value passed # to the warnings args or to None. # If the user didn't pass a value self.warnings will # be None. This means that the behavior is unchanged # and depends on the values passed to -W. self.warnings = warnings self.defaultTest = defaultTest self.testRunner = testRunner self.testLoader = testLoader self.progName = os.path.basename(argv[0]) self.parseArgs(argv) self.runTests()
View Source
def usageExit(self, msg=None): if msg: print(msg) if self._discovery_parser is None: self._initArgParsers() self._print_help() sys.exit(2)
View Source
def parseArgs(self, argv): self._initArgParsers() if self.module is None: if len(argv) > 1 and argv[1].lower() == 'discover': self._do_discovery(argv[2:]) return self._main_parser.parse_args(argv[1:], self) if not self.tests: # this allows "python -m unittest -v" to still work for # test discovery. self._do_discovery([]) return else: self._main_parser.parse_args(argv[1:], self) if self.tests: self.testNames = _convert_names(self.tests) if __name__ == '__main__': # to support python -m unittest ... self.module = None elif self.defaultTest is None: # createTests will load tests from self.module self.testNames = None elif isinstance(self.defaultTest, str): self.testNames = (self.defaultTest,) else: self.testNames = list(self.defaultTest) self.createTests()
View Source
def createTests(self, from_discovery=False, Loader=None): if self.testNamePatterns: self.testLoader.testNamePatterns = self.testNamePatterns if from_discovery: loader = self.testLoader if Loader is None else Loader() self.test = loader.discover(self.start, self.pattern, self.top) elif self.testNames is None: self.test = self.testLoader.loadTestsFromModule(self.module) else: self.test = self.testLoader.loadTestsFromNames(self.testNames, self.module)
View Source
def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, type): try: try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings, tb_locals=self.tb_locals) except TypeError: # didn't accept the tb_locals argument testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer, warnings=self.warnings) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
View Source
class TestCase(unittest.TestCase): """ Lightweight `unittest.TestCase` wrapper. When declaring your own tests you're supposed to inherit from this class and treat it pretty much the same as a good old-fashioned `unittest.TestCase`. For example: ```python \"\"\" META_START id: test_something_12345 META_END \"\"\" from kalash.run import main, TestCase, MetaLoader class TestSomething(TestCase): test_something(self): self.assertTrue(True) if __name__ == '__main__' main(testLoader=MetaLoader()) ``` Args: methodName (str): test method id (str): test ID from the metadata tag trigger (Trigger): `Trigger` instance """ def __init__( self, methodName: str, id: str, meta: Meta, trigger: Optional[Trigger] ) -> None: super().__init__(methodName=methodName) cli_config = trigger.cli_config if trigger else CliConfig() self._id = id self.log_base_path = cli_config.log_dir if cli_config else None self.groupby = cli_config.group_by if cli_config else None self.no_log_echo = cli_config.no_log_echo if cli_config else None self.meta = meta self.trigger = trigger # inject logger: if cli_config: if not cli_config.no_log: self.logger = get( id, self.__class__.__name__, self.meta, cli_config ) else: # create dummy non-functional logger on the spot when # running with `log=False` self.logger = logging.getLogger(self.__class__.__name__) # close and clear all handlers that sb could have opened # by accident for h in self.logger.handlers: h.close() self.logger.handlers = [] def allow_when(self, allowed_parameters_config_property: str, parameter_on_test_case: str): """When running with a custom configuration class, you can use this method to tell your test case to not be skipped on some runtime filter. This is useful mostly when using Kalash with `parameterized`. Consider the following example: ```python class TestAdvancedFiltering1(TestCase): @parameterized.expand(['lincombo', 'cancombo']) def test_1(self, name): self.allow_when('run_only_with', name) print(f"Running for {name}") ``` If at runtime the config object contains a `run_only_with=['cancombo']` value, the test will only be triggered for `cancombo`. Args: allowed_parameters_config_property (str): property name on the `config` section of the `Trigger` instance containing the skip/allow (must be a `List`). parameter_on_test_case (str): parameter value to find in the allowed list, coming from the test case """ if self.trigger: run_with: Optional[List[str]] = self.trigger.config.get( allowed_parameters_config_property) if run_with: if parameter_on_test_case in run_with: return else: import inspect caller = inspect.stack()[1].function self.skipTest(f"{parameter_on_test_case} made test function {caller} skip") def __del__(self): if hasattr(self, 'logger'): close(self.logger)
Lightweight unittest.TestCase
wrapper.
When declaring your own tests you're supposed to
inherit from this class and treat it pretty much
the same as a good old-fashioned unittest.TestCase
.
For example:
"""
META_START
id: test_something_12345
META_END
"""
from kalash.run import main, TestCase, MetaLoader
class TestSomething(TestCase):
test_something(self):
self.assertTrue(True)
if __name__ == '__main__'
main(testLoader=MetaLoader())
Args:
methodName (str): test method
id (str): test ID from the metadata tag
trigger (Trigger): Trigger
instance
View Source
def __init__( self, methodName: str, id: str, meta: Meta, trigger: Optional[Trigger] ) -> None: super().__init__(methodName=methodName) cli_config = trigger.cli_config if trigger else CliConfig() self._id = id self.log_base_path = cli_config.log_dir if cli_config else None self.groupby = cli_config.group_by if cli_config else None self.no_log_echo = cli_config.no_log_echo if cli_config else None self.meta = meta self.trigger = trigger # inject logger: if cli_config: if not cli_config.no_log: self.logger = get( id, self.__class__.__name__, self.meta, cli_config ) else: # create dummy non-functional logger on the spot when # running with `log=False` self.logger = logging.getLogger(self.__class__.__name__) # close and clear all handlers that sb could have opened # by accident for h in self.logger.handlers: h.close() self.logger.handlers = []
Create an instance of the class that will use the named test method when executed. Raises a ValueError if the instance does not have a method with the specified name.
View Source
def allow_when(self, allowed_parameters_config_property: str, parameter_on_test_case: str): """When running with a custom configuration class, you can use this method to tell your test case to not be skipped on some runtime filter. This is useful mostly when using Kalash with `parameterized`. Consider the following example: ```python class TestAdvancedFiltering1(TestCase): @parameterized.expand(['lincombo', 'cancombo']) def test_1(self, name): self.allow_when('run_only_with', name) print(f"Running for {name}") ``` If at runtime the config object contains a `run_only_with=['cancombo']` value, the test will only be triggered for `cancombo`. Args: allowed_parameters_config_property (str): property name on the `config` section of the `Trigger` instance containing the skip/allow (must be a `List`). parameter_on_test_case (str): parameter value to find in the allowed list, coming from the test case """ if self.trigger: run_with: Optional[List[str]] = self.trigger.config.get( allowed_parameters_config_property) if run_with: if parameter_on_test_case in run_with: return else: import inspect caller = inspect.stack()[1].function self.skipTest(f"{parameter_on_test_case} made test function {caller} skip")
When running with a custom configuration class, you can use this
method to tell your test case to not be skipped on some runtime filter.
This is useful mostly when using Kalash with parameterized
.
Consider the following example:
class TestAdvancedFiltering1(TestCase):
@parameterized.expand(['lincombo', 'cancombo'])
def test_1(self, name):
self.allow_when('run_only_with', name)
print(f"Running for {name}")
If at runtime the config object contains a run_only_with=['cancombo']
value, the test will only be triggered for cancombo
.
Args:
allowed_parameters_config_property (str): property name on the
config
section of the Trigger
instance containing the
skip/allow (must be a List
).
parameter_on_test_case (str): parameter value to find in the
allowed list, coming from the test case
Inherited Members
- unittest.case.TestCase
- failureException
- longMessage
- maxDiff
- addTypeEqualityFunc
- addCleanup
- addClassCleanup
- setUp
- tearDown
- setUpClass
- tearDownClass
- countTestCases
- defaultTestResult
- shortDescription
- id
- subTest
- run
- doCleanups
- doClassCleanups
- debug
- skipTest
- fail
- assertFalse
- assertTrue
- assertRaises
- assertWarns
- assertLogs
- assertEqual
- assertNotEqual
- assertAlmostEqual
- assertNotAlmostEqual
- assertSequenceEqual
- assertListEqual
- assertTupleEqual
- assertSetEqual
- assertIn
- assertNotIn
- assertIs
- assertIsNot
- assertDictEqual
- assertDictContainsSubset
- assertCountEqual
- assertMultiLineEqual
- assertLess
- assertLessEqual
- assertGreater
- assertGreaterEqual
- assertIsNone
- assertIsNotNone
- assertIsInstance
- assertNotIsInstance
- assertRaisesRegex
- assertWarnsRegex
- assertRegex
- assertNotRegex
- failUnlessRaises
- failIf
- assertRaisesRegexp
- assertRegexpMatches
- assertNotRegexpMatches
- failUnlessEqual
- assertEquals
- failIfEqual
- assertNotEquals
- failUnlessAlmostEqual
- assertAlmostEquals
- failIfAlmostEqual
- assertNotAlmostEquals
- failUnless
- assert_
View Source
def get_ts(name='', format="%Y%m%d%H%M%S", sep='-'): """ Attaches a formatted timestamp to a name passed as the first argument. Utility function used in test templates. Args: name (str): name to add `"-timestamp"` to format (str): `strftime` format string Returns: When called with e.g. `"Something"` at 9 AM 2020.02.14, it will return a string like `"Something-20200214090000"` """ return name + sep + datetime.datetime.now().strftime(format)
Attaches a formatted timestamp to a name passed as the first argument. Utility function used in test templates.
Args:
name (str): name to add "-timestamp"
to
format (str): strftime
format string
Returns:
When called with e.g. "Something"
at
9 AM 2020.02.14, it will return a string like
"Something-20200214090000"
View Source
class parameterized(object): """ Parameterize a test case:: class TestInt(object): @parameterized([ ("A", 10), ("F", 15), param("10", 42, base=42) ]) def test_int(self, input, expected, base=16): actual = int(input, base=base) assert_equal(actual, expected) @parameterized([ (2, 3, 5) (3, 5, 8), ]) def test_add(a, b, expected): assert_equal(a + b, expected) """ def __init__(self, input, doc_func=None, skip_on_empty=False): self.get_input = self.input_as_callable(input) self.doc_func = doc_func or default_doc_func self.skip_on_empty = skip_on_empty def __call__(self, test_func): self.assert_not_in_testcase_subclass() @wraps(test_func) def wrapper(test_self=None): test_cls = test_self and type(test_self) if test_self is not None: if issubclass(test_cls, InstanceType): raise TypeError(( "@parameterized can't be used with old-style classes, but " "%r has an old-style class. Consider using a new-style " "class, or '@parameterized.expand' " "(see http://stackoverflow.com/q/54867/71522 for more " "information on old-style classes)." ) %(test_self, )) original_doc = wrapper.__doc__ for num, args in enumerate(wrapper.parameterized_input): p = param.from_decorator(args) unbound_func, nose_tuple = self.param_as_nose_tuple(test_self, test_func, num, p) try: wrapper.__doc__ = nose_tuple[0].__doc__ # Nose uses `getattr(instance, test_func.__name__)` to get # a method bound to the test instance (as opposed to a # method bound to the instance of the class created when # tests were being enumerated). Set a value here to make # sure nose can get the correct test method. if test_self is not None: setattr(test_cls, test_func.__name__, unbound_func) yield nose_tuple finally: if test_self is not None: delattr(test_cls, test_func.__name__) wrapper.__doc__ = original_doc input = self.get_input() if not input: if not self.skip_on_empty: raise ValueError( "Parameters iterable is empty (hint: use " "`parameterized([], skip_on_empty=True)` to skip " "this test when the input is empty)" ) wrapper = wraps(test_func)(skip_on_empty_helper) wrapper.parameterized_input = input wrapper.parameterized_func = test_func test_func.__name__ = "_parameterized_original_%s" %(test_func.__name__, ) return wrapper def param_as_nose_tuple(self, test_self, func, num, p): nose_func = wraps(func)(lambda *args: func(*args[:-1], **args[-1])) nose_func.__doc__ = self.doc_func(func, num, p) # Track the unbound function because we need to setattr the unbound # function onto the class for nose to work (see comments above), and # Python 3 doesn't let us pull the function out of a bound method. unbound_func = nose_func if test_self is not None: # Under nose on Py2 we need to return an unbound method to make # sure that the `self` in the method is properly shared with the # `self` used in `setUp` and `tearDown`. But only there. Everyone # else needs a bound method. func_self = ( None if PY2 and detect_runner() == "nose" else test_self ) nose_func = make_method(nose_func, func_self, type(test_self)) return unbound_func, (nose_func, ) + p.args + (p.kwargs or {}, ) def assert_not_in_testcase_subclass(self): parent_classes = self._terrible_magic_get_defining_classes() if any(issubclass(cls, TestCase) for cls in parent_classes): raise Exception("Warning: '@parameterized' tests won't work " "inside subclasses of 'TestCase' - use " "'@parameterized.expand' instead.") def _terrible_magic_get_defining_classes(self): """ Returns the set of parent classes of the class currently being defined. Will likely only work if called from the ``parameterized`` decorator. This function is entirely @brandon_rhodes's fault, as he suggested the implementation: http://stackoverflow.com/a/8793684/71522 """ stack = inspect.stack() if len(stack) <= 4: return [] frame = stack[4] code_context = frame[4] and frame[4][0].strip() if not (code_context and code_context.startswith("class ")): return [] _, _, parents = code_context.partition("(") parents, _, _ = parents.partition(")") return eval("[" + parents + "]", frame[0].f_globals, frame[0].f_locals) @classmethod def input_as_callable(cls, input): if callable(input): return lambda: cls.check_input_values(input()) input_values = cls.check_input_values(input) return lambda: input_values @classmethod def check_input_values(cls, input_values): # Explicitly convery non-list inputs to a list so that: # 1. A helpful exception will be raised if they aren't iterable, and # 2. Generators are unwrapped exactly once (otherwise `nosetests # --processes=n` has issues; see: # https://github.com/wolever/nose-parameterized/pull/31) if not isinstance(input_values, list): input_values = list(input_values) return [ param.from_decorator(p) for p in input_values ] @classmethod def expand(cls, input, name_func=None, doc_func=None, skip_on_empty=False, **legacy): """ A "brute force" method of parameterizing test cases. Creates new test cases and injects them into the namespace that the wrapped function is being defined in. Useful for parameterizing tests in subclasses of 'UnitTest', where Nose test generators don't work. >>> @parameterized.expand([("foo", 1, 2)]) ... def test_add1(name, input, expected): ... actual = add1(input) ... assert_equal(actual, expected) ... >>> locals() ... 'test_add1_foo_0': <function ...> ... >>> """ if "testcase_func_name" in legacy: warnings.warn("testcase_func_name= is deprecated; use name_func=", DeprecationWarning, stacklevel=2) if not name_func: name_func = legacy["testcase_func_name"] if "testcase_func_doc" in legacy: warnings.warn("testcase_func_doc= is deprecated; use doc_func=", DeprecationWarning, stacklevel=2) if not doc_func: doc_func = legacy["testcase_func_doc"] doc_func = doc_func or default_doc_func name_func = name_func or default_name_func def parameterized_expand_wrapper(f, instance=None): frame_locals = inspect.currentframe().f_back.f_locals parameters = cls.input_as_callable(input)() if not parameters: if not skip_on_empty: raise ValueError( "Parameters iterable is empty (hint: use " "`parameterized.expand([], skip_on_empty=True)` to skip " "this test when the input is empty)" ) return wraps(f)(skip_on_empty_helper) digits = len(str(len(parameters) - 1)) for num, p in enumerate(parameters): name = name_func(f, "{num:0>{digits}}".format(digits=digits, num=num), p) # If the original function has patches applied by 'mock.patch', # re-construct all patches on the just former decoration layer # of param_as_standalone_func so as not to share # patch objects between new functions nf = reapply_patches_if_need(f) frame_locals[name] = cls.param_as_standalone_func(p, nf, name) frame_locals[name].__doc__ = doc_func(f, num, p) # Delete original patches to prevent new function from evaluating # original patching object as well as re-constructed patches. delete_patches_if_need(f) f.__test__ = False return parameterized_expand_wrapper @classmethod def param_as_standalone_func(cls, p, func, name): @wraps(func) def standalone_func(*a): return func(*(a + p.args), **p.kwargs) standalone_func.__name__ = name # place_as is used by py.test to determine what source file should be # used for this test. standalone_func.place_as = func # Remove __wrapped__ because py.test will try to look at __wrapped__ # to determine which parameters should be used with this test case, # and obviously we don't need it to do any parameterization. try: del standalone_func.__wrapped__ except AttributeError: pass return standalone_func @classmethod def to_safe_name(cls, s): return str(re.sub("[^a-zA-Z0-9_]+", "_", s))
Parameterize a test case::
class TestInt(object): @parameterized([ ("A", 10), ("F", 15), param("10", 42, base=42) ]) def test_int(self, input, expected, base=16): actual = int(input, base=base) assert_equal(actual, expected)
@parameterized([ (2, 3, 5) (3, 5, 8), ]) def test_add(a, b, expected): assert_equal(a + b, expected)
View Source
def __init__(self, input, doc_func=None, skip_on_empty=False): self.get_input = self.input_as_callable(input) self.doc_func = doc_func or default_doc_func self.skip_on_empty = skip_on_empty
View Source
def param_as_nose_tuple(self, test_self, func, num, p): nose_func = wraps(func)(lambda *args: func(*args[:-1], **args[-1])) nose_func.__doc__ = self.doc_func(func, num, p) # Track the unbound function because we need to setattr the unbound # function onto the class for nose to work (see comments above), and # Python 3 doesn't let us pull the function out of a bound method. unbound_func = nose_func if test_self is not None: # Under nose on Py2 we need to return an unbound method to make # sure that the `self` in the method is properly shared with the # `self` used in `setUp` and `tearDown`. But only there. Everyone # else needs a bound method. func_self = ( None if PY2 and detect_runner() == "nose" else test_self ) nose_func = make_method(nose_func, func_self, type(test_self)) return unbound_func, (nose_func, ) + p.args + (p.kwargs or {}, )
View Source
def assert_not_in_testcase_subclass(self): parent_classes = self._terrible_magic_get_defining_classes() if any(issubclass(cls, TestCase) for cls in parent_classes): raise Exception("Warning: '@parameterized' tests won't work " "inside subclasses of 'TestCase' - use " "'@parameterized.expand' instead.")
View Source
@classmethod def input_as_callable(cls, input): if callable(input): return lambda: cls.check_input_values(input()) input_values = cls.check_input_values(input) return lambda: input_values
View Source
@classmethod def check_input_values(cls, input_values): # Explicitly convery non-list inputs to a list so that: # 1. A helpful exception will be raised if they aren't iterable, and # 2. Generators are unwrapped exactly once (otherwise `nosetests # --processes=n` has issues; see: # https://github.com/wolever/nose-parameterized/pull/31) if not isinstance(input_values, list): input_values = list(input_values) return [ param.from_decorator(p) for p in input_values ]
View Source
@classmethod def expand(cls, input, name_func=None, doc_func=None, skip_on_empty=False, **legacy): """ A "brute force" method of parameterizing test cases. Creates new test cases and injects them into the namespace that the wrapped function is being defined in. Useful for parameterizing tests in subclasses of 'UnitTest', where Nose test generators don't work. >>> @parameterized.expand([("foo", 1, 2)]) ... def test_add1(name, input, expected): ... actual = add1(input) ... assert_equal(actual, expected) ... >>> locals() ... 'test_add1_foo_0': <function ...> ... >>> """ if "testcase_func_name" in legacy: warnings.warn("testcase_func_name= is deprecated; use name_func=", DeprecationWarning, stacklevel=2) if not name_func: name_func = legacy["testcase_func_name"] if "testcase_func_doc" in legacy: warnings.warn("testcase_func_doc= is deprecated; use doc_func=", DeprecationWarning, stacklevel=2) if not doc_func: doc_func = legacy["testcase_func_doc"] doc_func = doc_func or default_doc_func name_func = name_func or default_name_func def parameterized_expand_wrapper(f, instance=None): frame_locals = inspect.currentframe().f_back.f_locals parameters = cls.input_as_callable(input)() if not parameters: if not skip_on_empty: raise ValueError( "Parameters iterable is empty (hint: use " "`parameterized.expand([], skip_on_empty=True)` to skip " "this test when the input is empty)" ) return wraps(f)(skip_on_empty_helper) digits = len(str(len(parameters) - 1)) for num, p in enumerate(parameters): name = name_func(f, "{num:0>{digits}}".format(digits=digits, num=num), p) # If the original function has patches applied by 'mock.patch', # re-construct all patches on the just former decoration layer # of param_as_standalone_func so as not to share # patch objects between new functions nf = reapply_patches_if_need(f) frame_locals[name] = cls.param_as_standalone_func(p, nf, name) frame_locals[name].__doc__ = doc_func(f, num, p) # Delete original patches to prevent new function from evaluating # original patching object as well as re-constructed patches. delete_patches_if_need(f) f.__test__ = False return parameterized_expand_wrapper
A "brute force" method of parameterizing test cases. Creates new test cases and injects them into the namespace that the wrapped function is being defined in. Useful for parameterizing tests in subclasses of 'UnitTest', where Nose test generators don't work.
>>> @parameterized.expand([("foo", 1, 2)])
... def test_add1(name, input, expected):
... actual = add1(input)
... assert_equal(actual, expected)
...
>>> locals()
... 'test_add1_foo_0': <function ...> ...
>>>
View Source
@classmethod def param_as_standalone_func(cls, p, func, name): @wraps(func) def standalone_func(*a): return func(*(a + p.args), **p.kwargs) standalone_func.__name__ = name # place_as is used by py.test to determine what source file should be # used for this test. standalone_func.place_as = func # Remove __wrapped__ because py.test will try to look at __wrapped__ # to determine which parameters should be used with this test case, # and obviously we don't need it to do any parameterization. try: del standalone_func.__wrapped__ except AttributeError: pass return standalone_func
View Source
@classmethod def to_safe_name(cls, s): return str(re.sub("[^a-zA-Z0-9_]+", "_", s))