Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2# cardinal_pythonlib/athena_ohdsi.py 

3 

4""" 

5=============================================================================== 

6 

7 Original code copyright (C) 2009-2021 Rudolf Cardinal (rudolf@pobox.com). 

8 

9 This file is part of cardinal_pythonlib. 

10 

11 Licensed under the Apache License, Version 2.0 (the "License"); 

12 you may not use this file except in compliance with the License. 

13 You may obtain a copy of the License at 

14 

15 https://www.apache.org/licenses/LICENSE-2.0 

16 

17 Unless required by applicable law or agreed to in writing, software 

18 distributed under the License is distributed on an "AS IS" BASIS, 

19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

20 See the License for the specific language governing permissions and 

21 limitations under the License. 

22 

23=============================================================================== 

24 

25**Functions to assist with the Athena OHDSI vocabularies.** 

26 

27See https://athena.ohdsi.org/. 

28 

29""" 

30 

31import csv 

32import logging 

33from typing import Collection, Generator, Iterable, List 

34 

35from cardinal_pythonlib.logs import BraceStyleAdapter 

36from cardinal_pythonlib.reprfunc import simple_repr 

37from cardinal_pythonlib.snomed import SnomedConcept 

38 

39log = BraceStyleAdapter(logging.getLogger(__name__)) 

40 

41 

42# ============================================================================= 

43# Athena OHDSI mapping 

44# ============================================================================= 

45 

46# ----------------------------------------------------------------------------- 

47# Constants 

48# ----------------------------------------------------------------------------- 

49 

50class AthenaVocabularyId(object): 

51 """ 

52 Constant-holding class for Athena vocabulary IDs that we care about. 

53 """ 

54 ICD10CM = "ICD10CM" 

55 ICD9CM = "ICD9CM" 

56 OPCS4 = "OPCS4" 

57 SNOMED = "SNOMED" 

58 

59 

60class AthenaRelationshipId(object): 

61 r""" 

62 Constant-holding class for Athena relationship IDs that we care about. 

63 To show all (there are lots!): 

64 

65 .. code-block:: bash 

66 

67 awk 'BEGIN {FS="\t"}; {print $3}' CONCEPT_RELATIONSHIP.csv | sort -u 

68 

69 """ 

70 IS_A = "Is a" # "is a child of" 

71 MAPS_TO = "Maps to" # converting between vocabularies 

72 MAPPED_FROM = "Mapped from" # converting between vocabularies 

73 SUBSUMES = "Subsumes" # "is a parent of" 

74 

75 

76# ----------------------------------------------------------------------------- 

77# TSV row info classes 

78# ----------------------------------------------------------------------------- 

79 

80class AthenaConceptRow(object): 

81 """ 

82 Simple information-holding class for ``CONCEPT.csv`` file from 

83 https://athena.ohdsi.org/ vocabulary download. 

84 """ 

85 HEADER = [ 

86 "concept_id", "concept_name", "domain_id", "vocabulary_id", 

87 "concept_class_id", "standard_concept", "concept_code", 

88 "valid_start_date", "valid_end_date", "invalid_reason" 

89 ] 

90 

91 def __init__(self, 

92 concept_id: str, 

93 concept_name: str, 

94 domain_id: str, 

95 vocabulary_id: str, 

96 concept_class_id: str, 

97 standard_concept: str, 

98 concept_code: str, 

99 valid_start_date: str, 

100 valid_end_date: str, 

101 invalid_reason: str) -> None: 

102 """ 

103 Argument order is important. 

104 

105 Args: 

106 concept_id: Athena concept ID 

107 concept_name: Concept name in the originating system 

108 domain_id: e.g. "Observation", "Condition" 

109 vocabulary_id: e.g. "SNOMED", "ICD10CM" 

110 concept_class_id: e.g. "Substance", "3-char nonbill code" 

111 standard_concept: ?; e.g. "S" 

112 concept_code: concept code in the vocabulary (e.g. SNOMED-CT 

113 concept code like "3578611000001105" if vocabulary_id is 

114 "SNOMED"; ICD-10 code like "F32.2" if vocabulary_is is 

115 "ICD10CM"; etc.) 

116 valid_start_date: date in YYYYMMDD format 

117 valid_end_date: date in YYYYMMDD format 

118 invalid_reason: ? (but one can guess) 

119 """ 

120 self.concept_id = int(concept_id) 

121 self.concept_name = concept_name 

122 self.domain_id = domain_id 

