Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/pytest_cov/engine.py : 10%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Coverage controllers for use by pytest-cov and nose-cov."""
2import contextlib
3import copy
4import functools
5import os
6import random
7import socket
8import sys
10import coverage
11from coverage.data import CoverageData
13from .compat import StringIO
14from .embed import cleanup
17class _NullFile(object):
18 @staticmethod
19 def write(v):
20 pass
23@contextlib.contextmanager
24def _backup(obj, attr):
25 backup = getattr(obj, attr)
26 try:
27 setattr(obj, attr, copy.copy(backup))
28 yield
29 finally:
30 setattr(obj, attr, backup)
33def _ensure_topdir(meth):
34 @functools.wraps(meth)
35 def ensure_topdir_wrapper(self, *args, **kwargs):
36 try:
37 original_cwd = os.getcwd()
38 except OSError:
39 # Looks like it's gone, this is non-ideal because a side-effect will
40 # be introduced in the tests here but we can't do anything about it.
41 original_cwd = None
42 os.chdir(self.topdir)
43 try:
44 return meth(self, *args, **kwargs)
45 finally:
46 if original_cwd is not None:
47 os.chdir(original_cwd)
49 return ensure_topdir_wrapper
52class CovController(object):
53 """Base class for different plugin implementations."""
55 def __init__(self, cov_source, cov_report, cov_config, cov_append, cov_branch, config=None, nodeid=None):
56 """Get some common config used by multiple derived classes."""
57 self.cov_source = cov_source
58 self.cov_report = cov_report
59 self.cov_config = cov_config
60 self.cov_append = cov_append
61 self.cov_branch = cov_branch
62 self.config = config
63 self.nodeid = nodeid
65 self.cov = None
66 self.combining_cov = None
67 self.data_file = None
68 self.node_descs = set()
69 self.failed_workers = []
70 self.topdir = os.getcwd()
71 self.is_collocated = None
73 @contextlib.contextmanager
74 def ensure_topdir(self):
75 original_cwd = os.getcwd()
76 os.chdir(self.topdir)
77 yield
78 os.chdir(original_cwd)
80 @_ensure_topdir
81 def pause(self):
82 self.cov.stop()
83 self.unset_env()
85 @_ensure_topdir
86 def resume(self):
87 self.cov.start()
88 self.set_env()
90 @_ensure_topdir
91 def set_env(self):
92 """Put info about coverage into the env so that subprocesses can activate coverage."""
93 if self.cov_source is None:
94 os.environ['COV_CORE_SOURCE'] = os.pathsep
95 else:
96 os.environ['COV_CORE_SOURCE'] = os.pathsep.join(self.cov_source)
97 config_file = os.path.abspath(self.cov_config)
98 if os.path.exists(config_file):
99 os.environ['COV_CORE_CONFIG'] = config_file
100 else:
101 os.environ['COV_CORE_CONFIG'] = os.pathsep
102 os.environ['COV_CORE_DATAFILE'] = os.path.abspath(self.cov.config.data_file)
103 if self.cov_branch:
104 os.environ['COV_CORE_BRANCH'] = 'enabled'
106 @staticmethod
107 def unset_env():
108 """Remove coverage info from env."""
109 os.environ.pop('COV_CORE_SOURCE', None)
110 os.environ.pop('COV_CORE_CONFIG', None)
111 os.environ.pop('COV_CORE_DATAFILE', None)
112 os.environ.pop('COV_CORE_BRANCH', None)
113 os.environ.pop('COV_CORE_CONTEXT', None)
115 @staticmethod
116 def get_node_desc(platform, version_info):
117 """Return a description of this node."""
119 return 'platform %s, python %s' % (platform, '%s.%s.%s-%s-%s' % version_info[:5])
121 @staticmethod
122 def sep(stream, s, txt):
123 if hasattr(stream, 'sep'):
124 stream.sep(s, txt)
125 else:
126 sep_total = max((70 - 2 - len(txt)), 2)
127 sep_len = sep_total // 2
128 sep_extra = sep_total % 2
129 out = '%s %s %s\n' % (s * sep_len, txt, s * (sep_len + sep_extra))
130 stream.write(out)
132 @_ensure_topdir
133 def summary(self, stream):
134 """Produce coverage reports."""
135 total = None
137 if not self.cov_report:
138 with _backup(self.cov, "config"):
139 return self.cov.report(show_missing=True, ignore_errors=True, file=_NullFile)
141 # Output coverage section header.
142 if len(self.node_descs) == 1:
143 self.sep(stream, '-', 'coverage: %s' % ''.join(self.node_descs))
144 else:
145 self.sep(stream, '-', 'coverage')
146 for node_desc in sorted(self.node_descs):
147 self.sep(stream, ' ', '%s' % node_desc)
149 # Report on any failed workers.
150 if self.failed_workers:
151 self.sep(stream, '-', 'coverage: failed workers')
152 stream.write('The following workers failed to return coverage data, '
153 'ensure that pytest-cov is installed on these workers.\n')
154 for node in self.failed_workers:
155 stream.write('%s\n' % node.gateway.id)
157 # Produce terminal report if wanted.
158 if any(x in self.cov_report for x in ['term', 'term-missing']):
159 options = {
160 'show_missing': ('term-missing' in self.cov_report) or None,
161 'ignore_errors': True,
162 'file': stream,
163 }
164 skip_covered = isinstance(self.cov_report, dict) and 'skip-covered' in self.cov_report.values()
165 options.update({'skip_covered': skip_covered or None})
166 with _backup(self.cov, "config"):
167 total = self.cov.report(**options)
169 # Produce annotated source code report if wanted.
170 if 'annotate' in self.cov_report:
171 annotate_dir = self.cov_report['annotate']
173 with _backup(self.cov, "config"):
174 self.cov.annotate(ignore_errors=True, directory=annotate_dir)
175 # We need to call Coverage.report here, just to get the total
176 # Coverage.annotate don't return any total and we need it for --cov-fail-under.
178 with _backup(self.cov, "config"):
179 total = self.cov.report(ignore_errors=True, file=_NullFile)
180 if annotate_dir:
181 stream.write('Coverage annotated source written to dir %s\n' % annotate_dir)
182 else:
183 stream.write('Coverage annotated source written next to source\n')
185 # Produce html report if wanted.
186 if 'html' in self.cov_report:
187 output = self.cov_report['html']
188 with _backup(self.cov, "config"):
189 total = self.cov.html_report(ignore_errors=True, directory=output)
190 stream.write('Coverage HTML written to dir %s\n' % (self.cov.config.html_dir if output is None else output))
192 # Produce xml report if wanted.
193 if 'xml' in self.cov_report:
194 output = self.cov_report['xml']
195 with _backup(self.cov, "config"):
196 total = self.cov.xml_report(ignore_errors=True, outfile=output)
197 stream.write('Coverage XML written to file %s\n' % (self.cov.config.xml_output if output is None else output))
199 return total
202class Central(CovController):
203 """Implementation for centralised operation."""
205 @_ensure_topdir
206 def start(self):
207 cleanup()
209 self.cov = coverage.Coverage(source=self.cov_source,
210 branch=self.cov_branch,
211 data_suffix=True,
212 config_file=self.cov_config)
213 self.combining_cov = coverage.Coverage(source=self.cov_source,
214 branch=self.cov_branch,
215 data_suffix=True,
216 data_file=os.path.abspath(self.cov.config.data_file),
217 config_file=self.cov_config)
219 # Erase or load any previous coverage data and start coverage.
220 if not self.cov_append:
221 self.cov.erase()
222 self.cov.start()
223 self.set_env()
225 @_ensure_topdir
226 def finish(self):
227 """Stop coverage, save data to file and set the list of coverage objects to report on."""
229 self.unset_env()
230 self.cov.stop()
231 self.cov.save()
233 self.cov = self.combining_cov
234 self.cov.load()
235 self.cov.combine()
236 self.cov.save()
238 node_desc = self.get_node_desc(sys.platform, sys.version_info)
239 self.node_descs.add(node_desc)
242class DistMaster(CovController):
243 """Implementation for distributed master."""
245 @_ensure_topdir
246 def start(self):
247 cleanup()
249 # Ensure coverage rc file rsynced if appropriate.
250 if self.cov_config and os.path.exists(self.cov_config):
251 self.config.option.rsyncdir.append(self.cov_config)
253 self.cov = coverage.Coverage(source=self.cov_source,
254 branch=self.cov_branch,
255 data_suffix=True,
256 config_file=self.cov_config)
257 self.cov._warn_no_data = False
258 self.cov._warn_unimported_source = False
259 self.cov._warn_preimported_source = False
260 self.combining_cov = coverage.Coverage(source=self.cov_source,
261 branch=self.cov_branch,
262 data_suffix=True,
263 data_file=os.path.abspath(self.cov.config.data_file),
264 config_file=self.cov_config)
265 if not self.cov_append:
266 self.cov.erase()
267 self.cov.start()
268 self.cov.config.paths['source'] = [self.topdir]
270 def configure_node(self, node):
271 """Workers need to know if they are collocated and what files have moved."""
273 node.workerinput.update({
274 'cov_master_host': socket.gethostname(),
275 'cov_master_topdir': self.topdir,
276 'cov_master_rsync_roots': [str(root) for root in node.nodemanager.roots],
277 })
279 def testnodedown(self, node, error):
280 """Collect data file name from worker."""
282 # If worker doesn't return any data then it is likely that this
283 # plugin didn't get activated on the worker side.
284 output = getattr(node, 'workeroutput', {})
285 if 'cov_worker_node_id' not in output:
286 self.failed_workers.append(node)
287 return
289 # If worker is not collocated then we must save the data file
290 # that it returns to us.
291 if 'cov_worker_data' in output:
292 data_suffix = '%s.%s.%06d.%s' % (
293 socket.gethostname(), os.getpid(),
294 random.randint(0, 999999),
295 output['cov_worker_node_id']
296 )
298 cov = coverage.Coverage(source=self.cov_source,
299 branch=self.cov_branch,
300 data_suffix=data_suffix,
301 config_file=self.cov_config)
302 cov.start()
303 if coverage.version_info < (5, 0):
304 data = CoverageData()
305 data.read_fileobj(StringIO(output['cov_worker_data']))
306 cov.data.update(data)
307 else:
308 data = CoverageData(no_disk=True)
309 data.loads(output['cov_worker_data'])
310 cov.get_data().update(data)
311 cov.stop()
312 cov.save()
313 path = output['cov_worker_path']
314 self.cov.config.paths['source'].append(path)
316 # Record the worker types that contribute to the data file.
317 rinfo = node.gateway._rinfo()
318 node_desc = self.get_node_desc(rinfo.platform, rinfo.version_info)
319 self.node_descs.add(node_desc)
321 @_ensure_topdir
322 def finish(self):
323 """Combines coverage data and sets the list of coverage objects to report on."""
325 # Combine all the suffix files into the data file.
326 self.cov.stop()
327 self.cov.save()
328 self.cov = self.combining_cov
329 self.cov.load()
330 self.cov.combine()
331 self.cov.save()
334class DistWorker(CovController):
335 """Implementation for distributed workers."""
337 @_ensure_topdir
338 def start(self):
340 cleanup()
342 # Determine whether we are collocated with master.
343 self.is_collocated = (socket.gethostname() == self.config.workerinput['cov_master_host'] and
344 self.topdir == self.config.workerinput['cov_master_topdir'])
346 # If we are not collocated then rewrite master paths to worker paths.
347 if not self.is_collocated:
348 master_topdir = self.config.workerinput['cov_master_topdir']
349 worker_topdir = self.topdir
350 if self.cov_source is not None:
351 self.cov_source = [source.replace(master_topdir, worker_topdir)
352 for source in self.cov_source]
353 self.cov_config = self.cov_config.replace(master_topdir, worker_topdir)
355 # Erase any previous data and start coverage.
356 self.cov = coverage.Coverage(source=self.cov_source,
357 branch=self.cov_branch,
358 data_suffix=True,
359 config_file=self.cov_config)
360 self.cov.start()
361 self.set_env()
363 @_ensure_topdir
364 def finish(self):
365 """Stop coverage and send relevant info back to the master."""
366 self.unset_env()
367 self.cov.stop()
369 if self.is_collocated:
370 # We don't combine data if we're collocated - we can get
371 # race conditions in the .combine() call (it's not atomic)
372 # The data is going to be combined in the master.
373 self.cov.save()
375 # If we are collocated then just inform the master of our
376 # data file to indicate that we have finished.
377 self.config.workeroutput['cov_worker_node_id'] = self.nodeid
378 else:
379 self.cov.combine()
380 self.cov.save()
381 # If we are not collocated then add the current path
382 # and coverage data to the output so we can combine
383 # it on the master node.
385 # Send all the data to the master over the channel.
386 if coverage.version_info < (5, 0):
387 buff = StringIO()
388 self.cov.data.write_fileobj(buff)
389 data = buff.getvalue()
390 else:
391 data = self.cov.get_data().dumps()
393 self.config.workeroutput.update({
394 'cov_worker_path': self.topdir,
395 'cov_worker_node_id': self.nodeid,
396 'cov_worker_data': data,
397 })
399 def summary(self, stream):
400 """Only the master reports so do nothing."""
402 pass