diff options
Diffstat (limited to 'lib/spack/external/_pytest/pytester.py')
-rw-r--r-- | lib/spack/external/_pytest/pytester.py | 184 |
1 files changed, 106 insertions, 78 deletions
diff --git a/lib/spack/external/_pytest/pytester.py b/lib/spack/external/_pytest/pytester.py index d87c0a762a..2db85dff22 100644 --- a/lib/spack/external/_pytest/pytester.py +++ b/lib/spack/external/_pytest/pytester.py @@ -1,4 +1,6 @@ """ (disabled by default) support for testing pytest and pytest plugins. """ +from __future__ import absolute_import, division, print_function + import codecs import gc import os @@ -10,8 +12,9 @@ import time import traceback from fnmatch import fnmatch -from py.builtin import print_ +from weakref import WeakKeyDictionary +from _pytest.capture import MultiCapture, SysCapture from _pytest._code import Source import py import pytest @@ -22,13 +25,13 @@ from _pytest.assertion.rewrite import AssertionRewritingHook def pytest_addoption(parser): # group = parser.getgroup("pytester", "pytester (self-tests) options") parser.addoption('--lsof', - action="store_true", dest="lsof", default=False, - help=("run FD checks if lsof is available")) + action="store_true", dest="lsof", default=False, + help=("run FD checks if lsof is available")) parser.addoption('--runpytest', default="inprocess", dest="runpytest", - choices=("inprocess", "subprocess", ), - help=("run pytest sub runs in tests using an 'inprocess' " - "or 'subprocess' (python -m main) method")) + choices=("inprocess", "subprocess", ), + help=("run pytest sub runs in tests using an 'inprocess' " + "or 'subprocess' (python -m main) method")) def pytest_configure(config): @@ -59,7 +62,7 @@ class LsofFdLeakChecker(object): def _parse_lsof_output(self, out): def isopen(line): return line.startswith('f') and ("deleted" not in line and - 'mem' not in line and "txt" not in line and 'cwd' not in line) + 'mem' not in line and "txt" not in line and 'cwd' not in line) open_files = [] @@ -85,7 +88,7 @@ class LsofFdLeakChecker(object): return True @pytest.hookimpl(hookwrapper=True, tryfirst=True) - def pytest_runtest_item(self, item): + def pytest_runtest_protocol(self, item): lines1 = self.get_open_files() yield if hasattr(sys, "pypy_version_info"): @@ -104,7 +107,8 @@ class LsofFdLeakChecker(object): error.extend([str(f) for f in lines2]) error.append(error[0]) error.append("*** function %s:%s: %s " % item.location) - pytest.fail("\n".join(error), pytrace=False) + error.append("See issue #2366") + item.warn('', "\n".join(error)) # XXX copied from execnet's conftest.py - needs to be merged @@ -118,6 +122,7 @@ winpymap = { 'python3.5': r'C:\Python35\python.exe', } + def getexecutable(name, cache={}): try: return cache[name] @@ -126,19 +131,20 @@ def getexecutable(name, cache={}): if executable: import subprocess popen = subprocess.Popen([str(executable), "--version"], - universal_newlines=True, stderr=subprocess.PIPE) + universal_newlines=True, stderr=subprocess.PIPE) out, err = popen.communicate() if name == "jython": if not err or "2.5" not in err: executable = None if "2.5.2" in err: - executable = None # http://bugs.jython.org/issue1790 + executable = None # http://bugs.jython.org/issue1790 elif popen.returncode != 0: # Handle pyenv's 127. executable = None cache[name] = executable return executable + @pytest.fixture(params=['python2.6', 'python2.7', 'python3.3', "python3.4", 'pypy', 'pypy3']) def anypython(request): @@ -155,6 +161,8 @@ def anypython(request): return executable # used at least by pytest-xdist plugin + + @pytest.fixture def _pytest(request): """ Return a helper which offers a gethookrecorder(hook) @@ -163,6 +171,7 @@ def _pytest(request): """ return PytestArg(request) + class PytestArg: def __init__(self, request): self.request = request @@ -173,9 +182,9 @@ class PytestArg: return hookrecorder -def get_public_names(l): - """Only return names from iterator l without a leading underscore.""" - return [x for x in l if x[0] != "_"] +def get_public_names(values): + """Only return names from iterator values without a leading underscore.""" + return [x for x in values if x[0] != "_"] class ParsedCall: @@ -186,7 +195,7 @@ class ParsedCall: def __repr__(self): d = self.__dict__.copy() del d['_name'] - return "<ParsedCall %r(**%r)>" %(self._name, d) + return "<ParsedCall %r(**%r)>" % (self._name, d) class HookRecorder: @@ -226,15 +235,15 @@ class HookRecorder: name, check = entries.pop(0) for ind, call in enumerate(self.calls[i:]): if call._name == name: - print_("NAMEMATCH", name, call) + print("NAMEMATCH", name, call) if eval(check, backlocals, call.__dict__): - print_("CHECKERMATCH", repr(check), "->", call) + print("CHECKERMATCH", repr(check), "->", call) else: - print_("NOCHECKERMATCH", repr(check), "-", call) + print("NOCHECKERMATCH", repr(check), "-", call) continue i += ind + 1 break - print_("NONAMEMATCH", name, "with", call) + print("NONAMEMATCH", name, "with", call) else: pytest.fail("could not find %r check %r" % (name, check)) @@ -249,9 +258,9 @@ class HookRecorder: pytest.fail("\n".join(lines)) def getcall(self, name): - l = self.getcalls(name) - assert len(l) == 1, (name, l) - return l[0] + values = self.getcalls(name) + assert len(values) == 1, (name, values) + return values[0] # functionality for test reports @@ -260,9 +269,9 @@ class HookRecorder: return [x.report for x in self.getcalls(names)] def matchreport(self, inamepart="", - names="pytest_runtest_logreport pytest_collectreport", when=None): + names="pytest_runtest_logreport pytest_collectreport", when=None): """ return a testreport whose dotted import path matches """ - l = [] + values = [] for rep in self.getreports(names=names): try: if not when and rep.when != "call" and rep.passed: @@ -273,14 +282,14 @@ class HookRecorder: if when and getattr(rep, 'when', None) != when: continue if not inamepart or inamepart in rep.nodeid.split("::"): - l.append(rep) - if not l: + values.append(rep) + if not values: raise ValueError("could not find test report matching %r: " "no test reports at all!" % (inamepart,)) - if len(l) > 1: + if len(values) > 1: raise ValueError( - "found 2 or more testreports matching %r: %s" %(inamepart, l)) - return l[0] + "found 2 or more testreports matching %r: %s" % (inamepart, values)) + return values[0] def getfailures(self, names='pytest_runtest_logreport pytest_collectreport'): @@ -294,7 +303,7 @@ class HookRecorder: skipped = [] failed = [] for rep in self.getreports( - "pytest_collectreport pytest_runtest_logreport"): + "pytest_collectreport pytest_runtest_logreport"): if rep.passed: if getattr(rep, "when", None) == "call": passed.append(rep) @@ -332,7 +341,9 @@ def testdir(request, tmpdir_factory): return Testdir(request, tmpdir_factory) -rex_outcome = re.compile("(\d+) ([\w-]+)") +rex_outcome = re.compile(r"(\d+) ([\w-]+)") + + class RunResult: """The result of running a command. @@ -348,6 +359,7 @@ class RunResult: :duration: Duration in seconds. """ + def __init__(self, ret, outlines, errlines, duration): self.ret = ret self.outlines = outlines @@ -367,15 +379,19 @@ class RunResult: for num, cat in outcomes: d[cat] = int(num) return d + raise ValueError("Pytest terminal report not found") - def assert_outcomes(self, passed=0, skipped=0, failed=0): + def assert_outcomes(self, passed=0, skipped=0, failed=0, error=0): """ assert that the specified outcomes appear with the respective numbers (0 means it didn't occur) in the text output from a test run.""" d = self.parseoutcomes() - assert passed == d.get("passed", 0) - assert skipped == d.get("skipped", 0) - assert failed == d.get("failed", 0) - + obtained = { + 'passed': d.get('passed', 0), + 'skipped': d.get('skipped', 0), + 'failed': d.get('failed', 0), + 'error': d.get('error', 0), + } + assert obtained == dict(passed=passed, skipped=skipped, failed=failed, error=error) class Testdir: @@ -401,6 +417,7 @@ class Testdir: def __init__(self, request, tmpdir_factory): self.request = request + self._mod_collections = WeakKeyDictionary() # XXX remove duplication with tmpdir plugin basetmp = tmpdir_factory.ensuretemp("testdir") name = request.function.__name__ @@ -414,7 +431,7 @@ class Testdir: self.plugins = [] self._savesyspath = (list(sys.path), list(sys.meta_path)) self._savemodulekeys = set(sys.modules) - self.chdir() # always chdir + self.chdir() # always chdir self.request.addfinalizer(self.finalize) method = self.request.config.getoption("--runpytest") if method == "inprocess": @@ -446,9 +463,10 @@ class Testdir: the module is re-imported. """ for name in set(sys.modules).difference(self._savemodulekeys): - # it seems zope.interfaces is keeping some state - # (used by twisted related tests) - if name != "zope.interface": + # some zope modules used by twisted-related tests keeps internal + # state and can't be deleted; we had some trouble in the past + # with zope.interface for example + if not name.startswith("zope"): del sys.modules[name] def make_hook_recorder(self, pluginmanager): @@ -468,7 +486,7 @@ class Testdir: if not hasattr(self, '_olddir'): self._olddir = old - def _makefile(self, ext, args, kwargs): + def _makefile(self, ext, args, kwargs, encoding="utf-8"): items = list(kwargs.items()) if args: source = py.builtin._totext("\n").join( @@ -488,8 +506,8 @@ class Testdir: source_unicode = "\n".join([my_totext(line) for line in source.lines]) source = py.builtin._totext(source_unicode) - content = source.strip().encode("utf-8") # + "\n" - #content = content.rstrip() + "\n" + content = source.strip().encode(encoding) # + "\n" + # content = content.rstrip() + "\n" p.write(content, "wb") if ret is None: ret = p @@ -565,7 +583,7 @@ class Testdir: def mkpydir(self, name): """Create a new python package. - This creates a (sub)direcotry with an empty ``__init__.py`` + This creates a (sub)directory with an empty ``__init__.py`` file so that is recognised as a python package. """ @@ -574,6 +592,7 @@ class Testdir: return p Session = Session + def getnode(self, config, arg): """Return the collection node of a file. @@ -654,13 +673,13 @@ class Testdir: """ p = self.makepyfile(source) - l = list(cmdlineargs) + [p] - return self.inline_run(*l) + values = list(cmdlineargs) + [p] + return self.inline_run(*values) def inline_genitems(self, *args): """Run ``pytest.main(['--collectonly'])`` in-process. - Retuns a tuple of the collected items and a + Returns a tuple of the collected items and a :py:class:`HookRecorder` instance. This runs the :py:func:`pytest.main` function to run all of @@ -733,7 +752,8 @@ class Testdir: if kwargs.get("syspathinsert"): self.syspathinsert() now = time.time() - capture = py.io.StdCapture() + capture = MultiCapture(Capture=SysCapture) + capture.start_capturing() try: try: reprec = self.inline_run(*args, **kwargs) @@ -748,13 +768,14 @@ class Testdir: class reprec: ret = 3 finally: - out, err = capture.reset() + out, err = capture.readouterr() + capture.stop_capturing() sys.stdout.write(out) sys.stderr.write(err) res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), - time.time()-now) + time.time() - now) res.reprec = reprec return res @@ -770,11 +791,11 @@ class Testdir: args = [str(x) for x in args] for x in args: if str(x).startswith('--basetemp'): - #print ("basedtemp exists: %s" %(args,)) + # print("basedtemp exists: %s" %(args,)) break else: args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp')) - #print ("added basetemp: %s" %(args,)) + # print("added basetemp: %s" %(args,)) return args def parseconfig(self, *args): @@ -812,7 +833,7 @@ class Testdir: self.request.addfinalizer(config._ensure_unconfigure) return config - def getitem(self, source, funcname="test_func"): + def getitem(self, source, funcname="test_func"): """Return the test item for a test function. This writes the source to a python file and runs pytest's @@ -829,10 +850,10 @@ class Testdir: for item in items: if item.name == funcname: return item - assert 0, "%r item not found in module:\n%s\nitems: %s" %( + assert 0, "%r item not found in module:\n%s\nitems: %s" % ( funcname, source, items) - def getitems(self, source): + def getitems(self, source): """Return all test items collected from the module. This writes the source to a python file and runs pytest's @@ -843,7 +864,7 @@ class Testdir: modcol = self.getmodulecol(source) return self.genitems([modcol]) - def getmodulecol(self, source, configargs=(), withinit=False): + def getmodulecol(self, source, configargs=(), withinit=False): """Return the module collection node for ``source``. This writes ``source`` to a file using :py:meth:`makepyfile` @@ -856,15 +877,16 @@ class Testdir: :py:meth:`parseconfigure`. :param withinit: Whether to also write a ``__init__.py`` file - to the temporarly directory to ensure it is a package. + to the temporary directory to ensure it is a package. """ kw = {self.request.function.__name__: Source(source).strip()} path = self.makepyfile(**kw) if withinit: - self.makepyfile(__init__ = "#") + self.makepyfile(__init__="#") self.config = config = self.parseconfigure(path, *configargs) node = self.getnode(config, path) + return node def collect_by_name(self, modcol, name): @@ -879,7 +901,9 @@ class Testdir: :param name: The name of the node to return. """ - for colitem in modcol._memocollect(): + if modcol not in self._mod_collections: + self._mod_collections[modcol] = list(modcol.collect()) + for colitem in self._mod_collections[modcol]: if colitem.name == name: return colitem @@ -896,8 +920,11 @@ class Testdir: env['PYTHONPATH'] = os.pathsep.join(filter(None, [ str(os.getcwd()), env.get('PYTHONPATH', '')])) kw['env'] = env - return subprocess.Popen(cmdargs, - stdout=stdout, stderr=stderr, **kw) + + popen = subprocess.Popen(cmdargs, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr, **kw) + popen.stdin.close() + + return popen def run(self, *cmdargs): """Run a command with arguments. @@ -914,14 +941,14 @@ class Testdir: cmdargs = [str(x) for x in cmdargs] p1 = self.tmpdir.join("stdout") p2 = self.tmpdir.join("stderr") - print_("running:", ' '.join(cmdargs)) - print_(" in:", str(py.path.local())) + print("running:", ' '.join(cmdargs)) + print(" in:", str(py.path.local())) f1 = codecs.open(str(p1), "w", encoding="utf8") f2 = codecs.open(str(p2), "w", encoding="utf8") try: now = time.time() popen = self.popen(cmdargs, stdout=f1, stderr=f2, - close_fds=(sys.platform != "win32")) + close_fds=(sys.platform != "win32")) ret = popen.wait() finally: f1.close() @@ -936,19 +963,19 @@ class Testdir: f2.close() self._dump_lines(out, sys.stdout) self._dump_lines(err, sys.stderr) - return RunResult(ret, out, err, time.time()-now) + return RunResult(ret, out, err, time.time() - now) def _dump_lines(self, lines, fp): try: for line in lines: - py.builtin.print_(line, file=fp) + print(line, file=fp) except UnicodeEncodeError: print("couldn't print to %s because of encoding" % (fp,)) def _getpytestargs(self): # we cannot use "(sys.executable,script)" # because on windows the script is e.g. a pytest.exe - return (sys.executable, _pytest_fullpath,) # noqa + return (sys.executable, _pytest_fullpath,) # noqa def runpython(self, script): """Run a python script using sys.executable as interpreter. @@ -975,12 +1002,12 @@ class Testdir: """ p = py.path.local.make_numbered_dir(prefix="runpytest-", - keep=None, rootdir=self.tmpdir) + keep=None, rootdir=self.tmpdir) args = ('--basetemp=%s' % p, ) + args - #for x in args: + # for x in args: # if '--confcutdir' in str(x): # break - #else: + # else: # pass # args = ('--confcutdir=.',) + args plugins = [x for x in self.plugins if isinstance(x, str)] @@ -998,7 +1025,7 @@ class Testdir: The pexpect child is returned. """ - basetemp = self.tmpdir.mkdir("pexpect") + basetemp = self.tmpdir.mkdir("temp-pexpect") invoke = " ".join(map(str, self._getpytestargs())) cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) return self.spawn(cmd, expect_timeout=expect_timeout) @@ -1019,12 +1046,13 @@ class Testdir: child.timeout = expect_timeout return child + def getdecoded(out): - try: - return out.decode("utf-8") - except UnicodeDecodeError: - return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( - py.io.saferepr(out),) + try: + return out.decode("utf-8") + except UnicodeDecodeError: + return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( + py.io.saferepr(out),) class LineComp: @@ -1054,7 +1082,7 @@ class LineMatcher: """ - def __init__(self, lines): + def __init__(self, lines): self.lines = lines self._log_output = [] @@ -1093,7 +1121,7 @@ class LineMatcher: """ for i, line in enumerate(self.lines): if fnline == line or fnmatch(line, fnline): - return self.lines[i+1:] + return self.lines[i + 1:] raise ValueError("line %r not found in output" % fnline) def _log(self, *args): |