123 self.vocabulary_id = vocabulary_id 

124 self.concept_class_id = concept_class_id 

125 self.standard_concept = standard_concept 

126 self.concept_code = concept_code 

127 self.valid_start_date = valid_start_date 

128 self.valid_end_date = valid_end_date 

129 self.invalid_reason = invalid_reason 

130 # self.sort_context_concept_to_match = None 

131 

132 def __repr__(self) -> str: 

133 return simple_repr(self, self.HEADER) 

134 

135 def __str__(self) -> str: 

136 return ( 

137 f"Vocabulary {self.vocabulary_id}, concept {self.concept_code} " 

138 f"({self.concept_name}) -> Athena concept {self.concept_id}" 

139 ) 

140 

141 # I looked at sorting them to find the best. Not wise; would need human 

142 # review. Just use all valid codes. 

143 

144 _ = ''' 

145 

146 def set_sort_context_concept_to_match(self, 

147 concept: "AthenaConceptRow") -> None: 

148 self.sort_context_concept_to_match = concept 

149 

150 def __lt__(self, other: "AthenaConceptRow") -> bool: 

151 """ 

152 Compares using "less than" being equivalent to "preferable to". 

153 

154 So, returns True if "self" is better than other, and False if "self" is 

155 worse than other; that is, all tests look like "return self is better 

156 than other". 

157 

158 BINNED. We will use human judgement. 

159 """ 

160 invalid_s = bool(self.invalid_reason) 

161 invalid_o = bool(other.invalid_reason) 

162 if invalid_s != invalid_o: 

163 # better not to have an "invalid" reason; 

164 # empty strings are "less than" full ones 

165 return invalid_s < invalid_o 

166 if self.valid_end_date != other.valid_end_date: 

167 # better to have a later end date 

168 return self.valid_end_date > other.valid_end_date 

169 if self.valid_start_date != other.valid_start_date: 

170 # better to have an earlier start date 

171 return self.valid_start_date < other.valid_end_date 

172 if self.sort_context_concept_to_match: 

173 # Which is closer to our target context? 

174 c = self.sort_context_concept_to_match 

175 sp = self.match_tuple(c) 

176 op = other.match_tuple(c) 

177 log.info( 

178 "Tie-breaking to match {c}: {s} ({sp} points) vs " 

179 "{o} ({op} points)", 

180 s=self, sp=sp, o=other, op=op, c=c 

181 ) 

182 # More matching points is better 

183 return self.match_tuple(c) > other.match_tuple(c) 

184 log.warning("Tie-breaking {} and {} by ID", self, other) 

185 # Arbitrarily, better to have an earlier (lower) concept ID. 

186 return self.concept_id < other.concept_id 

187 

188 def match_tuple(self, target: "AthenaConceptRow") -> Tuple[float, float]: 

189 """ 

190 Returns a score reflecting our similarity to the target. 

191 

192 See 

193 

194 - https://stackoverflow.com/questions/8897593/similarity-between-two-text-documents 

195 - https://stackoverflow.com/questions/2380394/simple-implementation-of-n-gram-tf-idf-and-cosine-similarity-in-python 

196 - https://spacy.io/usage/vectors-similarity -- data not included 

197 - https://radimrehurek.com/gensim/index.html 

198 - https://radimrehurek.com/gensim/tut3.html 

199 - https://scikit-learn.org/stable/ 

200 - https://www.nltk.org/ 

201 

202 BINNED. We will use human judgement. 

203 """ # noqa 

204 self_words = set(x.lower() for x in self.concept_name.split()) 

205 other_words = set(x.lower() for x in target.concept_name.split()) 

206 # More matching words better 

207 n_matching_words = len(self_words & other_words) 

208 # More words better (often more specific) 

209 n_words = len(self_words) 

210 return float(n_matching_words), float(n_words) 

211 

212 ''' 

213 

214 def snomed_concept(self) -> SnomedConcept: 

215 """ 

216 Assuming this Athena concept reflects a SnomedConcept, returns it. 

217 

218 (Asserts if it isn't.) 

219 """ 

220 assert self.vocabulary_id == AthenaVocabularyId.SNOMED 

221 return SnomedConcept(int(self.concept_code), self.concept_name) 

222 

223 

224class AthenaConceptRelationshipRow(object): 

225 """ 

226 Simple information-holding class for ``CONCEPT_RELATIONSHIP.csv`` file from 

227 https://athena.ohdsi.org/ vocabulary download. 

228 """ 

