multitax.multitax
1from multitax.utils import ( 2 join_check, 3 check_no_file, 4 filter_function, 5 reverse_dict, 6 check_file, 7 close_files, 8 download_files, 9 open_files, 10 check_dir, 11) 12from collections import Counter 13from datetime import datetime 14from pylca.pylca import LCA 15 16 17class MultiTax(object): 18 _default_version = "current" 19 _supported_versions = ["current"] 20 _default_urls = {} 21 _default_root_node = "1" 22 23 def __init__( 24 self, 25 version: str = None, 26 files: list = None, 27 urls: list = None, 28 output_prefix: str = None, 29 root_node: str = None, 30 root_parent: str = "0", 31 root_name: str = None, 32 root_rank: str = None, 33 undefined_node: str = None, 34 undefined_name: str = None, 35 undefined_rank: str = None, 36 build_name_nodes: bool = False, 37 build_node_children: bool = False, 38 build_rank_nodes: bool = False, 39 extended_names: bool = False, 40 empty: bool = False, 41 ): 42 """ 43 Main constructor of MultiTax and sub-classes 44 45 Parameters: 46 * **version** *[str]*: Version to download/parse or custom version name (with files/urls). 47 * **files** *[str, list]*: One or more local files to parse. 48 * **urls** *[str, list]*: One or more urls to download and parse. 49 * **output_prefix** *[str]*: Directory to write downloaded files. 50 * **root_node** *[str]*: Define an alternative root node. 51 * **root_parent** *[str]*: Define the root parent node identifier. 52 * **root_name** *[str]*: Define an alternative root name. Set to None to use original name. 53 * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name. 54 * **undefined_node** *[str]*: Define a default return value for undefined nodes. 55 * **undefined_name** *[str]*: Define a default return value for undefined names. 56 * **undefined_rank** *[str]*: Define a default return value for undefined ranks. 57 * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use). 58 * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use). 59 * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use). 60 * **extended_names** *[bool]*: Parse extended names if available. 61 * **empty** *[bool]*: Create an empty instance. 62 63 Example: 64 65 tax_ncbi = NcbiTx() 66 tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"]) 67 tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"]) 68 tax_ott = OttTx(root_node="844192") 69 tax_gg = GreengenesTx(output_prefix="save/to/prefix_") 70 """ 71 if files: 72 if isinstance(files, str): 73 files = [files] 74 for file in files: 75 check_file(file) 76 77 if output_prefix: 78 check_dir(output_prefix) 79 80 # Main structures 81 self._nodes = {} 82 self._ranks = {} 83 self._names = {} 84 85 # Aux. structures 86 self._lineages = {} 87 self._name_nodes = {} 88 self._node_children = {} 89 self._rank_nodes = {} 90 self._translated_nodes = {} 91 self._lca = None 92 93 # Properties 94 self.datetime = datetime.now() 95 self.version = None 96 self.undefined_node = undefined_node 97 self.undefined_name = undefined_name 98 self.undefined_rank = undefined_rank 99 100 # Set version 101 if files or urls: 102 self.version = version 103 else: 104 self.version = self._default_version if not version else version 105 if self.version not in self._supported_versions: 106 raise ValueError( 107 f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls." 108 ) 109 110 # Store source of tax files (url or file) 111 self.sources = [] 112 113 if not empty: 114 # Open/Download/Write files 115 fhs = {} 116 if files: 117 fhs = open_files(files) 118 elif urls or self._default_urls.get(self.version): 119 fhs = download_files( 120 urls=urls if urls else self._default_urls[self.version], 121 output_prefix=output_prefix, 122 retry_attempts=3, 123 ) 124 125 if fhs: 126 # Parse taxonomy 127 self._nodes, self._ranks, self._names = self._parse( 128 fhs, extended_names=extended_names 129 ) 130 close_files(fhs) 131 # Save sources for stats (files or urls) 132 self.sources = list(fhs.keys()) 133 134 # Set root values 135 self._set_root_node( 136 root=root_node if root_node else self._default_root_node, 137 parent=root_parent, 138 name=root_name, 139 rank=root_rank, 140 ) 141 142 # build auxiliary structures 143 if build_node_children: 144 self._node_children = reverse_dict(self._nodes) 145 if build_name_nodes: 146 self._name_nodes = reverse_dict(self._names) 147 if build_rank_nodes: 148 self._rank_nodes = reverse_dict(self._ranks) 149 150 self.check_consistency() 151 152 def _exact_name(self, text: str, names: dict): 153 """ 154 Returns list of nodes of a given exact name (case sensitive). 155 """ 156 if text in names: 157 return names[text] 158 else: 159 return [] 160 161 def _parse(self, fhs: dict): 162 """ 163 main function to be overloaded 164 receives a dictionary with {"url/file": file handler} 165 return nodes, ranks and names dicts 166 """ 167 return {}, {}, {} 168 169 def _partial_name(self, text: str, names: dict): 170 """ 171 Searches names containing a certain text (case sensitive) and return their respective nodes. 172 """ 173 matching_nodes = set() 174 for name in names: 175 if text in name: 176 matching_nodes.update(names[name]) 177 return list(matching_nodes) 178 179 def _recurse_leaves(self, node: str): 180 """ 181 Recursive function returning leaf nodes 182 """ 183 children = self.children(node) 184 if not children: 185 return [node] 186 leaves = [] 187 for child in children: 188 leaves.extend(self._recurse_leaves(child)) 189 return leaves 190 191 def _remove(self, node: str): 192 """ 193 Removes node from taxonomy, no checking, for internal use 194 """ 195 del self._nodes[node] 196 if node in self._names: 197 del self._names[node] 198 if node in self._ranks: 199 del self._ranks[node] 200 201 def _reset_aux_data(self): 202 """ 203 Reset aux. data structures 204 """ 205 self._lineages = {} 206 self._name_nodes = {} 207 self._node_children = {} 208 self._rank_nodes = {} 209 self._translated_nodes = {} 210 self._lca = None 211 212 def _set_root_node(self, root: str, parent: str, name: str, rank: str): 213 """ 214 Set root node of the tree. 215 The files are parsed based on the self._default_root_node for each class 216 A user-defined root node can be: 217 1) internal: will filter the tree acodingly and delete the default root_node 218 2) external: will add node and link to the default 219 """ 220 221 # Set parent/root with defaults 222 self.root_parent = parent 223 self.root_node = self._default_root_node 224 self._nodes[self.root_node] = self.root_parent 225 226 # Default root node is the top by definition 227 if root != self._default_root_node: 228 if root in self._nodes: 229 # Not default but exists on tree, filter only descendants 230 self.filter(root, desc=True) 231 # Remove entry for _default_root_node 232 self._remove(self._default_root_node) 233 else: 234 # Not on tree, link default node with new root 235 self._nodes[self._default_root_node] = root 236 # Change root to user defined 237 self.root_node = root 238 # Set/Update new root node parent link 239 self._nodes[self.root_node] = self.root_parent 240 241 # User-defined rank/name. 242 # If provided, insert manually, 243 # If None, check if is in the tree (defined in the given tax) 244 # otherwise insert default "root" 245 if name: 246 self._names[self.root_node] = name 247 elif self.root_node not in self._names: 248 self._names[self.root_node] = "root" 249 # Set static name 250 self.root_name = self._names[self.root_node] 251 252 if rank: 253 self._ranks[self.root_node] = rank 254 elif self.root_node not in self._ranks: 255 self._ranks[self.root_node] = "root" 256 # Set static rank 257 self.root_rank = self._ranks[self.root_node] 258 259 def add(self, node: str, parent: str, name: str = None, rank: str = None): 260 """ 261 Adds node to taxonomy. 262 Deletes built lineages, translations and lca. 263 """ 264 if parent not in self._nodes: 265 raise ValueError("Parent node [" + parent + "] not found.") 266 elif node in self._nodes: 267 raise ValueError("Node [" + node + "] already present.") 268 269 self._nodes[node] = parent 270 self._names[node] = name if name is not None else self.undefined_name 271 self._ranks[node] = rank if rank is not None else self.undefined_rank 272 self._reset_aux_data() 273 274 def build_lca(self): 275 """ 276 Builds LCA structure based on pylca. 277 Optional function, LCA is built on first .lca() call. 278 279 Returns: None 280 """ 281 self._lca = LCA(self._nodes) 282 283 def build_lineages(self, root_node: str = None, ranks: list = None): 284 """ 285 Stores lineages in memory for faster access. 286 It is valid for lineage(), rank_lineage() and name_lineage(). 287 If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used. 288 289 Returns: None 290 """ 291 self.clear_lineages() 292 for node in self._nodes: 293 self._lineages[node] = self.lineage( 294 node=node, root_node=root_node, ranks=ranks 295 ) 296 297 def build_translation(self, tax, file: str = None, url: str = None): 298 """ 299 Create a translation of current taxonomy to another 300 301 Parameters: 302 303 * **tax** [MultiTax]: A target taxonomy to be translated to. 304 * **file** *[str]*: Local file to parse. 305 * **url** *[str]*: Url to download and parse. 306 307 Example: 308 309 from multitax import GtdbTx, NcbiTx 310 gtdb_tax = GtdbTx() 311 ncbi_tax = NcbiTx() 312 313 # Automatically download translation files 314 gtdb_tax.build_translation(ncbi_tax) 315 gtdb_tax.translate("g__Escherichia") 316 {'1301', '547', '561', '570', '590', '620'} 317 318 # Using local file 319 ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz") 320 ncbi_tax.translate("620") 321 {'g__Escherichia', 'g__Proteus', 'g__Serratia'} 322 """ 323 if file: 324 check_file(file) 325 326 self._translated_nodes = self._build_translation(tax, file, url) 327 328 def children(self, node: str): 329 """ 330 Returns list of direct children nodes of a given node. 331 """ 332 # Setup on first use 333 if not self._node_children: 334 self._node_children = reverse_dict(self._nodes) 335 if node in self._node_children: 336 return self._node_children[node] 337 else: 338 return [] 339 340 def check_consistency(self): 341 """ 342 Checks consistency of the tree 343 344 Returns: raise an Exception otherwise None 345 """ 346 if self.root_node not in self._nodes: 347 raise ValueError("Root node [" + self.root_node + "] not found.") 348 if self.root_parent in self._nodes: 349 raise ValueError( 350 "Root parent [" 351 + self.root_parent 352 + "] found but should not be on tree." 353 ) 354 if self.undefined_node in self._nodes: 355 raise ValueError( 356 "Undefined node [" 357 + self.undefined_node 358 + "] found but should not be on tree." 359 ) 360 361 # Difference between values and keys should be only root_parent 362 lost_nodes = set(self._nodes.values()).difference(self._nodes) 363 if self.root_parent not in lost_nodes: 364 raise ValueError( 365 "Root parent [" + self.root_parent + "] not properly defined." 366 ) 367 # Remove root_parent from lost nodes to report only missing 368 lost_nodes.remove(self.root_parent) 369 if len(lost_nodes) > 0: 370 raise ValueError("Parent nodes missing: " + ",".join(lost_nodes)) 371 372 return None 373 374 def clear_lca(self): 375 """ 376 Clear built LCA. 377 378 Returns: None 379 """ 380 self._lca = None 381 382 def clear_lineages(self): 383 """ 384 Clear built lineages. 385 386 Returns: None 387 """ 388 self._lineages = {} 389 390 def closest_parent(self, node: str, ranks: str): 391 """ 392 Returns the closest parent node based on a defined list of ranks 393 """ 394 # Rank of node is already on the list 395 if self.rank(node) in ranks: 396 return node 397 else: 398 # check lineage from back to front until find a valid node 399 for n in self.lineage(node, ranks=ranks)[::-1]: 400 if n != self.undefined_node: 401 return n 402 # nothing found 403 return self.undefined_node 404 405 def filter(self, nodes: list, desc: bool = False): 406 """ 407 Filters taxonomy given a list of nodes. 408 By default keep all the ancestors of the given nodes. 409 If desc=True, keep all descendants instead. 410 Deletes built lineages, translations and lca. 411 412 Example: 413 414 from multitax import GtdbTx 415 tax = GtdbTx() 416 417 tax.lineage('s__Enterovibrio marina') 418 # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina'] 419 # Keep only ancestors of 'g__Enterovibrio' 420 tax.filter('g__Enterovibrio') 421 422 # Reload taxonomy 423 tax = GtdbTx() 424 # Keep only descendants of 'g__Enterovibrio' 425 tax.filter('g__Enterovibrio', desc=True) 426 """ 427 if isinstance(nodes, str): 428 nodes = [nodes] 429 430 # Keep track of nodes to be filtered out 431 filtered_nodes = set(self._nodes) 432 # Always keep root 433 filtered_nodes.discard(self.root_node) 434 435 if desc: 436 # Keep descendants of the given nodes 437 for node in nodes: 438 # Check if node exists (skips root) 439 if node in filtered_nodes: 440 # For each leaf of the selected nodes 441 for leaf in self.leaves(node): 442 # Build lineage of each leaf up-to node itself 443 for n in self.lineage(leaf, root_node=node): 444 # Discard nodes from set to be kept 445 filtered_nodes.discard(n) 446 # Link node to root 447 self._nodes[node] = self.root_node 448 else: 449 # Keep ancestors of the given nodes (full lineage up-to root) 450 for node in nodes: 451 # ranks=[] in case build_lineages() was used with specific ranks 452 for n in self.lineage(node, ranks=[]): 453 # Discard nodes from set to be kept 454 filtered_nodes.discard(n) 455 456 # Delete filtered nodes 457 for node in filtered_nodes: 458 self._remove(node) 459 460 # Delete aux. data structures 461 self._reset_aux_data() 462 self.check_consistency() 463 464 @classmethod 465 def from_customtx(cls, ctx): 466 """ 467 Initialize a Tx sub-class based on a CustomTx instance. 468 469 Example: 470 471 tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"]) 472 tax_ncbi = NcbiTx.from_customtx(tax_custom) 473 """ 474 nc = cls(empty=True) 475 nc.version = ctx.version 476 nc.sources = ctx.sources 477 nc._nodes = ctx._nodes 478 nc._names = ctx._names 479 nc._ranks = ctx._ranks 480 return nc 481 482 def latest(self, node: str): 483 """ 484 Returns latest/updated version of a given node. 485 If node is already the latests, returns itself. 486 Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv) 487 """ 488 if node in self._nodes: 489 return node 490 else: 491 return self.undefined_node 492 493 def leaves(self, node: str = None): 494 """ 495 Returns a list of leaf nodes of a given node. 496 """ 497 if node is None or node == self.root_node: 498 # Leaves are nodes not contained in _nodes.values() ("parents") 499 return list(set(self._nodes).difference(self._nodes.values())) 500 elif node in self._nodes: 501 return self._recurse_leaves(node) 502 else: 503 return [] 504 505 def lca(self, nodes: list = None): 506 """ 507 Returns the lowest common ancestor of two or more nodes. 508 509 Example: 510 511 from multitax import GtdbTx 512 tax = GtdbTx() 513 tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"]) 514 """ 515 for node in nodes: 516 if node not in self._nodes: 517 raise ValueError("Node [" + node + "] not found.") 518 519 # Setup on first use 520 if not self._lca: 521 self.build_lca() 522 523 return self._lca(*nodes) 524 525 def lineage(self, node: str, root_node: str = None, ranks: list = None): 526 """ 527 Returns a list with the lineage of a given node. 528 If ranks is provided, returns only nodes annotated with such ranks. 529 If root_node is provided, use it instead of default root of tree. 530 """ 531 # If lineages were built with build_lineages() with matching params 532 if node in self._lineages and root_node is None and ranks is None: 533 return self._lineages[node] 534 else: 535 if not root_node: 536 root_node = self.root_node 537 538 n = node 539 if ranks: 540 # Fixed length lineage 541 lin = [self.undefined_node] * len(ranks) 542 # Loop until end of the tree (in case chosen root is not on lineage) 543 while n != self.undefined_node: 544 r = self.rank(n) 545 if r in ranks: 546 lin[ranks.index(r)] = n 547 # If node is root, break (after adding) 548 if n == root_node: 549 break 550 n = self.parent(n) 551 else: 552 # Full lineage 553 lin = [] 554 # Loop until end of the tree (in case chosen root is not on lineage) 555 while n != self.undefined_node: 556 lin.append(n) 557 # If node is root, break (after adding) 558 if n == root_node: 559 break 560 n = self.parent(n) 561 # Reverse order 562 lin = lin[::-1] 563 564 # last iteration node (n) != root_node: didn't find the root, invalid lineage 565 if n != root_node: 566 return [] 567 else: 568 return lin 569 570 def name(self, node: str): 571 """ 572 Returns name of a given node. 573 """ 574 if node in self._names: 575 return self._names[node] 576 else: 577 return self.undefined_name 578 579 def name_lineage(self, node: str, root_node: str = None, ranks: list = None): 580 """ 581 Returns a list with the name lineage of a given node. 582 """ 583 return list( 584 map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks)) 585 ) 586 587 def nodes_rank(self, rank: str): 588 """ 589 Returns list of nodes of a given rank. 590 """ 591 # Setup on first use 592 if not self._rank_nodes: 593 self._rank_nodes = reverse_dict(self._ranks) 594 if rank in self._rank_nodes: 595 return self._rank_nodes[rank] 596 else: 597 return [] 598 599 def parent(self, node: str): 600 """ 601 Returns the direct parent node of a given node. 602 """ 603 if node in self._nodes: 604 return self._nodes[node] 605 else: 606 return self.undefined_node 607 608 def parent_rank(self, node: str, rank: str): 609 """ 610 Returns the parent node of a given rank in the specified rank. 611 """ 612 parent = self.lineage(node=node, ranks=[rank]) 613 return parent[0] if parent else self.undefined_node 614 615 def prune(self, nodes: list): 616 """ 617 Prunes branches of the tree under the given nodes. 618 Deletes built lineages, translations and lca. 619 """ 620 621 if isinstance(nodes, str): 622 nodes = [nodes] 623 624 del_nodes = set() 625 for node in nodes: 626 if node not in self._nodes: 627 raise ValueError("Node [" + node + "] not found.") 628 for leaf in self.leaves(node): 629 for n in self.lineage(leaf, root_node=node)[1:]: 630 del_nodes.add(n) 631 632 for n in del_nodes: 633 self._remove(n) 634 635 self._reset_aux_data() 636 637 def rank(self, node: str): 638 """ 639 Returns the rank of a given node. 640 """ 641 if node in self._ranks: 642 return self._ranks[node] 643 else: 644 return self.undefined_rank 645 646 def rank_lineage(self, node: str, root_node: str = None, ranks: list = None): 647 """ 648 Returns a list with the rank lineage of a given node. 649 """ 650 return list( 651 map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks)) 652 ) 653 654 def remove(self, node: str, check_consistency: bool = False): 655 """ 656 Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. 657 Running check consistency after removing a node is recommended. 658 Deletes built lineages, translations and lca. 659 """ 660 if node not in self._nodes: 661 raise ValueError("Node [" + node + "] not found.") 662 self._remove(node) 663 self._reset_aux_data() 664 if check_consistency: 665 self.check_consistency() 666 667 def search_name(self, text: str, rank: str = None, exact: bool = True): 668 """ 669 Search node by exact or partial name 670 671 Parameters: 672 * **text** *[str]*: Text to search. 673 * **rank** *[str]*: Filter results by rank. 674 * **exact** *[bool]*: Exact or partial name search (both case sensitive). 675 676 Returns: list of matching nodes 677 """ 678 # Setup on first use 679 if not self._name_nodes: 680 self._name_nodes = reverse_dict(self._names) 681 682 if exact: 683 ret = self._exact_name(text, self._name_nodes) 684 else: 685 ret = self._partial_name(text, self._name_nodes) 686 687 # Only return nodes of chosen rank 688 if rank: 689 return filter_function(ret, self.rank, rank) 690 else: 691 return ret 692 693 def stats(self): 694 """ 695 Returns a dict with general numbers of the taxonomic tree 696 697 Example: 698 699 from pprint import pprint 700 from multitax import GtdbTx 701 tax = GtdbTx() 702 703 pprint(tax.stats()) 704 {'leaves': 30238, 705 'names': 42739, 706 'nodes': 42739, 707 'ranked_leaves': Counter({'species': 30238}), 708 'ranked_nodes': Counter({'species': 30238, 709 'genus': 8778, 710 'family': 2323, 711 'order': 930, 712 'class': 337, 713 'phylum': 131, 714 'domain': 1, 715 'root': 1}), 716 'ranks': 42739} 717 """ 718 s = {} 719 s["nodes"] = len(self._nodes) 720 s["ranks"] = len(self._ranks) 721 s["names"] = len(self._names) 722 all_leaves = self.leaves(self.root_node) 723 s["leaves"] = len(all_leaves) 724 s["ranked_nodes"] = Counter(self._ranks.values()) 725 s["ranked_leaves"] = Counter(map(self.rank, all_leaves)) 726 return s 727 728 def translate(self, node: str): 729 """ 730 Returns the translated node from another taxonomy. Translated nodes are generated with the build_translation function. 731 """ 732 if node in self._translated_nodes: 733 return self._translated_nodes[node] 734 else: 735 return [] 736 737 def write( 738 self, 739 output_file: str, 740 cols: list = ["node", "parent", "rank", "name"], 741 sep: str = "\t", 742 sep_multi: str = "|", 743 ranks: list = None, 744 gz: bool = False, 745 ): 746 """ 747 Writes loaded taxonomy to a file. 748 749 Parameters: 750 * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage" 751 * **sep** *[str]*: Separator of fields 752 * **sep_multi** *[str]*: Separator of multi-valued fields 753 * **ranks** *[list]*: Ranks to report 754 * **gz** *[bool]*: Gzip output 755 756 Returns: None 757 """ 758 import gzip 759 760 if gz: 761 output_file = ( 762 output_file if output_file.endswith(".gz") else output_file + ".gz" 763 ) 764 check_no_file(output_file) 765 outf = gzip.open(output_file, "wt") 766 else: 767 check_no_file(output_file) 768 outf = open(output_file, "w") 769 770 write_field = { 771 "node": lambda node: node, 772 "latest": self.latest, 773 "parent": self.parent, 774 "rank": self.rank, 775 "name": self.name, 776 "leaves": lambda node: join_check(self.leaves(node), sep_multi), 777 "children": lambda node: join_check(self.children(node), sep_multi), 778 "lineage": lambda node: join_check( 779 self.lineage(node, ranks=ranks), sep_multi 780 ), 781 "rank_lineage": lambda node: join_check( 782 self.rank_lineage(node, ranks=ranks), sep_multi 783 ), 784 "name_lineage": lambda node: join_check( 785 self.name_lineage(node, ranks=ranks), sep_multi 786 ), 787 } 788 789 for c in cols: 790 if c not in write_field: 791 raise ValueError( 792 "Field [" + c + "] is not valid. Options: " + ",".join(write_field) 793 ) 794 795 if ranks: 796 for rank in ranks: 797 for node in self.nodes_rank(rank): 798 print( 799 *[write_field[c](node) for c in cols], 800 sep=sep, 801 end="\n", 802 file=outf, 803 ) 804 else: 805 for node in self._nodes: 806 print( 807 *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf 808 ) 809 810 outf.close()
18class MultiTax(object): 19 _default_version = "current" 20 _supported_versions = ["current"] 21 _default_urls = {} 22 _default_root_node = "1" 23 24 def __init__( 25 self, 26 version: str = None, 27 files: list = None, 28 urls: list = None, 29 output_prefix: str = None, 30 root_node: str = None, 31 root_parent: str = "0", 32 root_name: str = None, 33 root_rank: str = None, 34 undefined_node: str = None, 35 undefined_name: str = None, 36 undefined_rank: str = None, 37 build_name_nodes: bool = False, 38 build_node_children: bool = False, 39 build_rank_nodes: bool = False, 40 extended_names: bool = False, 41 empty: bool = False, 42 ): 43 """ 44 Main constructor of MultiTax and sub-classes 45 46 Parameters: 47 * **version** *[str]*: Version to download/parse or custom version name (with files/urls). 48 * **files** *[str, list]*: One or more local files to parse. 49 * **urls** *[str, list]*: One or more urls to download and parse. 50 * **output_prefix** *[str]*: Directory to write downloaded files. 51 * **root_node** *[str]*: Define an alternative root node. 52 * **root_parent** *[str]*: Define the root parent node identifier. 53 * **root_name** *[str]*: Define an alternative root name. Set to None to use original name. 54 * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name. 55 * **undefined_node** *[str]*: Define a default return value for undefined nodes. 56 * **undefined_name** *[str]*: Define a default return value for undefined names. 57 * **undefined_rank** *[str]*: Define a default return value for undefined ranks. 58 * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use). 59 * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use). 60 * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use). 61 * **extended_names** *[bool]*: Parse extended names if available. 62 * **empty** *[bool]*: Create an empty instance. 63 64 Example: 65 66 tax_ncbi = NcbiTx() 67 tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"]) 68 tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"]) 69 tax_ott = OttTx(root_node="844192") 70 tax_gg = GreengenesTx(output_prefix="save/to/prefix_") 71 """ 72 if files: 73 if isinstance(files, str): 74 files = [files] 75 for file in files: 76 check_file(file) 77 78 if output_prefix: 79 check_dir(output_prefix) 80 81 # Main structures 82 self._nodes = {} 83 self._ranks = {} 84 self._names = {} 85 86 # Aux. structures 87 self._lineages = {} 88 self._name_nodes = {} 89 self._node_children = {} 90 self._rank_nodes = {} 91 self._translated_nodes = {} 92 self._lca = None 93 94 # Properties 95 self.datetime = datetime.now() 96 self.version = None 97 self.undefined_node = undefined_node 98 self.undefined_name = undefined_name 99 self.undefined_rank = undefined_rank 100 101 # Set version 102 if files or urls: 103 self.version = version 104 else: 105 self.version = self._default_version if not version else version 106 if self.version not in self._supported_versions: 107 raise ValueError( 108 f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls." 109 ) 110 111 # Store source of tax files (url or file) 112 self.sources = [] 113 114 if not empty: 115 # Open/Download/Write files 116 fhs = {} 117 if files: 118 fhs = open_files(files) 119 elif urls or self._default_urls.get(self.version): 120 fhs = download_files( 121 urls=urls if urls else self._default_urls[self.version], 122 output_prefix=output_prefix, 123 retry_attempts=3, 124 ) 125 126 if fhs: 127 # Parse taxonomy 128 self._nodes, self._ranks, self._names = self._parse( 129 fhs, extended_names=extended_names 130 ) 131 close_files(fhs) 132 # Save sources for stats (files or urls) 133 self.sources = list(fhs.keys()) 134 135 # Set root values 136 self._set_root_node( 137 root=root_node if root_node else self._default_root_node, 138 parent=root_parent, 139 name=root_name, 140 rank=root_rank, 141 ) 142 143 # build auxiliary structures 144 if build_node_children: 145 self._node_children = reverse_dict(self._nodes) 146 if build_name_nodes: 147 self._name_nodes = reverse_dict(self._names) 148 if build_rank_nodes: 149 self._rank_nodes = reverse_dict(self._ranks) 150 151 self.check_consistency() 152 153 def _exact_name(self, text: str, names: dict): 154 """ 155 Returns list of nodes of a given exact name (case sensitive). 156 """ 157 if text in names: 158 return names[text] 159 else: 160 return [] 161 162 def _parse(self, fhs: dict): 163 """ 164 main function to be overloaded 165 receives a dictionary with {"url/file": file handler} 166 return nodes, ranks and names dicts 167 """ 168 return {}, {}, {} 169 170 def _partial_name(self, text: str, names: dict): 171 """ 172 Searches names containing a certain text (case sensitive) and return their respective nodes. 173 """ 174 matching_nodes = set() 175 for name in names: 176 if text in name: 177 matching_nodes.update(names[name]) 178 return list(matching_nodes) 179 180 def _recurse_leaves(self, node: str): 181 """ 182 Recursive function returning leaf nodes 183 """ 184 children = self.children(node) 185 if not children: 186 return [node] 187 leaves = [] 188 for child in children: 189 leaves.extend(self._recurse_leaves(child)) 190 return leaves 191 192 def _remove(self, node: str): 193 """ 194 Removes node from taxonomy, no checking, for internal use 195 """ 196 del self._nodes[node] 197 if node in self._names: 198 del self._names[node] 199 if node in self._ranks: 200 del self._ranks[node] 201 202 def _reset_aux_data(self): 203 """ 204 Reset aux. data structures 205 """ 206 self._lineages = {} 207 self._name_nodes = {} 208 self._node_children = {} 209 self._rank_nodes = {} 210 self._translated_nodes = {} 211 self._lca = None 212 213 def _set_root_node(self, root: str, parent: str, name: str, rank: str): 214 """ 215 Set root node of the tree. 216 The files are parsed based on the self._default_root_node for each class 217 A user-defined root node can be: 218 1) internal: will filter the tree acodingly and delete the default root_node 219 2) external: will add node and link to the default 220 """ 221 222 # Set parent/root with defaults 223 self.root_parent = parent 224 self.root_node = self._default_root_node 225 self._nodes[self.root_node] = self.root_parent 226 227 # Default root node is the top by definition 228 if root != self._default_root_node: 229 if root in self._nodes: 230 # Not default but exists on tree, filter only descendants 231 self.filter(root, desc=True) 232 # Remove entry for _default_root_node 233 self._remove(self._default_root_node) 234 else: 235 # Not on tree, link default node with new root 236 self._nodes[self._default_root_node] = root 237 # Change root to user defined 238 self.root_node = root 239 # Set/Update new root node parent link 240 self._nodes[self.root_node] = self.root_parent 241 242 # User-defined rank/name. 243 # If provided, insert manually, 244 # If None, check if is in the tree (defined in the given tax) 245 # otherwise insert default "root" 246 if name: 247 self._names[self.root_node] = name 248 elif self.root_node not in self._names: 249 self._names[self.root_node] = "root" 250 # Set static name 251 self.root_name = self._names[self.root_node] 252 253 if rank: 254 self._ranks[self.root_node] = rank 255 elif self.root_node not in self._ranks: 256 self._ranks[self.root_node] = "root" 257 # Set static rank 258 self.root_rank = self._ranks[self.root_node] 259 260 def add(self, node: str, parent: str, name: str = None, rank: str = None): 261 """ 262 Adds node to taxonomy. 263 Deletes built lineages, translations and lca. 264 """ 265 if parent not in self._nodes: 266 raise ValueError("Parent node [" + parent + "] not found.") 267 elif node in self._nodes: 268 raise ValueError("Node [" + node + "] already present.") 269 270 self._nodes[node] = parent 271 self._names[node] = name if name is not None else self.undefined_name 272 self._ranks[node] = rank if rank is not None else self.undefined_rank 273 self._reset_aux_data() 274 275 def build_lca(self): 276 """ 277 Builds LCA structure based on pylca. 278 Optional function, LCA is built on first .lca() call. 279 280 Returns: None 281 """ 282 self._lca = LCA(self._nodes) 283 284 def build_lineages(self, root_node: str = None, ranks: list = None): 285 """ 286 Stores lineages in memory for faster access. 287 It is valid for lineage(), rank_lineage() and name_lineage(). 288 If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used. 289 290 Returns: None 291 """ 292 self.clear_lineages() 293 for node in self._nodes: 294 self._lineages[node] = self.lineage( 295 node=node, root_node=root_node, ranks=ranks 296 ) 297 298 def build_translation(self, tax, file: str = None, url: str = None): 299 """ 300 Create a translation of current taxonomy to another 301 302 Parameters: 303 304 * **tax** [MultiTax]: A target taxonomy to be translated to. 305 * **file** *[str]*: Local file to parse. 306 * **url** *[str]*: Url to download and parse. 307 308 Example: 309 310 from multitax import GtdbTx, NcbiTx 311 gtdb_tax = GtdbTx() 312 ncbi_tax = NcbiTx() 313 314 # Automatically download translation files 315 gtdb_tax.build_translation(ncbi_tax) 316 gtdb_tax.translate("g__Escherichia") 317 {'1301', '547', '561', '570', '590', '620'} 318 319 # Using local file 320 ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz") 321 ncbi_tax.translate("620") 322 {'g__Escherichia', 'g__Proteus', 'g__Serratia'} 323 """ 324 if file: 325 check_file(file) 326 327 self._translated_nodes = self._build_translation(tax, file, url) 328 329 def children(self, node: str): 330 """ 331 Returns list of direct children nodes of a given node. 332 """ 333 # Setup on first use 334 if not self._node_children: 335 self._node_children = reverse_dict(self._nodes) 336 if node in self._node_children: 337 return self._node_children[node] 338 else: 339 return [] 340 341 def check_consistency(self): 342 """ 343 Checks consistency of the tree 344 345 Returns: raise an Exception otherwise None 346 """ 347 if self.root_node not in self._nodes: 348 raise ValueError("Root node [" + self.root_node + "] not found.") 349 if self.root_parent in self._nodes: 350 raise ValueError( 351 "Root parent [" 352 + self.root_parent 353 + "] found but should not be on tree." 354 ) 355 if self.undefined_node in self._nodes: 356 raise ValueError( 357 "Undefined node [" 358 + self.undefined_node 359 + "] found but should not be on tree." 360 ) 361 362 # Difference between values and keys should be only root_parent 363 lost_nodes = set(self._nodes.values()).difference(self._nodes) 364 if self.root_parent not in lost_nodes: 365 raise ValueError( 366 "Root parent [" + self.root_parent + "] not properly defined." 367 ) 368 # Remove root_parent from lost nodes to report only missing 369 lost_nodes.remove(self.root_parent) 370 if len(lost_nodes) > 0: 371 raise ValueError("Parent nodes missing: " + ",".join(lost_nodes)) 372 373 return None 374 375 def clear_lca(self): 376 """ 377 Clear built LCA. 378 379 Returns: None 380 """ 381 self._lca = None 382 383 def clear_lineages(self): 384 """ 385 Clear built lineages. 386 387 Returns: None 388 """ 389 self._lineages = {} 390 391 def closest_parent(self, node: str, ranks: str): 392 """ 393 Returns the closest parent node based on a defined list of ranks 394 """ 395 # Rank of node is already on the list 396 if self.rank(node) in ranks: 397 return node 398 else: 399 # check lineage from back to front until find a valid node 400 for n in self.lineage(node, ranks=ranks)[::-1]: 401 if n != self.undefined_node: 402 return n 403 # nothing found 404 return self.undefined_node 405 406 def filter(self, nodes: list, desc: bool = False): 407 """ 408 Filters taxonomy given a list of nodes. 409 By default keep all the ancestors of the given nodes. 410 If desc=True, keep all descendants instead. 411 Deletes built lineages, translations and lca. 412 413 Example: 414 415 from multitax import GtdbTx 416 tax = GtdbTx() 417 418 tax.lineage('s__Enterovibrio marina') 419 # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina'] 420 # Keep only ancestors of 'g__Enterovibrio' 421 tax.filter('g__Enterovibrio') 422 423 # Reload taxonomy 424 tax = GtdbTx() 425 # Keep only descendants of 'g__Enterovibrio' 426 tax.filter('g__Enterovibrio', desc=True) 427 """ 428 if isinstance(nodes, str): 429 nodes = [nodes] 430 431 # Keep track of nodes to be filtered out 432 filtered_nodes = set(self._nodes) 433 # Always keep root 434 filtered_nodes.discard(self.root_node) 435 436 if desc: 437 # Keep descendants of the given nodes 438 for node in nodes: 439 # Check if node exists (skips root) 440 if node in filtered_nodes: 441 # For each leaf of the selected nodes 442 for leaf in self.leaves(node): 443 # Build lineage of each leaf up-to node itself 444 for n in self.lineage(leaf, root_node=node): 445 # Discard nodes from set to be kept 446 filtered_nodes.discard(n) 447 # Link node to root 448 self._nodes[node] = self.root_node 449 else: 450 # Keep ancestors of the given nodes (full lineage up-to root) 451 for node in nodes: 452 # ranks=[] in case build_lineages() was used with specific ranks 453 for n in self.lineage(node, ranks=[]): 454 # Discard nodes from set to be kept 455 filtered_nodes.discard(n) 456 457 # Delete filtered nodes 458 for node in filtered_nodes: 459 self._remove(node) 460 461 # Delete aux. data structures 462 self._reset_aux_data() 463 self.check_consistency() 464 465 @classmethod 466 def from_customtx(cls, ctx): 467 """ 468 Initialize a Tx sub-class based on a CustomTx instance. 469 470 Example: 471 472 tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"]) 473 tax_ncbi = NcbiTx.from_customtx(tax_custom) 474 """ 475 nc = cls(empty=True) 476 nc.version = ctx.version 477 nc.sources = ctx.sources 478 nc._nodes = ctx._nodes 479 nc._names = ctx._names 480 nc._ranks = ctx._ranks 481 return nc 482 483 def latest(self, node: str): 484 """ 485 Returns latest/updated version of a given node. 486 If node is already the latests, returns itself. 487 Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv) 488 """ 489 if node in self._nodes: 490 return node 491 else: 492 return self.undefined_node 493 494 def leaves(self, node: str = None): 495 """ 496 Returns a list of leaf nodes of a given node. 497 """ 498 if node is None or node == self.root_node: 499 # Leaves are nodes not contained in _nodes.values() ("parents") 500 return list(set(self._nodes).difference(self._nodes.values())) 501 elif node in self._nodes: 502 return self._recurse_leaves(node) 503 else: 504 return [] 505 506 def lca(self, nodes: list = None): 507 """ 508 Returns the lowest common ancestor of two or more nodes. 509 510 Example: 511 512 from multitax import GtdbTx 513 tax = GtdbTx() 514 tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"]) 515 """ 516 for node in nodes: 517 if node not in self._nodes: 518 raise ValueError("Node [" + node + "] not found.") 519 520 # Setup on first use 521 if not self._lca: 522 self.build_lca() 523 524 return self._lca(*nodes) 525 526 def lineage(self, node: str, root_node: str = None, ranks: list = None): 527 """ 528 Returns a list with the lineage of a given node. 529 If ranks is provided, returns only nodes annotated with such ranks. 530 If root_node is provided, use it instead of default root of tree. 531 """ 532 # If lineages were built with build_lineages() with matching params 533 if node in self._lineages and root_node is None and ranks is None: 534 return self._lineages[node] 535 else: 536 if not root_node: 537 root_node = self.root_node 538 539 n = node 540 if ranks: 541 # Fixed length lineage 542 lin = [self.undefined_node] * len(ranks) 543 # Loop until end of the tree (in case chosen root is not on lineage) 544 while n != self.undefined_node: 545 r = self.rank(n) 546 if r in ranks: 547 lin[ranks.index(r)] = n 548 # If node is root, break (after adding) 549 if n == root_node: 550 break 551 n = self.parent(n) 552 else: 553 # Full lineage 554 lin = [] 555 # Loop until end of the tree (in case chosen root is not on lineage) 556 while n != self.undefined_node: 557 lin.append(n) 558 # If node is root, break (after adding) 559 if n == root_node: 560 break 561 n = self.parent(n) 562 # Reverse order 563 lin = lin[::-1] 564 565 # last iteration node (n) != root_node: didn't find the root, invalid lineage 566 if n != root_node: 567 return [] 568 else: 569 return lin 570 571 def name(self, node: str): 572 """ 573 Returns name of a given node. 574 """ 575 if node in self._names: 576 return self._names[node] 577 else: 578 return self.undefined_name 579 580 def name_lineage(self, node: str, root_node: str = None, ranks: list = None): 581 """ 582 Returns a list with the name lineage of a given node. 583 """ 584 return list( 585 map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks)) 586 ) 587 588 def nodes_rank(self, rank: str): 589 """ 590 Returns list of nodes of a given rank. 591 """ 592 # Setup on first use 593 if not self._rank_nodes: 594 self._rank_nodes = reverse_dict(self._ranks) 595 if rank in self._rank_nodes: 596 return self._rank_nodes[rank] 597 else: 598 return [] 599 600 def parent(self, node: str): 601 """ 602 Returns the direct parent node of a given node. 603 """ 604 if node in self._nodes: 605 return self._nodes[node] 606 else: 607 return self.undefined_node 608 609 def parent_rank(self, node: str, rank: str): 610 """ 611 Returns the parent node of a given rank in the specified rank. 612 """ 613 parent = self.lineage(node=node, ranks=[rank]) 614 return parent[0] if parent else self.undefined_node 615 616 def prune(self, nodes: list): 617 """ 618 Prunes branches of the tree under the given nodes. 619 Deletes built lineages, translations and lca. 620 """ 621 622 if isinstance(nodes, str): 623 nodes = [nodes] 624 625 del_nodes = set() 626 for node in nodes: 627 if node not in self._nodes: 628 raise ValueError("Node [" + node + "] not found.") 629 for leaf in self.leaves(node): 630 for n in self.lineage(leaf, root_node=node)[1:]: 631 del_nodes.add(n) 632 633 for n in del_nodes: 634 self._remove(n) 635 636 self._reset_aux_data() 637 638 def rank(self, node: str): 639 """ 640 Returns the rank of a given node. 641 """ 642 if node in self._ranks: 643 return self._ranks[node] 644 else: 645 return self.undefined_rank 646 647 def rank_lineage(self, node: str, root_node: str = None, ranks: list = None): 648 """ 649 Returns a list with the rank lineage of a given node. 650 """ 651 return list( 652 map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks)) 653 ) 654 655 def remove(self, node: str, check_consistency: bool = False): 656 """ 657 Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. 658 Running check consistency after removing a node is recommended. 659 Deletes built lineages, translations and lca. 660 """ 661 if node not in self._nodes: 662 raise ValueError("Node [" + node + "] not found.") 663 self._remove(node) 664 self._reset_aux_data() 665 if check_consistency: 666 self.check_consistency() 667 668 def search_name(self, text: str, rank: str = None, exact: bool = True): 669 """ 670 Search node by exact or partial name 671 672 Parameters: 673 * **text** *[str]*: Text to search. 674 * **rank** *[str]*: Filter results by rank. 675 * **exact** *[bool]*: Exact or partial name search (both case sensitive). 676 677 Returns: list of matching nodes 678 """ 679 # Setup on first use 680 if not self._name_nodes: 681 self._name_nodes = reverse_dict(self._names) 682 683 if exact: 684 ret = self._exact_name(text, self._name_nodes) 685 else: 686 ret = self._partial_name(text, self._name_nodes) 687 688 # Only return nodes of chosen rank 689 if rank: 690 return filter_function(ret, self.rank, rank) 691 else: 692 return ret 693 694 def stats(self): 695 """ 696 Returns a dict with general numbers of the taxonomic tree 697 698 Example: 699 700 from pprint import pprint 701 from multitax import GtdbTx 702 tax = GtdbTx() 703 704 pprint(tax.stats()) 705 {'leaves': 30238, 706 'names': 42739, 707 'nodes': 42739, 708 'ranked_leaves': Counter({'species': 30238}), 709 'ranked_nodes': Counter({'species': 30238, 710 'genus': 8778, 711 'family': 2323, 712 'order': 930, 713 'class': 337, 714 'phylum': 131, 715 'domain': 1, 716 'root': 1}), 717 'ranks': 42739} 718 """ 719 s = {} 720 s["nodes"] = len(self._nodes) 721 s["ranks"] = len(self._ranks) 722 s["names"] = len(self._names) 723 all_leaves = self.leaves(self.root_node) 724 s["leaves"] = len(all_leaves) 725 s["ranked_nodes"] = Counter(self._ranks.values()) 726 s["ranked_leaves"] = Counter(map(self.rank, all_leaves)) 727 return s 728 729 def translate(self, node: str): 730 """ 731 Returns the translated node from another taxonomy. Translated nodes are generated with the build_translation function. 732 """ 733 if node in self._translated_nodes: 734 return self._translated_nodes[node] 735 else: 736 return [] 737 738 def write( 739 self, 740 output_file: str, 741 cols: list = ["node", "parent", "rank", "name"], 742 sep: str = "\t", 743 sep_multi: str = "|", 744 ranks: list = None, 745 gz: bool = False, 746 ): 747 """ 748 Writes loaded taxonomy to a file. 749 750 Parameters: 751 * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage" 752 * **sep** *[str]*: Separator of fields 753 * **sep_multi** *[str]*: Separator of multi-valued fields 754 * **ranks** *[list]*: Ranks to report 755 * **gz** *[bool]*: Gzip output 756 757 Returns: None 758 """ 759 import gzip 760 761 if gz: 762 output_file = ( 763 output_file if output_file.endswith(".gz") else output_file + ".gz" 764 ) 765 check_no_file(output_file) 766 outf = gzip.open(output_file, "wt") 767 else: 768 check_no_file(output_file) 769 outf = open(output_file, "w") 770 771 write_field = { 772 "node": lambda node: node, 773 "latest": self.latest, 774 "parent": self.parent, 775 "rank": self.rank, 776 "name": self.name, 777 "leaves": lambda node: join_check(self.leaves(node), sep_multi), 778 "children": lambda node: join_check(self.children(node), sep_multi), 779 "lineage": lambda node: join_check( 780 self.lineage(node, ranks=ranks), sep_multi 781 ), 782 "rank_lineage": lambda node: join_check( 783 self.rank_lineage(node, ranks=ranks), sep_multi 784 ), 785 "name_lineage": lambda node: join_check( 786 self.name_lineage(node, ranks=ranks), sep_multi 787 ), 788 } 789 790 for c in cols: 791 if c not in write_field: 792 raise ValueError( 793 "Field [" + c + "] is not valid. Options: " + ",".join(write_field) 794 ) 795 796 if ranks: 797 for rank in ranks: 798 for node in self.nodes_rank(rank): 799 print( 800 *[write_field[c](node) for c in cols], 801 sep=sep, 802 end="\n", 803 file=outf, 804 ) 805 else: 806 for node in self._nodes: 807 print( 808 *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf 809 ) 810 811 outf.close()
24 def __init__( 25 self, 26 version: str = None, 27 files: list = None, 28 urls: list = None, 29 output_prefix: str = None, 30 root_node: str = None, 31 root_parent: str = "0", 32 root_name: str = None, 33 root_rank: str = None, 34 undefined_node: str = None, 35 undefined_name: str = None, 36 undefined_rank: str = None, 37 build_name_nodes: bool = False, 38 build_node_children: bool = False, 39 build_rank_nodes: bool = False, 40 extended_names: bool = False, 41 empty: bool = False, 42 ): 43 """ 44 Main constructor of MultiTax and sub-classes 45 46 Parameters: 47 * **version** *[str]*: Version to download/parse or custom version name (with files/urls). 48 * **files** *[str, list]*: One or more local files to parse. 49 * **urls** *[str, list]*: One or more urls to download and parse. 50 * **output_prefix** *[str]*: Directory to write downloaded files. 51 * **root_node** *[str]*: Define an alternative root node. 52 * **root_parent** *[str]*: Define the root parent node identifier. 53 * **root_name** *[str]*: Define an alternative root name. Set to None to use original name. 54 * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name. 55 * **undefined_node** *[str]*: Define a default return value for undefined nodes. 56 * **undefined_name** *[str]*: Define a default return value for undefined names. 57 * **undefined_rank** *[str]*: Define a default return value for undefined ranks. 58 * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use). 59 * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use). 60 * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use). 61 * **extended_names** *[bool]*: Parse extended names if available. 62 * **empty** *[bool]*: Create an empty instance. 63 64 Example: 65 66 tax_ncbi = NcbiTx() 67 tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"]) 68 tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"]) 69 tax_ott = OttTx(root_node="844192") 70 tax_gg = GreengenesTx(output_prefix="save/to/prefix_") 71 """ 72 if files: 73 if isinstance(files, str): 74 files = [files] 75 for file in files: 76 check_file(file) 77 78 if output_prefix: 79 check_dir(output_prefix) 80 81 # Main structures 82 self._nodes = {} 83 self._ranks = {} 84 self._names = {} 85 86 # Aux. structures 87 self._lineages = {} 88 self._name_nodes = {} 89 self._node_children = {} 90 self._rank_nodes = {} 91 self._translated_nodes = {} 92 self._lca = None 93 94 # Properties 95 self.datetime = datetime.now() 96 self.version = None 97 self.undefined_node = undefined_node 98 self.undefined_name = undefined_name 99 self.undefined_rank = undefined_rank 100 101 # Set version 102 if files or urls: 103 self.version = version 104 else: 105 self.version = self._default_version if not version else version 106 if self.version not in self._supported_versions: 107 raise ValueError( 108 f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls." 109 ) 110 111 # Store source of tax files (url or file) 112 self.sources = [] 113 114 if not empty: 115 # Open/Download/Write files 116 fhs = {} 117 if files: 118 fhs = open_files(files) 119 elif urls or self._default_urls.get(self.version): 120 fhs = download_files( 121 urls=urls if urls else self._default_urls[self.version], 122 output_prefix=output_prefix, 123 retry_attempts=3, 124 ) 125 126 if fhs: 127 # Parse taxonomy 128 self._nodes, self._ranks, self._names = self._parse( 129 fhs, extended_names=extended_names 130 ) 131 close_files(fhs) 132 # Save sources for stats (files or urls) 133 self.sources = list(fhs.keys()) 134 135 # Set root values 136 self._set_root_node( 137 root=root_node if root_node else self._default_root_node, 138 parent=root_parent, 139 name=root_name, 140 rank=root_rank, 141 ) 142 143 # build auxiliary structures 144 if build_node_children: 145 self._node_children = reverse_dict(self._nodes) 146 if build_name_nodes: 147 self._name_nodes = reverse_dict(self._names) 148 if build_rank_nodes: 149 self._rank_nodes = reverse_dict(self._ranks) 150 151 self.check_consistency()
Main constructor of MultiTax and sub-classes
Parameters:
- version [str]: Version to download/parse or custom version name (with files/urls).
- files [str, list]: One or more local files to parse.
- urls [str, list]: One or more urls to download and parse.
- output_prefix [str]: Directory to write downloaded files.
- root_node [str]: Define an alternative root node.
- root_parent [str]: Define the root parent node identifier.
- root_name [str]: Define an alternative root name. Set to None to use original name.
- root_rank [str]: Define an alternative root rank. Set to None to use original name.
- undefined_node [str]: Define a default return value for undefined nodes.
- undefined_name [str]: Define a default return value for undefined names.
- undefined_rank [str]: Define a default return value for undefined ranks.
- build_node_children [bool]: Build node,children dict (otherwise it will be created on first use).
- build_name_nodes [bool]: Build name,nodes dict (otherwise it will be created on first use).
- build_rank_nodes [bool]: Build rank,nodes dict (otherwise it will be created on first use).
- extended_names [bool]: Parse extended names if available.
- empty [bool]: Create an empty instance.
Example:
tax_ncbi = NcbiTx()
tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
tax_ott = OttTx(root_node="844192")
tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
260 def add(self, node: str, parent: str, name: str = None, rank: str = None): 261 """ 262 Adds node to taxonomy. 263 Deletes built lineages, translations and lca. 264 """ 265 if parent not in self._nodes: 266 raise ValueError("Parent node [" + parent + "] not found.") 267 elif node in self._nodes: 268 raise ValueError("Node [" + node + "] already present.") 269 270 self._nodes[node] = parent 271 self._names[node] = name if name is not None else self.undefined_name 272 self._ranks[node] = rank if rank is not None else self.undefined_rank 273 self._reset_aux_data()
Adds node to taxonomy. Deletes built lineages, translations and lca.
275 def build_lca(self): 276 """ 277 Builds LCA structure based on pylca. 278 Optional function, LCA is built on first .lca() call. 279 280 Returns: None 281 """ 282 self._lca = LCA(self._nodes)
Builds LCA structure based on pylca. Optional function, LCA is built on first .lca() call.
Returns: None
284 def build_lineages(self, root_node: str = None, ranks: list = None): 285 """ 286 Stores lineages in memory for faster access. 287 It is valid for lineage(), rank_lineage() and name_lineage(). 288 If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used. 289 290 Returns: None 291 """ 292 self.clear_lineages() 293 for node in self._nodes: 294 self._lineages[node] = self.lineage( 295 node=node, root_node=root_node, ranks=ranks 296 )
Stores lineages in memory for faster access. It is valid for lineage(), rank_lineage() and name_lineage(). If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.
Returns: None
298 def build_translation(self, tax, file: str = None, url: str = None): 299 """ 300 Create a translation of current taxonomy to another 301 302 Parameters: 303 304 * **tax** [MultiTax]: A target taxonomy to be translated to. 305 * **file** *[str]*: Local file to parse. 306 * **url** *[str]*: Url to download and parse. 307 308 Example: 309 310 from multitax import GtdbTx, NcbiTx 311 gtdb_tax = GtdbTx() 312 ncbi_tax = NcbiTx() 313 314 # Automatically download translation files 315 gtdb_tax.build_translation(ncbi_tax) 316 gtdb_tax.translate("g__Escherichia") 317 {'1301', '547', '561', '570', '590', '620'} 318 319 # Using local file 320 ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz") 321 ncbi_tax.translate("620") 322 {'g__Escherichia', 'g__Proteus', 'g__Serratia'} 323 """ 324 if file: 325 check_file(file) 326 327 self._translated_nodes = self._build_translation(tax, file, url)
Create a translation of current taxonomy to another
Parameters:
- tax [MultiTax]: A target taxonomy to be translated to.
- file [str]: Local file to parse.
- url [str]: Url to download and parse.
Example:
from multitax import GtdbTx, NcbiTx
gtdb_tax = GtdbTx()
ncbi_tax = NcbiTx()
# Automatically download translation files
gtdb_tax.build_translation(ncbi_tax)
gtdb_tax.translate("g__Escherichia")
{'1301', '547', '561', '570', '590', '620'}
# Using local file
ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
ncbi_tax.translate("620")
{'g__Escherichia', 'g__Proteus', 'g__Serratia'}
329 def children(self, node: str): 330 """ 331 Returns list of direct children nodes of a given node. 332 """ 333 # Setup on first use 334 if not self._node_children: 335 self._node_children = reverse_dict(self._nodes) 336 if node in self._node_children: 337 return self._node_children[node] 338 else: 339 return []
Returns list of direct children nodes of a given node.
341 def check_consistency(self): 342 """ 343 Checks consistency of the tree 344 345 Returns: raise an Exception otherwise None 346 """ 347 if self.root_node not in self._nodes: 348 raise ValueError("Root node [" + self.root_node + "] not found.") 349 if self.root_parent in self._nodes: 350 raise ValueError( 351 "Root parent [" 352 + self.root_parent 353 + "] found but should not be on tree." 354 ) 355 if self.undefined_node in self._nodes: 356 raise ValueError( 357 "Undefined node [" 358 + self.undefined_node 359 + "] found but should not be on tree." 360 ) 361 362 # Difference between values and keys should be only root_parent 363 lost_nodes = set(self._nodes.values()).difference(self._nodes) 364 if self.root_parent not in lost_nodes: 365 raise ValueError( 366 "Root parent [" + self.root_parent + "] not properly defined." 367 ) 368 # Remove root_parent from lost nodes to report only missing 369 lost_nodes.remove(self.root_parent) 370 if len(lost_nodes) > 0: 371 raise ValueError("Parent nodes missing: " + ",".join(lost_nodes)) 372 373 return None
Checks consistency of the tree
Returns: raise an Exception otherwise None
375 def clear_lca(self): 376 """ 377 Clear built LCA. 378 379 Returns: None 380 """ 381 self._lca = None
Clear built LCA.
Returns: None
383 def clear_lineages(self): 384 """ 385 Clear built lineages. 386 387 Returns: None 388 """ 389 self._lineages = {}
Clear built lineages.
Returns: None
391 def closest_parent(self, node: str, ranks: str): 392 """ 393 Returns the closest parent node based on a defined list of ranks 394 """ 395 # Rank of node is already on the list 396 if self.rank(node) in ranks: 397 return node 398 else: 399 # check lineage from back to front until find a valid node 400 for n in self.lineage(node, ranks=ranks)[::-1]: 401 if n != self.undefined_node: 402 return n 403 # nothing found 404 return self.undefined_node
Returns the closest parent node based on a defined list of ranks
406 def filter(self, nodes: list, desc: bool = False): 407 """ 408 Filters taxonomy given a list of nodes. 409 By default keep all the ancestors of the given nodes. 410 If desc=True, keep all descendants instead. 411 Deletes built lineages, translations and lca. 412 413 Example: 414 415 from multitax import GtdbTx 416 tax = GtdbTx() 417 418 tax.lineage('s__Enterovibrio marina') 419 # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina'] 420 # Keep only ancestors of 'g__Enterovibrio' 421 tax.filter('g__Enterovibrio') 422 423 # Reload taxonomy 424 tax = GtdbTx() 425 # Keep only descendants of 'g__Enterovibrio' 426 tax.filter('g__Enterovibrio', desc=True) 427 """ 428 if isinstance(nodes, str): 429 nodes = [nodes] 430 431 # Keep track of nodes to be filtered out 432 filtered_nodes = set(self._nodes) 433 # Always keep root 434 filtered_nodes.discard(self.root_node) 435 436 if desc: 437 # Keep descendants of the given nodes 438 for node in nodes: 439 # Check if node exists (skips root) 440 if node in filtered_nodes: 441 # For each leaf of the selected nodes 442 for leaf in self.leaves(node): 443 # Build lineage of each leaf up-to node itself 444 for n in self.lineage(leaf, root_node=node): 445 # Discard nodes from set to be kept 446 filtered_nodes.discard(n) 447 # Link node to root 448 self._nodes[node] = self.root_node 449 else: 450 # Keep ancestors of the given nodes (full lineage up-to root) 451 for node in nodes: 452 # ranks=[] in case build_lineages() was used with specific ranks 453 for n in self.lineage(node, ranks=[]): 454 # Discard nodes from set to be kept 455 filtered_nodes.discard(n) 456 457 # Delete filtered nodes 458 for node in filtered_nodes: 459 self._remove(node) 460 461 # Delete aux. data structures 462 self._reset_aux_data() 463 self.check_consistency()
Filters taxonomy given a list of nodes. By default keep all the ancestors of the given nodes. If desc=True, keep all descendants instead. Deletes built lineages, translations and lca.
Example:
from multitax import GtdbTx
tax = GtdbTx()
tax.lineage('s__Enterovibrio marina')
# ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
# Keep only ancestors of 'g__Enterovibrio'
tax.filter('g__Enterovibrio')
# Reload taxonomy
tax = GtdbTx()
# Keep only descendants of 'g__Enterovibrio'
tax.filter('g__Enterovibrio', desc=True)
465 @classmethod 466 def from_customtx(cls, ctx): 467 """ 468 Initialize a Tx sub-class based on a CustomTx instance. 469 470 Example: 471 472 tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"]) 473 tax_ncbi = NcbiTx.from_customtx(tax_custom) 474 """ 475 nc = cls(empty=True) 476 nc.version = ctx.version 477 nc.sources = ctx.sources 478 nc._nodes = ctx._nodes 479 nc._names = ctx._names 480 nc._ranks = ctx._ranks 481 return nc
Initialize a Tx sub-class based on a CustomTx instance.
Example:
tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
tax_ncbi = NcbiTx.from_customtx(tax_custom)
483 def latest(self, node: str): 484 """ 485 Returns latest/updated version of a given node. 486 If node is already the latests, returns itself. 487 Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv) 488 """ 489 if node in self._nodes: 490 return node 491 else: 492 return self.undefined_node
Returns latest/updated version of a given node. If node is already the latests, returns itself. Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)
494 def leaves(self, node: str = None): 495 """ 496 Returns a list of leaf nodes of a given node. 497 """ 498 if node is None or node == self.root_node: 499 # Leaves are nodes not contained in _nodes.values() ("parents") 500 return list(set(self._nodes).difference(self._nodes.values())) 501 elif node in self._nodes: 502 return self._recurse_leaves(node) 503 else: 504 return []
Returns a list of leaf nodes of a given node.
506 def lca(self, nodes: list = None): 507 """ 508 Returns the lowest common ancestor of two or more nodes. 509 510 Example: 511 512 from multitax import GtdbTx 513 tax = GtdbTx() 514 tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"]) 515 """ 516 for node in nodes: 517 if node not in self._nodes: 518 raise ValueError("Node [" + node + "] not found.") 519 520 # Setup on first use 521 if not self._lca: 522 self.build_lca() 523 524 return self._lca(*nodes)
Returns the lowest common ancestor of two or more nodes.
Example:
from multitax import GtdbTx
tax = GtdbTx()
tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
526 def lineage(self, node: str, root_node: str = None, ranks: list = None): 527 """ 528 Returns a list with the lineage of a given node. 529 If ranks is provided, returns only nodes annotated with such ranks. 530 If root_node is provided, use it instead of default root of tree. 531 """ 532 # If lineages were built with build_lineages() with matching params 533 if node in self._lineages and root_node is None and ranks is None: 534 return self._lineages[node] 535 else: 536 if not root_node: 537 root_node = self.root_node 538 539 n = node 540 if ranks: 541 # Fixed length lineage 542 lin = [self.undefined_node] * len(ranks) 543 # Loop until end of the tree (in case chosen root is not on lineage) 544 while n != self.undefined_node: 545 r = self.rank(n) 546 if r in ranks: 547 lin[ranks.index(r)] = n 548 # If node is root, break (after adding) 549 if n == root_node: 550 break 551 n = self.parent(n) 552 else: 553 # Full lineage 554 lin = [] 555 # Loop until end of the tree (in case chosen root is not on lineage) 556 while n != self.undefined_node: 557 lin.append(n) 558 # If node is root, break (after adding) 559 if n == root_node: 560 break 561 n = self.parent(n) 562 # Reverse order 563 lin = lin[::-1] 564 565 # last iteration node (n) != root_node: didn't find the root, invalid lineage 566 if n != root_node: 567 return [] 568 else: 569 return lin
Returns a list with the lineage of a given node. If ranks is provided, returns only nodes annotated with such ranks. If root_node is provided, use it instead of default root of tree.
571 def name(self, node: str): 572 """ 573 Returns name of a given node. 574 """ 575 if node in self._names: 576 return self._names[node] 577 else: 578 return self.undefined_name
Returns name of a given node.
580 def name_lineage(self, node: str, root_node: str = None, ranks: list = None): 581 """ 582 Returns a list with the name lineage of a given node. 583 """ 584 return list( 585 map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks)) 586 )
Returns a list with the name lineage of a given node.
588 def nodes_rank(self, rank: str): 589 """ 590 Returns list of nodes of a given rank. 591 """ 592 # Setup on first use 593 if not self._rank_nodes: 594 self._rank_nodes = reverse_dict(self._ranks) 595 if rank in self._rank_nodes: 596 return self._rank_nodes[rank] 597 else: 598 return []
Returns list of nodes of a given rank.
600 def parent(self, node: str): 601 """ 602 Returns the direct parent node of a given node. 603 """ 604 if node in self._nodes: 605 return self._nodes[node] 606 else: 607 return self.undefined_node
Returns the direct parent node of a given node.
609 def parent_rank(self, node: str, rank: str): 610 """ 611 Returns the parent node of a given rank in the specified rank. 612 """ 613 parent = self.lineage(node=node, ranks=[rank]) 614 return parent[0] if parent else self.undefined_node
Returns the parent node of a given rank in the specified rank.
616 def prune(self, nodes: list): 617 """ 618 Prunes branches of the tree under the given nodes. 619 Deletes built lineages, translations and lca. 620 """ 621 622 if isinstance(nodes, str): 623 nodes = [nodes] 624 625 del_nodes = set() 626 for node in nodes: 627 if node not in self._nodes: 628 raise ValueError("Node [" + node + "] not found.") 629 for leaf in self.leaves(node): 630 for n in self.lineage(leaf, root_node=node)[1:]: 631 del_nodes.add(n) 632 633 for n in del_nodes: 634 self._remove(n) 635 636 self._reset_aux_data()
Prunes branches of the tree under the given nodes. Deletes built lineages, translations and lca.
638 def rank(self, node: str): 639 """ 640 Returns the rank of a given node. 641 """ 642 if node in self._ranks: 643 return self._ranks[node] 644 else: 645 return self.undefined_rank
Returns the rank of a given node.
647 def rank_lineage(self, node: str, root_node: str = None, ranks: list = None): 648 """ 649 Returns a list with the rank lineage of a given node. 650 """ 651 return list( 652 map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks)) 653 )
Returns a list with the rank lineage of a given node.
655 def remove(self, node: str, check_consistency: bool = False): 656 """ 657 Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. 658 Running check consistency after removing a node is recommended. 659 Deletes built lineages, translations and lca. 660 """ 661 if node not in self._nodes: 662 raise ValueError("Node [" + node + "] not found.") 663 self._remove(node) 664 self._reset_aux_data() 665 if check_consistency: 666 self.check_consistency()
Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. Running check consistency after removing a node is recommended. Deletes built lineages, translations and lca.
668 def search_name(self, text: str, rank: str = None, exact: bool = True): 669 """ 670 Search node by exact or partial name 671 672 Parameters: 673 * **text** *[str]*: Text to search. 674 * **rank** *[str]*: Filter results by rank. 675 * **exact** *[bool]*: Exact or partial name search (both case sensitive). 676 677 Returns: list of matching nodes 678 """ 679 # Setup on first use 680 if not self._name_nodes: 681 self._name_nodes = reverse_dict(self._names) 682 683 if exact: 684 ret = self._exact_name(text, self._name_nodes) 685 else: 686 ret = self._partial_name(text, self._name_nodes) 687 688 # Only return nodes of chosen rank 689 if rank: 690 return filter_function(ret, self.rank, rank) 691 else: 692 return ret
Search node by exact or partial name
Parameters:
- text [str]: Text to search.
- rank [str]: Filter results by rank.
- exact [bool]: Exact or partial name search (both case sensitive).
Returns: list of matching nodes
694 def stats(self): 695 """ 696 Returns a dict with general numbers of the taxonomic tree 697 698 Example: 699 700 from pprint import pprint 701 from multitax import GtdbTx 702 tax = GtdbTx() 703 704 pprint(tax.stats()) 705 {'leaves': 30238, 706 'names': 42739, 707 'nodes': 42739, 708 'ranked_leaves': Counter({'species': 30238}), 709 'ranked_nodes': Counter({'species': 30238, 710 'genus': 8778, 711 'family': 2323, 712 'order': 930, 713 'class': 337, 714 'phylum': 131, 715 'domain': 1, 716 'root': 1}), 717 'ranks': 42739} 718 """ 719 s = {} 720 s["nodes"] = len(self._nodes) 721 s["ranks"] = len(self._ranks) 722 s["names"] = len(self._names) 723 all_leaves = self.leaves(self.root_node) 724 s["leaves"] = len(all_leaves) 725 s["ranked_nodes"] = Counter(self._ranks.values()) 726 s["ranked_leaves"] = Counter(map(self.rank, all_leaves)) 727 return s
Returns a dict with general numbers of the taxonomic tree
Example:
from pprint import pprint
from multitax import GtdbTx
tax = GtdbTx()
pprint(tax.stats())
{'leaves': 30238,
'names': 42739,
'nodes': 42739,
'ranked_leaves': Counter({'species': 30238}),
'ranked_nodes': Counter({'species': 30238,
'genus': 8778,
'family': 2323,
'order': 930,
'class': 337,
'phylum': 131,
'domain': 1,
'root': 1}),
'ranks': 42739}
729 def translate(self, node: str): 730 """ 731 Returns the translated node from another taxonomy. Translated nodes are generated with the build_translation function. 732 """ 733 if node in self._translated_nodes: 734 return self._translated_nodes[node] 735 else: 736 return []
Returns the translated node from another taxonomy. Translated nodes are generated with the build_translation function.
738 def write( 739 self, 740 output_file: str, 741 cols: list = ["node", "parent", "rank", "name"], 742 sep: str = "\t", 743 sep_multi: str = "|", 744 ranks: list = None, 745 gz: bool = False, 746 ): 747 """ 748 Writes loaded taxonomy to a file. 749 750 Parameters: 751 * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage" 752 * **sep** *[str]*: Separator of fields 753 * **sep_multi** *[str]*: Separator of multi-valued fields 754 * **ranks** *[list]*: Ranks to report 755 * **gz** *[bool]*: Gzip output 756 757 Returns: None 758 """ 759 import gzip 760 761 if gz: 762 output_file = ( 763 output_file if output_file.endswith(".gz") else output_file + ".gz" 764 ) 765 check_no_file(output_file) 766 outf = gzip.open(output_file, "wt") 767 else: 768 check_no_file(output_file) 769 outf = open(output_file, "w") 770 771 write_field = { 772 "node": lambda node: node, 773 "latest": self.latest, 774 "parent": self.parent, 775 "rank": self.rank, 776 "name": self.name, 777 "leaves": lambda node: join_check(self.leaves(node), sep_multi), 778 "children": lambda node: join_check(self.children(node), sep_multi), 779 "lineage": lambda node: join_check( 780 self.lineage(node, ranks=ranks), sep_multi 781 ), 782 "rank_lineage": lambda node: join_check( 783 self.rank_lineage(node, ranks=ranks), sep_multi 784 ), 785 "name_lineage": lambda node: join_check( 786 self.name_lineage(node, ranks=ranks), sep_multi 787 ), 788 } 789 790 for c in cols: 791 if c not in write_field: 792 raise ValueError( 793 "Field [" + c + "] is not valid. Options: " + ",".join(write_field) 794 ) 795 796 if ranks: 797 for rank in ranks: 798 for node in self.nodes_rank(rank): 799 print( 800 *[write_field[c](node) for c in cols], 801 sep=sep, 802 end="\n", 803 file=outf, 804 ) 805 else: 806 for node in self._nodes: 807 print( 808 *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf 809 ) 810 811 outf.close()
Writes loaded taxonomy to a file.
Parameters:
- cols [list]: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
- sep [str]: Separator of fields
- sep_multi [str]: Separator of multi-valued fields
- ranks [list]: Ranks to report
- gz [bool]: Gzip output
Returns: None