Coverage for linkage/helpers.py: 60%

248 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1r""" 

2crate_anon/linkage/helpers.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Helper functions for linkage tools.** 

27 

28Avoid using pickle for caching; it is insecure (arbitrary code execution). 

29 

30""" 

31 

32# ============================================================================= 

33# Imports 

34# ============================================================================= 

35 

36from argparse import ArgumentTypeError 

37from contextlib import contextmanager, ExitStack 

38from io import StringIO, TextIOWrapper 

39import logging 

40from math import log as math_ln 

41import os 

42import random 

43import re 

44import string 

45from typing import ( 

46 Any, 

47 Dict, 

48 Generator, 

49 List, 

50 Optional, 

51 Set, 

52 Tuple, 

53 Type, 

54 TYPE_CHECKING, 

55 Union, 

56) 

57import unicodedata 

58from zipfile import ZipFile 

59 

60import regex 

61from cardinal_pythonlib.datetimefunc import coerce_to_pendulum_date 

62from cardinal_pythonlib.fileops import mkdir_p 

63from fuzzy import DMetaphone 

64from numba import jit 

65from pendulum import Date 

66from pendulum.parsing.exceptions import ParserError 

67 

68from crate_anon.anonymise.anonregex import get_uk_postcode_regex_string 

69from crate_anon.common.logfunc import warn_once 

70from crate_anon.common.regex_helpers import anchor 

71from crate_anon.linkage.constants import ( 

72 FuzzyDefaults, 

73 MANGLE_PRETRANSLATE, 

74 MINUS_INFINITY, 

75 NONE_TYPE, 

76 SAFE_UPPER_PRETRANSLATE, 

77 SIMPLIFY_PUNCTUATION_WHITESPACE_TRANS, 

78) 

79 

80if TYPE_CHECKING: 

81 from crate_anon.linkage.matchconfig import MatchConfig 

82 

83log = logging.getLogger(__name__) 

84 

85 

86# ============================================================================= 

87# Notes 

88# ============================================================================= 

89 

90_ = """ 

91 

92Geography 

93--------- 

94 

95[COVERED IN THE PAPER. FURTHER DETAIL HERE.] 

96 

97UK postcodes have this format (A letter, 9 digit, plus Wikipedia concrete 

98examples; 

99https://en.wikipedia.org/wiki/Postcodes_in_the_United_Kingdom#Formatting): 

100 

101+------------+----------------+------------+----------+ 

102| Postcode | 

103+------------+----------------+------------+----------+ 

104| Outward code | Inward code | 

105+------------+----------------+------------+----------+ 

106| Area (1-2) | District (1-2) | Sector (1) | Unit (2) | 

107| (A, AA) | (9, 99, 9A) | (9) | (AA) | 

108+------------+----------------+------------+----------+ 

109| AA | 9A | 9 | AA | 

110| SW | 1W | 0 | NY | 

111| EC | 9A | 9 | AA | 

112+------------+----------------+------------+----------+ 

113| A | 9A | 9 | AA | 

114| W | 1A | 0 | AX | 

115+------------+----------------+------------+----------+ 

116| A | 9 | 9 | AA | 

117| M | 1 | 1 | AE | 

118+------------+----------------+------------+----------+ 

119| A | 99 | 9 | AA | 

120| B | 33 | 8 | TH | 

121+------------+----------------+------------+----------+ 

122| AA | 9 | 9 | AA | 

123| CR | 2 | 6 | XH | 

124+------------+----------------+------------+----------+ 

125| AA | 99 | 9 | AA | 

126| DN | 55 | 1 | PT | 

127+------------+----------------+------------+----------+ 

128 

129Then there are "special cases" that don't fit, but they are mostly 

130extraterritorial. See 

131https://en.wikipedia.org/wiki/Postcodes_in_the_United_Kingdom#Special_cases. 

132 

133UK census geography is described at 

134https://www.ons.gov.uk/methodology/geography/ukgeographies/censusgeography. 

135 

136The most important unit for our purposes is the Output Area (OA), the smallest 

137unit, which is made up of an integer number of postcode units. 

138 

139So an OA is bigger than a postcode unit. But is it bigger or smaller than a 

140postcode sector? Smaller, I think. 

141 

142- https://data.gov.uk/dataset/7f4e1818-4305-4962-adc4-e4e3effd7784/output-area-to-postcode-sector-december-2011-lookup-in-england-and-wales 

143- this allows you to look up *from* output area *to* postcode sector, implying 

144 that postcode sectors must be larger. 

145 

146""" # noqa: E501 