229 HEADER = [ 

230 "concept_id_1", "concept_id_2", "relationship_id", 

231 "valid_start_date", "valid_end_date", "invalid_reason", 

232 ] 

233 

234 def __init__(self, 

235 concept_id_1: str, 

236 concept_id_2: str, 

237 relationship_id: str, 

238 valid_start_date: str, 

239 valid_end_date: str, 

240 invalid_reason: str) -> None: 

241 """ 

242 Argument order is important. 

243 

244 Args: 

245 concept_id_1: Athena concept ID #1 

246 concept_id_2: Athena concept ID #2 

247 relationship_id: e.g. "Is a", "Has legal category" 

248 valid_start_date: date in YYYYMMDD format 

249 valid_end_date: date in YYYYMMDD format 

250 invalid_reason: ? (but one can guess) 

251 """ 

252 self.concept_id_1 = int(concept_id_1) 

253 self.concept_id_2 = int(concept_id_2) 

254 self.relationship_id = relationship_id 

255 self.valid_start_date = valid_start_date 

256 self.valid_end_date = valid_end_date 

257 self.invalid_reason = invalid_reason 

258 

259 def __repr__(self) -> str: 

260 return simple_repr(self, self.HEADER) 

261 

262 def __str__(self) -> str: 

263 return ( 

264 f"Athena concept relationship {self.concept_id_1} " 

265 f"{self.relationship_id!r} {self.concept_id_2}" 

266 ) 

267 

268 

269# ----------------------------------------------------------------------------- 

270# Fetch data from TSV files 

271# ----------------------------------------------------------------------------- 

272 

273# noinspection DuplicatedCode 

274def get_athena_concepts( 

275 tsv_filename: str = "", 

276 cached_concepts: Iterable[AthenaConceptRow] = None, 

277 vocabulary_ids: Collection[str] = None, 

278 concept_codes: Collection[str] = None, 

279 concept_ids: Collection[int] = None, 

280 not_vocabulary_ids: Collection[str] = None, 

281 not_concept_codes: Collection[str] = None, 

282 not_concept_ids: Collection[int] = None, 

283 encoding: str = "utf-8") -> List[AthenaConceptRow]: 

284 """ 

285 From the Athena ``CONCEPT.csv`` tab-separated value file, return a list 

286 of concepts matching the restriction criteria. 

287 

288 Args: 

289 tsv_filename: 

290 filename 

291 cached_concepts: 

292 alternative to tsv_filename 

293 vocabulary_ids: 

294 permissible ``vocabulary_id`` values, or None or an empty list for 

295 all 

296 concept_codes: 

297 permissible ``concept_code`` values, or None or an empty list for 

298 all 

299 concept_ids: 

300 permissible ``concept_id`` values, or None or an empty list for all 

301 not_vocabulary_ids: 

302 impermissible ``vocabulary_id`` values, or None or an empty list 

303 for none 

304 not_concept_codes: 

305 impermissible ``concept_code`` values, or None or an empty list for 

306 none 

307 not_concept_ids: 

308 impermissible ``concept_id`` values, or None or an empty list for 

309 none 

310 encoding: 

311 encoding for input files 

312 

313 Returns: 

314 list: of :class:`AthenaConceptRow` objects 

315 

316 Test and timing code: 

317 

318 .. code-block:: python 

319 

320 import logging 

321 import timeit 

322 logging.basicConfig(level=logging.DEBUG) 

323  

324 from cardinal_pythonlib.athena_ohdsi import ( 

325 get_athena_concepts, 

326 get_athena_concept_relationships, 

327 ) 

328  

329 concept_filename = "CONCEPT.csv" 

330 cr_filename = "CONCEPT_RELATIONSHIP.csv" 

331 testcode = "175898006" 

332 testid = 46067884 

333  

334 concept_testcode = ''' 

335 get_athena_concepts(concept_filename, concept_codes=[testcode]) 

336 ''' 

337 cr_testcode = ''' 

338 get_athena_concept_relationships(cr_filename, concept_id_1_values=[testid]) 

339 ''' 

340  

341 timeit.timeit(cr_testcode, number=1, globals=globals()) 

342 # Initial method: 33.6 s (for 9.9m rows on a Windows laptop). 

343 # Chain of generators: 21.5 s. Better. 

344  

345 timeit.timeit(concept_testcode, number=1, globals=globals()) 

346 # After speedup: 3.9 s for 1.1m rows. 

347 

348 """ # noqa 

