Coverage for common/extendedconfigparser.py: 74%

162 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1""" 

2crate_anon/common/extendedconfigparser.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Slightly extended ConfigParser.** 

27 

28""" 

29 

30import ast 

31import configparser 

32import logging 

33import os.path 

34from typing import ( 

35 Any, 

36 Dict, 

37 Iterable, 

38 Generator, 

39 List, 

40 Optional, 

41 TextIO, 

42 TYPE_CHECKING, 

43) 

44 

45from crate_anon.anonymise.dbholder import DatabaseHolder 

46from crate_anon.nlp_manager.constants import DatabaseConfigKeys 

47 

48if TYPE_CHECKING: 

49 from crate_anon.anonymise.config import DatabaseSafeConfig 

50 

51log = logging.getLogger(__name__) 

52 

53 

54# ============================================================================= 

55# Helper functions 

56# ============================================================================= 

57 

58 

59def configfail(errmsg) -> None: 

60 """ 

61 Args: 

62 errmsg: error message 

63 

64 Raises: 

65 :exc:`ValueError` 

66 

67 """ 

68 log.critical(errmsg) 

69 raise ValueError(errmsg) 

70 

71 

72def gen_lines(multiline: str) -> Generator[str, None, None]: 

73 """ 

74 Generate lines from a multi-line string. (Apply :func:`strip`, too.) 

75 """ 

76 for line in multiline.splitlines(): 

77 line = line.strip() 

78 if line: 

79 yield line 

80 

81 

82def gen_words(lines: Iterable[str]) -> Generator[str, None, None]: 

83 """ 

84 Generate words from lines. 

85 """ 

86 for line in lines: 

87 for word in line.split(): 

88 yield word 

89 

90 

91def gen_ints( 

92 words: Iterable[str], 

93 minimum: int = None, 

94 maximum: int = None, 

95 suppress_errors: bool = False, 

96) -> Generator[int, None, None]: 

97 """ 

98 Generate integers from words. 

99 

100 Args: 

101 words: iterable of word strings 

102 minimum: minimum permissible value, or ``None`` 

103 maximum: maximum permissible value, or ``None`` 

104 suppress_errors: suppress values that fail, rather than raising an 

105 exception 

106 

107 Yields: 

108 integers 

109 

110 Raises: 

111 :exc:`ValueError` if bad values come through, unless 

112 ``suppress_errors`` is set. 

113 

114 """ 

115 for word in words: 

116 try: 

117 value = int(word) 

118 if minimum is not None: 

119 if value < minimum: 

120 configfail(f"Value {value} less than minimum of {minimum}") 

121 if maximum is not None: 

122 if value > maximum: 

123 configfail(f"Value {value} more than maximum of {maximum}") 

124 yield value 

125 except ValueError: 

126 if not suppress_errors: 

127 raise 

128 

129 

130# ============================================================================= 

131# ExtendedConfigParser 

132# ============================================================================= 

133 

134 

135class ExtendedConfigParser(configparser.ConfigParser): 

136 """ 

137 A version of ``configparser.ConfigParser`` with assistance functions for 

138 reading parameters. 

139 """ 

140 

141 def __init__(self, *args, case_sensitive: bool = False, **kwargs) -> None: 

142 """ 

143 Args: 

144 case_sensitive: 

145 Make the parser case-sensitive for option names? 

146 """ 

147 kwargs["interpolation"] = None 

148 kwargs["inline_comment_prefixes"] = ("#", ";") 

149 # 'converters': Python 3.5 and up 

150 super().__init__(*args, **kwargs) 

151 if case_sensitive: 

152 # https://stackoverflow.com/questions/1611799/preserve-case-in-configparser # noqa: E501 

153 self.optionxform = str 

154 

155 # Use the underlying ConfigParser class for e.g. 

156 # getboolean(section, option) 

157 

158 @staticmethod 

159 def raise_missing(section: str, option: str) -> None: 

160 """ 

161 Raise :exc:`ValueError` to complain about a missing parameter. 

162 

163 Args: 

164 section: section name 

165 option: parameter name 

166 """ 

