Coverage for common/extendedconfigparser.py: 74%
162 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1"""
2crate_anon/common/extendedconfigparser.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Slightly extended ConfigParser.**
28"""
30import ast
31import configparser
32import logging
33import os.path
34from typing import (
35 Any,
36 Dict,
37 Iterable,
38 Generator,
39 List,
40 Optional,
41 TextIO,
42 TYPE_CHECKING,
43)
45from crate_anon.anonymise.dbholder import DatabaseHolder
46from crate_anon.nlp_manager.constants import DatabaseConfigKeys
48if TYPE_CHECKING:
49 from crate_anon.anonymise.config import DatabaseSafeConfig
51log = logging.getLogger(__name__)
54# =============================================================================
55# Helper functions
56# =============================================================================
59def configfail(errmsg) -> None:
60 """
61 Args:
62 errmsg: error message
64 Raises:
65 :exc:`ValueError`
67 """
68 log.critical(errmsg)
69 raise ValueError(errmsg)
72def gen_lines(multiline: str) -> Generator[str, None, None]:
73 """
74 Generate lines from a multi-line string. (Apply :func:`strip`, too.)
75 """
76 for line in multiline.splitlines():
77 line = line.strip()
78 if line:
79 yield line
82def gen_words(lines: Iterable[str]) -> Generator[str, None, None]:
83 """
84 Generate words from lines.
85 """
86 for line in lines:
87 for word in line.split():
88 yield word
91def gen_ints(
92 words: Iterable[str],
93 minimum: int = None,
94 maximum: int = None,
95 suppress_errors: bool = False,
96) -> Generator[int, None, None]:
97 """
98 Generate integers from words.
100 Args:
101 words: iterable of word strings
102 minimum: minimum permissible value, or ``None``
103 maximum: maximum permissible value, or ``None``
104 suppress_errors: suppress values that fail, rather than raising an
105 exception
107 Yields:
108 integers
110 Raises:
111 :exc:`ValueError` if bad values come through, unless
112 ``suppress_errors`` is set.
114 """
115 for word in words:
116 try:
117 value = int(word)
118 if minimum is not None:
119 if value < minimum:
120 configfail(f"Value {value} less than minimum of {minimum}")
121 if maximum is not None:
122 if value > maximum:
123 configfail(f"Value {value} more than maximum of {maximum}")
124 yield value
125 except ValueError:
126 if not suppress_errors:
127 raise
130# =============================================================================
131# ExtendedConfigParser
132# =============================================================================
135class ExtendedConfigParser(configparser.ConfigParser):
136 """
137 A version of ``configparser.ConfigParser`` with assistance functions for
138 reading parameters.
139 """
141 def __init__(self, *args, case_sensitive: bool = False, **kwargs) -> None:
142 """
143 Args:
144 case_sensitive:
145 Make the parser case-sensitive for option names?
146 """
147 kwargs["interpolation"] = None
148 kwargs["inline_comment_prefixes"] = ("#", ";")
149 # 'converters': Python 3.5 and up
150 super().__init__(*args, **kwargs)
151 if case_sensitive:
152 # https://stackoverflow.com/questions/1611799/preserve-case-in-configparser # noqa: E501
153 self.optionxform = str
155 # Use the underlying ConfigParser class for e.g.
156 # getboolean(section, option)
158 @staticmethod
159 def raise_missing(section: str, option: str) -> None:
160 """
161 Raise :exc:`ValueError` to complain about a missing parameter.
163 Args:
164 section: section name
165 option: parameter name
166 """
167 configfail(f"Config section [{section}]: missing parameter: {option}")
169 def require_section(self, section: str) -> None:
170 """
171 Requires that a section be present, or raises :exc:`ValueError`.
173 Args:
174 section: section name
175 """
176 if not self.has_section(section):
177 log.warning(f"Sections: {list(self.keys())!r}")
178 configfail(f"Config missing section: {section}")
180 def require_option_to_be_absent(
181 self, section: str, option: str, msg: str
182 ) -> None:
183 """
184 Require that an option be absent in the specified section, or print
185 a message and raise :exc:`ValueError`.
186 """
187 if not self.has_option(section, option):
188 return
189 configfail(msg)
191 def get_str(
192 self,
193 section: str,
194 option: str,
195 required: bool = False,
196 default: str = None,
197 ) -> Optional[str]:
198 """
199 Returns a string parameter.
201 Args:
202 section: section name
203 option: parameter name
204 required: raise :exc:`ValueError` if the parameter is missing?
205 default: value to return if parameter is missing and not required
207 Returns:
208 string parameter value, or ``default``
209 """
210 if required and default is not None:
211 raise AssertionError(
212 f"required and default are incompatible "
213 f"(section={section!r}, option={option!r}, "
214 f"required={required!r}; default={default!r}"
215 )
216 s = self.get(section, option, fallback=default)
217 if not s:
218 # ConfigParser.get() checks against None but not blank strings
219 s = default
220 if required and not s:
221 self.raise_missing(section, option)
222 return s
224 def get_str_list(
225 self,
226 section: str,
227 option: str,
228 as_words: bool = True,
229 lower: bool = False,
230 required: bool = False,
231 ) -> List[str]:
232 """
233 Returns a string list parameter.
235 Args:
236 section: section name
237 option: parameter name
238 as_words: break the value into words (rather than lines)?
239 lower: force the return value into lower case?
240 required: raise :exc:`ValueError` if the parameter is missing?
242 Returns:
243 list of strings
244 """
245 multiline = self.get(section, option, fallback="")
246 if lower:
247 multiline = multiline.lower()
248 if as_words:
249 result = list(gen_words(gen_lines(multiline)))
250 else: # as lines
251 result = list(gen_lines(multiline))
252 if required and not result:
253 self.raise_missing(section, option)
254 return result
256 def get_int_default_if_failure(
257 self, section: str, option: str, default: int = None
258 ) -> Optional[int]:
259 """
260 Returns an integer parameter, or a default if we can't read one.
262 Args:
263 section: section name
264 option: parameter name
265 default: value to return if the parameter cannot be read (missing
266 or not an integer)
268 Returns:
269 an integer, or ``default``
270 """
271 try:
272 return self.getint(section, option, fallback=default)
273 except ValueError: # e.g. invalid literal for int() with base 10
274 return default
276 def get_int_raise_if_no_default(
277 self, section: str, option: str, default: int = None
278 ) -> int:
279 """
280 Like :meth:`get_int_default_if_failure`, but if the default is given
281 as ``None`` and no value is found, raises an exception.
282 """
283 result = self.get_int_default_if_failure(
284 section=section, option=option, default=default
285 )
286 if result is None:
287 self.raise_missing(section, option)
288 return result
290 def get_int_positive_raise_if_no_default(
291 self, section: str, option: str, default: int = None
292 ) -> int:
293 """
294 Like :meth:`get_int_default_if_failure`, but also requires
295 that the result be greater than or equal to 0.
296 """
297 result = self.get_int_raise_if_no_default(
298 section=section, option=option, default=default
299 )
300 if result < 0:
301 configfail(
302 f"Config section [{section}]: option {option!r} "
303 f"must not be negative"
304 )
305 return result
307 def get_int_list(
308 self,
309 section: str,
310 option: str,
311 minimum: int = None,
312 maximum: int = None,
313 suppress_errors: bool = True,
314 ) -> List[int]:
315 """
316 Returns a list of integers from a parameter.
318 Args:
319 section: config section name
320 option: parameter name
321 minimum: minimum permissible value, or ``None``
322 maximum: maximum permissible value, or ``None``
323 suppress_errors: suppress values that fail, rather than raising an
324 exception
326 Returns:
327 list of integers
329 """
330 multiline = self.get(section, option, fallback="")
331 return list(
332 gen_ints(
333 gen_words(gen_lines(multiline)),
334 minimum=minimum,
335 maximum=maximum,
336 suppress_errors=suppress_errors,
337 )
338 )
340 def get_bool(
341 self, section: str, option: str, default: bool = None
342 ) -> bool:
343 """
344 Retrieves a boolean value from a parser.
347 Args:
348 section:
349 section name within config file
350 option:
351 option (parameter) name within that section
352 default:
353 Value to return if option is absent and not required. If the
354 default if not specified, and the option is missing, raise an
355 error.
357 Returns:
358 Boolean value
360 Raises:
361 NoSectionError: if the section is absent
362 NoOptionError: if the parameter is absent and required
364 """
365 result = self.getboolean(section, option, fallback=default)
366 if result is None:
367 self.raise_missing(section, option)
368 return result
370 def get_pyvalue_list(
371 self, section: str, option: str, default: Any = None
372 ) -> List[Any]:
373 """
374 Returns a list of Python values, produced by applying
375 :func:`ast.literal_eval` to the string parameter value, and checking
376 that the result is a list.
378 Args:
379 section: config section name
380 option: parameter name
381 default: value to return if no string is found for the parameter
383 Returns:
384 a Python list of some sort
386 Raises:
387 :exc:`ValueError` if a string is found but it doesn't evaluate to
388 a list
390 """
391 default = default or []
392 strvalue = self.get(section, option, fallback=None)
393 if not strvalue:
394 return default
395 pyvalue = ast.literal_eval(strvalue)
396 # Now, make sure it's a list:
397 # https://stackoverflow.com/questions/1835018
398 if not isinstance(pyvalue, list):
399 configfail(
400 f"Option {option} must evaluate to a Python list "
401 f"using ast.literal_eval()"
402 )
403 return pyvalue
405 def get_database(
406 self,
407 section: str,
408 dbname: str = None,
409 srccfg: "DatabaseSafeConfig" = None,
410 with_session: bool = False,
411 with_conn: bool = False,
412 reflect: bool = False,
413 ) -> DatabaseHolder:
414 """
415 Gets a database description from the config file.
417 Args:
418 section: config section name
419 dbname: name to give the database (if ``None``, the section name
420 will be used)
421 srccfg: :class:`crate_anon.anonymise.config.DatabaseSafeConfig`
422 with_session: create an SQLAlchemy Session?
423 with_conn: create an SQLAlchemy connection (via an Engine)?
424 reflect: read the database structure (when required)?
426 Returns:
427 a :class:`crate_anon.anonymise.dbholder.DatabaseHolder` object
429 """
431 dbname = dbname or section
432 url = self.get_str(section, DatabaseConfigKeys.URL, required=True)
433 echo = self.get_bool(section, DatabaseConfigKeys.ECHO, default=False)
434 return DatabaseHolder(
435 dbname,
436 url,
437 srccfg=srccfg,
438 with_session=with_session,
439 with_conn=with_conn,
440 reflect=reflect,
441 echo=echo,
442 )
444 def get_env_dict(
445 self, section: str, parent_env: Optional[Dict[str, str]] = None
446 ) -> Dict[str, str]:
447 """
448 Gets an operating system environment variable dictionary (``variable:
449 value`` mapping) from the config file.
451 Args:
452 section: config section name
453 parent_env: optional starting point (e.g. parent OS environment)
455 Returns:
456 a dictionary suitable for use as an OS environment
458 """
459 if parent_env:
460 env = parent_env.copy()
461 else:
462 env = {} # type: Dict[str, str]
463 newitems = {(str(k), str(v)) for k, v in self.items(section)}
464 # items() returns a list of (name, value) tuples
465 env.update(newitems)
466 return env
469# =============================================================================
470# ConfigSection
471# =============================================================================
474class ConfigSection:
475 """
476 Represents a section within a config file.
477 """
479 def __init__(
480 self,
481 section: str,
482 parser: ExtendedConfigParser = None,
483 filename: str = None,
484 fileobj: TextIO = None,
485 case_sensitive: bool = False,
486 encoding: str = "utf8",
487 ) -> None:
488 """
489 You must specify exactly one of ``parser``, ``filename``, or
490 ``fileobj``.
492 Args:
493 section:
494 The name of the section within the config file, e.g.
495 ``main`` for the section marked by ``[main]``.
496 parser:
497 Specify this, a :class:`ExtendedConfigParser`, if you
498 have already loaded the file into a parser.
499 filename:
500 The name of a file to option. Specify also the encoding.
501 fileobj:
502 A file-like object to open.
503 case_sensitive:
504 If ``parser`` is used, make it case-sensitive for options?
505 encoding:
506 If ``filename`` is used, the character encoding.
507 """
508 self.section = section
510 # Check paramers
511 if bool(parser) + bool(filename) + bool(fileobj) != 1:
512 raise ValueError(
513 "Specify exactly one of: " "parser, filename, fileobj"
514 )
516 # Record or create parser
517 if parser:
518 assert isinstance(parser, ExtendedConfigParser)
519 self.parser = parser
520 else:
521 self.parser = ExtendedConfigParser(case_sensitive=case_sensitive)
522 if filename:
523 log.info(f"Reading config file: {filename}")
524 if not os.path.isfile(filename):
525 raise RuntimeError(
526 f"Config file {filename} does not exist"
527 )
528 self.parser.read(filename, encoding=encoding)
529 else:
530 self.parser.read_file(fileobj)
532 # Check section exists
533 self.parser.require_section(self.section)
535 def opt_str(
536 self, option: str, default: str = None, required: bool = False
537 ) -> str:
538 """
539 Reads a string option.
541 Args:
542 option: parameter (option) name
543 default: default if not found and not required
544 required: is the parameter required?
545 """
546 return self.parser.get_str(
547 self.section, option, default=default, required=required
548 )
550 def opt_multiline(
551 self,
552 option: str,
553 required: bool = False,
554 lower: bool = False,
555 as_words: bool = True,
556 ) -> List[str]:
557 """
558 Reads a multiline string, returning a list of words or lines.
559 Similar to :meth:`opt_strlist`, but different defaults.
561 Args:
562 option: parameter (option) name
563 required: is the parameter required?
564 lower: convert to lower case?
565 as_words: split as words, rather than as lines?
566 """
567 return self.parser.get_str_list(
568 self.section,
569 option,
570 as_words=as_words,
571 lower=lower,
572 required=required,
573 )
575 def opt_strlist(
576 self,
577 option: str,
578 required: bool = False,
579 lower: bool = False,
580 as_words: bool = True,
581 ) -> List[str]:
582 """
583 Returns a list of strings from the config file.
584 Similar to :meth:`opt_multiline`, but different defaults.
586 Args:
587 option: parameter (option) name
588 required: is the parameter required?
589 lower: convert to lower case?
590 as_words: split as words, rather than as lines?
591 """
592 return self.parser.get_str_list(
593 self.section,
594 option,
595 as_words=as_words,
596 lower=lower,
597 required=required,
598 )
600 def opt_bool(self, option: str, default: bool = None) -> bool:
601 """
602 Reads a boolean option.
604 Args:
605 option: parameter (option) name
606 default: default if not found (if None, the parameter is required)
607 """
608 return self.parser.get_bool(self.section, option, default=default)
610 def opt_int(self, option: str, default: int = None) -> Optional[int]:
611 """
612 Reads an integer option.
614 Args:
615 option: parameter (option) name
616 default: default if not found (if None, the parameter is required)
617 """
618 return self.parser.get_int_raise_if_no_default(
619 self.section, option, default=default
620 )
622 def opt_int_positive(
623 self, option: str, default: int = None
624 ) -> Optional[int]:
625 """
626 Reads an integer option that must be greater than or equal to 0.
628 Args:
629 option: parameter (option) name
630 default: default if not found (if None, the parameter is required)
631 """
632 return self.parser.get_int_positive_raise_if_no_default(
633 self.section, option, default=default
634 )
636 def opt_multiline_int(
637 self, option: str, minimum: int = None, maximum: int = None
638 ) -> List[int]:
639 """
640 Returns a list of integers within the specified range.
641 """
642 return self.parser.get_int_list(
643 self.section,
644 option,
645 minimum=minimum,
646 maximum=maximum,
647 suppress_errors=False,
648 )
650 def opt_multiline_csv_pairs(self, option: str) -> Dict[str, str]:
651 """
652 Reads a dictionary of key-value pairs, specified as lines each of
653 the format ``key, value``.
655 Args:
656 option: name of the config file option
657 """
658 d = {} # type: Dict[str, str]
659 lines = self.opt_multiline(option, as_words=False)
660 for line in lines:
661 pair = [item.strip() for item in line.split(",")]
662 if len(pair) != 2:
663 raise ValueError(
664 f"For option {option}: specify items as "
665 f"a list of comma-separated pairs"
666 )
667 d[pair[0]] = pair[1]
668 return d
670 def opt_pyvalue_list(self, option: str, default: Any = None) -> Any:
671 """
672 Returns a list of evaluated Python values.
673 """
674 return self.parser.get_pyvalue_list(
675 self.section, option, default=default
676 )
678 def require_absent(self, option: str, msg: str) -> None:
679 """
680 If an option is present, print the message and raise an exception.
681 Use this for deprecated option names.
682 """
683 self.parser.require_option_to_be_absent(self.section, option, msg)
685 def other_section(self, section: str) -> "ConfigSection":
686 """
687 Returns a :class:`ConfigSection` attached to a different section of
688 the same parser.
690 Args:
691 section:
692 The new section name.
693 """
694 return ConfigSection(section=section, parser=self.parser)