349 assert bool(tsv_filename) != bool(cached_concepts), ( 

350 "Specify either tsv_filename or cached_concepts" 

351 ) 

352 n_rows_read = 0 

353 

354 def gen_rows() -> Generator[AthenaConceptRow, None, None]: 

355 nonlocal n_rows_read 

356 with open(tsv_filename, 'r', encoding=encoding) as tsvin: 

357 reader = csv.reader(tsvin, delimiter="\t") 

358 header = next(reader, None) 

359 if header != AthenaConceptRow.HEADER: 

360 raise ValueError( 

361 f"Athena concept file has unexpected header: {header!r}; " 

362 f"expected {AthenaConceptRow.HEADER!r}") 

363 for row in reader: 

364 n_rows_read += 1 

365 concept = AthenaConceptRow(*row) 

366 yield concept 

367 

368 def filter_vocab(concepts_: Iterable[AthenaConceptRow]) \ 

369 -> Generator[AthenaConceptRow, None, None]: 

370 for concept in concepts_: 

371 if concept.vocabulary_id in vocabulary_ids: 

372 yield concept 

373 

374 def filter_code(concepts_: Iterable[AthenaConceptRow]) \ 

375 -> Generator[AthenaConceptRow, None, None]: 

376 for concept in concepts_: 

377 if concept.concept_code in concept_codes: 

378 yield concept 

379 

380 def filter_id(concepts_: Iterable[AthenaConceptRow]) \ 

381 -> Generator[AthenaConceptRow, None, None]: 

382 for concept in concepts_: 

383 if concept.concept_id in concept_ids: 

384 yield concept 

385 

386 def filter_not_vocab(concepts_: Iterable[AthenaConceptRow]) \ 

387 -> Generator[AthenaConceptRow, None, None]: 

388 for concept in concepts_: 

389 if concept.vocabulary_id not in not_vocabulary_ids: 

390 yield concept 

391 

392 def filter_not_code(concepts_: Iterable[AthenaConceptRow]) \ 

393 -> Generator[AthenaConceptRow, None, None]: 

394 for concept in concepts_: 

395 if concept.concept_code not in not_concept_codes: 

396 yield concept 

397 

398 def filter_not_id(concepts_: Iterable[AthenaConceptRow]) \ 

399 -> Generator[AthenaConceptRow, None, None]: 

400 for concept in concepts_: 

401 if concept.concept_id not in not_concept_ids: 

402 yield concept 

403 

404 # Build up the fastest pipeline we can. 

405 if tsv_filename: 

406 log.info(f"Loading Athena concepts from file: {tsv_filename}") 

407 gen = gen_rows() 

408 else: 

409 log.info("Using cached Athena concepts") 

410 gen = cached_concepts 

411 # Positive checks 

412 if vocabulary_ids: 

413 gen = filter_vocab(gen) 

414 if concept_codes: 

415 gen = filter_code(gen) 

416 if concept_ids: 

417 gen = filter_id(gen) 

418 # Negative checks 

419 if not_vocabulary_ids: 

420 gen = filter_not_vocab(gen) 

421 if not_concept_codes: 

422 gen = filter_not_code(gen) 

423 if not_concept_ids: 

424 gen = filter_not_id(gen) 

425 

426 concepts = list(concept for concept in gen) 

427 log.debug(f"Retrieved {len(concepts)} concepts from {n_rows_read} rows") 

428 return concepts 

429 

430 

431# noinspection DuplicatedCode 

432def get_athena_concept_relationships( 

433 tsv_filename: str = "", 

434 cached_concept_relationships: Iterable[AthenaConceptRelationshipRow] = None, # noqa 

435 concept_id_1_values: Collection[int] = None, 

436 concept_id_2_values: Collection[int] = None, 

437 relationship_id_values: Collection[str] = None, 

438 not_concept_id_1_values: Collection[int] = None, 

439 not_concept_id_2_values: Collection[int] = None, 

440 not_relationship_id_values: Collection[str] = None, 

441 encoding: str = "utf-8") \ 

442 -> List[AthenaConceptRelationshipRow]: 