167 configfail(f"Config section [{section}]: missing parameter: {option}") 

168 

169 def require_section(self, section: str) -> None: 

170 """ 

171 Requires that a section be present, or raises :exc:`ValueError`. 

172 

173 Args: 

174 section: section name 

175 """ 

176 if not self.has_section(section): 

177 log.warning(f"Sections: {list(self.keys())!r}") 

178 configfail(f"Config missing section: {section}") 

179 

180 def require_option_to_be_absent( 

181 self, section: str, option: str, msg: str 

182 ) -> None: 

183 """ 

184 Require that an option be absent in the specified section, or print 

185 a message and raise :exc:`ValueError`. 

186 """ 

187 if not self.has_option(section, option): 

188 return 

189 configfail(msg) 

190 

191 def get_str( 

192 self, 

193 section: str, 

194 option: str, 

195 required: bool = False, 

196 default: str = None, 

197 ) -> Optional[str]: 

198 """ 

199 Returns a string parameter. 

200 

201 Args: 

202 section: section name 

203 option: parameter name 

204 required: raise :exc:`ValueError` if the parameter is missing? 

205 default: value to return if parameter is missing and not required 

206 

207 Returns: 

208 string parameter value, or ``default`` 

209 """ 

210 if required and default is not None: 

211 raise AssertionError( 

212 f"required and default are incompatible " 

213 f"(section={section!r}, option={option!r}, " 

214 f"required={required!r}; default={default!r}" 

215 ) 

216 s = self.get(section, option, fallback=default) 

217 if not s: 

218 # ConfigParser.get() checks against None but not blank strings 

219 s = default 

220 if required and not s: 

221 self.raise_missing(section, option) 

222 return s 

223 

224 def get_str_list( 

225 self, 

226 section: str, 

227 option: str, 

228 as_words: bool = True, 

229 lower: bool = False, 

230 required: bool = False, 

231 ) -> List[str]: 

232 """ 

233 Returns a string list parameter. 

234 

235 Args: 

236 section: section name 

237 option: parameter name 

238 as_words: break the value into words (rather than lines)? 

239 lower: force the return value into lower case? 

240 required: raise :exc:`ValueError` if the parameter is missing? 

241 

242 Returns: 

243 list of strings 

244 """ 

245 multiline = self.get(section, option, fallback="") 

246 if lower: 

247 multiline = multiline.lower() 

248 if as_words: 

249 result = list(gen_words(gen_lines(multiline))) 

250 else: # as lines 

251 result = list(gen_lines(multiline)) 

252 if required and not result: 

253 self.raise_missing(section, option) 

254 return result 

255 

256 def get_int_default_if_failure( 

257 self, section: str, option: str, default: int = None 

258 ) -> Optional[int]: 

259 """ 

260 Returns an integer parameter, or a default if we can't read one. 

261 

262 Args: 

263 section: section name 

264 option: parameter name 

265 default: value to return if the parameter cannot be read (missing 

266 or not an integer) 

267 

268 Returns: 

269 an integer, or ``default`` 

270 """ 

271 try: 

272 return self.getint(section, option, fallback=default) 

273 except ValueError: # e.g. invalid literal for int() with base 10 

274 return default 

275 

276 def get_int_raise_if_no_default( 

277 self, section: str, option: str, default: int = None 

278 ) -> int: 

279 """ 

280 Like :meth:`get_int_default_if_failure`, but if the default is given 

281 as ``None`` and no value is found, raises an exception. 

282 """ 

283 result = self.get_int_default_if_failure( 

284 section=section, option=option, default=default 

285 ) 

286 if result is None: 

287 self.raise_missing(section, option) 

288 return result 

289 

290 def get_int_positive_raise_if_no_default( 

291 self, section: str, option: str, default: int = None 

292 ) -> int: 

293 """ 

294 Like :meth:`get_int_default_if_failure`, but also requires 

295 that the result be greater than or equal to 0. 

296 """ 