147 

148 

149# ============================================================================= 

150# Metaphones 

151# ============================================================================= 

152 

153dmeta = DMetaphone() 

154 

155_ = """ 

156 

157For a sense of metaphones: 

158 

159>>> dmeta("Rudolf") 

160[b'RTLF', None] 

161>>> dmeta("Cardinal") 

162[b'KRTN', None] 

163>>> dmeta("Supercalifragilistic") 

164[b'SPRK', None] 

165>>> dmeta("Christopher") 

166[b'KRST', None] 

167>>> dmeta("Chris") 

168[b'KRS', None] 

169>>> dmeta("C") 

170[b'K', None] 

171>>> dmeta("Philip") 

172[b'FLP', None] 

173>>> dmeta("Phil") 

174[b'FL', None] 

175>>> dmeta("Phi") 

176[b'F', None] 

177>>> dmeta("Knuth") # https://stackabuse.com/phonetic-similarity-of-words-a-vectorized-approach-in-python/ 

178[b'N0', b'NT'] 

179 

180>>> dmeta("Clérambault") # raises UnicodeEncodeError 

181 

182""" # noqa: E501 

183 

184 

185# ============================================================================= 

186# For caching 

187# ============================================================================= 

188 

189 

190def mkdir_for_filename(filename: str) -> None: 

191 """ 

192 Ensures that a directory exists for the filename. 

193 """ 

194 assert filename 

195 mkdir_p(os.path.dirname(filename)) 

196 

197 

198# ============================================================================= 

199# Reading from file or zipped file 

200# ============================================================================= 

201 

202 

203@contextmanager 

204def open_even_if_zipped(filename: str) -> Generator[StringIO, None, None]: 

205 """ 

206 Yields (as a context manager) a text file, opened directly or through a 

207 ZIP file (distinguished by its extension) containing that file. 

208 """ 

209 is_zip = os.path.splitext(filename)[1].lower() == ".zip" 

210 with ExitStack() as stack: 

211 if is_zip: 

212 log.info(f"Reading ZIP file: {filename}") 

213 z = stack.enter_context(ZipFile(filename)) # type: ZipFile 

214 contents = z.infolist() 

215 if not contents: 

216 raise ValueError("ZIP file is empty") 

217 first_file = contents[0] 

218 log.info(f"Within ZIP, reading: {first_file.filename}") 

219 # noinspection PyTypeChecker 

220 binary_file = stack.enter_context(z.open(first_file)) 

221 f = TextIOWrapper(binary_file) 

222 else: 

223 log.info(f"Reading file: {filename}") 

224 # noinspection PyTypeChecker 

225 f = stack.enter_context(open(filename, "rt")) 

226 yield f 

227 log.debug(f"... finished reading: {filename}") 

228 

229 

230# ============================================================================= 

231# Name manipulation 

232# ============================================================================= 

233 

234REMOVE_PUNCTUATION_SPACE_TABLE = str.maketrans("", "", string.punctuation) 

235# ... the three-argument version of str.maketrans removes anything in the third 

236# category. The object returned is a dictionary mapping integer ASCII values 

237# to replacement character values (or None). 

238REMOVE_PUNCTUATION_SPACE_TABLE[ord(" ")] = None # also remove spaces 

239NONWORD_REGEX = regex.compile(r"\W") 

