Coverage for linkage/frequencies.py: 46%
285 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1r"""
2crate_anon/linkage/frequencies.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Frequency classes for linkage tools.**
28These record and calculate frequencies of real-world things (names, postcodes)
29from publicly available data.
31"""
33# =============================================================================
34# Imports
35# =============================================================================
37from collections import Counter, defaultdict
38import csv
39import json
40import logging
41from typing import Any, Dict, List, Optional, Sequence, Set, Tuple
43from cardinal_pythonlib.reprfunc import auto_repr
44import jsonlines
46from crate_anon.common.logfunc import warn_once
47from crate_anon.linkage.constants import UK_POPULATION_2017
48from crate_anon.linkage.helpers import (
49 get_first_two_char,
50 get_metaphone,
51 get_postcode_sector,
52 is_pseudopostcode,
53 mkdir_for_filename,
54 open_even_if_zipped,
55 standardize_name,
56 standardize_postcode,
57)
59log = logging.getLogger(__name__)
62# =============================================================================
63# BasicNameMetaphoneFreq
64# =============================================================================
67class BasicNameFreqInfo:
68 """
69 Used for calculating P(share F2C but not name or metaphone).
71 Note that the metaphone can be "", e.g. if the name is "W". But we can
72 still calculate the frequency of those metaphones cumulatively across all
73 our names.
74 """
76 KEY_NAME = "name"
77 KEY_P_NAME = "p_f"
78 KEY_GENDER = "gender"
79 KEY_METAPHONE = "metaphone"
80 KEY_P_METAPHONE = "p_p1"
81 KEY_P_METAPHONE_NOT_NAME = "p_p1nf"
82 KEY_F2C = "f2c"
83 KEY_P_F2C = "p_p2"
84 KEY_P_F2C_NOT_NAME_METAPHONE = "p_p2np1"
86 def __init__(
87 self,
88 name: str,
89 p_name: float,
90 gender: str = "",
91 metaphone: str = "",
92 p_metaphone: float = 0.0,
93 p_metaphone_not_name: float = 0.0,
94 f2c: str = "",
95 p_f2c: float = 0.0,
96 p_f2c_not_name_metaphone: float = 0.0,
97 synthetic: bool = False,
98 ) -> None:
99 """
100 The constructor allows initialization with just a name and its
101 frequency (with other probabilities being set later), or from a saved
102 representation with full details.
104 Args:
105 name:
106 Name.
107 p_name:
108 Population probability (frequency) of this name, within the
109 specified gender if there is one.
110 gender:
111 Specified gender, or a blank string for non-gender-associated
112 names.
113 metaphone:
114 "Sounds-like" representation as the first part of a double
115 metaphone.
116 p_metaphone:
117 Population frequency (probability) of the metaphone.
118 p_metaphone_not_name:
119 Probability that someone in the population shares this
120 metaphone, but not this name. Usually this is ``p_metaphone -
121 p_name``, but you may choose to impose a minimum frequency.
122 f2c:
123 First two characters (F2C) of the name.
124 p_f2c:
125 Population probability of the F2C.
126 p_f2c_not_name_metaphone:
127 Probability that someone in the population shares this F2C, but
128 not this name or metaphone.
129 synthetic:
130 Is this record made up (e.g. an unknown name, or a mean of two
131 other records)?
132 """
133 name = standardize_name(name)
134 self.name = name
135 self.gender = gender
136 self.p_name = p_name
138 self.metaphone = metaphone or get_metaphone(name)
139 self.p_metaphone = p_metaphone
140 self.p_metaphone_not_name = p_metaphone_not_name
142 self.f2c = f2c or get_first_two_char(name)
143 self.p_f2c = p_f2c # not important! For info only.
144 self.p_f2c_not_name_metaphone = p_f2c_not_name_metaphone
146 self.synthetic = synthetic
148 def __repr__(self) -> str:
149 return auto_repr(self, sort_attrs=False)
151 @property
152 def p_no_match(self) -> float:
153 assert (
154 self.p_metaphone >= self.p_name
155 ), "Set p_metaphone before using p_no_match"
156 return 1 - self.p_metaphone - self.p_f2c_not_name_metaphone
157 # p_metaphone includes p_name
159 def as_dict(self) -> Dict[str, Any]:
160 """
161 Returns a JSON representation.
162 """
163 return {
164 self.KEY_NAME: self.name,
165 self.KEY_GENDER: self.gender,
166 self.KEY_P_NAME: self.p_name,
167 self.KEY_METAPHONE: self.metaphone,
168 self.KEY_P_METAPHONE: self.p_metaphone,
169 self.KEY_P_METAPHONE_NOT_NAME: self.p_metaphone_not_name,
170 self.KEY_F2C: self.f2c,
171 self.KEY_P_F2C: self.p_f2c,
172 self.KEY_P_F2C_NOT_NAME_METAPHONE: self.p_f2c_not_name_metaphone,
173 }
175 @classmethod
176 def from_dict(cls, d: Dict[str, Any]) -> "BasicNameFreqInfo":
177 """
178 Create from JSON representation.
179 """
180 return BasicNameFreqInfo(
181 name=d[cls.KEY_NAME],
182 gender=d[cls.KEY_GENDER],
183 p_name=d[cls.KEY_P_NAME],
184 metaphone=d[cls.KEY_METAPHONE],
185 p_metaphone=d[cls.KEY_P_METAPHONE],
186 p_metaphone_not_name=d[cls.KEY_P_METAPHONE_NOT_NAME],
187 f2c=d[cls.KEY_F2C],
188 p_f2c=d[cls.KEY_P_F2C],
189 p_f2c_not_name_metaphone=d[cls.KEY_P_F2C_NOT_NAME_METAPHONE],
190 )
192 @staticmethod
193 def weighted_mean(
194 objects: Sequence["BasicNameFreqInfo"], weights: Sequence[float]
195 ):
196 """
197 Returns an object with the weighted probabilities across the objects
198 specified. Used for gender weighting.
199 """
200 assert len(objects) == len(weights) > 0
201 first = objects[0]
202 result = BasicNameFreqInfo(name=first.name, p_name=0.0, synthetic=True)
203 for i, obj in enumerate(objects):
204 w = weights[i]
205 result.p_name += w * obj.p_name
206 result.p_metaphone += w * obj.p_name
207 result.p_metaphone_not_name += w * obj.p_metaphone_not_name
208 result.p_f2c += w * obj.p_f2c
209 result.p_f2c_not_name_metaphone += w * obj.p_f2c_not_name_metaphone
210 return result
213# =============================================================================
214# NameFrequencyInfo
215# =============================================================================
218class NameFrequencyInfo:
219 """
220 Holds frequencies of a class of names (e.g. first names or surnames), and
221 also of their fuzzy (metaphone) versions.
223 We keep these frequency representations entirely here (source) and with
224 the probands (storage); the config doesn't get involved except to define
225 min_frequency at creation. We need to scan across all names for an estimate
226 of the empty ("") metaphone, which does arise in our standard data. There
227 is a process for obtaining default frequency information for any names not
228 encountered in our name definitions, of course, but that is then stored
229 with the (hashed) name representations and nothing needs to be recalculated
230 at comparison time. (Compare postcodes, where further geographical
231 adjustments may be required, depending on the comparison population.)
232 """
234 def __init__(
235 self,
236 csv_filename: str,
237 cache_filename: str,
238 by_gender: bool = False,
239 min_frequency: float = 0,
240 ) -> None:
241 """
242 Initializes the object from a CSV file.
243 Uses standardize_name().
245 Args:
246 csv_filename:
247 CSV file, with no header, of "name, frequency" pairs.
248 cache_filename:
249 File in which to cache information, for faster loading.
250 by_gender:
251 Is the source data split by gender?
252 min_frequency:
253 Minimum frequency to allow; see command-line help.
254 """
255 self._csv_filename = csv_filename
256 self._cache_filename = cache_filename
257 self._min_frequency = min_frequency
258 self.by_gender = by_gender
260 self.infolist = [] # type: List[BasicNameFreqInfo]
262 # We key the following by (name, gender), even if gender is "".
263 # This makes the code much simpler.
264 self.name_gender_idx = (
265 {}
266 ) # type: Dict[Tuple[str, str], BasicNameFreqInfo]
267 self.metaphone_freq = {} # type: Dict[Tuple[str, str], float]
268 self.f2c_freq = {} # type: Dict[Tuple[str, str], float]
269 self.f2c_to_infolist = defaultdict(
270 list
271 ) # type: Dict[Tuple[str, str], List[BasicNameFreqInfo]]
273 if not csv_filename or not cache_filename:
274 log.debug("Using dummy NameFrequencyInfo")
275 return
277 try:
278 self._load_from_cache(cache_filename)
279 except ValueError:
280 log.critical(f"Bad cache: please delete {cache_filename}")
281 raise
282 except FileNotFoundError:
283 self._load_from_csv(csv_filename)
284 self._save_to_cache(cache_filename)
286 def _load_from_cache(self, cache_filename: str) -> None:
287 """
288 Loads from a JSONL cache.
289 """
290 log.info(f"Reading from cache: {cache_filename}")
291 with jsonlines.open(cache_filename) as reader:
292 self.infolist = [BasicNameFreqInfo.from_dict(d) for d in reader]
293 log.debug(f"... finished reading from: {cache_filename}")
294 self._index(update_infolist=False)
296 def _save_to_cache(self, cache_filename: str) -> None:
297 """
298 Saves to a JSONL cache.
299 """
300 if not cache_filename:
301 return
302 log.info(f"Writing to cache: {cache_filename}")
303 mkdir_for_filename(cache_filename)
304 with jsonlines.open(cache_filename, mode="w") as writer:
305 for i in self.infolist:
306 writer.write(i.as_dict())
307 log.debug(f"... finished writing to cache: {cache_filename}")
309 def _load_from_csv(self, csv_filename: str) -> None:
310 """
311 Read from the original data.
312 """
313 log.info(f"Reading source data: {csv_filename}")
314 by_gender = self.by_gender
315 min_frequency = self._min_frequency
316 self.infolist = []
317 with open_even_if_zipped(csv_filename) as f:
318 for row in csv.reader(f):
319 if by_gender:
320 gender = row[1]
321 freq_str = row[2]
322 else:
323 gender = ""
324 freq_str = row[1]
325 self.infolist.append(
326 BasicNameFreqInfo(
327 name=row[0],
328 p_name=max(min_frequency, float(freq_str)),
329 gender=gender,
330 )
331 )
332 log.debug(f"... finished reading from: {csv_filename}")
333 self._index(update_infolist=True)
335 def _index(self, update_infolist: bool) -> None:
336 """
337 Build our internal indexes, having loaded `self.infolist`.
339 Example for thinking (with fictional metaphones; these might be
340 wrong!):
342 .. code-block:: none
344 # name p metaphone f2c
345 1 SMITH 0.2 SMT SM
346 2 SMYTHE 0.05 SMT SM
347 3 SCHMITH 0.01 SMT SC
348 4 SMALL 0.04 SML SM
349 5 JONES 0.2 JNS JO
350 6 JOPLIN 0.1 JPL JO
351 7 WALKER 0.2 WLK WA
352 8 ZEBRA 0.2 ZBR ZE
354 With respect to a proband called SMITH:
356 - P(another person's name is SMITH) = 0.2 [1];
358 - P(another person's metaphone is SMT) = 0.26 [1, 2, 3];
359 - P(another person's metaphone is SMT but their name is not SMITH) =
360 0.06 [2, 3], being the preceding minus [1];
362 - P(another person's F2C is SM) = 0.29 [1, 2, 4];
363 - P(another person's F2C is SM but their metaphone is not SMT and their
364 name is not SMITH) = 0.04 [4].
366 With respect to a proband called SMALL:
368 - P(another person's name is SMALL) = 0.04 [4];
370 - P(... metaphone SML) = 0.04 [4];
371 - P(... metaphone SML, name not SMALL) = 0, being the preceding minus
372 [4];
374 - P(... F2C SM) = 0.29 [1, 2, 4];
375 - P(... F2C SM but metaphone not SML and name not SMALL) = 0.25 [1, 2].
377 This makes it apparent that:
379 - P(another person matches on name) = P(name in the population).
381 - Since names have a one-to-one or many-to-one relationship with
382 metaphones (one name can only have one metaphone but two names can
383 share a metaphone), P(metaphone match but not name match) is
384 P(metaphone match) minus P(name match).
386 - There is obviously a quantity P(F2C) that is constant for every F2C.
387 Also, the relationship between names and F2C is one-to-one or
388 many-to-one, as for metaphones. However, if F2C are second in the
389 hierarchy, such that we need to calculate P(F2C match but not name OR
390 METAPHONE match), it becomes relevant that the relationship between
391 metaphones and F2C is many-to-many [see examples 1-4 above].
393 THEREFORE, P(F2C match but name or metaphone match) is SPECIFIC TO
394 A NAME.
396 """
397 log.debug("Indexing name frequency info...")
399 # Reset
400 self.name_gender_idx = {}
401 self.metaphone_freq = {}
402 self.f2c_freq = {}
403 self.f2c_to_infolist = defaultdict(list)
405 # For extra speed:
406 min_frequency = self._min_frequency
407 name_gender_idx = self.name_gender_idx
408 metaphone_freq = self.metaphone_freq
409 f2c_freq = self.f2c_freq
410 f2c_to_infolist = self.f2c_to_infolist
412 meta_to_infolist = defaultdict(
413 list
414 ) # type: Dict[Tuple[str, str], List[BasicNameFreqInfo]]
416 for i in self.infolist:
417 name_key = i.name, i.gender
418 metaphone_key = i.metaphone, i.gender
419 f2c_key = i.f2c, i.gender
420 p_name = i.p_name
422 # Enable rapid lookup by name/gender
423 name_gender_idx[name_key] = i
425 # Calculate metaphone frequency (maybe for writing back to name
426 # info objects, but certainly for frequency information relating to
427 # unknown names with known metaphones).
428 metaphone_freq[metaphone_key] = (
429 metaphone_freq.get(metaphone_key, 0) + p_name
430 )
432 # Calculate F2C frequency (not very important!).
433 f2c_freq[f2c_key] = f2c_freq.get(f2c_key, 0) + p_name
435 # Enable lookup by F2C
436 f2c_to_infolist[f2c_key].append(i)
438 if update_infolist:
439 # Enable temporary lookup by metaphone
440 meta_to_infolist[metaphone_key].append(i)
442 if update_infolist:
443 log.info("... calculating additional frequency info (slow)...")
444 # Store metaphone frequency for each name.
445 for metaphone_key, metaphone_infolist in meta_to_infolist.items():
446 p_meta = metaphone_freq[metaphone_key]
447 for i in metaphone_infolist: # type: BasicNameFreqInfo
448 i.p_metaphone = max(min_frequency, p_meta)
449 i.p_metaphone_not_name = max(
450 min_frequency, p_meta - i.p_name
451 )
452 # This is not very important, but... store F2C frequency.
453 for f2c_key, f2c_infolist in f2c_to_infolist.items():
454 p_f2c = max(min_frequency, f2c_freq[f2c_key])
455 for i in f2c_infolist: # type: BasicNameFreqInfo
456 i.p_f2c = p_f2c
457 # Calculate P(F2C match but not name or metaphone match).
458 # This is name-specific; see above.
459 for i in self.infolist:
460 f2c_key = i.f2c, i.gender
461 i.p_f2c_not_name_metaphone = 0.0
462 for other in f2c_to_infolist[f2c_key]: # ... same F2C...
463 if other.name != i.name and other.metaphone != i.metaphone:
464 # ... but different name and metaphone...
465 i.p_f2c_not_name_metaphone += other.p_name
466 i.p_f2c_not_name_metaphone = max(
467 min_frequency, i.p_f2c_not_name_metaphone
468 )
470 log.debug("... finished indexing name frequency info")
472 def name_frequency_info(
473 self, name: str, gender: str = "", prestandardized: bool = True
474 ) -> BasicNameFreqInfo:
475 """
476 Look up frequency information for a name (with gender, optionally).
477 """
478 if not prestandardized:
479 name = standardize_name(name)
480 key = name, gender
481 result = self.name_gender_idx.get(key, None)
482 if result is not None:
483 return result
484 return self._unknown_name_info(name, gender)
486 def _unknown_name_info(
487 self, name: str, gender: str = ""
488 ) -> BasicNameFreqInfo:
489 """
490 Return a default set of information for unknown names. We do not alter
491 our saved information.
493 It's possible that an unknown name has a known metaphone or F2C,
494 though, so we account for that.
495 """
496 min_frequency = self._min_frequency
497 result = BasicNameFreqInfo(
498 name=name,
499 p_name=min_frequency,
500 gender=gender,
501 synthetic=True,
502 )
504 metaphone = result.metaphone
505 meta_key = metaphone, gender
506 result.p_metaphone = max(
507 min_frequency, self.metaphone_freq.get(meta_key, min_frequency)
508 )
509 result.p_metaphone_not_name = max(
510 min_frequency, result.p_metaphone - result.p_name
511 )
513 f2c_key = result.f2c, gender
514 result.p_f2c = max(
515 min_frequency, self.f2c_freq.get(f2c_key, min_frequency)
516 )
517 p_f2c_not_name_metaphone = 0.0
518 for i in self.f2c_to_infolist[f2c_key]: # same F2C
519 if i.metaphone != metaphone: # but not same metaphone
520 # and by definition not the same name, or we wouldn't be here
521 p_f2c_not_name_metaphone += i.p_name
522 result.p_f2c_not_name_metaphone = max(
523 min_frequency, p_f2c_not_name_metaphone
524 )
526 return result
528 def name_frequency(
529 self, name: str, gender: str = "", prestandardized: bool = True
530 ) -> float:
531 """
532 Returns the frequency of a name.
534 Args:
535 name: the name to check
536 gender: the gender, if created with ``by_gender=True``
537 prestandardized: was the name pre-standardized in format?
539 Returns:
540 the name's frequency in the population
541 """
542 return self.name_frequency_info(
543 name, gender, prestandardized=prestandardized
544 ).p_name
546 def metaphone_frequency(self, metaphone: str, gender: str = "") -> float:
547 """
548 Returns the frequency of a metaphone.
549 """
550 key = metaphone, gender
551 return self.metaphone_freq.get(key, self._min_frequency)
553 def first_two_char_frequency(self, f2c: str, gender: str = "") -> float:
554 """
555 Returns the frequency of the first two characters of a name.
556 This one isn't very important; we want a more refined probability.
557 """
558 key = f2c, gender
559 return self.f2c_freq.get(key, self._min_frequency)
561 def get_names_for_metaphone(self, metaphone: str) -> List[str]:
562 """
563 Return (for debugging purposes) a list of all names matching the
564 specified metaphone.
565 """
566 metaphone = metaphone.upper()
567 return sorted(
568 set(
569 info.name
570 for info in self.infolist
571 if info.metaphone == metaphone
572 )
573 )
576# =============================================================================
577# PostcodeFrequencyInfo
578# =============================================================================
581class PostcodeFrequencyInfo:
582 """
583 Holds frequencies of UK postcodes, and also their hashed versions.
584 Handles pseudo-postcodes somewhat separately.
586 Frequencies are national estimates for known real postcodes. Any local
587 correction or correction for unknown postcodes is done separately.
589 We return explicit "don't know" values for unknown postcodes (including
590 pseudopostcodes) since those values may be handled differently, in a way
591 that is set at comparison time.
592 """
594 KEY_POSTCODE_UNIT_FREQ = "postcode_unit_freq"
595 KEY_POSTCODE_SECTOR_FREQ = "postcode_sector_freq"
597 def __init__(
598 self,
599 csv_filename: str,
600 cache_filename: str,
601 report_every: int = 10000,
602 ) -> None:
603 """
604 Initializes the object from a CSV file.
606 Args:
607 csv_filename:
608 CSV file from the UK Office of National Statistics, e.g.
609 ``ONSPD_MAY_2022_UK.csv``. Columns include "pdcs" (one of the
610 postcode formats) and "oa11" (Output Area from the 2011
611 Census). A ZIP file containing a single CSV file is also
612 permissible (distinguished by filename extension).
613 cache_filename:
614 Filename to hold pickle format cached data, because the CSV
615 read process is slow (it's a 1.4 Gb CSV).
616 report_every:
617 How often to report progress during loading.
618 """
619 self._csv_filename = csv_filename
620 self._cache_filename = cache_filename
622 self._postcode_unit_freq = {} # type: Dict[str, float]
623 self._postcode_sector_freq = {} # type: Dict[str, float]
625 if not csv_filename or not cache_filename:
626 log.debug("Using dummy PostcodeFrequencyInfo")
627 return
629 try:
630 self._load_from_cache(cache_filename)
631 except (KeyError, ValueError):
632 log.critical(f"Bad cache: please delete {cache_filename}")
633 raise
634 except FileNotFoundError:
635 self._load_from_csv(
636 csv_filename,
637 report_every=report_every,
638 )
639 self._save_to_cache(cache_filename)
641 def _load_from_cache(self, cache_filename: str) -> None:
642 """
643 Loads from a JSON cache.
645 May raise KeyError, ValueError.
646 """
647 log.info(f"Reading from cache: {cache_filename}")
648 with open(cache_filename) as file:
649 d = json.load(file)
651 # May raise KeyError:
652 self._postcode_unit_freq = d[self.KEY_POSTCODE_UNIT_FREQ]
653 self._postcode_sector_freq = d[self.KEY_POSTCODE_SECTOR_FREQ]
655 if not isinstance(self._postcode_unit_freq, dict):
656 raise ValueError(
657 f"Bad cache: {self.KEY_POSTCODE_UNIT_FREQ} is of wrong type "
658 f"{type(self._postcode_unit_freq)}"
659 )
660 if not isinstance(self._postcode_sector_freq, dict):
661 raise ValueError(
662 f"Bad cache: {self.KEY_POSTCODE_SECTOR_FREQ} is of wrong type "
663 f"{type(self._postcode_sector_freq)}"
664 )
666 log.debug(f"... finished reading from: {cache_filename}")
668 def _save_to_cache(self, cache_filename: str) -> None:
669 """
670 Saves to a JSON cache.
671 """
672 if not cache_filename:
673 return
674 log.info(f"Writing to cache: {cache_filename}")
675 mkdir_for_filename(cache_filename)
676 d = {
677 self.KEY_POSTCODE_UNIT_FREQ: self._postcode_unit_freq,
678 self.KEY_POSTCODE_SECTOR_FREQ: self._postcode_sector_freq,
679 }
680 with open(cache_filename, mode="w") as file:
681 json.dump(d, file)
682 log.debug(f"... finished writing to cache: {cache_filename}")
684 def _load_from_csv(self, csv_filename: str, report_every: int) -> None:
685 """
686 Read from the original data.
687 """
688 log.info(f"Reading source data: {csv_filename}")
690 self._postcode_unit_freq = {}
691 self._postcode_sector_freq = {}
693 oa_unit_counter = Counter()
694 unit_to_oa = {} # type: Dict[str, str]
695 sector_to_oas = {} # type: Dict[str, Set[str]]
697 # Load data
698 with open_even_if_zipped(csv_filename) as f:
699 csvreader = csv.DictReader(f)
700 for rownum, row in enumerate(csvreader, start=1):
701 unit = standardize_postcode(row["pcds"])
702 sector = get_postcode_sector(unit)
703 oa = row["oa11"]
704 if rownum % report_every == 0:
705 log.debug(
706 f"Row# {rownum}: postcode unit {unit}, "
707 f"postcode sector {sector}, Output Area {oa}"
708 )
710 unit_to_oa[unit] = oa
711 oa_unit_counter[oa] += 1 # one more unit for this OA
712 if sector in sector_to_oas:
713 sector_to_oas[sector].add(oa)
714 else:
715 sector_to_oas[sector] = {oa}
717 # Calculate. The absolute value of the population size of an OA is
718 # irrelevant as it cancels out.
719 log.info("Calculating population frequencies for postcodes...")
720 unit_freq = self._postcode_unit_freq
721 sector_freq = self._postcode_sector_freq
722 total_n_oas = len(oa_unit_counter)
723 log.info(f"Number of Output Areas: {total_n_oas}")
724 for unit, oa in unit_to_oa.items():
725 n_units_in_this_oa = oa_unit_counter[oa]
726 unit_n_oas = 1 / n_units_in_this_oa
727 unit_freq[unit] = unit_n_oas / total_n_oas
728 for sector, oas in sector_to_oas.items():
729 sector_n_oas = len(oas)
730 sector_freq[sector] = sector_n_oas / total_n_oas
732 log.debug(f"... finished reading from: {csv_filename}")
734 def postcode_unit_sector_frequency(
735 self, postcode_unit: str, prestandardized: bool = False
736 ) -> Tuple[Optional[float], Optional[float]]:
737 """
738 Returns the frequency of a postcode unit and its associated sector.
739 Performs an important check that the sector frequency is as least as
740 big as the unit frequency.
742 Args:
743 postcode_unit: the postcode unit to check
744 prestandardized: was the postcode pre-standardized in format?
746 Returns:
747 tuple: unit_frequency, sector_frequency
748 """
749 unit = (
750 postcode_unit
751 if prestandardized
752 else standardize_postcode(postcode_unit)
753 )
754 sector = get_postcode_sector(unit)
755 try:
756 unit_freq = self._postcode_unit_freq[unit]
757 sector_freq = self._postcode_sector_freq[sector]
758 assert unit_freq <= sector_freq, (
759 f"Postcodes: unit_freq = {unit_freq}, "
760 f"sector_freq = {sector_freq}, but should have "
761 f"unit_freq <= sector_freq, "
762 f"for unit = {unit}, sector = {sector}"
763 )
764 except KeyError:
765 if not is_pseudopostcode(unit, prestandardized=True):
766 warn_once(
767 f"Unknown postcode: {unit}", log, level=logging.DEBUG
768 )
769 unit_freq = None
770 sector_freq = None
771 return unit_freq, sector_freq
773 def debug_is_valid_postcode(
774 self, postcode_unit: str, prestandardized: bool = False
775 ) -> bool:
776 """
777 Is this a valid postcode?
778 """
779 if not prestandardized:
780 postcode_unit = standardize_postcode(postcode_unit)
781 return postcode_unit in self._postcode_unit_freq or is_pseudopostcode(
782 postcode_unit, prestandardized=True
783 )
785 def debug_postcode_unit_population(
786 self,
787 postcode_unit: str,
788 prestandardized: bool = False,
789 total_population: int = UK_POPULATION_2017,
790 ) -> Optional[float]:
791 """
792 Returns the calculated population of a postcode unit.
794 Args:
795 postcode_unit: the postcode unit to check
796 prestandardized: was the postcode pre-standardized in format?
797 total_population: national population
798 """
799 unit_freq, _ = self.postcode_unit_sector_frequency(
800 postcode_unit, prestandardized
801 )
802 if unit_freq is None:
803 return None
804 return unit_freq * total_population
806 def debug_postcode_sector_population(
807 self,
808 postcode_sector: str,
809 prestandardized: bool = False,
810 total_population: int = UK_POPULATION_2017,
811 ) -> Optional[float]:
812 """
813 Returns the calculated population of a postcode sector.
815 Args:
816 postcode_sector: the postcode sector to check
817 prestandardized: was the sector pre-standardized in format?
818 total_population: national population
819 """
820 sector = (
821 postcode_sector
822 if prestandardized
823 else standardize_postcode(postcode_sector)
824 )
825 sector_freq = self._postcode_sector_freq.get(sector)
826 if sector_freq is None:
827 return None
828 return sector_freq * total_population