297 result = self.get_int_raise_if_no_default( 

298 section=section, option=option, default=default 

299 ) 

300 if result < 0: 

301 configfail( 

302 f"Config section [{section}]: option {option!r} " 

303 f"must not be negative" 

304 ) 

305 return result 

306 

307 def get_int_list( 

308 self, 

309 section: str, 

310 option: str, 

311 minimum: int = None, 

312 maximum: int = None, 

313 suppress_errors: bool = True, 

314 ) -> List[int]: 

315 """ 

316 Returns a list of integers from a parameter. 

317 

318 Args: 

319 section: config section name 

320 option: parameter name 

321 minimum: minimum permissible value, or ``None`` 

322 maximum: maximum permissible value, or ``None`` 

323 suppress_errors: suppress values that fail, rather than raising an 

324 exception 

325 

326 Returns: 

327 list of integers 

328 

329 """ 

330 multiline = self.get(section, option, fallback="") 

331 return list( 

332 gen_ints( 

333 gen_words(gen_lines(multiline)), 

334 minimum=minimum, 

335 maximum=maximum, 

336 suppress_errors=suppress_errors, 

337 ) 

338 ) 

339 

340 def get_bool( 

341 self, section: str, option: str, default: bool = None 

342 ) -> bool: 

343 """ 

344 Retrieves a boolean value from a parser. 

345 

346 

347 Args: 

348 section: 

349 section name within config file 

350 option: 

351 option (parameter) name within that section 

352 default: 

353 Value to return if option is absent and not required. If the 

354 default if not specified, and the option is missing, raise an 

355 error. 

356 

357 Returns: 

358 Boolean value 

359 

360 Raises: 

361 NoSectionError: if the section is absent 

362 NoOptionError: if the parameter is absent and required 

363 

364 """ 

365 result = self.getboolean(section, option, fallback=default) 

366 if result is None: 

367 self.raise_missing(section, option) 

368 return result 

369 

370 def get_pyvalue_list( 

371 self, section: str, option: str, default: Any = None 

372 ) -> List[Any]: 

373 """ 

374 Returns a list of Python values, produced by applying 

375 :func:`ast.literal_eval` to the string parameter value, and checking 

376 that the result is a list. 

377 

378 Args: 

379 section: config section name 

380 option: parameter name 

381 default: value to return if no string is found for the parameter 

382 

383 Returns: 

384 a Python list of some sort 

385 

386 Raises: 

387 :exc:`ValueError` if a string is found but it doesn't evaluate to 

388 a list 

389 

390 """ 

391 default = default or [] 

392 strvalue = self.get(section, option, fallback=None) 

393 if not strvalue: 

394 return default 

395 pyvalue = ast.literal_eval(strvalue) 

396 # Now, make sure it's a list: 

397 # https://stackoverflow.com/questions/1835018 

398 if not isinstance(pyvalue, list): 

399 configfail( 

400 f"Option {option} must evaluate to a Python list " 

401 f"using ast.literal_eval()" 

402 ) 

403 return pyvalue 

404 

405 def get_database( 

406 self, 

407 section: str, 

408 dbname: str = None, 

409 srccfg: "DatabaseSafeConfig" = None, 

410 with_session: bool = False, 

411 with_conn: bool = False, 

412 reflect: bool = False, 

413 ) -> DatabaseHolder: 

414 """ 

415 Gets a database description from the config file. 

416 

417 Args: 

418 section: config section name 

419 dbname: name to give the database (if ``None``, the section name 

420 will be used) 

421 srccfg: :class:`crate_anon.anonymise.config.DatabaseSafeConfig` 

422 with_session: create an SQLAlchemy Session? 

423 with_conn: create an SQLAlchemy connection (via an Engine)? 

424 reflect: read the database structure (when required)? 

425 

426 Returns: 

427 a :class:`crate_anon.anonymise.dbholder.DatabaseHolder` object 

428 

429 """ 

430 

431 dbname = dbname or section 

432 url = self.get_str(section, DatabaseConfigKeys.URL, required=True) 