240ONE_OR_MORE_SPACE_REGEX = regex.compile(r"\s+") 

241 

242 

243def mangle_unicode_to_ascii(s: Any) -> str: 

244 """ 

245 Mangle unicode to ASCII, losing accents etc. in the process. 

246 This is a slightly different version to that in cardinal_pythonlib, because 

247 the Eszett gets a rough ride: 

248 

249 .. code-block:: python 

250 

251 "Straße Clérambault".encode("ascii", "ignore") # b'Strae Clerambault' 

252 

253 So we add the ``MANGLE_PRETRANSLATE`` step. 

254 """ 

255 if s is None: 

256 return "" 

257 if not isinstance(s, str): 

258 s = str(s) 

259 return ( 

260 unicodedata.normalize("NFKD", s) 

261 .translate(MANGLE_PRETRANSLATE) 

262 .encode("ascii", "ignore") # gets rid of accents 

263 .decode("ascii") # back to a string 

264 ) 

265 

266 

267def safe_upper(name: str) -> str: 

268 """ 

269 Convert to upper case, but don't mess up a few specific accents. Note that: 

270 

271 - 'ß'.upper() == 'SS' but 'ẞ'.upper() == 'ẞ' 

272 

273 ... here, we will use an upper-case Eszett, and the "SS" will be dealt with 

274 through transliteration. 

275 """ 

276 return name.translate(SAFE_UPPER_PRETRANSLATE).upper() 

277 

278 

279def remove_redundant_whitespace(x: str) -> str: 

280 """ 

281 Strip at edges; remove double-spaces; remove any other whitespace by a 

282 single space. 

283 """ 

284 return ONE_OR_MORE_SPACE_REGEX.sub(" ", x).strip() 

285 

286 

287def simplify_punctuation_whitespace(x: str) -> str: 

288 """ 

289 Simplify punctuation and whitespace, e.g. curly to straight quotes, tab to 

290 space, en dash to hyphen, etc. 

291 """ 

292 return x.translate(SIMPLIFY_PUNCTUATION_WHITESPACE_TRANS) 

293 

294 

295def standardize_name(name: str) -> str: 

296 """ 

297 Converts a name to a standard form: upper case (will also e.g. translate 

298 Eszett to SS), no spaces, no punctuation. 

299 

300 This is the format used by the US surname database, e.g. ACOSTAPEREZ for 

301 (probably) Acosta Perez, and just PEREZ without e.g. PÉREZ. 

302 

303 We use this for our name frequency databases. For other purposes, we use 

304 a more sophisticated approach; see e.g. surname_alternative_fragments(). 

305 

306 Examples: see unit tests. 

307 """ 

308 return mangle_unicode_to_ascii( 

309 name.upper().translate(REMOVE_PUNCTUATION_SPACE_TABLE) 

310 ) 

311 

312 

313def _gen_name_versions( 

314 x: str, 

315 accent_transliterations: Dict[ 

316 int, Union[str, int, None] 

317 ] = FuzzyDefaults.ACCENT_TRANSLITERATIONS_TRANS, 

318) -> Generator[str, None, None]: 

319 """ 

320 Generate the string itself and accent-mangled and accent-transliterated 

321 versions thereof. We assume that either nothing happens, mangling 

322 happens, or transliteration happens, but not some nasty combination. 

323 """ 

324 x = x.strip() 

325 if not x: 

326 return 

327 # The string: 

328 yield x 

329 # Mangled, e.g. Ü to U: 

330 yield mangle_unicode_to_ascii(x) 

331 # Transliterated, e.g. Ü to UE. 

332 yield x.translate(accent_transliterations) 

333 

334 

335def surname_alternative_fragments( 

336 surname: str, 

337 accent_transliterations: Dict[ 

338 int, Union[str, int, None] 

339 ] = FuzzyDefaults.ACCENT_TRANSLITERATIONS_TRANS, 

340 nonspecific_name_components: Set[ 

341 str 

342 ] = FuzzyDefaults.NONSPECIFIC_NAME_COMPONENTS, 

343) -> List[str]: 

