Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Coverage controllers for use by pytest-cov and nose-cov.""" 

2import contextlib 

3import copy 

4import functools 

5import os 

6import random 

7import socket 

8import sys 

9 

10import coverage 

11from coverage.data import CoverageData 

12 

13from .compat import StringIO 

14from .embed import cleanup 

15 

16 

17class _NullFile(object): 

18 @staticmethod 

19 def write(v): 

20 pass 

21 

22 

23@contextlib.contextmanager 

24def _backup(obj, attr): 

25 backup = getattr(obj, attr) 

26 try: 

27 setattr(obj, attr, copy.copy(backup)) 

28 yield 

29 finally: 

30 setattr(obj, attr, backup) 

31 

32 

33def _ensure_topdir(meth): 

34 @functools.wraps(meth) 

35 def ensure_topdir_wrapper(self, *args, **kwargs): 

36 try: 

37 original_cwd = os.getcwd() 

38 except OSError: 

39 # Looks like it's gone, this is non-ideal because a side-effect will 

40 # be introduced in the tests here but we can't do anything about it. 

41 original_cwd = None 

42 os.chdir(self.topdir) 

43 try: 

44 return meth(self, *args, **kwargs) 

45 finally: 

46 if original_cwd is not None: 

47 os.chdir(original_cwd) 

48 

49 return ensure_topdir_wrapper 

50 

51 

52class CovController(object): 

53 """Base class for different plugin implementations.""" 

54 

55 def __init__(self, cov_source, cov_report, cov_config, cov_append, cov_branch, config=None, nodeid=None): 

56 """Get some common config used by multiple derived classes.""" 

57 self.cov_source = cov_source 

58 self.cov_report = cov_report 

59 self.cov_config = cov_config 

60 self.cov_append = cov_append 

61 self.cov_branch = cov_branch 

62 self.config = config 

63 self.nodeid = nodeid 

64 

65 self.cov = None 

66 self.combining_cov = None 

67 self.data_file = None 

68 self.node_descs = set() 

69 self.failed_workers = [] 

70 self.topdir = os.getcwd() 

71 self.is_collocated = None 

72 

73 @contextlib.contextmanager 

74 def ensure_topdir(self): 

75 original_cwd = os.getcwd() 

76 os.chdir(self.topdir) 

77 yield 

78 os.chdir(original_cwd) 

79 

80 @_ensure_topdir 

81 def pause(self): 

82 self.cov.stop() 

83 self.unset_env() 

84 

85 @_ensure_topdir 

86 def resume(self): 

87 self.cov.start() 

88 self.set_env() 

89 

90 @_ensure_topdir 

91 def set_env(self): 

92 """Put info about coverage into the env so that subprocesses can activate coverage.""" 

93 if self.cov_source is None: 

94 os.environ['COV_CORE_SOURCE'] = os.pathsep 

95 else: 

96 os.environ['COV_CORE_SOURCE'] = os.pathsep.join(self.cov_source) 

97 config_file = os.path.abspath(self.cov_config) 

98 if os.path.exists(config_file): 

99 os.environ['COV_CORE_CONFIG'] = config_file 

100 else: 

101 os.environ['COV_CORE_CONFIG'] = os.pathsep 

102 os.environ['COV_CORE_DATAFILE'] = os.path.abspath(self.cov.config.data_file) 

103 if self.cov_branch: 

104 os.environ['COV_CORE_BRANCH'] = 'enabled' 

105 

106 @staticmethod 

107 def unset_env(): 

108 """Remove coverage info from env.""" 

109 os.environ.pop('COV_CORE_SOURCE', None) 

110 os.environ.pop('COV_CORE_CONFIG', None) 

111 os.environ.pop('COV_CORE_DATAFILE', None) 

112 os.environ.pop('COV_CORE_BRANCH', None) 

113 os.environ.pop('COV_CORE_CONTEXT', None) 

114 

115 @staticmethod 

116 def get_node_desc(platform, version_info): 

117 """Return a description of this node.""" 

118 

119 return 'platform %s, python %s' % (platform, '%s.%s.%s-%s-%s' % version_info[:5]) 

120 

121 @staticmethod 

122 def sep(stream, s, txt): 

123 if hasattr(stream, 'sep'): 

124 stream.sep(s, txt) 

125 else: 

126 sep_total = max((70 - 2 - len(txt)), 2) 

127 sep_len = sep_total // 2 

128 sep_extra = sep_total % 2 

129 out = '%s %s %s\n' % (s * sep_len, txt, s * (sep_len + sep_extra)) 

130 stream.write(out) 

131 

132 @_ensure_topdir 

133 def summary(self, stream): 

134 """Produce coverage reports.""" 

135 total = None 

136 

137 if not self.cov_report: 

138 with _backup(self.cov, "config"): 

139 return self.cov.report(show_missing=True, ignore_errors=True, file=_NullFile) 

140 

141 # Output coverage section header. 

142 if len(self.node_descs) == 1: 

143 self.sep(stream, '-', 'coverage: %s' % ''.join(self.node_descs)) 

144 else: 

145 self.sep(stream, '-', 'coverage') 

146 for node_desc in sorted(self.node_descs): 

147 self.sep(stream, ' ', '%s' % node_desc) 

148 

149 # Report on any failed workers. 

150 if self.failed_workers: 

151 self.sep(stream, '-', 'coverage: failed workers') 

152 stream.write('The following workers failed to return coverage data, ' 

153 'ensure that pytest-cov is installed on these workers.\n') 

154 for node in self.failed_workers: 

155 stream.write('%s\n' % node.gateway.id) 

156 

157 # Produce terminal report if wanted. 

158 if any(x in self.cov_report for x in ['term', 'term-missing']): 

159 options = { 

160 'show_missing': ('term-missing' in self.cov_report) or None, 

161 'ignore_errors': True, 

162 'file': stream, 

163 } 

164 skip_covered = isinstance(self.cov_report, dict) and 'skip-covered' in self.cov_report.values() 

165 options.update({'skip_covered': skip_covered or None}) 

166 with _backup(self.cov, "config"): 

167 total = self.cov.report(**options) 

168 

169 # Produce annotated source code report if wanted. 

170 if 'annotate' in self.cov_report: 

171 annotate_dir = self.cov_report['annotate'] 

172 

173 with _backup(self.cov, "config"): 

174 self.cov.annotate(ignore_errors=True, directory=annotate_dir) 

175 # We need to call Coverage.report here, just to get the total 

176 # Coverage.annotate don't return any total and we need it for --cov-fail-under. 

177 

178 with _backup(self.cov, "config"): 

179 total = self.cov.report(ignore_errors=True, file=_NullFile) 

180 if annotate_dir: 

181 stream.write('Coverage annotated source written to dir %s\n' % annotate_dir) 

182 else: 

183 stream.write('Coverage annotated source written next to source\n') 

184 

185 # Produce html report if wanted. 

186 if 'html' in self.cov_report: 

187 output = self.cov_report['html'] 

188 with _backup(self.cov, "config"): 

189 total = self.cov.html_report(ignore_errors=True, directory=output) 

190 stream.write('Coverage HTML written to dir %s\n' % (self.cov.config.html_dir if output is None else output)) 

191 

192 # Produce xml report if wanted. 

193 if 'xml' in self.cov_report: 

194 output = self.cov_report['xml'] 

195 with _backup(self.cov, "config"): 

196 total = self.cov.xml_report(ignore_errors=True, outfile=output) 

197 stream.write('Coverage XML written to file %s\n' % (self.cov.config.xml_output if output is None else output)) 

198 

199 return total 

200 

201 

202class Central(CovController): 

203 """Implementation for centralised operation.""" 

204 

205 @_ensure_topdir 

206 def start(self): 

207 cleanup() 

208 

209 self.cov = coverage.Coverage(source=self.cov_source, 

210 branch=self.cov_branch, 

211 data_suffix=True, 

212 config_file=self.cov_config) 

213 self.combining_cov = coverage.Coverage(source=self.cov_source, 

214 branch=self.cov_branch, 

215 data_suffix=True, 

216 data_file=os.path.abspath(self.cov.config.data_file), 

217 config_file=self.cov_config) 

218 

219 # Erase or load any previous coverage data and start coverage. 

220 if not self.cov_append: 

221 self.cov.erase() 

222 self.cov.start() 

223 self.set_env() 

224 

225 @_ensure_topdir 

226 def finish(self): 

227 """Stop coverage, save data to file and set the list of coverage objects to report on.""" 

228 

229 self.unset_env() 

230 self.cov.stop() 

231 self.cov.save() 

232 

233 self.cov = self.combining_cov 

234 self.cov.load() 

235 self.cov.combine() 

236 self.cov.save() 

237 

238 node_desc = self.get_node_desc(sys.platform, sys.version_info) 

239 self.node_descs.add(node_desc) 

240 

241 

242class DistMaster(CovController): 

243 """Implementation for distributed master.""" 

244 

245 @_ensure_topdir 

246 def start(self): 

247 cleanup() 

248 

249 # Ensure coverage rc file rsynced if appropriate. 

250 if self.cov_config and os.path.exists(self.cov_config): 

251 self.config.option.rsyncdir.append(self.cov_config) 

252 

253 self.cov = coverage.Coverage(source=self.cov_source, 

254 branch=self.cov_branch, 

255 data_suffix=True, 

256 config_file=self.cov_config) 

257 self.cov._warn_no_data = False 

258 self.cov._warn_unimported_source = False 

259 self.cov._warn_preimported_source = False 

260 self.combining_cov = coverage.Coverage(source=self.cov_source, 

261 branch=self.cov_branch, 

262 data_suffix=True, 

263 data_file=os.path.abspath(self.cov.config.data_file), 

264 config_file=self.cov_config) 

265 if not self.cov_append: 

266 self.cov.erase() 

267 self.cov.start() 

268 self.cov.config.paths['source'] = [self.topdir] 

269 

270 def configure_node(self, node): 

271 """Workers need to know if they are collocated and what files have moved.""" 

272 

273 node.workerinput.update({ 

274 'cov_master_host': socket.gethostname(), 

275 'cov_master_topdir': self.topdir, 

276 'cov_master_rsync_roots': [str(root) for root in node.nodemanager.roots], 

277 }) 

278 

279 def testnodedown(self, node, error): 

280 """Collect data file name from worker.""" 

281 

282 # If worker doesn't return any data then it is likely that this 

283 # plugin didn't get activated on the worker side. 

284 output = getattr(node, 'workeroutput', {}) 

285 if 'cov_worker_node_id' not in output: 

286 self.failed_workers.append(node) 

287 return 

288 

289 # If worker is not collocated then we must save the data file 

290 # that it returns to us. 

291 if 'cov_worker_data' in output: 

292 data_suffix = '%s.%s.%06d.%s' % ( 

293 socket.gethostname(), os.getpid(), 

294 random.randint(0, 999999), 

295 output['cov_worker_node_id'] 

296 ) 

297 

298 cov = coverage.Coverage(source=self.cov_source, 

299 branch=self.cov_branch, 

300 data_suffix=data_suffix, 

301 config_file=self.cov_config) 

302 cov.start() 

303 if coverage.version_info < (5, 0): 

304 data = CoverageData() 

305 data.read_fileobj(StringIO(output['cov_worker_data'])) 

306 cov.data.update(data) 

307 else: 

308 data = CoverageData(no_disk=True) 

309 data.loads(output['cov_worker_data']) 

310 cov.get_data().update(data) 

311 cov.stop() 

312 cov.save() 

313 path = output['cov_worker_path'] 

314 self.cov.config.paths['source'].append(path) 

315 

316 # Record the worker types that contribute to the data file. 

317 rinfo = node.gateway._rinfo() 

318 node_desc = self.get_node_desc(rinfo.platform, rinfo.version_info) 

319 self.node_descs.add(node_desc) 

320 

321 @_ensure_topdir 

322 def finish(self): 

323 """Combines coverage data and sets the list of coverage objects to report on.""" 

324 

325 # Combine all the suffix files into the data file. 

326 self.cov.stop() 

327 self.cov.save() 

328 self.cov = self.combining_cov 

329 self.cov.load() 

330 self.cov.combine() 

331 self.cov.save() 

332 

333 

334class DistWorker(CovController): 

335 """Implementation for distributed workers.""" 

336 

337 @_ensure_topdir 

338 def start(self): 

339 

340 cleanup() 

341 

342 # Determine whether we are collocated with master. 

343 self.is_collocated = (socket.gethostname() == self.config.workerinput['cov_master_host'] and 

344 self.topdir == self.config.workerinput['cov_master_topdir']) 

345 

346 # If we are not collocated then rewrite master paths to worker paths. 

347 if not self.is_collocated: 

348 master_topdir = self.config.workerinput['cov_master_topdir'] 

349 worker_topdir = self.topdir 

350 if self.cov_source is not None: 

351 self.cov_source = [source.replace(master_topdir, worker_topdir) 

352 for source in self.cov_source] 

353 self.cov_config = self.cov_config.replace(master_topdir, worker_topdir) 

354 

355 # Erase any previous data and start coverage. 

356 self.cov = coverage.Coverage(source=self.cov_source, 

357 branch=self.cov_branch, 

358 data_suffix=True, 

359 config_file=self.cov_config) 

360 self.cov.start() 

361 self.set_env() 

362 

363 @_ensure_topdir 

364 def finish(self): 

365 """Stop coverage and send relevant info back to the master.""" 

366 self.unset_env() 

367 self.cov.stop() 

368 

369 if self.is_collocated: 

370 # We don't combine data if we're collocated - we can get 

371 # race conditions in the .combine() call (it's not atomic) 

372 # The data is going to be combined in the master. 

373 self.cov.save() 

374 

375 # If we are collocated then just inform the master of our 

376 # data file to indicate that we have finished. 

377 self.config.workeroutput['cov_worker_node_id'] = self.nodeid 

378 else: 

379 self.cov.combine() 

380 self.cov.save() 

381 # If we are not collocated then add the current path 

382 # and coverage data to the output so we can combine 

383 # it on the master node. 

384 

385 # Send all the data to the master over the channel. 

386 if coverage.version_info < (5, 0): 

387 buff = StringIO() 

388 self.cov.data.write_fileobj(buff) 

389 data = buff.getvalue() 

390 else: 

391 data = self.cov.get_data().dumps() 

392 

393 self.config.workeroutput.update({ 

394 'cov_worker_path': self.topdir, 

395 'cov_worker_node_id': self.nodeid, 

396 'cov_worker_data': data, 

397 }) 

398 

399 def summary(self, stream): 

400 """Only the master reports so do nothing.""" 

401 

402 pass