433 echo = self.get_bool(section, DatabaseConfigKeys.ECHO, default=False) 

434 return DatabaseHolder( 

435 dbname, 

436 url, 

437 srccfg=srccfg, 

438 with_session=with_session, 

439 with_conn=with_conn, 

440 reflect=reflect, 

441 echo=echo, 

442 ) 

443 

444 def get_env_dict( 

445 self, section: str, parent_env: Optional[Dict[str, str]] = None 

446 ) -> Dict[str, str]: 

447 """ 

448 Gets an operating system environment variable dictionary (``variable: 

449 value`` mapping) from the config file. 

450 

451 Args: 

452 section: config section name 

453 parent_env: optional starting point (e.g. parent OS environment) 

454 

455 Returns: 

456 a dictionary suitable for use as an OS environment 

457 

458 """ 

459 if parent_env: 

460 env = parent_env.copy() 

461 else: 

462 env = {} # type: Dict[str, str] 

463 newitems = {(str(k), str(v)) for k, v in self.items(section)} 

464 # items() returns a list of (name, value) tuples 

465 env.update(newitems) 

466 return env 

467 

468 

469# ============================================================================= 

470# ConfigSection 

471# ============================================================================= 

472 

473 

474class ConfigSection: 

475 """ 

476 Represents a section within a config file. 

477 """ 

478 

479 def __init__( 

480 self, 

481 section: str, 

482 parser: ExtendedConfigParser = None, 

483 filename: str = None, 

484 fileobj: TextIO = None, 

485 case_sensitive: bool = False, 

486 encoding: str = "utf8", 

487 ) -> None: 

488 """ 

489 You must specify exactly one of ``parser``, ``filename``, or 

490 ``fileobj``. 

491 

492 Args: 

493 section: 

494 The name of the section within the config file, e.g. 

495 ``main`` for the section marked by ``[main]``. 

496 parser: 

497 Specify this, a :class:`ExtendedConfigParser`, if you 

498 have already loaded the file into a parser. 

499 filename: 

500 The name of a file to option. Specify also the encoding. 

501 fileobj: 

502 A file-like object to open. 

503 case_sensitive: 

504 If ``parser`` is used, make it case-sensitive for options? 

505 encoding: 

506 If ``filename`` is used, the character encoding. 

507 """ 

508 self.section = section 

509 

510 # Check paramers 

511 if bool(parser) + bool(filename) + bool(fileobj) != 1: 

512 raise ValueError( 

513 "Specify exactly one of: " "parser, filename, fileobj" 

514 ) 

515 

516 # Record or create parser 

517 if parser: 

518 assert isinstance(parser, ExtendedConfigParser) 

519 self.parser = parser 

520 else: 

521 self.parser = ExtendedConfigParser(case_sensitive=case_sensitive) 

522 if filename: 

523 log.info(f"Reading config file: {filename}") 

524 if not os.path.isfile(filename): 

525 raise RuntimeError( 

526 f"Config file {filename} does not exist" 

527 ) 

528 self.parser.read(filename, encoding=encoding) 

529 else: 

530 self.parser.read_file(fileobj) 

531 

532 # Check section exists 

533 self.parser.require_section(self.section) 

534 

535 def opt_str( 

536 self, option: str, default: str = None, required: bool = False 

537 ) -> str: 

538 """ 

539 Reads a string option. 

540 

541 Args: 

542 option: parameter (option) name 

543 default: default if not found and not required 

544 required: is the parameter required? 

545 """ 

546 return self.parser.get_str( 

547 self.section, option, default=default, required=required 

548 ) 

549 

550 def opt_multiline( 

551 self, 

552 option: str, 

553 required: bool = False, 

554 lower: bool = False, 

555 as_words: bool = True, 

556 ) -> List[str]: 

557 """ 

558 Reads a multiline string, returning a list of words or lines. 

559 Similar to :meth:`opt_strlist`, but different defaults. 

560 

561 Args: 

562 option: parameter (option) name 

563 required: is the parameter required? 

564 lower: convert to lower case? 

565 as_words: split as words, rather than as lines? 

566 """ 