344 """ 

345 Return a list of fragments that may occur as substitutes for the name 

346 (including the name itself). Those fragments include: 

347 

348 - Parts of double-barrelled surnames. 

349 - ASCII-mangled versions of accents (e.g. Ü to U). 

350 - Transliterated versions of accents (e.g. Ü to UE). 

351 

352 Upper case will be used throughout. 

353 

354 Args: 

355 surname: 

356 The name to process. This should contain all original accents, 

357 spacing, and punctuation (i.e. should NOT have been standardized as 

358 above). Case is unimportant (we will use upper case internally). 

359 accent_transliterations: 

360 A mapping from accents to potential transliterated versions, in the 

361 form of a Python string translation table. 

362 nonspecific_name_components: 

363 Name fragments that should not be produced in their own right, e.g. 

364 nobiliary particles such as "van" in "van Beethoven". 

365 

366 Returns: 

367 A list of fragments: full name first, then other fragments in 

368 alphabetical order. 

369 """ 

370 if not surname: 

371 # No name, nothing to do. 

372 return [] 

373 

374 # Very basic standardization first: upper case, sort out punctuation. 

375 surname = safe_upper(simplify_punctuation_whitespace(surname)) 

376 

377 # Split into word chunks: 

378 chunks = list(filter(None, NONWORD_REGEX.split(surname))) 

379 # Filtering is required, e.g. "hello ' world" -> ['hello', '', '', 'world'] 

380 

381 # Now make a standardized version of the name where punctuation/spaces have 

382 # been removed (as per the US name databases also): 

383 surname = "".join(chunks) 

384 

385 # Create a set of fragments. The set will de-duplicate. 

386 fragments = set() # type: Set[str] 

387 

388 # The name itself, and any accent-modified versions: 

389 fragments.update(_gen_name_versions(surname, accent_transliterations)) 

390 

391 # Components, and any accent-modified versions: 

392 for chunk in chunks: 

393 # All should be pre-stripped and none should be blank 

394 if chunk in nonspecific_name_components: 

395 continue 

396 fragments.update(_gen_name_versions(chunk, accent_transliterations)) 

397 # This process may well have worked through duplicates, but the set will 

398 # take care of those. 

399 

400 # Return the (standardized) name first. 

401 return [surname] + sorted(fragments - {surname}) 

402 

403 

404def get_metaphone(x: str) -> str: 

405 """ 

406 Returns a string representing a metaphone of the string -- specifically, 

407 the first (primary) part of a Double Metaphone. 

408 

409 See 

410 

411 - https://www.b-eye-network.com/view/1596 

412 - https://dl.acm.org/citation.cfm?id=349132 

413 

414 The implementation is from https://pypi.org/project/Fuzzy/. 

415 

416 Alternatives (soundex, NYSIIS) are in ``fuzzy`` and also in ``jellyfish`` 

417 (https://jellyfish.readthedocs.io/en/latest/). 

418 

419 .. code-block:: python 

420 

421 from crate_anon.tools.fuzzy_id_match import * 

422 get_metaphone("Alice") # ALK 

423 get_metaphone("Alec") # matches Alice; ALK 

424 get_metaphone("Mary Ellen") # MRLN 

425 get_metaphone("D'Souza") # TSS 

426 get_metaphone("de Clerambault") # TKRM; won't do accents 

427 

428 """ 

429 if not x: 

430 return "" 

431 metaphones = dmeta(x) 

432 first_part = metaphones[0] # the first part only 

433 if first_part is None: 

434 warn_once(f"No metaphone for {x!r}", log, level=logging.DEBUG) 

435 return "" 

436 return first_part.decode("ascii") 

437 

438 

439def get_first_two_char(x: str) -> str: 

