multitax.multitax
1from multitax.utils import ( 2 join_check, 3 check_no_file, 4 filter_function, 5 reverse_dict, 6 check_file, 7 close_files, 8 download_files, 9 open_files, 10 check_dir, 11) 12from collections import Counter 13from datetime import datetime 14from pylca.pylca import LCA 15 16 17class MultiTax(object): 18 _default_version = "current" 19 20 _supported_versions = ["current"] 21 _default_urls = {} 22 _default_root_node = "1" 23 _standard_ranks = [ 24 "domain", 25 "phylum", 26 "class", 27 "order", 28 "family", 29 "genus", 30 "species", 31 ] 32 33 def __init__( 34 self, 35 version: str = None, 36 files: list = None, 37 urls: list = None, 38 output_prefix: str = None, 39 root_node: str = None, 40 root_parent: str = "0", 41 root_name: str = None, 42 root_rank: str = None, 43 undefined_node: str = None, 44 undefined_name: str = None, 45 undefined_rank: str = None, 46 build_name_nodes: bool = False, 47 build_node_children: bool = False, 48 build_rank_nodes: bool = False, 49 extended_names: bool = False, 50 empty: bool = False, 51 ): 52 """ 53 Main constructor of MultiTax and sub-classes 54 55 Parameters: 56 * **version** *[str]*: Version to download/parse or custom version name (with files/urls). 57 * **files** *[str, list]*: One or more local files to parse. 58 * **urls** *[str, list]*: One or more urls to download and parse. 59 * **output_prefix** *[str]*: Directory to write downloaded files. 60 * **root_node** *[str]*: Define an alternative root node. 61 * **root_parent** *[str]*: Define the root parent node identifier. 62 * **root_name** *[str]*: Define an alternative root name. Set to None to use original name. 63 * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name. 64 * **undefined_node** *[str]*: Define a default return value for undefined nodes. 65 * **undefined_name** *[str]*: Define a default return value for undefined names. 66 * **undefined_rank** *[str]*: Define a default return value for undefined ranks. 67 * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use). 68 * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use). 69 * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use). 70 * **extended_names** *[bool]*: Parse extended names if available. 71 * **empty** *[bool]*: Create an empty instance. 72 73 Example: 74 75 tax_ncbi = NcbiTx() 76 tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"]) 77 tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"]) 78 tax_ott = OttTx(root_node="844192") 79 tax_gg = GreengenesTx(output_prefix="save/to/prefix_") 80 """ 81 if files: 82 if isinstance(files, str): 83 files = [files] 84 for file in files: 85 check_file(file) 86 87 if output_prefix: 88 check_dir(output_prefix) 89 90 # Main structures 91 self._nodes = {} 92 self._ranks = {} 93 self._names = {} 94 95 # Aux. structures 96 self._lineages = {} 97 self._name_nodes = {} 98 self._node_children = {} 99 self._rank_nodes = {} 100 self._translated_nodes = {} 101 self._lca = None 102 103 # Properties 104 self.datetime = datetime.now() 105 self.version = None 106 self.undefined_node = undefined_node 107 self.undefined_name = undefined_name 108 self.undefined_rank = undefined_rank 109 110 # Set version 111 if files or urls: 112 self.version = version 113 else: 114 self.version = self._default_version if not version else version 115 if self.version not in self._supported_versions: 116 raise ValueError( 117 f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls." 118 ) 119 120 # Store source of tax files (url or file) 121 self.sources = [] 122 123 if not empty: 124 # Open/Download/Write files 125 fhs = {} 126 if files: 127 fhs = open_files(files) 128 elif urls or self._default_urls.get(self.version): 129 fhs = download_files( 130 urls=urls if urls else self._default_urls[self.version], 131 output_prefix=output_prefix, 132 retry_attempts=3, 133 ) 134 135 if fhs: 136 # Parse taxonomy 137 self._nodes, self._ranks, self._names = self._parse( 138 fhs, extended_names=extended_names 139 ) 140 close_files(fhs) 141 # Save sources for stats (files or urls) 142 self.sources = list(fhs.keys()) 143 144 # Set root values 145 self._set_root_node( 146 root=root_node if root_node else self._default_root_node, 147 parent=root_parent, 148 name=root_name, 149 rank=root_rank, 150 ) 151 152 # build auxiliary structures 153 if build_node_children: 154 self._node_children = reverse_dict(self._nodes) 155 if build_name_nodes: 156 self._name_nodes = reverse_dict(self._names) 157 if build_rank_nodes: 158 self._rank_nodes = reverse_dict(self._ranks) 159 160 self.check_consistency() 161 162 def _exact_name(self, text: str, names: dict): 163 """ 164 Returns list of nodes of a given exact name (case sensitive). 165 """ 166 if text in names: 167 return names[text] 168 else: 169 return [] 170 171 def _parse(self, fhs: dict): 172 """ 173 main function to be overloaded 174 receives a dictionary with {"url/file": file handler} 175 return nodes, ranks and names dicts 176 """ 177 return {}, {}, {} 178 179 def _partial_name(self, text: str, names: dict): 180 """ 181 Searches names containing a certain text (case sensitive) and return their respective nodes. 182 """ 183 matching_nodes = set() 184 for name in names: 185 if text in name: 186 matching_nodes.update(names[name]) 187 return list(matching_nodes) 188 189 def _recurse_leaves(self, node: str): 190 """ 191 Recursive function returning leaf nodes 192 """ 193 children = self.children(node) 194 if not children: 195 return [node] 196 leaves = [] 197 for child in children: 198 leaves.extend(self._recurse_leaves(child)) 199 return leaves 200 201 def _remove(self, node: str): 202 """ 203 Removes node from taxonomy, no checking, for internal use 204 """ 205 del self._nodes[node] 206 if node in self._names: 207 del self._names[node] 208 if node in self._ranks: 209 del self._ranks[node] 210 211 def _reset_aux_data(self): 212 """ 213 Reset aux. data structures 214 """ 215 self._lineages = {} 216 self._name_nodes = {} 217 self._node_children = {} 218 self._rank_nodes = {} 219 self._translated_nodes = {} 220 self._lca = None 221 222 def _set_root_node(self, root: str, parent: str, name: str, rank: str): 223 """ 224 Set root node of the tree. 225 The files are parsed based on the self._default_root_node for each class 226 A user-defined root node can be: 227 1) internal: will filter the tree acodingly and delete the default root_node 228 2) external: will add node and link to the default 229 """ 230 231 # Set parent/root with defaults 232 self.root_parent = parent 233 self.root_node = self._default_root_node 234 self._nodes[self.root_node] = self.root_parent 235 236 # Default root node is the top by definition 237 if root != self._default_root_node: 238 if root in self._nodes: 239 # Not default but exists on tree, filter only descendants 240 self.filter(root, desc=True) 241 # Remove entry for _default_root_node 242 self._remove(self._default_root_node) 243 else: 244 # Not on tree, link default node with new root 245 self._nodes[self._default_root_node] = root 246 # Change root to user defined 247 self.root_node = root 248 # Set/Update new root node parent link 249 self._nodes[self.root_node] = self.root_parent 250 251 # User-defined rank/name. 252 # If provided, insert manually, 253 # If None, check if is in the tree (defined in the given tax) 254 # otherwise insert default "root" 255 if name: 256 self._names[self.root_node] = name 257 elif self.root_node not in self._names: 258 self._names[self.root_node] = "root" 259 # Set static name 260 self.root_name = self._names[self.root_node] 261 262 if rank: 263 self._ranks[self.root_node] = rank 264 elif self.root_node not in self._ranks: 265 self._ranks[self.root_node] = "root" 266 # Set static rank 267 self.root_rank = self._ranks[self.root_node] 268 269 def add(self, node: str, parent: str, name: str = None, rank: str = None): 270 """ 271 Adds node to taxonomy. 272 Deletes built lineages, translations and lca. 273 """ 274 if parent not in self._nodes: 275 raise ValueError("Parent node [" + parent + "] not found.") 276 elif node in self._nodes: 277 raise ValueError("Node [" + node + "] already present.") 278 279 self._nodes[node] = parent 280 self._names[node] = name if name is not None else self.undefined_name 281 self._ranks[node] = rank if rank is not None else self.undefined_rank 282 self._reset_aux_data() 283 284 def build_lca(self): 285 """ 286 Builds LCA structure based on pylca. 287 Optional function, LCA is built on first .lca() call. 288 289 Returns: None 290 """ 291 self._lca = LCA(self._nodes) 292 293 def build_lineages(self, root_node: str = None, ranks: list = None): 294 """ 295 Stores lineages in memory for faster access. 296 It is valid for lineage(), rank_lineage() and name_lineage(). 297 If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used. 298 299 Returns: None 300 """ 301 self.clear_lineages() 302 for node in self._nodes: 303 self._lineages[node] = self.lineage( 304 node=node, root_node=root_node, ranks=ranks 305 ) 306 307 def build_translation( 308 self, tax, representatives: bool = False, file: str = None, url: str = None 309 ): 310 """ 311 Create a translation of current taxonomy to another 312 313 Parameters: 314 315 * **tax** [MultiTax]: A target taxonomy to be translated to. 316 * **representatives** *[bool]*: Use only GTDB representative genomes to translate nodes. 317 * **file** *[str]*: Local file to parse. 318 * **url** *[str]*: Url to download and parse. 319 320 Example: 321 322 from multitax import GtdbTx, NcbiTx 323 gtdb_tax = GtdbTx() 324 ncbi_tax = NcbiTx() 325 326 # Automatically download translation files 327 gtdb_tax.build_translation(ncbi_tax) 328 gtdb_tax.translate("g__Escherichia") 329 ['561', '620', '590', '1224', '194', '543', '547', '570', '186803', '2005523', '841', '2', '1485', '2159', '216572', '1301', '128827', '815', '239759', '2791015', '1263', '1472649', '816', '171549', '2005473', '33024'] 330 331 # Using local file from https://github.com/pirovc/multitax/tree/main/data/gtdb 332 ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz") 333 ncbi_tax.translate("620") 334 {'g__Escherichia', 'g__Proteus', 'g__Serratia'} 335 336 # Translation based on GTDB representative genome only 337 gtdb_tax.build_translation(ncbi_tax, representatives=True) 338 gtdb_tax.translate("g__Escherichia") 339 ['561', '547'] 340 """ 341 if file: 342 check_file(file) 343 344 self._translated_nodes = self._build_translation( 345 tax, representatives, file, url 346 ) 347 348 def children(self, node: str): 349 """ 350 Returns list of direct children nodes of a given node. 351 """ 352 # Setup on first use 353 if not self._node_children: 354 self._node_children = reverse_dict(self._nodes) 355 if node in self._node_children: 356 return self._node_children[node] 357 else: 358 return [] 359 360 def check_consistency(self): 361 """ 362 Checks consistency of the tree 363 364 Returns: raise an Exception otherwise None 365 """ 366 if self.root_node not in self._nodes: 367 raise ValueError("Root node [" + self.root_node + "] not found.") 368 if self.root_parent in self._nodes: 369 raise ValueError( 370 "Root parent [" 371 + self.root_parent 372 + "] found but should not be on tree." 373 ) 374 if self.undefined_node in self._nodes: 375 raise ValueError( 376 "Undefined node [" 377 + self.undefined_node 378 + "] found but should not be on tree." 379 ) 380 381 # Difference between values and keys should be only root_parent 382 lost_nodes = set(self._nodes.values()).difference(self._nodes) 383 if self.root_parent not in lost_nodes: 384 raise ValueError( 385 "Root parent [" + self.root_parent + "] not properly defined." 386 ) 387 # Remove root_parent from lost nodes to report only missing 388 lost_nodes.remove(self.root_parent) 389 if len(lost_nodes) > 0: 390 raise ValueError("Parent nodes missing: " + ",".join(lost_nodes)) 391 392 return None 393 394 def clear_lca(self): 395 """ 396 Clear built LCA. 397 398 Returns: None 399 """ 400 self._lca = None 401 402 def clear_lineages(self): 403 """ 404 Clear built lineages. 405 406 Returns: None 407 """ 408 self._lineages = {} 409 410 def closest_parent(self, node: str, ranks: str): 411 """ 412 Returns the closest parent node based on a defined list of ranks 413 """ 414 # Rank of node is already on the list 415 if self.rank(node) in ranks: 416 return node 417 else: 418 # check lineage from back to front until find a valid node 419 for n in self.lineage(node, ranks=ranks)[::-1]: 420 if n != self.undefined_node: 421 return n 422 # nothing found 423 return self.undefined_node 424 425 def filter(self, nodes: list, desc: bool = False): 426 """ 427 Filters taxonomy given a list of nodes. 428 By default keep all the ancestors of the given nodes. 429 If desc=True, keep all descendants instead. 430 Deletes built lineages, translations and lca. 431 432 Example: 433 434 from multitax import GtdbTx 435 tax = GtdbTx() 436 437 tax.lineage('s__Enterovibrio marina') 438 # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina'] 439 # Keep only ancestors of 'g__Enterovibrio' 440 tax.filter('g__Enterovibrio') 441 442 # Reload taxonomy 443 tax = GtdbTx() 444 # Keep only descendants of 'g__Enterovibrio' 445 tax.filter('g__Enterovibrio', desc=True) 446 """ 447 if isinstance(nodes, str): 448 nodes = [nodes] 449 450 # Keep track of nodes to be filtered out 451 filtered_nodes = set(self._nodes) 452 # Always keep root 453 filtered_nodes.discard(self.root_node) 454 455 if desc: 456 # Keep descendants of the given nodes 457 for node in nodes: 458 # Check if node exists (skips root) 459 if node in filtered_nodes: 460 # For each leaf of the selected nodes 461 for leaf in self.leaves(node): 462 # Build lineage of each leaf up-to node itself 463 for n in self.lineage(leaf, root_node=node): 464 # Discard nodes from set to be kept 465 filtered_nodes.discard(n) 466 # Link node to root 467 self._nodes[node] = self.root_node 468 else: 469 # Keep ancestors of the given nodes (full lineage up-to root) 470 for node in nodes: 471 # ranks=[] in case build_lineages() was used with specific ranks 472 for n in self.lineage(node, ranks=[]): 473 # Discard nodes from set to be kept 474 filtered_nodes.discard(n) 475 476 # Delete filtered nodes 477 for node in filtered_nodes: 478 self._remove(node) 479 480 # Delete aux. data structures 481 self._reset_aux_data() 482 self.check_consistency() 483 484 @classmethod 485 def from_customtx(cls, ctx): 486 """ 487 Initialize a Tx sub-class based on a CustomTx instance. 488 489 Example: 490 491 tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"]) 492 tax_ncbi = NcbiTx.from_customtx(tax_custom) 493 """ 494 nc = cls(empty=True) 495 nc.version = ctx.version 496 nc.sources = ctx.sources 497 nc._nodes = ctx._nodes 498 nc._names = ctx._names 499 nc._ranks = ctx._ranks 500 return nc 501 502 def latest(self, node: str): 503 """ 504 Returns latest/updated version of a given node. 505 If node is already the latests, returns itself. 506 Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv) 507 """ 508 if node in self._nodes: 509 return node 510 else: 511 return self.undefined_node 512 513 def leaves(self, node: str = None): 514 """ 515 Returns a list of leaf nodes of a given node. 516 """ 517 if node is None or node == self.root_node: 518 # Leaves are nodes not contained in _nodes.values() ("parents") 519 return list(set(self._nodes).difference(self._nodes.values())) 520 elif node in self._nodes: 521 return self._recurse_leaves(node) 522 else: 523 return [] 524 525 def lca(self, nodes: list = None): 526 """ 527 Returns the lowest common ancestor of two or more nodes. 528 529 Example: 530 531 from multitax import GtdbTx 532 tax = GtdbTx() 533 tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"]) 534 """ 535 for node in nodes: 536 if node not in self._nodes: 537 raise ValueError("Node [" + node + "] not found.") 538 539 # Setup on first use 540 if not self._lca: 541 self.build_lca() 542 543 return self._lca(*nodes) 544 545 def lineage(self, node: str, root_node: str = None, ranks: list = None): 546 """ 547 Returns a list with the lineage of a given node. 548 If ranks is provided, returns only nodes annotated with such ranks. 549 If root_node is provided, use it instead of default root of tree. 550 """ 551 # If lineages were built with build_lineages() with matching params 552 if node in self._lineages and root_node is None and ranks is None: 553 return self._lineages[node] 554 else: 555 if not root_node: 556 root_node = self.root_node 557 558 n = node 559 if ranks: 560 # Fixed length lineage 561 lin = [self.undefined_node] * len(ranks) 562 # Loop until end of the tree (in case chosen root is not on lineage) 563 while n != self.undefined_node: 564 r = self.rank(n) 565 if r in ranks: 566 lin[ranks.index(r)] = n 567 # If node is root, break (after adding) 568 if n == root_node: 569 break 570 n = self.parent(n) 571 else: 572 # Full lineage 573 lin = [] 574 # Loop until end of the tree (in case chosen root is not on lineage) 575 while n != self.undefined_node: 576 lin.append(n) 577 # If node is root, break (after adding) 578 if n == root_node: 579 break 580 n = self.parent(n) 581 # Reverse order 582 lin = lin[::-1] 583 584 # last iteration node (n) != root_node: didn't find the root, invalid lineage 585 if n != root_node: 586 return [] 587 else: 588 return lin 589 590 def name(self, node: str): 591 """ 592 Returns name of a given node. 593 """ 594 if node in self._names: 595 return self._names[node] 596 else: 597 return self.undefined_name 598 599 def name_lineage(self, node: str, root_node: str = None, ranks: list = None): 600 """ 601 Returns a list with the name lineage of a given node. 602 """ 603 return list( 604 map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks)) 605 ) 606 607 def nodes_rank(self, rank: str): 608 """ 609 Returns list of nodes of a given rank. 610 """ 611 # Setup on first use 612 if not self._rank_nodes: 613 self._rank_nodes = reverse_dict(self._ranks) 614 if rank in self._rank_nodes: 615 return self._rank_nodes[rank] 616 else: 617 return [] 618 619 def parent(self, node: str): 620 """ 621 Returns the direct parent node of a given node. 622 """ 623 if node in self._nodes: 624 return self._nodes[node] 625 else: 626 return self.undefined_node 627 628 def parent_rank(self, node: str, rank: str): 629 """ 630 Returns the parent node of a given rank in the specified rank. 631 """ 632 parent = self.lineage(node=node, ranks=[rank]) 633 return parent[0] if parent else self.undefined_node 634 635 def prune(self, nodes: list): 636 """ 637 Prunes branches of the tree under the given nodes. 638 Deletes built lineages, translations and lca. 639 """ 640 641 if isinstance(nodes, str): 642 nodes = [nodes] 643 644 del_nodes = set() 645 for node in nodes: 646 if node not in self._nodes: 647 raise ValueError("Node [" + node + "] not found.") 648 for leaf in self.leaves(node): 649 for n in self.lineage(leaf, root_node=node)[1:]: 650 del_nodes.add(n) 651 652 for n in del_nodes: 653 self._remove(n) 654 655 self._reset_aux_data() 656 657 def rank(self, node: str): 658 """ 659 Returns the rank of a given node. 660 """ 661 if node in self._ranks: 662 return self._ranks[node] 663 else: 664 return self.undefined_rank 665 666 def rank_lineage(self, node: str, root_node: str = None, ranks: list = None): 667 """ 668 Returns a list with the rank lineage of a given node. 669 """ 670 return list( 671 map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks)) 672 ) 673 674 def remove(self, node: str, check_consistency: bool = False): 675 """ 676 Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. 677 Running check consistency after removing a node is recommended. 678 Deletes built lineages, translations and lca. 679 """ 680 if node not in self._nodes: 681 raise ValueError("Node [" + node + "] not found.") 682 self._remove(node) 683 self._reset_aux_data() 684 if check_consistency: 685 self.check_consistency() 686 687 def search_name(self, text: str, rank: str = None, exact: bool = True): 688 """ 689 Search node by exact or partial name 690 691 Parameters: 692 * **text** *[str]*: Text to search. 693 * **rank** *[str]*: Filter results by rank. 694 * **exact** *[bool]*: Exact or partial name search (both case sensitive). 695 696 Returns: list of matching nodes 697 """ 698 # Setup on first use 699 if not self._name_nodes: 700 self._name_nodes = reverse_dict(self._names) 701 702 if exact: 703 ret = self._exact_name(text, self._name_nodes) 704 else: 705 ret = self._partial_name(text, self._name_nodes) 706 707 # Only return nodes of chosen rank 708 if rank: 709 return filter_function(ret, self.rank, rank) 710 else: 711 return ret 712 713 def stats(self): 714 """ 715 Returns a dict with general numbers of the taxonomic tree 716 717 Example: 718 719 from pprint import pprint 720 from multitax import GtdbTx 721 tax = GtdbTx() 722 723 pprint(tax.stats()) 724 {'leaves': 30238, 725 'names': 42739, 726 'nodes': 42739, 727 'ranked_leaves': Counter({'species': 30238}), 728 'ranked_nodes': Counter({'species': 30238, 729 'genus': 8778, 730 'family': 2323, 731 'order': 930, 732 'class': 337, 733 'phylum': 131, 734 'domain': 1, 735 'root': 1}), 736 'ranks': 42739} 737 """ 738 s = {} 739 s["nodes"] = len(self._nodes) 740 s["ranks"] = len(self._ranks) 741 s["names"] = len(self._names) 742 all_leaves = self.leaves(self.root_node) 743 s["leaves"] = len(all_leaves) 744 s["ranked_nodes"] = Counter(self._ranks.values()) 745 s["ranked_leaves"] = Counter(map(self.rank, all_leaves)) 746 return s 747 748 def translate(self, node: str, top_perc: float | None = None, counts: bool = False): 749 """ 750 Returns the translated node(s) from another taxonomy. One node may translate to none, one or several nodes. 751 `counts` additionally outputs the number of entries/genomes used to translate each node. 752 The translation have to first be generated with the `build_translation` function. 753 754 Parameters: 755 * **node** *[str]*: Node to translate. 756 * **top_perc** *[float]*: Keep translations summing up to `top_perc` of the nodes based on counts. 757 * **counts** *[bool]*: Output a sorted list of tuples with the translated node and counts. 758 759 Returns: List of translated nodes (or list of tuples with counts) 760 """ 761 if node in self._translated_nodes: 762 ret = Counter(self._translated_nodes[node]) 763 i = None 764 if top_perc: 765 total = ret.total() 766 sm = 0 767 for i, (_, cnt) in enumerate(ret.most_common(), 1): 768 sm += cnt 769 if (sm / total) >= top_perc: 770 break 771 return ret.most_common(i) if counts else [n[0] for n in ret.most_common(i)] 772 773 return [] 774 775 def write( 776 self, 777 output_file: str, 778 cols: list = ["node", "parent", "rank", "name"], 779 sep: str = "\t", 780 sep_multi: str = "|", 781 ranks: list = None, 782 gz: bool = False, 783 ): 784 """ 785 Writes loaded taxonomy to a file. 786 787 Parameters: 788 * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage" 789 * **sep** *[str]*: Separator of fields 790 * **sep_multi** *[str]*: Separator of multi-valued fields 791 * **ranks** *[list]*: Ranks to report 792 * **gz** *[bool]*: Gzip output 793 794 Returns: None 795 """ 796 import gzip 797 798 if gz: 799 output_file = ( 800 output_file if output_file.endswith(".gz") else output_file + ".gz" 801 ) 802 check_no_file(output_file) 803 outf = gzip.open(output_file, "wt") 804 else: 805 check_no_file(output_file) 806 outf = open(output_file, "w") 807 808 write_field = { 809 "node": lambda node: node, 810 "latest": self.latest, 811 "parent": self.parent, 812 "rank": self.rank, 813 "name": self.name, 814 "leaves": lambda node: join_check(self.leaves(node), sep_multi), 815 "children": lambda node: join_check(self.children(node), sep_multi), 816 "lineage": lambda node: join_check( 817 self.lineage(node, ranks=ranks), sep_multi 818 ), 819 "rank_lineage": lambda node: join_check( 820 self.rank_lineage(node, ranks=ranks), sep_multi 821 ), 822 "name_lineage": lambda node: join_check( 823 self.name_lineage(node, ranks=ranks), sep_multi 824 ), 825 } 826 827 for c in cols: 828 if c not in write_field: 829 raise ValueError( 830 "Field [" + c + "] is not valid. Options: " + ",".join(write_field) 831 ) 832 833 if ranks: 834 for rank in ranks: 835 for node in self.nodes_rank(rank): 836 print( 837 *[write_field[c](node) for c in cols], 838 sep=sep, 839 end="\n", 840 file=outf, 841 ) 842 else: 843 for node in self._nodes: 844 print( 845 *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf 846 ) 847 848 outf.close()
18class MultiTax(object): 19 _default_version = "current" 20 21 _supported_versions = ["current"] 22 _default_urls = {} 23 _default_root_node = "1" 24 _standard_ranks = [ 25 "domain", 26 "phylum", 27 "class", 28 "order", 29 "family", 30 "genus", 31 "species", 32 ] 33 34 def __init__( 35 self, 36 version: str = None, 37 files: list = None, 38 urls: list = None, 39 output_prefix: str = None, 40 root_node: str = None, 41 root_parent: str = "0", 42 root_name: str = None, 43 root_rank: str = None, 44 undefined_node: str = None, 45 undefined_name: str = None, 46 undefined_rank: str = None, 47 build_name_nodes: bool = False, 48 build_node_children: bool = False, 49 build_rank_nodes: bool = False, 50 extended_names: bool = False, 51 empty: bool = False, 52 ): 53 """ 54 Main constructor of MultiTax and sub-classes 55 56 Parameters: 57 * **version** *[str]*: Version to download/parse or custom version name (with files/urls). 58 * **files** *[str, list]*: One or more local files to parse. 59 * **urls** *[str, list]*: One or more urls to download and parse. 60 * **output_prefix** *[str]*: Directory to write downloaded files. 61 * **root_node** *[str]*: Define an alternative root node. 62 * **root_parent** *[str]*: Define the root parent node identifier. 63 * **root_name** *[str]*: Define an alternative root name. Set to None to use original name. 64 * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name. 65 * **undefined_node** *[str]*: Define a default return value for undefined nodes. 66 * **undefined_name** *[str]*: Define a default return value for undefined names. 67 * **undefined_rank** *[str]*: Define a default return value for undefined ranks. 68 * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use). 69 * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use). 70 * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use). 71 * **extended_names** *[bool]*: Parse extended names if available. 72 * **empty** *[bool]*: Create an empty instance. 73 74 Example: 75 76 tax_ncbi = NcbiTx() 77 tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"]) 78 tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"]) 79 tax_ott = OttTx(root_node="844192") 80 tax_gg = GreengenesTx(output_prefix="save/to/prefix_") 81 """ 82 if files: 83 if isinstance(files, str): 84 files = [files] 85 for file in files: 86 check_file(file) 87 88 if output_prefix: 89 check_dir(output_prefix) 90 91 # Main structures 92 self._nodes = {} 93 self._ranks = {} 94 self._names = {} 95 96 # Aux. structures 97 self._lineages = {} 98 self._name_nodes = {} 99 self._node_children = {} 100 self._rank_nodes = {} 101 self._translated_nodes = {} 102 self._lca = None 103 104 # Properties 105 self.datetime = datetime.now() 106 self.version = None 107 self.undefined_node = undefined_node 108 self.undefined_name = undefined_name 109 self.undefined_rank = undefined_rank 110 111 # Set version 112 if files or urls: 113 self.version = version 114 else: 115 self.version = self._default_version if not version else version 116 if self.version not in self._supported_versions: 117 raise ValueError( 118 f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls." 119 ) 120 121 # Store source of tax files (url or file) 122 self.sources = [] 123 124 if not empty: 125 # Open/Download/Write files 126 fhs = {} 127 if files: 128 fhs = open_files(files) 129 elif urls or self._default_urls.get(self.version): 130 fhs = download_files( 131 urls=urls if urls else self._default_urls[self.version], 132 output_prefix=output_prefix, 133 retry_attempts=3, 134 ) 135 136 if fhs: 137 # Parse taxonomy 138 self._nodes, self._ranks, self._names = self._parse( 139 fhs, extended_names=extended_names 140 ) 141 close_files(fhs) 142 # Save sources for stats (files or urls) 143 self.sources = list(fhs.keys()) 144 145 # Set root values 146 self._set_root_node( 147 root=root_node if root_node else self._default_root_node, 148 parent=root_parent, 149 name=root_name, 150 rank=root_rank, 151 ) 152 153 # build auxiliary structures 154 if build_node_children: 155 self._node_children = reverse_dict(self._nodes) 156 if build_name_nodes: 157 self._name_nodes = reverse_dict(self._names) 158 if build_rank_nodes: 159 self._rank_nodes = reverse_dict(self._ranks) 160 161 self.check_consistency() 162 163 def _exact_name(self, text: str, names: dict): 164 """ 165 Returns list of nodes of a given exact name (case sensitive). 166 """ 167 if text in names: 168 return names[text] 169 else: 170 return [] 171 172 def _parse(self, fhs: dict): 173 """ 174 main function to be overloaded 175 receives a dictionary with {"url/file": file handler} 176 return nodes, ranks and names dicts 177 """ 178 return {}, {}, {} 179 180 def _partial_name(self, text: str, names: dict): 181 """ 182 Searches names containing a certain text (case sensitive) and return their respective nodes. 183 """ 184 matching_nodes = set() 185 for name in names: 186 if text in name: 187 matching_nodes.update(names[name]) 188 return list(matching_nodes) 189 190 def _recurse_leaves(self, node: str): 191 """ 192 Recursive function returning leaf nodes 193 """ 194 children = self.children(node) 195 if not children: 196 return [node] 197 leaves = [] 198 for child in children: 199 leaves.extend(self._recurse_leaves(child)) 200 return leaves 201 202 def _remove(self, node: str): 203 """ 204 Removes node from taxonomy, no checking, for internal use 205 """ 206 del self._nodes[node] 207 if node in self._names: 208 del self._names[node] 209 if node in self._ranks: 210 del self._ranks[node] 211 212 def _reset_aux_data(self): 213 """ 214 Reset aux. data structures 215 """ 216 self._lineages = {} 217 self._name_nodes = {} 218 self._node_children = {} 219 self._rank_nodes = {} 220 self._translated_nodes = {} 221 self._lca = None 222 223 def _set_root_node(self, root: str, parent: str, name: str, rank: str): 224 """ 225 Set root node of the tree. 226 The files are parsed based on the self._default_root_node for each class 227 A user-defined root node can be: 228 1) internal: will filter the tree acodingly and delete the default root_node 229 2) external: will add node and link to the default 230 """ 231 232 # Set parent/root with defaults 233 self.root_parent = parent 234 self.root_node = self._default_root_node 235 self._nodes[self.root_node] = self.root_parent 236 237 # Default root node is the top by definition 238 if root != self._default_root_node: 239 if root in self._nodes: 240 # Not default but exists on tree, filter only descendants 241 self.filter(root, desc=True) 242 # Remove entry for _default_root_node 243 self._remove(self._default_root_node) 244 else: 245 # Not on tree, link default node with new root 246 self._nodes[self._default_root_node] = root 247 # Change root to user defined 248 self.root_node = root 249 # Set/Update new root node parent link 250 self._nodes[self.root_node] = self.root_parent 251 252 # User-defined rank/name. 253 # If provided, insert manually, 254 # If None, check if is in the tree (defined in the given tax) 255 # otherwise insert default "root" 256 if name: 257 self._names[self.root_node] = name 258 elif self.root_node not in self._names: 259 self._names[self.root_node] = "root" 260 # Set static name 261 self.root_name = self._names[self.root_node] 262 263 if rank: 264 self._ranks[self.root_node] = rank 265 elif self.root_node not in self._ranks: 266 self._ranks[self.root_node] = "root" 267 # Set static rank 268 self.root_rank = self._ranks[self.root_node] 269 270 def add(self, node: str, parent: str, name: str = None, rank: str = None): 271 """ 272 Adds node to taxonomy. 273 Deletes built lineages, translations and lca. 274 """ 275 if parent not in self._nodes: 276 raise ValueError("Parent node [" + parent + "] not found.") 277 elif node in self._nodes: 278 raise ValueError("Node [" + node + "] already present.") 279 280 self._nodes[node] = parent 281 self._names[node] = name if name is not None else self.undefined_name 282 self._ranks[node] = rank if rank is not None else self.undefined_rank 283 self._reset_aux_data() 284 285 def build_lca(self): 286 """ 287 Builds LCA structure based on pylca. 288 Optional function, LCA is built on first .lca() call. 289 290 Returns: None 291 """ 292 self._lca = LCA(self._nodes) 293 294 def build_lineages(self, root_node: str = None, ranks: list = None): 295 """ 296 Stores lineages in memory for faster access. 297 It is valid for lineage(), rank_lineage() and name_lineage(). 298 If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used. 299 300 Returns: None 301 """ 302 self.clear_lineages() 303 for node in self._nodes: 304 self._lineages[node] = self.lineage( 305 node=node, root_node=root_node, ranks=ranks 306 ) 307 308 def build_translation( 309 self, tax, representatives: bool = False, file: str = None, url: str = None 310 ): 311 """ 312 Create a translation of current taxonomy to another 313 314 Parameters: 315 316 * **tax** [MultiTax]: A target taxonomy to be translated to. 317 * **representatives** *[bool]*: Use only GTDB representative genomes to translate nodes. 318 * **file** *[str]*: Local file to parse. 319 * **url** *[str]*: Url to download and parse. 320 321 Example: 322 323 from multitax import GtdbTx, NcbiTx 324 gtdb_tax = GtdbTx() 325 ncbi_tax = NcbiTx() 326 327 # Automatically download translation files 328 gtdb_tax.build_translation(ncbi_tax) 329 gtdb_tax.translate("g__Escherichia") 330 ['561', '620', '590', '1224', '194', '543', '547', '570', '186803', '2005523', '841', '2', '1485', '2159', '216572', '1301', '128827', '815', '239759', '2791015', '1263', '1472649', '816', '171549', '2005473', '33024'] 331 332 # Using local file from https://github.com/pirovc/multitax/tree/main/data/gtdb 333 ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz") 334 ncbi_tax.translate("620") 335 {'g__Escherichia', 'g__Proteus', 'g__Serratia'} 336 337 # Translation based on GTDB representative genome only 338 gtdb_tax.build_translation(ncbi_tax, representatives=True) 339 gtdb_tax.translate("g__Escherichia") 340 ['561', '547'] 341 """ 342 if file: 343 check_file(file) 344 345 self._translated_nodes = self._build_translation( 346 tax, representatives, file, url 347 ) 348 349 def children(self, node: str): 350 """ 351 Returns list of direct children nodes of a given node. 352 """ 353 # Setup on first use 354 if not self._node_children: 355 self._node_children = reverse_dict(self._nodes) 356 if node in self._node_children: 357 return self._node_children[node] 358 else: 359 return [] 360 361 def check_consistency(self): 362 """ 363 Checks consistency of the tree 364 365 Returns: raise an Exception otherwise None 366 """ 367 if self.root_node not in self._nodes: 368 raise ValueError("Root node [" + self.root_node + "] not found.") 369 if self.root_parent in self._nodes: 370 raise ValueError( 371 "Root parent [" 372 + self.root_parent 373 + "] found but should not be on tree." 374 ) 375 if self.undefined_node in self._nodes: 376 raise ValueError( 377 "Undefined node [" 378 + self.undefined_node 379 + "] found but should not be on tree." 380 ) 381 382 # Difference between values and keys should be only root_parent 383 lost_nodes = set(self._nodes.values()).difference(self._nodes) 384 if self.root_parent not in lost_nodes: 385 raise ValueError( 386 "Root parent [" + self.root_parent + "] not properly defined." 387 ) 388 # Remove root_parent from lost nodes to report only missing 389 lost_nodes.remove(self.root_parent) 390 if len(lost_nodes) > 0: 391 raise ValueError("Parent nodes missing: " + ",".join(lost_nodes)) 392 393 return None 394 395 def clear_lca(self): 396 """ 397 Clear built LCA. 398 399 Returns: None 400 """ 401 self._lca = None 402 403 def clear_lineages(self): 404 """ 405 Clear built lineages. 406 407 Returns: None 408 """ 409 self._lineages = {} 410 411 def closest_parent(self, node: str, ranks: str): 412 """ 413 Returns the closest parent node based on a defined list of ranks 414 """ 415 # Rank of node is already on the list 416 if self.rank(node) in ranks: 417 return node 418 else: 419 # check lineage from back to front until find a valid node 420 for n in self.lineage(node, ranks=ranks)[::-1]: 421 if n != self.undefined_node: 422 return n 423 # nothing found 424 return self.undefined_node 425 426 def filter(self, nodes: list, desc: bool = False): 427 """ 428 Filters taxonomy given a list of nodes. 429 By default keep all the ancestors of the given nodes. 430 If desc=True, keep all descendants instead. 431 Deletes built lineages, translations and lca. 432 433 Example: 434 435 from multitax import GtdbTx 436 tax = GtdbTx() 437 438 tax.lineage('s__Enterovibrio marina') 439 # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina'] 440 # Keep only ancestors of 'g__Enterovibrio' 441 tax.filter('g__Enterovibrio') 442 443 # Reload taxonomy 444 tax = GtdbTx() 445 # Keep only descendants of 'g__Enterovibrio' 446 tax.filter('g__Enterovibrio', desc=True) 447 """ 448 if isinstance(nodes, str): 449 nodes = [nodes] 450 451 # Keep track of nodes to be filtered out 452 filtered_nodes = set(self._nodes) 453 # Always keep root 454 filtered_nodes.discard(self.root_node) 455 456 if desc: 457 # Keep descendants of the given nodes 458 for node in nodes: 459 # Check if node exists (skips root) 460 if node in filtered_nodes: 461 # For each leaf of the selected nodes 462 for leaf in self.leaves(node): 463 # Build lineage of each leaf up-to node itself 464 for n in self.lineage(leaf, root_node=node): 465 # Discard nodes from set to be kept 466 filtered_nodes.discard(n) 467 # Link node to root 468 self._nodes[node] = self.root_node 469 else: 470 # Keep ancestors of the given nodes (full lineage up-to root) 471 for node in nodes: 472 # ranks=[] in case build_lineages() was used with specific ranks 473 for n in self.lineage(node, ranks=[]): 474 # Discard nodes from set to be kept 475 filtered_nodes.discard(n) 476 477 # Delete filtered nodes 478 for node in filtered_nodes: 479 self._remove(node) 480 481 # Delete aux. data structures 482 self._reset_aux_data() 483 self.check_consistency() 484 485 @classmethod 486 def from_customtx(cls, ctx): 487 """ 488 Initialize a Tx sub-class based on a CustomTx instance. 489 490 Example: 491 492 tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"]) 493 tax_ncbi = NcbiTx.from_customtx(tax_custom) 494 """ 495 nc = cls(empty=True) 496 nc.version = ctx.version 497 nc.sources = ctx.sources 498 nc._nodes = ctx._nodes 499 nc._names = ctx._names 500 nc._ranks = ctx._ranks 501 return nc 502 503 def latest(self, node: str): 504 """ 505 Returns latest/updated version of a given node. 506 If node is already the latests, returns itself. 507 Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv) 508 """ 509 if node in self._nodes: 510 return node 511 else: 512 return self.undefined_node 513 514 def leaves(self, node: str = None): 515 """ 516 Returns a list of leaf nodes of a given node. 517 """ 518 if node is None or node == self.root_node: 519 # Leaves are nodes not contained in _nodes.values() ("parents") 520 return list(set(self._nodes).difference(self._nodes.values())) 521 elif node in self._nodes: 522 return self._recurse_leaves(node) 523 else: 524 return [] 525 526 def lca(self, nodes: list = None): 527 """ 528 Returns the lowest common ancestor of two or more nodes. 529 530 Example: 531 532 from multitax import GtdbTx 533 tax = GtdbTx() 534 tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"]) 535 """ 536 for node in nodes: 537 if node not in self._nodes: 538 raise ValueError("Node [" + node + "] not found.") 539 540 # Setup on first use 541 if not self._lca: 542 self.build_lca() 543 544 return self._lca(*nodes) 545 546 def lineage(self, node: str, root_node: str = None, ranks: list = None): 547 """ 548 Returns a list with the lineage of a given node. 549 If ranks is provided, returns only nodes annotated with such ranks. 550 If root_node is provided, use it instead of default root of tree. 551 """ 552 # If lineages were built with build_lineages() with matching params 553 if node in self._lineages and root_node is None and ranks is None: 554 return self._lineages[node] 555 else: 556 if not root_node: 557 root_node = self.root_node 558 559 n = node 560 if ranks: 561 # Fixed length lineage 562 lin = [self.undefined_node] * len(ranks) 563 # Loop until end of the tree (in case chosen root is not on lineage) 564 while n != self.undefined_node: 565 r = self.rank(n) 566 if r in ranks: 567 lin[ranks.index(r)] = n 568 # If node is root, break (after adding) 569 if n == root_node: 570 break 571 n = self.parent(n) 572 else: 573 # Full lineage 574 lin = [] 575 # Loop until end of the tree (in case chosen root is not on lineage) 576 while n != self.undefined_node: 577 lin.append(n) 578 # If node is root, break (after adding) 579 if n == root_node: 580 break 581 n = self.parent(n) 582 # Reverse order 583 lin = lin[::-1] 584 585 # last iteration node (n) != root_node: didn't find the root, invalid lineage 586 if n != root_node: 587 return [] 588 else: 589 return lin 590 591 def name(self, node: str): 592 """ 593 Returns name of a given node. 594 """ 595 if node in self._names: 596 return self._names[node] 597 else: 598 return self.undefined_name 599 600 def name_lineage(self, node: str, root_node: str = None, ranks: list = None): 601 """ 602 Returns a list with the name lineage of a given node. 603 """ 604 return list( 605 map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks)) 606 ) 607 608 def nodes_rank(self, rank: str): 609 """ 610 Returns list of nodes of a given rank. 611 """ 612 # Setup on first use 613 if not self._rank_nodes: 614 self._rank_nodes = reverse_dict(self._ranks) 615 if rank in self._rank_nodes: 616 return self._rank_nodes[rank] 617 else: 618 return [] 619 620 def parent(self, node: str): 621 """ 622 Returns the direct parent node of a given node. 623 """ 624 if node in self._nodes: 625 return self._nodes[node] 626 else: 627 return self.undefined_node 628 629 def parent_rank(self, node: str, rank: str): 630 """ 631 Returns the parent node of a given rank in the specified rank. 632 """ 633 parent = self.lineage(node=node, ranks=[rank]) 634 return parent[0] if parent else self.undefined_node 635 636 def prune(self, nodes: list): 637 """ 638 Prunes branches of the tree under the given nodes. 639 Deletes built lineages, translations and lca. 640 """ 641 642 if isinstance(nodes, str): 643 nodes = [nodes] 644 645 del_nodes = set() 646 for node in nodes: 647 if node not in self._nodes: 648 raise ValueError("Node [" + node + "] not found.") 649 for leaf in self.leaves(node): 650 for n in self.lineage(leaf, root_node=node)[1:]: 651 del_nodes.add(n) 652 653 for n in del_nodes: 654 self._remove(n) 655 656 self._reset_aux_data() 657 658 def rank(self, node: str): 659 """ 660 Returns the rank of a given node. 661 """ 662 if node in self._ranks: 663 return self._ranks[node] 664 else: 665 return self.undefined_rank 666 667 def rank_lineage(self, node: str, root_node: str = None, ranks: list = None): 668 """ 669 Returns a list with the rank lineage of a given node. 670 """ 671 return list( 672 map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks)) 673 ) 674 675 def remove(self, node: str, check_consistency: bool = False): 676 """ 677 Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. 678 Running check consistency after removing a node is recommended. 679 Deletes built lineages, translations and lca. 680 """ 681 if node not in self._nodes: 682 raise ValueError("Node [" + node + "] not found.") 683 self._remove(node) 684 self._reset_aux_data() 685 if check_consistency: 686 self.check_consistency() 687 688 def search_name(self, text: str, rank: str = None, exact: bool = True): 689 """ 690 Search node by exact or partial name 691 692 Parameters: 693 * **text** *[str]*: Text to search. 694 * **rank** *[str]*: Filter results by rank. 695 * **exact** *[bool]*: Exact or partial name search (both case sensitive). 696 697 Returns: list of matching nodes 698 """ 699 # Setup on first use 700 if not self._name_nodes: 701 self._name_nodes = reverse_dict(self._names) 702 703 if exact: 704 ret = self._exact_name(text, self._name_nodes) 705 else: 706 ret = self._partial_name(text, self._name_nodes) 707 708 # Only return nodes of chosen rank 709 if rank: 710 return filter_function(ret, self.rank, rank) 711 else: 712 return ret 713 714 def stats(self): 715 """ 716 Returns a dict with general numbers of the taxonomic tree 717 718 Example: 719 720 from pprint import pprint 721 from multitax import GtdbTx 722 tax = GtdbTx() 723 724 pprint(tax.stats()) 725 {'leaves': 30238, 726 'names': 42739, 727 'nodes': 42739, 728 'ranked_leaves': Counter({'species': 30238}), 729 'ranked_nodes': Counter({'species': 30238, 730 'genus': 8778, 731 'family': 2323, 732 'order': 930, 733 'class': 337, 734 'phylum': 131, 735 'domain': 1, 736 'root': 1}), 737 'ranks': 42739} 738 """ 739 s = {} 740 s["nodes"] = len(self._nodes) 741 s["ranks"] = len(self._ranks) 742 s["names"] = len(self._names) 743 all_leaves = self.leaves(self.root_node) 744 s["leaves"] = len(all_leaves) 745 s["ranked_nodes"] = Counter(self._ranks.values()) 746 s["ranked_leaves"] = Counter(map(self.rank, all_leaves)) 747 return s 748 749 def translate(self, node: str, top_perc: float | None = None, counts: bool = False): 750 """ 751 Returns the translated node(s) from another taxonomy. One node may translate to none, one or several nodes. 752 `counts` additionally outputs the number of entries/genomes used to translate each node. 753 The translation have to first be generated with the `build_translation` function. 754 755 Parameters: 756 * **node** *[str]*: Node to translate. 757 * **top_perc** *[float]*: Keep translations summing up to `top_perc` of the nodes based on counts. 758 * **counts** *[bool]*: Output a sorted list of tuples with the translated node and counts. 759 760 Returns: List of translated nodes (or list of tuples with counts) 761 """ 762 if node in self._translated_nodes: 763 ret = Counter(self._translated_nodes[node]) 764 i = None 765 if top_perc: 766 total = ret.total() 767 sm = 0 768 for i, (_, cnt) in enumerate(ret.most_common(), 1): 769 sm += cnt 770 if (sm / total) >= top_perc: 771 break 772 return ret.most_common(i) if counts else [n[0] for n in ret.most_common(i)] 773 774 return [] 775 776 def write( 777 self, 778 output_file: str, 779 cols: list = ["node", "parent", "rank", "name"], 780 sep: str = "\t", 781 sep_multi: str = "|", 782 ranks: list = None, 783 gz: bool = False, 784 ): 785 """ 786 Writes loaded taxonomy to a file. 787 788 Parameters: 789 * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage" 790 * **sep** *[str]*: Separator of fields 791 * **sep_multi** *[str]*: Separator of multi-valued fields 792 * **ranks** *[list]*: Ranks to report 793 * **gz** *[bool]*: Gzip output 794 795 Returns: None 796 """ 797 import gzip 798 799 if gz: 800 output_file = ( 801 output_file if output_file.endswith(".gz") else output_file + ".gz" 802 ) 803 check_no_file(output_file) 804 outf = gzip.open(output_file, "wt") 805 else: 806 check_no_file(output_file) 807 outf = open(output_file, "w") 808 809 write_field = { 810 "node": lambda node: node, 811 "latest": self.latest, 812 "parent": self.parent, 813 "rank": self.rank, 814 "name": self.name, 815 "leaves": lambda node: join_check(self.leaves(node), sep_multi), 816 "children": lambda node: join_check(self.children(node), sep_multi), 817 "lineage": lambda node: join_check( 818 self.lineage(node, ranks=ranks), sep_multi 819 ), 820 "rank_lineage": lambda node: join_check( 821 self.rank_lineage(node, ranks=ranks), sep_multi 822 ), 823 "name_lineage": lambda node: join_check( 824 self.name_lineage(node, ranks=ranks), sep_multi 825 ), 826 } 827 828 for c in cols: 829 if c not in write_field: 830 raise ValueError( 831 "Field [" + c + "] is not valid. Options: " + ",".join(write_field) 832 ) 833 834 if ranks: 835 for rank in ranks: 836 for node in self.nodes_rank(rank): 837 print( 838 *[write_field[c](node) for c in cols], 839 sep=sep, 840 end="\n", 841 file=outf, 842 ) 843 else: 844 for node in self._nodes: 845 print( 846 *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf 847 ) 848 849 outf.close()
34 def __init__( 35 self, 36 version: str = None, 37 files: list = None, 38 urls: list = None, 39 output_prefix: str = None, 40 root_node: str = None, 41 root_parent: str = "0", 42 root_name: str = None, 43 root_rank: str = None, 44 undefined_node: str = None, 45 undefined_name: str = None, 46 undefined_rank: str = None, 47 build_name_nodes: bool = False, 48 build_node_children: bool = False, 49 build_rank_nodes: bool = False, 50 extended_names: bool = False, 51 empty: bool = False, 52 ): 53 """ 54 Main constructor of MultiTax and sub-classes 55 56 Parameters: 57 * **version** *[str]*: Version to download/parse or custom version name (with files/urls). 58 * **files** *[str, list]*: One or more local files to parse. 59 * **urls** *[str, list]*: One or more urls to download and parse. 60 * **output_prefix** *[str]*: Directory to write downloaded files. 61 * **root_node** *[str]*: Define an alternative root node. 62 * **root_parent** *[str]*: Define the root parent node identifier. 63 * **root_name** *[str]*: Define an alternative root name. Set to None to use original name. 64 * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name. 65 * **undefined_node** *[str]*: Define a default return value for undefined nodes. 66 * **undefined_name** *[str]*: Define a default return value for undefined names. 67 * **undefined_rank** *[str]*: Define a default return value for undefined ranks. 68 * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use). 69 * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use). 70 * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use). 71 * **extended_names** *[bool]*: Parse extended names if available. 72 * **empty** *[bool]*: Create an empty instance. 73 74 Example: 75 76 tax_ncbi = NcbiTx() 77 tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"]) 78 tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"]) 79 tax_ott = OttTx(root_node="844192") 80 tax_gg = GreengenesTx(output_prefix="save/to/prefix_") 81 """ 82 if files: 83 if isinstance(files, str): 84 files = [files] 85 for file in files: 86 check_file(file) 87 88 if output_prefix: 89 check_dir(output_prefix) 90 91 # Main structures 92 self._nodes = {} 93 self._ranks = {} 94 self._names = {} 95 96 # Aux. structures 97 self._lineages = {} 98 self._name_nodes = {} 99 self._node_children = {} 100 self._rank_nodes = {} 101 self._translated_nodes = {} 102 self._lca = None 103 104 # Properties 105 self.datetime = datetime.now() 106 self.version = None 107 self.undefined_node = undefined_node 108 self.undefined_name = undefined_name 109 self.undefined_rank = undefined_rank 110 111 # Set version 112 if files or urls: 113 self.version = version 114 else: 115 self.version = self._default_version if not version else version 116 if self.version not in self._supported_versions: 117 raise ValueError( 118 f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls." 119 ) 120 121 # Store source of tax files (url or file) 122 self.sources = [] 123 124 if not empty: 125 # Open/Download/Write files 126 fhs = {} 127 if files: 128 fhs = open_files(files) 129 elif urls or self._default_urls.get(self.version): 130 fhs = download_files( 131 urls=urls if urls else self._default_urls[self.version], 132 output_prefix=output_prefix, 133 retry_attempts=3, 134 ) 135 136 if fhs: 137 # Parse taxonomy 138 self._nodes, self._ranks, self._names = self._parse( 139 fhs, extended_names=extended_names 140 ) 141 close_files(fhs) 142 # Save sources for stats (files or urls) 143 self.sources = list(fhs.keys()) 144 145 # Set root values 146 self._set_root_node( 147 root=root_node if root_node else self._default_root_node, 148 parent=root_parent, 149 name=root_name, 150 rank=root_rank, 151 ) 152 153 # build auxiliary structures 154 if build_node_children: 155 self._node_children = reverse_dict(self._nodes) 156 if build_name_nodes: 157 self._name_nodes = reverse_dict(self._names) 158 if build_rank_nodes: 159 self._rank_nodes = reverse_dict(self._ranks) 160 161 self.check_consistency()
Main constructor of MultiTax and sub-classes
Parameters:
- version [str]: Version to download/parse or custom version name (with files/urls).
- files [str, list]: One or more local files to parse.
- urls [str, list]: One or more urls to download and parse.
- output_prefix [str]: Directory to write downloaded files.
- root_node [str]: Define an alternative root node.
- root_parent [str]: Define the root parent node identifier.
- root_name [str]: Define an alternative root name. Set to None to use original name.
- root_rank [str]: Define an alternative root rank. Set to None to use original name.
- undefined_node [str]: Define a default return value for undefined nodes.
- undefined_name [str]: Define a default return value for undefined names.
- undefined_rank [str]: Define a default return value for undefined ranks.
- build_node_children [bool]: Build node,children dict (otherwise it will be created on first use).
- build_name_nodes [bool]: Build name,nodes dict (otherwise it will be created on first use).
- build_rank_nodes [bool]: Build rank,nodes dict (otherwise it will be created on first use).
- extended_names [bool]: Parse extended names if available.
- empty [bool]: Create an empty instance.
Example:
tax_ncbi = NcbiTx()
tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
tax_ott = OttTx(root_node="844192")
tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
270 def add(self, node: str, parent: str, name: str = None, rank: str = None): 271 """ 272 Adds node to taxonomy. 273 Deletes built lineages, translations and lca. 274 """ 275 if parent not in self._nodes: 276 raise ValueError("Parent node [" + parent + "] not found.") 277 elif node in self._nodes: 278 raise ValueError("Node [" + node + "] already present.") 279 280 self._nodes[node] = parent 281 self._names[node] = name if name is not None else self.undefined_name 282 self._ranks[node] = rank if rank is not None else self.undefined_rank 283 self._reset_aux_data()
Adds node to taxonomy. Deletes built lineages, translations and lca.
285 def build_lca(self): 286 """ 287 Builds LCA structure based on pylca. 288 Optional function, LCA is built on first .lca() call. 289 290 Returns: None 291 """ 292 self._lca = LCA(self._nodes)
Builds LCA structure based on pylca. Optional function, LCA is built on first .lca() call.
Returns: None
294 def build_lineages(self, root_node: str = None, ranks: list = None): 295 """ 296 Stores lineages in memory for faster access. 297 It is valid for lineage(), rank_lineage() and name_lineage(). 298 If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used. 299 300 Returns: None 301 """ 302 self.clear_lineages() 303 for node in self._nodes: 304 self._lineages[node] = self.lineage( 305 node=node, root_node=root_node, ranks=ranks 306 )
Stores lineages in memory for faster access. It is valid for lineage(), rank_lineage() and name_lineage(). If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.
Returns: None
308 def build_translation( 309 self, tax, representatives: bool = False, file: str = None, url: str = None 310 ): 311 """ 312 Create a translation of current taxonomy to another 313 314 Parameters: 315 316 * **tax** [MultiTax]: A target taxonomy to be translated to. 317 * **representatives** *[bool]*: Use only GTDB representative genomes to translate nodes. 318 * **file** *[str]*: Local file to parse. 319 * **url** *[str]*: Url to download and parse. 320 321 Example: 322 323 from multitax import GtdbTx, NcbiTx 324 gtdb_tax = GtdbTx() 325 ncbi_tax = NcbiTx() 326 327 # Automatically download translation files 328 gtdb_tax.build_translation(ncbi_tax) 329 gtdb_tax.translate("g__Escherichia") 330 ['561', '620', '590', '1224', '194', '543', '547', '570', '186803', '2005523', '841', '2', '1485', '2159', '216572', '1301', '128827', '815', '239759', '2791015', '1263', '1472649', '816', '171549', '2005473', '33024'] 331 332 # Using local file from https://github.com/pirovc/multitax/tree/main/data/gtdb 333 ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz") 334 ncbi_tax.translate("620") 335 {'g__Escherichia', 'g__Proteus', 'g__Serratia'} 336 337 # Translation based on GTDB representative genome only 338 gtdb_tax.build_translation(ncbi_tax, representatives=True) 339 gtdb_tax.translate("g__Escherichia") 340 ['561', '547'] 341 """ 342 if file: 343 check_file(file) 344 345 self._translated_nodes = self._build_translation( 346 tax, representatives, file, url 347 )
Create a translation of current taxonomy to another
Parameters:
- tax [MultiTax]: A target taxonomy to be translated to.
- representatives [bool]: Use only GTDB representative genomes to translate nodes.
- file [str]: Local file to parse.
- url [str]: Url to download and parse.
Example:
from multitax import GtdbTx, NcbiTx
gtdb_tax = GtdbTx()
ncbi_tax = NcbiTx()
# Automatically download translation files
gtdb_tax.build_translation(ncbi_tax)
gtdb_tax.translate("g__Escherichia")
['561', '620', '590', '1224', '194', '543', '547', '570', '186803', '2005523', '841', '2', '1485', '2159', '216572', '1301', '128827', '815', '239759', '2791015', '1263', '1472649', '816', '171549', '2005473', '33024']
# Using local file from https://github.com/pirovc/multitax/tree/main/data/gtdb
ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
ncbi_tax.translate("620")
{'g__Escherichia', 'g__Proteus', 'g__Serratia'}
# Translation based on GTDB representative genome only
gtdb_tax.build_translation(ncbi_tax, representatives=True)
gtdb_tax.translate("g__Escherichia")
['561', '547']
349 def children(self, node: str): 350 """ 351 Returns list of direct children nodes of a given node. 352 """ 353 # Setup on first use 354 if not self._node_children: 355 self._node_children = reverse_dict(self._nodes) 356 if node in self._node_children: 357 return self._node_children[node] 358 else: 359 return []
Returns list of direct children nodes of a given node.
361 def check_consistency(self): 362 """ 363 Checks consistency of the tree 364 365 Returns: raise an Exception otherwise None 366 """ 367 if self.root_node not in self._nodes: 368 raise ValueError("Root node [" + self.root_node + "] not found.") 369 if self.root_parent in self._nodes: 370 raise ValueError( 371 "Root parent [" 372 + self.root_parent 373 + "] found but should not be on tree." 374 ) 375 if self.undefined_node in self._nodes: 376 raise ValueError( 377 "Undefined node [" 378 + self.undefined_node 379 + "] found but should not be on tree." 380 ) 381 382 # Difference between values and keys should be only root_parent 383 lost_nodes = set(self._nodes.values()).difference(self._nodes) 384 if self.root_parent not in lost_nodes: 385 raise ValueError( 386 "Root parent [" + self.root_parent + "] not properly defined." 387 ) 388 # Remove root_parent from lost nodes to report only missing 389 lost_nodes.remove(self.root_parent) 390 if len(lost_nodes) > 0: 391 raise ValueError("Parent nodes missing: " + ",".join(lost_nodes)) 392 393 return None
Checks consistency of the tree
Returns: raise an Exception otherwise None
395 def clear_lca(self): 396 """ 397 Clear built LCA. 398 399 Returns: None 400 """ 401 self._lca = None
Clear built LCA.
Returns: None
403 def clear_lineages(self): 404 """ 405 Clear built lineages. 406 407 Returns: None 408 """ 409 self._lineages = {}
Clear built lineages.
Returns: None
411 def closest_parent(self, node: str, ranks: str): 412 """ 413 Returns the closest parent node based on a defined list of ranks 414 """ 415 # Rank of node is already on the list 416 if self.rank(node) in ranks: 417 return node 418 else: 419 # check lineage from back to front until find a valid node 420 for n in self.lineage(node, ranks=ranks)[::-1]: 421 if n != self.undefined_node: 422 return n 423 # nothing found 424 return self.undefined_node
Returns the closest parent node based on a defined list of ranks
426 def filter(self, nodes: list, desc: bool = False): 427 """ 428 Filters taxonomy given a list of nodes. 429 By default keep all the ancestors of the given nodes. 430 If desc=True, keep all descendants instead. 431 Deletes built lineages, translations and lca. 432 433 Example: 434 435 from multitax import GtdbTx 436 tax = GtdbTx() 437 438 tax.lineage('s__Enterovibrio marina') 439 # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina'] 440 # Keep only ancestors of 'g__Enterovibrio' 441 tax.filter('g__Enterovibrio') 442 443 # Reload taxonomy 444 tax = GtdbTx() 445 # Keep only descendants of 'g__Enterovibrio' 446 tax.filter('g__Enterovibrio', desc=True) 447 """ 448 if isinstance(nodes, str): 449 nodes = [nodes] 450 451 # Keep track of nodes to be filtered out 452 filtered_nodes = set(self._nodes) 453 # Always keep root 454 filtered_nodes.discard(self.root_node) 455 456 if desc: 457 # Keep descendants of the given nodes 458 for node in nodes: 459 # Check if node exists (skips root) 460 if node in filtered_nodes: 461 # For each leaf of the selected nodes 462 for leaf in self.leaves(node): 463 # Build lineage of each leaf up-to node itself 464 for n in self.lineage(leaf, root_node=node): 465 # Discard nodes from set to be kept 466 filtered_nodes.discard(n) 467 # Link node to root 468 self._nodes[node] = self.root_node 469 else: 470 # Keep ancestors of the given nodes (full lineage up-to root) 471 for node in nodes: 472 # ranks=[] in case build_lineages() was used with specific ranks 473 for n in self.lineage(node, ranks=[]): 474 # Discard nodes from set to be kept 475 filtered_nodes.discard(n) 476 477 # Delete filtered nodes 478 for node in filtered_nodes: 479 self._remove(node) 480 481 # Delete aux. data structures 482 self._reset_aux_data() 483 self.check_consistency()
Filters taxonomy given a list of nodes. By default keep all the ancestors of the given nodes. If desc=True, keep all descendants instead. Deletes built lineages, translations and lca.
Example:
from multitax import GtdbTx
tax = GtdbTx()
tax.lineage('s__Enterovibrio marina')
# ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
# Keep only ancestors of 'g__Enterovibrio'
tax.filter('g__Enterovibrio')
# Reload taxonomy
tax = GtdbTx()
# Keep only descendants of 'g__Enterovibrio'
tax.filter('g__Enterovibrio', desc=True)
485 @classmethod 486 def from_customtx(cls, ctx): 487 """ 488 Initialize a Tx sub-class based on a CustomTx instance. 489 490 Example: 491 492 tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"]) 493 tax_ncbi = NcbiTx.from_customtx(tax_custom) 494 """ 495 nc = cls(empty=True) 496 nc.version = ctx.version 497 nc.sources = ctx.sources 498 nc._nodes = ctx._nodes 499 nc._names = ctx._names 500 nc._ranks = ctx._ranks 501 return nc
Initialize a Tx sub-class based on a CustomTx instance.
Example:
tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
tax_ncbi = NcbiTx.from_customtx(tax_custom)
503 def latest(self, node: str): 504 """ 505 Returns latest/updated version of a given node. 506 If node is already the latests, returns itself. 507 Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv) 508 """ 509 if node in self._nodes: 510 return node 511 else: 512 return self.undefined_node
Returns latest/updated version of a given node. If node is already the latests, returns itself. Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)
514 def leaves(self, node: str = None): 515 """ 516 Returns a list of leaf nodes of a given node. 517 """ 518 if node is None or node == self.root_node: 519 # Leaves are nodes not contained in _nodes.values() ("parents") 520 return list(set(self._nodes).difference(self._nodes.values())) 521 elif node in self._nodes: 522 return self._recurse_leaves(node) 523 else: 524 return []
Returns a list of leaf nodes of a given node.
526 def lca(self, nodes: list = None): 527 """ 528 Returns the lowest common ancestor of two or more nodes. 529 530 Example: 531 532 from multitax import GtdbTx 533 tax = GtdbTx() 534 tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"]) 535 """ 536 for node in nodes: 537 if node not in self._nodes: 538 raise ValueError("Node [" + node + "] not found.") 539 540 # Setup on first use 541 if not self._lca: 542 self.build_lca() 543 544 return self._lca(*nodes)
Returns the lowest common ancestor of two or more nodes.
Example:
from multitax import GtdbTx
tax = GtdbTx()
tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
546 def lineage(self, node: str, root_node: str = None, ranks: list = None): 547 """ 548 Returns a list with the lineage of a given node. 549 If ranks is provided, returns only nodes annotated with such ranks. 550 If root_node is provided, use it instead of default root of tree. 551 """ 552 # If lineages were built with build_lineages() with matching params 553 if node in self._lineages and root_node is None and ranks is None: 554 return self._lineages[node] 555 else: 556 if not root_node: 557 root_node = self.root_node 558 559 n = node 560 if ranks: 561 # Fixed length lineage 562 lin = [self.undefined_node] * len(ranks) 563 # Loop until end of the tree (in case chosen root is not on lineage) 564 while n != self.undefined_node: 565 r = self.rank(n) 566 if r in ranks: 567 lin[ranks.index(r)] = n 568 # If node is root, break (after adding) 569 if n == root_node: 570 break 571 n = self.parent(n) 572 else: 573 # Full lineage 574 lin = [] 575 # Loop until end of the tree (in case chosen root is not on lineage) 576 while n != self.undefined_node: 577 lin.append(n) 578 # If node is root, break (after adding) 579 if n == root_node: 580 break 581 n = self.parent(n) 582 # Reverse order 583 lin = lin[::-1] 584 585 # last iteration node (n) != root_node: didn't find the root, invalid lineage 586 if n != root_node: 587 return [] 588 else: 589 return lin
Returns a list with the lineage of a given node. If ranks is provided, returns only nodes annotated with such ranks. If root_node is provided, use it instead of default root of tree.
591 def name(self, node: str): 592 """ 593 Returns name of a given node. 594 """ 595 if node in self._names: 596 return self._names[node] 597 else: 598 return self.undefined_name
Returns name of a given node.
600 def name_lineage(self, node: str, root_node: str = None, ranks: list = None): 601 """ 602 Returns a list with the name lineage of a given node. 603 """ 604 return list( 605 map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks)) 606 )
Returns a list with the name lineage of a given node.
608 def nodes_rank(self, rank: str): 609 """ 610 Returns list of nodes of a given rank. 611 """ 612 # Setup on first use 613 if not self._rank_nodes: 614 self._rank_nodes = reverse_dict(self._ranks) 615 if rank in self._rank_nodes: 616 return self._rank_nodes[rank] 617 else: 618 return []
Returns list of nodes of a given rank.
620 def parent(self, node: str): 621 """ 622 Returns the direct parent node of a given node. 623 """ 624 if node in self._nodes: 625 return self._nodes[node] 626 else: 627 return self.undefined_node
Returns the direct parent node of a given node.
629 def parent_rank(self, node: str, rank: str): 630 """ 631 Returns the parent node of a given rank in the specified rank. 632 """ 633 parent = self.lineage(node=node, ranks=[rank]) 634 return parent[0] if parent else self.undefined_node
Returns the parent node of a given rank in the specified rank.
636 def prune(self, nodes: list): 637 """ 638 Prunes branches of the tree under the given nodes. 639 Deletes built lineages, translations and lca. 640 """ 641 642 if isinstance(nodes, str): 643 nodes = [nodes] 644 645 del_nodes = set() 646 for node in nodes: 647 if node not in self._nodes: 648 raise ValueError("Node [" + node + "] not found.") 649 for leaf in self.leaves(node): 650 for n in self.lineage(leaf, root_node=node)[1:]: 651 del_nodes.add(n) 652 653 for n in del_nodes: 654 self._remove(n) 655 656 self._reset_aux_data()
Prunes branches of the tree under the given nodes. Deletes built lineages, translations and lca.
658 def rank(self, node: str): 659 """ 660 Returns the rank of a given node. 661 """ 662 if node in self._ranks: 663 return self._ranks[node] 664 else: 665 return self.undefined_rank
Returns the rank of a given node.
667 def rank_lineage(self, node: str, root_node: str = None, ranks: list = None): 668 """ 669 Returns a list with the rank lineage of a given node. 670 """ 671 return list( 672 map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks)) 673 )
Returns a list with the rank lineage of a given node.
675 def remove(self, node: str, check_consistency: bool = False): 676 """ 677 Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. 678 Running check consistency after removing a node is recommended. 679 Deletes built lineages, translations and lca. 680 """ 681 if node not in self._nodes: 682 raise ValueError("Node [" + node + "] not found.") 683 self._remove(node) 684 self._reset_aux_data() 685 if check_consistency: 686 self.check_consistency()
Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. Running check consistency after removing a node is recommended. Deletes built lineages, translations and lca.
688 def search_name(self, text: str, rank: str = None, exact: bool = True): 689 """ 690 Search node by exact or partial name 691 692 Parameters: 693 * **text** *[str]*: Text to search. 694 * **rank** *[str]*: Filter results by rank. 695 * **exact** *[bool]*: Exact or partial name search (both case sensitive). 696 697 Returns: list of matching nodes 698 """ 699 # Setup on first use 700 if not self._name_nodes: 701 self._name_nodes = reverse_dict(self._names) 702 703 if exact: 704 ret = self._exact_name(text, self._name_nodes) 705 else: 706 ret = self._partial_name(text, self._name_nodes) 707 708 # Only return nodes of chosen rank 709 if rank: 710 return filter_function(ret, self.rank, rank) 711 else: 712 return ret
Search node by exact or partial name
Parameters:
- text [str]: Text to search.
- rank [str]: Filter results by rank.
- exact [bool]: Exact or partial name search (both case sensitive).
Returns: list of matching nodes
714 def stats(self): 715 """ 716 Returns a dict with general numbers of the taxonomic tree 717 718 Example: 719 720 from pprint import pprint 721 from multitax import GtdbTx 722 tax = GtdbTx() 723 724 pprint(tax.stats()) 725 {'leaves': 30238, 726 'names': 42739, 727 'nodes': 42739, 728 'ranked_leaves': Counter({'species': 30238}), 729 'ranked_nodes': Counter({'species': 30238, 730 'genus': 8778, 731 'family': 2323, 732 'order': 930, 733 'class': 337, 734 'phylum': 131, 735 'domain': 1, 736 'root': 1}), 737 'ranks': 42739} 738 """ 739 s = {} 740 s["nodes"] = len(self._nodes) 741 s["ranks"] = len(self._ranks) 742 s["names"] = len(self._names) 743 all_leaves = self.leaves(self.root_node) 744 s["leaves"] = len(all_leaves) 745 s["ranked_nodes"] = Counter(self._ranks.values()) 746 s["ranked_leaves"] = Counter(map(self.rank, all_leaves)) 747 return s
Returns a dict with general numbers of the taxonomic tree
Example:
from pprint import pprint
from multitax import GtdbTx
tax = GtdbTx()
pprint(tax.stats())
{'leaves': 30238,
'names': 42739,
'nodes': 42739,
'ranked_leaves': Counter({'species': 30238}),
'ranked_nodes': Counter({'species': 30238,
'genus': 8778,
'family': 2323,
'order': 930,
'class': 337,
'phylum': 131,
'domain': 1,
'root': 1}),
'ranks': 42739}
749 def translate(self, node: str, top_perc: float | None = None, counts: bool = False): 750 """ 751 Returns the translated node(s) from another taxonomy. One node may translate to none, one or several nodes. 752 `counts` additionally outputs the number of entries/genomes used to translate each node. 753 The translation have to first be generated with the `build_translation` function. 754 755 Parameters: 756 * **node** *[str]*: Node to translate. 757 * **top_perc** *[float]*: Keep translations summing up to `top_perc` of the nodes based on counts. 758 * **counts** *[bool]*: Output a sorted list of tuples with the translated node and counts. 759 760 Returns: List of translated nodes (or list of tuples with counts) 761 """ 762 if node in self._translated_nodes: 763 ret = Counter(self._translated_nodes[node]) 764 i = None 765 if top_perc: 766 total = ret.total() 767 sm = 0 768 for i, (_, cnt) in enumerate(ret.most_common(), 1): 769 sm += cnt 770 if (sm / total) >= top_perc: 771 break 772 return ret.most_common(i) if counts else [n[0] for n in ret.most_common(i)] 773 774 return []
Returns the translated node(s) from another taxonomy. One node may translate to none, one or several nodes.
counts additionally outputs the number of entries/genomes used to translate each node.
The translation have to first be generated with the build_translation function.
Parameters:
- node [str]: Node to translate.
- top_perc [float]: Keep translations summing up to
top_percof the nodes based on counts. - counts [bool]: Output a sorted list of tuples with the translated node and counts.
Returns: List of translated nodes (or list of tuples with counts)
776 def write( 777 self, 778 output_file: str, 779 cols: list = ["node", "parent", "rank", "name"], 780 sep: str = "\t", 781 sep_multi: str = "|", 782 ranks: list = None, 783 gz: bool = False, 784 ): 785 """ 786 Writes loaded taxonomy to a file. 787 788 Parameters: 789 * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage" 790 * **sep** *[str]*: Separator of fields 791 * **sep_multi** *[str]*: Separator of multi-valued fields 792 * **ranks** *[list]*: Ranks to report 793 * **gz** *[bool]*: Gzip output 794 795 Returns: None 796 """ 797 import gzip 798 799 if gz: 800 output_file = ( 801 output_file if output_file.endswith(".gz") else output_file + ".gz" 802 ) 803 check_no_file(output_file) 804 outf = gzip.open(output_file, "wt") 805 else: 806 check_no_file(output_file) 807 outf = open(output_file, "w") 808 809 write_field = { 810 "node": lambda node: node, 811 "latest": self.latest, 812 "parent": self.parent, 813 "rank": self.rank, 814 "name": self.name, 815 "leaves": lambda node: join_check(self.leaves(node), sep_multi), 816 "children": lambda node: join_check(self.children(node), sep_multi), 817 "lineage": lambda node: join_check( 818 self.lineage(node, ranks=ranks), sep_multi 819 ), 820 "rank_lineage": lambda node: join_check( 821 self.rank_lineage(node, ranks=ranks), sep_multi 822 ), 823 "name_lineage": lambda node: join_check( 824 self.name_lineage(node, ranks=ranks), sep_multi 825 ), 826 } 827 828 for c in cols: 829 if c not in write_field: 830 raise ValueError( 831 "Field [" + c + "] is not valid. Options: " + ",".join(write_field) 832 ) 833 834 if ranks: 835 for rank in ranks: 836 for node in self.nodes_rank(rank): 837 print( 838 *[write_field[c](node) for c in cols], 839 sep=sep, 840 end="\n", 841 file=outf, 842 ) 843 else: 844 for node in self._nodes: 845 print( 846 *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf 847 ) 848 849 outf.close()
Writes loaded taxonomy to a file.
Parameters:
- cols [list]: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
- sep [str]: Separator of fields
- sep_multi [str]: Separator of multi-valued fields
- ranks [list]: Ranks to report
- gz [bool]: Gzip output
Returns: None