567 return self.parser.get_str_list( 

568 self.section, 

569 option, 

570 as_words=as_words, 

571 lower=lower, 

572 required=required, 

573 ) 

574 

575 def opt_strlist( 

576 self, 

577 option: str, 

578 required: bool = False, 

579 lower: bool = False, 

580 as_words: bool = True, 

581 ) -> List[str]: 

582 """ 

583 Returns a list of strings from the config file. 

584 Similar to :meth:`opt_multiline`, but different defaults. 

585 

586 Args: 

587 option: parameter (option) name 

588 required: is the parameter required? 

589 lower: convert to lower case? 

590 as_words: split as words, rather than as lines? 

591 """ 

592 return self.parser.get_str_list( 

593 self.section, 

594 option, 

595 as_words=as_words, 

596 lower=lower, 

597 required=required, 

598 ) 

599 

600 def opt_bool(self, option: str, default: bool = None) -> bool: 

601 """ 

602 Reads a boolean option. 

603 

604 Args: 

605 option: parameter (option) name 

606 default: default if not found (if None, the parameter is required) 

607 """ 

608 return self.parser.get_bool(self.section, option, default=default) 

609 

610 def opt_int(self, option: str, default: int = None) -> Optional[int]: 

611 """ 

612 Reads an integer option. 

613 

614 Args: 

615 option: parameter (option) name 

616 default: default if not found (if None, the parameter is required) 

617 """ 

618 return self.parser.get_int_raise_if_no_default( 

619 self.section, option, default=default 

620 ) 

621 

622 def opt_int_positive( 

623 self, option: str, default: int = None 

624 ) -> Optional[int]: 

625 """ 

626 Reads an integer option that must be greater than or equal to 0. 

627 

628 Args: 

629 option: parameter (option) name 

630 default: default if not found (if None, the parameter is required) 

631 """ 

632 return self.parser.get_int_positive_raise_if_no_default( 

633 self.section, option, default=default 

634 ) 

635 

636 def opt_multiline_int( 

637 self, option: str, minimum: int = None, maximum: int = None 

638 ) -> List[int]: 

639 """ 

640 Returns a list of integers within the specified range. 

641 """ 

642 return self.parser.get_int_list( 

643 self.section, 

644 option, 

645 minimum=minimum, 

646 maximum=maximum, 

647 suppress_errors=False, 

648 ) 

649 

650 def opt_multiline_csv_pairs(self, option: str) -> Dict[str, str]: 

651 """ 

652 Reads a dictionary of key-value pairs, specified as lines each of 

653 the format ``key, value``. 

654 

655 Args: 

656 option: name of the config file option 

657 """ 

658 d = {} # type: Dict[str, str] 

659 lines = self.opt_multiline(option, as_words=False) 

660 for line in lines: 

661 pair = [item.strip() for item in line.split(",")] 

662 if len(pair) != 2: 

663 raise ValueError( 

664 f"For option {option}: specify items as " 

665 f"a list of comma-separated pairs" 

666 ) 

667 d[pair[0]] = pair[1] 

668 return d 

669 

670 def opt_pyvalue_list(self, option: str, default: Any = None) -> Any: 

671 """ 

672 Returns a list of evaluated Python values. 

673 """ 

674 return self.parser.get_pyvalue_list( 

675 self.section, option, default=default 

676 ) 

677 

678 def require_absent(self, option: str, msg: str) -> None: 

679 """ 

680 If an option is present, print the message and raise an exception. 

681 Use this for deprecated option names. 

682 """ 

683 self.parser.require_option_to_be_absent(self.section, option, msg) 

684 

685 def other_section(self, section: str) -> "ConfigSection": 

686 """ 

687 Returns a :class:`ConfigSection` attached to a different section of 

688 the same parser. 

689 

690 Args: 

691 section: 

692 The new section name. 

693 """ 

694 return ConfigSection(section=section, parser=self.parser)