440 """ 

441 Returns the first two characters of a string. Having this as a function is 

442 slight overkill. 

443 """ 

444 return x[:2] 

445 

446 

447# ============================================================================= 

448# Postcode manipulation 

449# ============================================================================= 

450 

451POSTCODE_REGEX = re.compile( 

452 anchor(get_uk_postcode_regex_string(at_word_boundaries_only=False)) 

453 # Need at_word_boundaries_only=True. 

454 # We don't want at_word_boundaries_only=True, since that matches e.g. 

455 # "VALID_POSTCODE JUNK". We want anchor() instead. 

456) 

457 

458 

459def standardize_postcode(postcode_unit_or_sector: str) -> str: 

460 """ 

461 Standardizes postcodes to "no space" format. 

462 """ 

463 return postcode_unit_or_sector.upper().translate( 

464 REMOVE_PUNCTUATION_SPACE_TABLE 

465 ) 

466 

467 

468def get_postcode_sector( 

469 postcode_unit: str, prestandardized: bool = False 

470) -> str: 

471 """ 

472 Returns the postcode (area + district +) sector from a full postcode. For 

473 example, converts "AB12 3CD" to "AB12 3". 

474 

475 While the format and length of the first part (area + district) varies (2-4 

476 characters), the format of the second (sector + unit) is fixed, of the 

477 format "9AA" (3 characters); 

478 https://en.wikipedia.org/wiki/Postcodes_in_the_United_Kingdom#Formatting. 

479 So to get the sector, we chop off the last two characters. 

480 """ 

481 if not prestandardized: 

482 postcode_unit = standardize_postcode(postcode_unit) 

483 return postcode_unit[:-2] 

484 

485 

486# noinspection HttpUrlsUsage 

487_ = """ 

488PSEUDO_POSTCODES = set(standardize_postcode(p) for p in ( 

489 "ZZ99 3VZ", # No fixed abode [1, 2] 

490 "ZZ99 3WZ", # Address not known [2] 

491 "ZZ99 3CZ", # England/U.K, not otherwise specified [1, 3] (*) 

492 # ... or "Z99 3CZ"? [2] (*). 

493 "ZZ99 3GZ", # Wales, not otherwise specified [1, 2] 

494 "ZZ99 1WZ", # Scotland, not otherwise specified [1, 2] 

495 "ZZ99 2WZ", # Northern Ireland, not otherwise specified [1, 2] 

496 # Also: ZZ99 <nnn>, where <nnn> is a country code -- so that's a large 

497 # range. 

498 # [1] http://www.datadictionary.wales.nhs.uk/index.html#!WordDocuments/postcode.htm 

499 # [2] https://www.england.nhs.uk/wp-content/uploads/2021/03/commissioner-assignment-method-2122-guidance-v1.1.pdf 

500 # [3] https://afyonluoglu.org/PublicWebFiles/Reports-TR/Veri%20Sozlugu/international/2017-HES%20Admitted%20Patient%20Care%20Data%20Dictionary.pdf 

501 # (*) [2] uses "Z99 3CZ" (page 6); [1, 3] use "ZZ99 3CZ". 

502)) 

503PSEUDO_POSTCODE_SECTORS = set(get_postcode_sector(p) for p in PSEUDO_POSTCODES) 

504""" # noqa: E501 

505 

506PSEUDO_POSTCODE_START = "ZZ99" 

507PSEUDOPOSTCODE_NFA = "ZZ993VZ" # no fixed abode 

508 

509 

510def is_pseudopostcode( 

511 postcode_unit: str, prestandardized: bool = False 

512) -> bool: 

513 """ 

514 Is this a pseudopostcode? 

515 """ 

516 if not prestandardized: 

517 postcode_unit = standardize_postcode(postcode_unit) 

518 return postcode_unit.startswith(PSEUDO_POSTCODE_START) 

519 

520 

521def is_nfa_postcode(postcode_unit: str, prestandardized: bool = False) -> bool: 