443 """ 

444 From the Athena ``CONCEPT_RELATIONSHIP.csv`` tab-separated value file, 

445 return a list of relationships matching the restriction criteria. 

446 

447 Args: 

448 tsv_filename: 

449 filename 

450 cached_concept_relationships: 

451 alternative to tsv_filename 

452 concept_id_1_values: 

453 permissible ``concept_id_1`` values, or None or an empty list for 

454 all 

455 concept_id_2_values: 

456 permissible ``concept_id_2`` values, or None or an empty list for 

457 all 

458 relationship_id_values: 

459 permissible ``relationship_id`` values, or None or an empty list 

460 for all 

461 not_concept_id_1_values: 

462 impermissible ``concept_id_1`` values, or None or an empty list for 

463 none 

464 not_concept_id_2_values: 

465 impermissible ``concept_id_2`` values, or None or an empty list for 

466 none 

467 not_relationship_id_values: 

468 impermissible ``relationship_id`` values, or None or an empty list 

469 for none 

470 encoding: 

471 encoding for input files 

472 

473 Returns: 

474 list: of :class:`AthenaConceptRelationshipRow` objects 

475 

476 """ 

477 assert bool(tsv_filename) != bool(cached_concept_relationships), ( 

478 "Specify either tsv_filename or cached_concept_relationships" 

479 ) 

480 n_rows_read = 0 

481 

482 def gen_rows() -> Generator[AthenaConceptRelationshipRow, None, None]: 

483 nonlocal n_rows_read 

484 with open(tsv_filename, 'r', encoding=encoding) as tsvin: 

485 reader = csv.reader(tsvin, delimiter="\t") 

486 header = next(reader, None) 

487 if header != AthenaConceptRelationshipRow.HEADER: 

488 raise ValueError( 

489 f"Athena concept relationship file has unexpected header: " 

490 f"{header!r}; expected " 

491 f"{AthenaConceptRelationshipRow.HEADER!r}") 

492 for row in reader: 

493 n_rows_read += 1 

494 rel = AthenaConceptRelationshipRow(*row) 

495 yield rel 

496 

497 def filter_rel(rels: Iterable[AthenaConceptRelationshipRow]) \ 

498 -> Generator[AthenaConceptRelationshipRow, None, None]: 

499 for rel in rels: 

500 if rel.relationship_id in relationship_id_values: 

501 yield rel 

502 

503 def filter_c1(rels: Iterable[AthenaConceptRelationshipRow]) \ 

504 -> Generator[AthenaConceptRelationshipRow, None, None]: 

505 for rel in rels: 

506 if rel.concept_id_1 in concept_id_1_values: 

507 yield rel 

508 

509 def filter_c2(rels: Iterable[AthenaConceptRelationshipRow]) \ 

510 -> Generator[AthenaConceptRelationshipRow, None, None]: 

511 for rel in rels: 

512 if rel.concept_id_2 in concept_id_2_values: 

513 yield rel 

514 

515 def filter_not_rel(rels: Iterable[AthenaConceptRelationshipRow]) \ 

516 -> Generator[AthenaConceptRelationshipRow, None, None]: 

517 for rel in rels: 

518 if rel.relationship_id not in not_relationship_id_values: 

519 yield rel 

520 

521 def filter_not_c1(rels: Iterable[AthenaConceptRelationshipRow]) \ 

522 -> Generator[AthenaConceptRelationshipRow, None, None]: 

523 for rel in rels: 

524 if rel.concept_id_1 not in not_concept_id_1_values: 

525 yield rel 

526 

527 def filter_not_c2(rels: Iterable[AthenaConceptRelationshipRow]) \ 

528 -> Generator[AthenaConceptRelationshipRow, None, None]: 

529 for rel in rels: 

530 if rel.concept_id_2 not in not_concept_id_2_values: 

531 yield rel 

532 

533 # Build up the fastest pipeline we can. 

534 if tsv_filename: 

535 log.info(f"Loading Athena concept relationships from file: " 

536 f"{tsv_filename}") 

537 gen = gen_rows() 

538 else: 

539 log.info("Using cached Athena concept relationships") 

540 gen = cached_concept_relationships 

541 # Positive checks 

542 if relationship_id_values: 

543 gen = filter_rel(gen) 

544 if concept_id_1_values: 

545 gen = filter_c1(gen) 

546 if concept_id_2_values: 

547 gen = filter_c2(gen) 

548 # Negative checks 

549 if not_relationship_id_values: 

550 gen = filter_not_rel(gen) 

551 if not_concept_id_1_values: 

552 gen = filter_not_c1(gen) 

553 if not_concept_id_2_values: 

554 gen = filter_not_c2(gen) 

555 

556 relationships = list(rel for rel in gen) 

557 log.debug(f"Retrieved {len(relationships)} relationships from " 

558 f"{n_rows_read} rows") 

559 return relationships