522 """ 

523 Is this the pseudopostcode meaning "no fixed abode"? 

524 """ 

525 if not prestandardized: 

526 postcode_unit = standardize_postcode(postcode_unit) 

527 return postcode_unit == PSEUDOPOSTCODE_NFA 

528 

529 

530# ============================================================================= 

531# Functions to introduce errors (for testing) 

532# ============================================================================= 

533 

534 

535def mutate_name(name: str) -> str: 

536 """ 

537 Introduces typos into a (standardized, capitalized, 

538 no-space-no-punctuation) name. 

539 """ 

540 n = len(name) 

541 a = ord("A") 

542 z = ord("Z") 

543 which = random.randrange(n) 

544 start_ord = ord(name[which]) 

545 while True: 

546 replacement_ord = random.randint(a, z) 

547 if replacement_ord != start_ord: 

548 break 

549 return name[:which] + chr(replacement_ord) + name[which + 1 :] 

550 

551 

552def mutate_postcode(postcode: str, cfg: "MatchConfig") -> str: 

553 """ 

554 Introduces typos into a UK postcode, keeping the letter/digit format. 

555 

556 Args: 

557 postcode: the postcode to alter 

558 cfg: the main :class:`MatchConfig` object 

559 """ 

560 n = len(postcode) 

561 a = ord("A") 

562 z = ord("Z") 

563 zero = ord("0") 

564 nine = ord("9") 

565 while True: 

566 while True: 

567 which = random.randrange(n) 

568 if postcode[which] != " ": 

569 break 

570 # noinspection PyUnboundLocalVariable 

571 start_ord = ord(postcode[which]) 

572 replacement_ord = start_ord 

573 if postcode[which].isdigit(): 

574 while replacement_ord == start_ord: 

575 replacement_ord = random.randint(zero, nine) 

576 else: 

577 while replacement_ord == start_ord: 

578 replacement_ord = random.randint(a, z) 

579 mutated = ( 

580 postcode[:which] + chr(replacement_ord) + postcode[which + 1 :] 

581 ) 

582 if cfg.is_valid_postcode(mutated): 

583 return mutated 

584 

585 

586# ============================================================================= 

587# Faster maths 

588# ============================================================================= 

589 

590 

591@jit(nopython=True) 

592def ln(x: float) -> float: 

593 """ 

594 Version of :func:`math.log` that treats log(0) as ``-inf``, rather than 

595 crashing with ``ValueError: math domain error``. 

596 

597 Args: 

598 x: parameter 

599 

600 Returns: 

601 float: ln(x), the natural logarithm of x 

602 """ 

603 # noinspection PyBroadException 

604 try: 

605 return math_ln(x) 

606 except Exception: # numba.jit can only cope with Exception 

607 if x < 0: 

608 raise ValueError("Can't take log of a negative number") 

609 # Either x > 0 but causing problems anyway (unlikely), or x == 0. 

610 return MINUS_INFINITY 

611 

612 

613@jit(nopython=True) 

614def log_posterior_odds_from_pdh_pdnh( 

615 log_prior_odds: float, p_d_given_h: float, p_d_given_not_h: float 

616) -> float: 

617 r""" 

618 Calculates posterior odds. 

619 Fast implementation. 

620 

621 Args: 

622 log_prior_odds: 

623 log prior odds of H, :math:`ln(\frac{ P(H) }{ P(\neg H) })` 

624 p_d_given_h: 

625 :math:`P(D | H)` 

626 p_d_given_not_h: 

627 :math:`P(D | \neg H)` 

628 

629 Returns: 

630 float: 

631 log posterior odds of H, 

632 :math:`ln(\frac{ P(H | D) }{ P(\neg H | D) })` 

633 """ 

634 return log_prior_odds + ln(p_d_given_h) - ln(p_d_given_not_h) 

635 

636 

637@jit(nopython=True) 

638def log_likelihood_ratio_from_p( 

639 p_d_given_h: float, p_d_given_not_h: float 

640) -> float: 

641 r""" 

642 Calculates the log of the odds ratio. 

643 Fast implementation. 

644 

645 Args: 

646 p_d_given_h: 

647 :math:`P(D | H)` 

648 p_d_given_not_h: 

649 :math:`P(D | \neg H)` 

650 

651 Returns: 

652 float: 

653 log likelihood ratio, 

654 :math:`ln(\frac{ P(D | H) }{ P(D | \neg H) })` 

655 """ 

656 return ln(p_d_given_h) - ln(p_d_given_not_h) 

657 

658 

659# ============================================================================= 

660# Read and check the type of dictionary values 

661# ============================================================================= 

662 

663 

664def getdictval( 

665 d: Dict[str, Any], 

666 key: str, 

667 type_: Type, 

668 mandatory: bool = False, 

669 default: Any = None, 

670) -> Any: 

671 """ 

672 Returns a value from a dictionary, or raises ValueError. 

673 

674 - If ``mandatory`` is True, the key must be present, and the value must not 

675 be ``None`` or a blank string. 

676 - If ``mandatory`` is False and the key is absent, ``default`` is returned. 

677 - The value must be of type `type_` (or ``None`` if permitted). 

678 """ 

679 try: 

680 v = d[key] 

681 except KeyError: 

682 if mandatory: 

683 raise ValueError(f"Missing key: {key}") 

684 else: 

685 return default 

686 if mandatory and (v is None or v == ""): 

687 raise ValueError(f"Missing or blank value: {key}") 

688 if not isinstance(v, (type_, NONE_TYPE)): 

689 raise ValueError( 

690 f"Value for {key!r} should be of type {type_} " 

691 f"but was of type {type(v)}; was {v!r}" 

692 ) 

693 return v 

694 

695 

696def validate_prob(p: float, description: str) -> None: 

697 """ 

698 Checks a probability is in the range [0, 1] or raises :exc:`ValueError`. 

699 """ 

700 if not 0 <= p <= 1: 

701 raise ValueError( 

702 f"Bad probability for {description}: {p} " 

703 f"-- must be in range [0, 1]" 

704 ) 

705 

706 

707def validate_uncertain_prob(p: float, description: str) -> None: 

708 """ 

709 Checks a probability is in the range (0, 1) or raises :exc:`ValueError`. 

710 """ 

711 if not 0 < p < 1: 

712 raise ValueError( 

713 f"Bad probability for {description}: {p} " 

714 f"-- must be in range (0, 1)" 

715 ) 

716 

717 

718def getdictprob( 

719 d: Dict[str, Any], 

720 key: str, 

721 mandatory: bool = False, 

722 default: Optional[float] = None, 

723) -> Optional[float]: 

724 """ 

725 As for :func:`getdictval` but returns a probability and checks that it is 

726 in range. The default is non-mandatory, returning None. 

727 """ 

728 v = getdictval(d, key, float, mandatory=mandatory, default=default) 

729 if v is None: 

730 return None 

731 validate_prob(v, key) 

732 return v 

733 

734 

735# ============================================================================= 

736# Dates 

737# ============================================================================= 

738 

739ISO_DATE_REGEX = re.compile( 

740 # yyyy-MM-dd, from the year 0000 onwards. 

741 r"^\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12][0-9]|3[01])$" 

742 # ^^^^^ ^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^ 

743 # year month day 

744) 

745# Also: https://stackoverflow.com/questions/3143070 

746 

747 

748def is_valid_isoformat_date(x: str) -> bool: 

749 """ 

750 Validates an ISO-format date with separators, e.g. '2022-12-31'. 

751 """ 

752 if not isinstance(x, str): 

753 return False 

754 if not ISO_DATE_REGEX.match(x): 

755 # We check this because "2020" will convert to 2020-01-01 if we just 

756 # let Pendulum autoconvert below. 

757 return False 

758 try: 

759 coerce_to_pendulum_date(x) 

760 except (ParserError, ValueError): 

761 return False 

762 return True 

763 

764 

765def is_valid_isoformat_blurred_date(x: str) -> bool: 

766 """ 

767 Validates an ISO-format date (as above) that must be the first of the 

768 month. 

769 """ 

770 if not is_valid_isoformat_date(x): 

771 return False 

772 d = coerce_to_pendulum_date(x) 

773 return d.day == 1 

774 

775 

776def isoformat_optional_date_str(d: Optional[Date]) -> str: 

777 """ 

778 Returns a date in string format. 

779 """ 

780 if not d: 

781 return "" 

782 return d.isoformat() 

783 

784 

785def isoformat_date_or_none(d: Optional[Date]) -> Optional[str]: 

786 """ 

787 Returns a date in string format, or None if it is absent. 

788 """ 

789 if not d: 

790 return None 

791 return d.isoformat() 

792 

793 

794def age_years(dob: Optional[Date], when: Optional[Date]) -> Optional[int]: 

795 """ 

796 A person's age in years when something happened, or ``None`` if either 

797 DOB or the index date is unknown. 

798 """ 

799 if dob and when: 

800 return (when - dob).in_years() 

801 return None 

802 

803 

804def mk_blurry_dates(d: Union[Date, str]) -> Tuple[str, str, str]: 

805 """ 

806 Returns MONTH_DAY, YEAR_DAY, and YEAR_MONTH versions in a standard form. 

807 """ 

808 # ISO format is %Y-%m-%d; see 

809 # https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes # noqa: E501 

810 # Here we want the shortest full representation; these are not intended to 

811 # be human-legible. 

812 d = coerce_to_pendulum_date(d) 

813 dob_md = d.strftime("%m%d") 

814 dob_yd = d.strftime("%Y%d") 

815 dob_ym = d.strftime("%Y%m") 

816 return dob_md, dob_yd, dob_ym 

817 

818 

819# ============================================================================= 

820# argparse helpers 

821# ============================================================================= 

822 

823 

824def optional_int(value: str) -> Optional[int]: 

825 """ 

826 ``argparse`` argument type that checks that its value is an integer or the 

827 value ``None``. 

828 """ 

829 if value.lower() == "none": 

830 return None 

831 try: 

832 return int(value) 

833 except (AssertionError, TypeError, ValueError): 

834 raise ArgumentTypeError(f"{value!r} is an invalid optional int") 

835 

836 

837# ============================================================================= 

838# Identity function 

839# ============================================================================= 

840 

841 

842def identity(x: Any) -> Any: 

843 """ 

844 Returns its input. 

845 """ 

846 return x 

847 

848 

849# ============================================================================= 

850# Perfect identifiers 

851# ============================================================================= 

852 

853 

854def dict_from_str(x: str) -> Dict[str, str]: 

855 """ 

856 Reads a dictionary like {'a': 'x', 'b': 'y'} from a string like "{a:x, 

857 b:y}". 

858 """ 

859 if not x: 

860 return {} 

861 w = x.strip() # working 

862 if w[0] != "{" or w[-1] != "}": 

863 raise ValueError(f"Bad dict string: {x!r}") 

864 w = w[1:-1].strip() 

865 d = {} 

866 for pair_str in w.split(","): 

867 if pair_str.count(":") != 1: 

868 raise ValueError(f"Bad dict string: {x!r}") 

869 k, v = pair_str.split(":") 

870 d[k.strip()] = v.strip() 

871 return d 

872 

873 

874def standardize_perfect_id_key(k: str) -> str: 

875 """ 

876 Keys are compared case-insensitive, in lower case. 

877 """ 

878 return k.strip().lower() 

879 

880 

881def standardize_perfect_id_value(k: Any) -> str: 

882 """ 

883 Values are forced to strings and compared case-insensitive, in upper case. 

884 """ 

885 return str(k).strip().upper()